I'm trying to extract the output from the sparknlp (using Pretrained Pipeline 'explain_document_dl'). I have spent a lot of time looking for ways (UDFs, explode, etc) but cannot get anywhere close to a workable solution. Say I want to get extract values under result
and metadata
from the column entities
. In that column there is an array containing multiple dictionaries
When I use df.withColumn("entity_name", explode("entities.result"))
, only the value from the first dictionary is extracted.
The content of the "entities" column is a list of dictionaries.
The attempt to provide a reproducible example/re-create the dataframe (thanks to suggestion provided by @jonathan below):
# content of one cell as an example:
d = [{"annotatorType":"chunk","begin":2740,"end":2747,"result":"•Ability","metadata":{"entity":"ORG","sentence":"8","chunk":"22"},"embeddings":[],"sentence_embeddings":[]}, {"annotatorType":"chunk","begin":2740,"end":2747,"result":"Fedex","metadata":{"entity":"ORG","sentence":"8","chunk":"22"},"embeddings":[],"sentence_embeddings":[]}]
from pyspark.sql.types import StructType, StructField, StringType
from array import array
schema = StructType([StructField('annotatorType', StringType(), True),
StructField('begin', IntegerType(), True),
StructField('end', IntegerType(), True),
StructField('result', StringType(), True),
StructField('sentence', StringType(), True),
StructField('chunk', StringType(), True),
StructField('metadata', StructType((StructField('entity', StringType(), True),
StructField('sentence', StringType(), True),
StructField('chunk', StringType(), True)
)), True),
StructField('embeddings', StringType(), True),
StructField('sentence_embeddings', StringType(), True)
]
)
df = spark.createDataFrame(d, schema=schema)
df.show()
In this case of a single list of dictionary, it works:
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end| result|sentence|chunk| metadata|embeddings|sentence_embeddings|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
| chunk| 2740|2747|•Ability| null| null|[ORG, 8, 22]| []| []|
| chunk| 2740|2747| Fedex| null| null|[ORG, 8, 22]| []| []|
+-------------+-----+----+--------+--------+-----+------------+----------+-------------------+
But I am stuck on how to apply this to a column, which contains some cells with an array of multiple dictionaries (so multiple rows to the original cell).
I tried to apply the same schema to the entities
column, and I had to convert the column to json first.
ent1 = ent1.withColumn("entities2", to_json("entities"))
It works for cells with an array of 1 dictionary, but gives null
to cells that have an array of multiple dictionaries (the 4th row):
ent1.withColumn("entities2", from_json("entities2", schema)).select("entities2.*").show()
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
|annotatorType|begin| end|result|sentence|chunk| metadata|embeddings|sentence_embeddings|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
| chunk| 166| 169| Lyft| null| null|[MISC, 0, 0]| []| []|
| chunk| 11| 14| Lyft| null| null|[MISC, 0, 0]| []| []|
| chunk| 52| 55| Lyft| null| null|[MISC, 1, 0]| []| []|
| null| null|null| null| null| null| null| null| null|
+-------------+-----+----+------+--------+-----+------------+----------+-------------------+
The desired output is
+-------------+-----+----+----------------+------------------------+----------+-------------------+
|annotatorType|begin| end| result | metadata |embeddings|sentence_embeddings|
+-------------+-----+----+----------------+------------------------+----------+-------------------+
| chunk| 166| 169|Lyft |[MISC] | []| []|
| chunk| 11| 14|Lyft |[MISC] | []| []|
| chunk| 52| 55|Lyft. |[MISC] | []| []|
| chunk| [..]|[..]|[Lyft,Lyft, |[MISC,MISC,MISC, | []| []|
| | | |FedEx Ground..] |ORG,LOC,ORG,ORG,ORG,ORG]| | |
+-------------+-----+----+----------------+------------------------+----------+-------------------+
I also tried convert to json for each row, but I lose track of the original row and was given flatted son:
new_df = sqlContext.read.json(ent2.rdd.map(lambda r: r.entities2))
new_df.show()
+-------------+-----+----------+----+------------+----------------+-------------------+
|annotatorType|begin|embeddings| end| metadata| result|sentence_embeddings|
+-------------+-----+----------+----+------------+----------------+-------------------+
| chunk| 166| []| 169|[0, MISC, 0]| Lyft| []|
| chunk| 11| []| 14|[0, MISC, 0]| Lyft| []|
| chunk| 52| []| 55|[0, MISC, 1]| Lyft| []|
| chunk| 0| []| 11| [0, ORG, 0]| FedEx Ground| []|
| chunk| 717| []| 720| [1, LOC, 4]| Dock| []|
| chunk| 811| []| 816| [2, ORG, 5]| Parcel| []|
| chunk| 1080| []|1095| [3, ORG, 6]|Parcel Assistant| []|
| chunk| 1102| []|1108| [4, ORG, 7]| • Daily| []|
| chunk| 1408| []|1417| [5, ORG, 8]| Assistants| []|
+-------------+-----+----------+----+------------+----------------+-------------------+
I tried to apply UDF to go through the list of array inside "entities" :
def flatten(my_dict):
d_result = defaultdict(list)
for sub in my_dict:
val = sub['result']
d_result["result"].append(val)
return d_result["result"]
ent = ent.withColumn('result', flatten(df.entities))
TypeError: Column is not iterable
I found this post Apache Spark Read JSON With Extra Columns being closely resembling my problem, but after converting the column entities
to json, I still can't solve it by the solutions provided in that post.
Any help is appreciated!! Ideally solutions in python but examples in scala is helpful too!