4

I'm trying to extract the output from the sparknlp (using Pretrained Pipeline 'explain_document_dl'). I have spent a lot of time looking for ways (UDFs, explode, etc) but cannot get anywhere close to a workable solution. Say I want to get extract values under result and metadata from the column entities. In that column there is an array containing multiple dictionaries

When I use df.withColumn("entity_name", explode("entities.result")), only the value from the first dictionary is extracted.

The content of the "entities" column is a list of dictionaries.

The attempt to provide a reproducible example/re-create the dataframe (thanks to suggestion provided by @jonathan below):

# content of one cell as an example:
d = [{"annotatorType":"chunk","begin":2740,"end":2747,"result":"•Ability","metadata":{"entity":"ORG","sentence":"8","chunk":"22"},"embeddings":[],"sentence_embeddings":[]}, {"annotatorType":"chunk","begin":2740,"end":2747,"result":"Fedex","metadata":{"entity":"ORG","sentence":"8","chunk":"22"},"embeddings":[],"sentence_embeddings":[]}]

from pyspark.sql.types import StructType, StructField, StringType
from array import array
schema = StructType([StructField('annotatorType', StringType(), True),
                     StructField('begin', IntegerType(), True),
                     StructField('end', IntegerType(), True),
                     StructField('result', StringType(), True),
                     StructField('sentence', StringType(), True),
                     StructField('chunk', StringType(), True),
                     StructField('metadata', StructType((StructField('entity', StringType(), True),
                                                      StructField('sentence', StringType(), True),
                                                      StructField('chunk', StringType(), True)
                                                      )), True),
                     StructField('embeddings', StringType(), True),
                     StructField('sentence_embeddings', StringType(), True)
                    ]
                   )

df = spark.createDataFrame(d, schema=schema)
df.show()

In this case of a single list of dictionary, it works:

+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end|  result|sentence|chunk|    metadata|embeddings|sentence_embeddings|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
|        chunk| 2740|2747|•Ability|    null| null|[ORG, 8, 22]|        []|                 []|
|        chunk| 2740|2747|   Fedex|    null| null|[ORG, 8, 22]|        []|                 []|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+

But I am stuck on how to apply this to a column, which contains some cells with an array of multiple dictionaries (so multiple rows to the original cell).

I tried to apply the same schema to the entities column, and I had to convert the column to json first.

ent1 = ent1.withColumn("entities2", to_json("entities"))

enter image description here

It works for cells with an array of 1 dictionary, but gives null to cells that have an array of multiple dictionaries (the 4th row):

ent1.withColumn("entities2", from_json("entities2", schema)).select("entities2.*").show()

+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end|result|sentence|chunk|    metadata|embeddings|sentence_embeddings|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
|        chunk|  166| 169|  Lyft|    null| null|[MISC, 0, 0]|        []|                 []|
|        chunk|   11|  14|  Lyft|    null| null|[MISC, 0, 0]|        []|                 []|
|        chunk|   52|  55|  Lyft|    null| null|[MISC, 1, 0]|        []|                 []|
|         null| null|null|  null|    null| null|        null|      null|               null|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+

The desired output is

+-------------+-----+----+----------------+------------------------+----------+-------------------+
|annotatorType|begin| end|         result |    metadata            |embeddings|sentence_embeddings|
+-------------+-----+----+----------------+------------------------+----------+-------------------+
|        chunk|  166| 169|Lyft            |[MISC]                  |        []|                 []|
|        chunk|   11|  14|Lyft            |[MISC]                  |        []|                 []|
|        chunk|   52|  55|Lyft.           |[MISC]                  |        []|                 []|
|        chunk| [..]|[..]|[Lyft,Lyft,     |[MISC,MISC,MISC,        |        []|                 []| 
|             |     |    |FedEx Ground..] |ORG,LOC,ORG,ORG,ORG,ORG]|          |                   |     
+-------------+-----+----+----------------+------------------------+----------+-------------------+

I also tried convert to json for each row, but I lose track of the original row and was given flatted son:

new_df = sqlContext.read.json(ent2.rdd.map(lambda r: r.entities2))
new_df.show()
+-------------+-----+----------+----+------------+----------------+-------------------+
|annotatorType|begin|embeddings| end|    metadata|          result|sentence_embeddings|
+-------------+-----+----------+----+------------+----------------+-------------------+
|        chunk|  166|        []| 169|[0, MISC, 0]|            Lyft|                 []|
|        chunk|   11|        []|  14|[0, MISC, 0]|            Lyft|                 []|
|        chunk|   52|        []|  55|[0, MISC, 1]|            Lyft|                 []|
|        chunk|    0|        []|  11| [0, ORG, 0]|    FedEx Ground|                 []|
|        chunk|  717|        []| 720| [1, LOC, 4]|            Dock|                 []|
|        chunk|  811|        []| 816| [2, ORG, 5]|          Parcel|                 []|
|        chunk| 1080|        []|1095| [3, ORG, 6]|Parcel Assistant|                 []|
|        chunk| 1102|        []|1108| [4, ORG, 7]|         • Daily|                 []|
|        chunk| 1408|        []|1417| [5, ORG, 8]|      Assistants|                 []|
+-------------+-----+----------+----+------------+----------------+-------------------+

I tried to apply UDF to go through the list of array inside "entities" :

def flatten(my_dict):
    d_result = defaultdict(list)
    for sub in my_dict:
        val = sub['result']
        d_result["result"].append(val)
    return d_result["result"]
ent = ent.withColumn('result', flatten(df.entities))

TypeError: Column is not iterable

I found this post Apache Spark Read JSON With Extra Columns being closely resembling my problem, but after converting the column entities to json, I still can't solve it by the solutions provided in that post.

Any help is appreciated!! Ideally solutions in python but examples in scala is helpful too!

Peggy
  • 83
  • 2
  • 10
  • Please read [how to create reproducible Spark example](https://stackoverflow.com/q/48427185/) and [edit] your question accordingly. Spark NLP uses a lot of custom schemas, and knowing upstream steps is crucial here. – 10465355 Jun 24 '19 at 16:58
  • I updated with my attempt to re-create the spark dataframe, but I could only get NULL. I'm not familiar with spark dataframe so perhaps something is missing. I provided the schema as reference. – Peggy Jun 24 '19 at 18:26

1 Answers1

3

The reason of getting null is because the schema variable doesn't exactly represent the list of dictionaries you are passing in as data

    from pyspark.shell import *
    from pyspark.sql.types import *

    schema = StructType([StructField('result', StringType(), True),
                 StructField('metadata', StructType((StructField('entity', StringType(), True),
                                                     StructField('sentence', StringType(), True),
                                                     StructField('chunk', StringType(), True))), True)])

    df = spark.createDataFrame(d1, schema=schema)
    df.show()

If you prefer a customised solution, you can try pure python/pandas approach

    import pandas as pd
    from pyspark.shell import *

    result = []
    metadata_entity = []
    for row in d1:
        result.append(row.get('result'))
        metadata_entity.append(row.get('metadata').get('entity'))

    schema = {'result': [result], 'metadata.entity': [metadata_entity]}
    pandas_df = pd.DataFrame(schema)

    df = spark.createDataFrame(pandas_df)
    df.show()

    # specific columns
    df.select('result','metadata.entity').show()

EDIT

IMHO after reading all approaches you have been trying, I think sc.parallelize still makes the trick for pretty complex cases. I don't have your original variable, but I can OCR your image and take it from there --although there is no longer Classroom Teacher or Instructional values. Hopefully it will for all that be useful.

You can always create a mock dataframe with the structure you need and get its schema

For complex cases with nested data types, you may use SparkContext and read the resulting JSON format

    import itertools

    from pyspark.shell import *
    from pyspark.sql.functions import *
    from pyspark.sql.types import *

    # assume two lists in two dictionary keys to make four cells
    # since I don't have but entities2, I can just replicate it
    sample = {
        'single_list': [{'annotatorType': 'chunk', 'begin': '166', 'end': '169', 'result': 'Lyft',
                         'metadata': {'entity': 'MISC', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
                         'sentence_embeddings': []},
                        {'annotatorType': 'chunk', 'begin': '11', 'end': '14', 'result': 'Lyft',
                         'metadata': {'entity': 'MISC', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
                         'sentence_embeddings': []},
                        {'annotatorType': 'chunk', 'begin': '52', 'end': '55', 'result': 'Lyft',
                         'metadata': {'entity': 'MISC', 'sentence': '1', 'chunk': '0'}, 'embeddings': [],
                         'sentence_embeddings': []}],
        'frankenstein': [
            {'annotatorType': 'chunk', 'begin': '0', 'end': '11', 'result': 'FedEx Ground',
             'metadata': {'entity': 'ORG', 'sentence': '0', 'chunk': '0'}, 'embeddings': [],
             'sentence_embeddings': []},
            {'annotatorType': 'chunk', 'begin': '717', 'end': '720', 'result': 'Dock',
             'metadata': {'entity': 'LOC', 'sentence': '4', 'chunk': '1'}, 'embeddings': [],
             'sentence_embeddings': []},
            {'annotatorType': 'chunk', 'begin': '811', 'end': '816', 'result': 'Parcel',
             'metadata': {'entity': 'ORG', 'sentence': '5', 'chunk': '2'}, 'embeddings': [],
             'sentence_embeddings': []},
            {'annotatorType': 'chunk', 'begin': '1080', 'end': '1095', 'result': 'Parcel Assistant',
             'metadata': {'entity': 'ORG', 'sentence': '6', 'chunk': '3'}, 'embeddings': [],
             'sentence_embeddings': []},
            {'annotatorType': 'chunk', 'begin': '1102', 'end': '1108', 'result': '* Daily',
             'metadata': {'entity': 'ORG', 'sentence': '7', 'chunk': '4'}, 'embeddings': [],
             'sentence_embeddings': []},
            {'annotatorType': 'chunk', 'begin': '1408', 'end': '1417', 'result': 'Assistants',
             'metadata': {'entity': 'ORG', 'sentence': '8', 'chunk': '5'}, 'embeddings': [],
             'sentence_embeddings': []}]
    }

    # since they are structurally different, get two dataframes
    df_single_list = spark.read.json(sc.parallelize(sample.get('single_list')))
    df_frankenstein = spark.read.json(sc.parallelize(sample.get('frankenstein')))

    # print better the table first border
    print('\n')

    # list to create a dataframe schema
    annotatorType = []
    begin = []
    embeddings = []
    end = []
    metadata = []
    result = []
    sentence_embeddings = []

    # PEP8 here to have an UDF instead of lambdas
    # probably a dictionary with actions to avoid IF statements
    function_metadata = lambda x: [x.entity]
    for k, i in enumerate(df_frankenstein.columns):
        if i == 'annotatorType':
            annotatorType.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'begin':
            begin.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'embeddings':
            embeddings.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'end':
            end.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'metadata':
            _temp = list(map(function_metadata, df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect()))
            metadata.append(list(itertools.chain.from_iterable(_temp)))
        if i == 'result':
            result.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())
        if i == 'sentence_embeddings':
            sentence_embeddings.append(df_frankenstein.select(i).rdd.flatMap(lambda x: x).collect())

    # headers
    annotatorType_header = 'annotatorType'
    begin_header = 'begin'
    embeddings_header = 'embeddings'
    end_header = 'end'
    metadata_header = 'metadata'
    result_header = 'result'
    sentence_embeddings_header = 'sentence_embeddings'
    metadata_entity_header = 'metadata.entity'

    frankenstein_schema = StructType(
        [StructField(annotatorType_header, ArrayType(StringType())),
         StructField(begin_header, ArrayType(StringType())),
         StructField(embeddings_header, ArrayType(StringType())),
         StructField(end_header, ArrayType(StringType())),
         StructField(metadata_header, ArrayType(StringType())),
         StructField(result_header, ArrayType(StringType())),
         StructField(sentence_embeddings_header, ArrayType(StringType()))
         ])

    # list of lists of lists of lists of ... lists
    frankenstein_list = [[annotatorType, begin, embeddings, end, metadata, result, sentence_embeddings]]
    df_frankenstein = spark.createDataFrame(frankenstein_list, schema=frankenstein_schema)

    print(df_single_list.schema)
    print(df_frankenstein.schema)

    # let's see how it is
    df_single_list.select(
        annotatorType_header,
        begin_header,
        end_header,
        result_header,
        array(metadata_entity_header),
        embeddings_header,
        sentence_embeddings_header).show()

    # let's see again
    df_frankenstein.select(
        annotatorType_header,
        begin_header,
        end_header,
        result_header,
        metadata_header,
        embeddings_header,
        sentence_embeddings_header).show()

Output:

    StructType(List(StructField(annotatorType,StringType,true),StructField(begin,StringType,true),StructField(embeddings,ArrayType(StringType,true),true),StructField(end,StringType,true),StructField(metadata,StructType(List(StructField(chunk,StringType,true),StructField(entity,StringType,true),StructField(sentence,StringType,true))),true),StructField(result,StringType,true),StructField(sentence_embeddings,ArrayType(StringType,true),true)))
    StructType(List(StructField(annotatorType,ArrayType(StringType,true),true),StructField(begin,ArrayType(StringType,true),true),StructField(embeddings,ArrayType(StringType,true),true),StructField(end,ArrayType(StringType,true),true),StructField(metadata,ArrayType(StringType,true),true),StructField(result,ArrayType(StringType,true),true),StructField(sentence_embeddings,ArrayType(StringType,true),true)))

    +-------------+-----+---+------+----------------------+----------+-------------------+
    |annotatorType|begin|end|result|array(metadata.entity)|embeddings|sentence_embeddings|
    +-------------+-----+---+------+----------------------+----------+-------------------+
    |        chunk|  166|169|  Lyft|                [MISC]|        []|                 []|
    |        chunk|   11| 14|  Lyft|                [MISC]|        []|                 []|
    |        chunk|   52| 55|  Lyft|                [MISC]|        []|                 []|
    +-------------+-----+---+------+----------------------+----------+-------------------+
    +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
    |       annotatorType|               begin|                 end|              result|            metadata|          embeddings| sentence_embeddings|
    +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
    |[[chunk, chunk, c...|[[0, 717, 811, 10...|[[11, 720, 816, 1...|[[FedEx Ground, D...|[[ORG, LOC, ORG, ...|[[[], [], [], [],...|[[[], [], [], [],...|
    +--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+

You will have to select from each dataframe separately since they are different in datatypes, but the content is ready (if I understood your requirement from the output) to consume

( ͡° ͜ʖ ͡°)

jslipknot
  • 435
  • 2
  • 5
  • 14
  • Thank you Jonathan! I also discovered that I can json_schema = spark.read.json(df.rdd.map(lambda row: row.entities2)).schema instead of writing it out. But my challenge remains- for cells that have multiple dictionaries, I cannot iterate over it with an UDF or using the from_json function to get the values out... any ideas? – Peggy Jun 25 '19 at 21:14
  • Most welcome Peggy. I made a minor update on the `sc.parallelize` method. The last part is just a quick (and non-standard) way to show the column as a list, but I believe it's nothing related to the real solution developed beforehand. I was just trying to reproduce the output. If I'm not getting the idea, please, let me know (: – jslipknot Jun 25 '19 at 23:39
  • You used OCR on my screen shot to re-create my example? You're my hero. In the example you used, `sample` is actually just one row in the data frame (under `entities` column). So I need to take that cell, expand to the intermediate data frame (first `output`), then put it back together (second `output`) to match the original row. Can I combine those steps into an UDF? I updated the desired output in the post. – Peggy Jun 26 '19 at 03:25
  • lol nothing to brag about. it's just a basic PyTesseract and trimming/replacing the last 9 chars with a closing square bracket (: I will update the answer accordingly. Is your "desire output" the final one? Or do you need another column in there? I can create random content and use an UDF to combine it all. If it's not sensitive information and you think you can share it, then please, let me know the final output and its structure. Anyway, meanwhile I will mock it with a random complex structure and some meaningless data – jslipknot Jun 26 '19 at 18:53
  • let's see if this custom schema is useful for you to continue with next steps. Sadly there's been always this restriction on union definition in set theory. In Spark it's the same and that makes it difficult to link both dataframes per column because data types are just different. Hope it helps – jslipknot Jun 27 '19 at 02:39
  • The desired output is the one from the post. So I think what I can do is iterate over each row and apply the solution you suggested, and combine them back to a data frame. I think I can work with this!! Thank you so much!!! – Peggy Jun 27 '19 at 15:33
  • This is so absurdly convoluted for simply wanting to get meaningful data out of spark NLP outputs – Garglesoap Dec 28 '21 at 21:03