0

I have a JSON field in a table column as String, that I need to parse and explode using Spark.

Edit: to make my question easier to understand: My data is in array of JSONs. How do I write a schema for such data? Below is an example of such JSON:

[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"3"}]

This is how the JSON is, wrapped in a []:

[{"ProductName":"MS Quattro plan US QA","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PLK1O3JDCGVJFM","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":140.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":140.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV4RSKT6C0TAVU","VendorServiceKey":"a044b16a-1861-4308-8086-a3a3b506fac2","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E5","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":120.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":60.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVZEOGY5DEUV29","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":20.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":10.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null},{"ProductName":"Office 365 Enterprise E1","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PL40SXO0YBS8LW","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":84.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":84.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV097S8NKP3PLO","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":44.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":22.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVJZL3RZL4LLZ7","VendorServiceKey":"91fd106f-4b2c-4938-95ac-f54f74e9a239","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E1","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":40.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":20.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null}]

I manually wrote this Schema:

StructType([
            StructField("ProductName", StringType()),
            StructField("CartProductPromotion", StringType()),
            StructField("ProductConfigPromotions", StringType()),
            StructField("ProductKey", StringType()),
            StructField("IsConfigurable", StringType()),
            StructField("IsConfigured", StringType()),
            StructField("ProductProvisionType", StringType()),
            StructField("VendorKey", StringType()),
            StructField("ProductConfigSettingTemplateKey", StringType()),
            StructField("ProductConfigKey", StringType()),
            StructField("ParentConfigKey", StringType()),
            StructField("VendorConfigKey", StringType()),
            StructField("Quantity", StringType()),
            StructField("PromoCodes", StringType()),
            StructField("Price", StringType()),
            StructField("CustomerKey", StringType()),
            StructField("CustomerDomainPrefix", StringType()),
            StructField("CustomerContactId", StringType()),
            StructField("ParentOrderLineId", StringType()),
            StructField("ParentVendorSubscriptionId", StringType()),
            StructField("BillingFrequency", StringType()),
            StructField("BillingType", StringType()),
            StructField("DueToday", StringType()),
            StructField("originalPrice", StringType()),
            StructField("RemainingVendorSettingsName", StringType()),
            StructField("Trial", StringType()),
            StructField("Services", ArrayType(StructType([
                StructField("ServiceKey", StringType()),
                StructField("VendorServiceKey", StringType()),
                StructField("VendorSubscriptionKey", StringType()),
                StructField("Name", StringType()),
                StructField("VendorProvisionResponse", StringType()),
                StructField("ProvisionStatus", StringType()),
                StructField("SubscriptionStatus", StringType()),
                StructField("BillingFrequency", StringType()),              
                StructField("CreatedOnUtc", StringType()),
                StructField("UpdatedOnUtc", StringType()),
                StructField("Quantity", StringType()),
                StructField("Price", StringType()),

                StructField("RateCardId", StringType()),
                StructField("RateCardVersion", StringType()),
                StructField("Margin", StringType()),
                StructField("DefaultQuantity", StringType()),               
                StructField("Cost", StringType()),
                StructField("ProvisionDate", StringType()),
                StructField("ParentServiceKey", StringType()),
                StructField("ServiceConfiguration", StructType([
                    StructField("QuestionText", StringType()),
                    StructField("QuestionNumber", StringType()),
                    StructField("IsQuestionRequired", StringType()),
                    StructField("OptionText", StringType()),
                    StructField("OptionNumber", StringType()),
                    StructField("MaxAllowedServices", StringType()),
                    StructField("NextAction", StringType()),
                    StructField("NextActionQuestion", StringType())
                ])),
                StructField("IsDummy", StringType()),
                StructField("VendorKey", StringType()),
                StructField("EULADateTime", StringType())               
            ]))),
            StructField("ProfileId", StringType()),
            StructField("FloorPlanId", StringType()),
            StructField("Currency", StringType()),
            StructField("ProductSettings", StringType()),               
            StructField("CustomerSettings", StringType()),
            StructField("ResellerSettings", StringType())   ])

It does not parse my field into array, I get Null in the new field (code at the bottom).

+---+--------------------+--------------+
|key|               value|value_w_schema|
+---+--------------------+--------------+
| 10|[{"ProductName":"...|          null|
+---+--------------------+--------------+

But if I remove the [] from the string field and keep 1 json, wrapped in {}, then the Schema works. Should I wrap the written schema into another Struct or Array? Can someone point me to a good tutorial on writing these Schemas?

Reproducible code:

cart_CartProducts_schema = StructType([
            StructField("ProductName", StringType()),
            StructField("CartProductPromotion", StringType()),
            StructField("ProductConfigPromotions", StringType()),
            StructField("ProductKey", StringType()),
            StructField("IsConfigurable", StringType()),
            StructField("IsConfigured", StringType()),
            StructField("ProductProvisionType", StringType()),
            StructField("VendorKey", StringType()),
            StructField("ProductConfigSettingTemplateKey", StringType()),
            StructField("ProductConfigKey", StringType()),
            StructField("ParentConfigKey", StringType()),
            StructField("VendorConfigKey", StringType()),
            StructField("Quantity", StringType()),
            StructField("PromoCodes", StringType()),
            StructField("Price", StringType()),
            StructField("CustomerKey", StringType()),
            StructField("CustomerDomainPrefix", StringType()),
            StructField("CustomerContactId", StringType()),
            StructField("ParentOrderLineId", StringType()),
            StructField("ParentVendorSubscriptionId", StringType()),
            StructField("BillingFrequency", StringType()),
            StructField("BillingType", StringType()),
            StructField("DueToday", StringType()),
            StructField("originalPrice", StringType()),
            StructField("RemainingVendorSettingsName", StringType()),
            StructField("Trial", StringType()),
            StructField("Services", ArrayType(StructType([
                StructField("ServiceKey", StringType()),
                StructField("VendorServiceKey", StringType()),
                StructField("VendorSubscriptionKey", StringType()),
                StructField("Name", StringType()),
                StructField("VendorProvisionResponse", StringType()),
                StructField("ProvisionStatus", StringType()),
                StructField("SubscriptionStatus", StringType()),
                StructField("BillingFrequency", StringType()),              
                StructField("CreatedOnUtc", StringType()),
                StructField("UpdatedOnUtc", StringType()),
                StructField("Quantity", StringType()),
                StructField("Price", StringType()),

                StructField("RateCardId", StringType()),
                StructField("RateCardVersion", StringType()),
                StructField("Margin", StringType()),
                StructField("DefaultQuantity", StringType()),               
                StructField("Cost", StringType()),
                StructField("ProvisionDate", StringType()),
                StructField("ParentServiceKey", StringType()),
                StructField("ServiceConfiguration", StructType([
                    StructField("QuestionText", StringType()),
                    StructField("QuestionNumber", StringType()),
                    StructField("IsQuestionRequired", StringType()),
                    StructField("OptionText", StringType()),
                    StructField("OptionNumber", StringType()),
                    StructField("MaxAllowedServices", StringType()),
                    StructField("NextAction", StringType()),
                    StructField("NextActionQuestion", StringType())
                ])),
                StructField("IsDummy", StringType()),
                StructField("VendorKey", StringType()),
                StructField("EULADateTime", StringType())               
            ]))),
            StructField("ProfileId", StringType()),
            StructField("FloorPlanId", StringType()),
            StructField("Currency", StringType()),
            StructField("ProductSettings", StringType()),               
            StructField("CustomerSettings", StringType()),
            StructField("ResellerSettings", StringType())
   ])

data = [(10,'''[{"ProductName":"MS Quattro plan US QA","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PLK1O3JDCGVJFM","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":140.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":140.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV4RSKT6C0TAVU","VendorServiceKey":"a044b16a-1861-4308-8086-a3a3b506fac2","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E5","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":120.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":60.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello1","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVZEOGY5DEUV29","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":20.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":10.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello2","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null},{"ProductName":"Office 365 Enterprise E1","CartProductPromotion":null,"ProductConfigPromotions":null,"ProductKey":"PDUKVWL8XSBJUX","IsConfigurable":false,"IsConfigured":false,"ProductProvisionType":0,"VendorKey":null,"ProductConfigSettingTemplateKey":null,"ProductConfigKey":"PL40SXO0YBS8LW","ParentConfigKey":null,"VendorConfigKey":null,"Quantity":1,"PromoCodes":null,"Price":84.0,"CustomerKey":null,"CustomerDomainPrefix":null,"CustomerContactId":"6550d015-5ac1-464a-95b8-42ae4cfea05e","ParentOrderLineId":null,"ParentVendorSubscriptionId":null,"BillingFrequency":1,"BillingType":1,"DueToday":84.0,"originalPrice":null,"RemainingVendorSettingsName":null,"Trial":null,"Services":[{"ServiceKey":"SV097S8NKP3PLO","VendorServiceKey":"Quattro Support Service","VendorSubscriptionKey":null,"Name":"Quattro Support Service","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":44.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":22.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":"Hello3","QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null},{"ServiceKey":"SVJZL3RZL4LLZ7","VendorServiceKey":"91fd106f-4b2c-4938-95ac-f54f74e9a239","VendorSubscriptionKey":null,"Name":"Office 365 Enterprise E1","VendorProvisionResponse":null,"ProvisionStatus":1,"SubscriptionStatus":0,"BillingFrequency":1,"CreatedOnUtc":null,"UpdatedOnUtc":null,"Quantity":1.0,"Price":40.0,"RateCardId":1011294,"RateCardVersion":null,"Margin":100.0,"DefaultQuantity":null,"Cost":20.0,"ProvisionDate":null,"ParentServiceKey":null,"ServiceConfiguration":{"QuestionText":null,"QuestionNumber":null,"IsQuestionRequired":null,"OptionText":null,"OptionNumber":null,"MaxAllowedServices":null,"NextAction":null,"NextActionQuestion":null},"IsDummy":null,"VendorKey":null,"EULADateTime":null}],"ProfileId":521,"FloorPlanId":null,"Currency":"USD","ProductSettings":null,"CustomerSettings":null,"ResellerSettings":null}]''')]
df1 = spark.createDataFrame(data, ("key", "value"))
df1.show(truncate=True)

#Apply the schema to the JSON string
df2 = df1.withColumn("value_w_schema", psf.from_json(df1.value, cart_CartProducts_schema))
#df2.printSchema()
df2.show()
sumon c
  • 739
  • 2
  • 10
  • 18
  • What about the solution described here: https://stackoverflow.com/questions/48361177/spark-structured-streaming-kafka-convert-json-without-schema-infer-schema/49646835#49646835 – Aydin K. Apr 07 '20 at 18:45
  • for spark 2.4+, check schema_of_json: http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.schema_of_json. (post-process the generated schema by replacing `null` with `string`) – jxc Apr 07 '20 at 21:50
  • @jxc somehow this wasnt working. I tried: jsonschema = psf.schema_of_json(df1.select(psf.col("value")).take(1)[0].value) df2 = df1.withColumn('value_w_schema', psf.from_json(psf.col('value'), jsonschema)) – sumon c Apr 07 '20 at 22:04
  • @jxc I then tried: file_location = "/FileStore/tables/cart_cart-e7003.txt" lineleveldtlsdf= spark.read.json(file_location) lineleveldtlsdf.printSchema() But for some reason the schema was not usable. – sumon c Apr 07 '20 at 22:05
  • 1
    @sumonc, try this `jsonschema = spark.range(1).select(psf.schema_of_json(psf.lit(df1.select('value').first().value)).alias('val')).first().val.replace('null','string')` – jxc Apr 07 '20 at 22:16

1 Answers1

0

I could solve it using the answer given here: (the 2nd one): https://stackoverflow.com/a/58037383/4253760

My question is better phrased as: How to write Schema for Array of JSONs?

Problem JSON, rather an array of JSONs: (Now I think I will be able to write a schema for my complex JSON.

[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"5"}]

Solution:

trial_sch = ArrayType(StructType([
            StructField("CategoryName", StringType()),
            StructField("CategoryTitle", StringType()),
            StructField("CategoryLevels", StringType())
])
)

data = [(10,'''[{"CategoryName":"cat5","CategoryTitle":"cat_title","CategoryLevels":"5"}]''')]
df1 = spark.createDataFrame(data, ("key", "value"))
df1.show(truncate=True)

#Apply the schema to the JSON string
df2 = df1.withColumn("value_w_schema", psf.from_json(df1.value, trial_sch))  #Change schema here
#df2.printSchema()
df2.select("value_w_schema").show(truncate=False)
sumon c
  • 739
  • 2
  • 10
  • 18