1

I have a JSON which I am reading from a kafka topic using spark streaming

{"COUNTRY_REGION": "United States",  "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7", "PARKS_CHANGE_PERC": "\\\\N",  "LAST_UPDATE_DATE": "05:31.7"}

I understand that we would first need to create a schema which I have done here and parse the input json which we got from Kafka i.e. value field via the from_json function.

schema = StructType([ 
    StructField("COUNTRY_REGION",StringType(),True), 
    StructField("PROVINCE_STATE",StringType(),True),
    StructField("ISO_3166_1",StringType(),True), 
    StructField("ISO_3166_2", StringType(), True), 
    StructField("DATE", DateType(), True), 
    StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", IntegerType(), True),
    StructField("PARKS_CHANGE_PERC", IntegerType(), True), 
    StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True), 
    StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),                    
    StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),  
    StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True), 
    StructField("LAST_UPDATE_DATE", DateType(), True),
    StructField("LAST_REPORTED_FLAG", BooleanType(), True),
    StructField("SUB_REGION_2", StringType(), True),
  ])

json_edit = df.select (from_json("value",schema).alias("json"))

However, I realise the GROCERY_AND_PHARMACY_CHANGE_PERC, PARKS_CHANGE_PERC and LAST_UPDATE_DATE become null.

display(json_edit)

{"COUNTRY_REGION": "United States",  "GROCERY_AND_PHARMACY_CHANGE_PERC": null, "PARKS_CHANGE_PERC": null, "LAST_UPDATE_DATE": null}

I realized that it's because of the original JSON, for example "GROCERY_AND_PHARMACY_CHANGE_PERC": "-7" when it should be "GROCERY_AND_PHARMACY_CHANGE_PERC": -7.

Is there any way to convert the string to a double/int before I parse it into the schema?

Thank you!

Mohana B C
  • 5,021
  • 1
  • 9
  • 28
Adam
  • 1,157
  • 6
  • 19
  • 40

2 Answers2

0

If you are not sure about schema of incoming data better not to specify it in first place. Well, we can handle this kind situation if we use schema registry with data format as avro while integrating spark with kafka.

Since the format is JSON here, we can dynamically extract schema using schema_of_json function and pass this output to from_json.

Here is code using scala, hope you can change that to python as required.

  val spark = SparkSession.builder().master("local[*]").getOrCreate()
  import spark.implicits._
  spark.sparkContext.setLogLevel("ERROR")

  def dfOps(ds: DataFrame, n: Long) =
    ds.select(from_json('value,
      schema_of_json(ds.select('value).first().getString(0))).as("json"))
      .select("json.*").show()

  spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "mytopic")
    .option("startingOffsets", "earliest")
    .load()
    .select('value.cast("string"))
    .writeStream
    .foreachBatch(dfOps _)
    .start()
    .awaitTermination()
Mohana B C
  • 5,021
  • 1
  • 9
  • 28
0

You can change the type of the three columns to StringType in the schema, parse the json and then process the three columns individually later:

df=...

schema = StructType([ 
    StructField("COUNTRY_REGION",StringType(),True), 
    StructField("PROVINCE_STATE",StringType(),True),
    StructField("ISO_3166_1",StringType(),True), 
    StructField("ISO_3166_2", StringType(), True), 
    StructField("DATE", DateType(), True), 
    StructField("GROCERY_AND_PHARMACY_CHANGE_PERC", StringType(), True), #using StringType
    StructField("PARKS_CHANGE_PERC", StringType(), True), #using StringType
    StructField("RESIDENTIAL_CHANGE_PERC", IntegerType(), True), 
    StructField("RETAIL_AND_RECREATION_CHANGE_PERC", IntegerType(), True),                    
    StructField("TRANSIT_STATIONS_CHANGE_PERC", IntegerType(), True),  
    StructField("WORKPLACES_CHANGE_PERC", IntegerType(), True), 
    StructField("LAST_UPDATE_DATE", StringType(), True), #using StringType
    StructField("LAST_REPORTED_FLAG", BooleanType(), True),
    StructField("SUB_REGION_2", StringType(), True),
  ])
df2=df.select (from_json("value",schema).alias("json"))

After parsing the json string, convert the struct into separate top-level columns (select("json.*")), process the three columns using withColumn and then repack the struct using this answer if necessary:

from pyspark.sql import functions as F

df2.select("json.*") \
    .withColumn("GROCERY_AND_PHARMACY_CHANGE_PERC", 
        F.col("GROCERY_AND_PHARMACY_CHANGE_PERC").cast(IntegerType())) \
    .withColumn("PARKS_CHANGE_PERC", 
        F.col("PARKS_CHANGE_PERC").cast(IntegerType())) \
    .withColumn("LAST_UPDATE_DATE", 
        F.to_timestamp("LAST_UPDATE_DATE", "HH:mm.s")) \
    .withColumn('json', F.struct(*[F.col(col) for col in df2.select('json.*').columns])) \
    .drop(*df2.select('json.*').columns) \
    .show(truncate=False)

Remark: in the example data for the column LAST_UPDATE_DATE the string "05:31.7" is given. The code above assumes that this is a timestamp in the format HH:mm.s. As the date is missing the result is 1970-01-01 05:31:07 for this example. This can be fixed using another date format in to_timestamp.

werner
  • 13,518
  • 6
  • 30
  • 45
  • I keep getting a Queries with streaming sources must be executed with writeStream.start(); when i try and run the df2 = df2.select("json.*") \ – Adam Aug 09 '21 at 03:49
  • 1
    I managed to do it like this: json_edit = json_edit.select("json.*") \ .withColumn("GROCERY_AND_PHARMACY_CHANGE_PERC", F.col("GROCERY_AND_PHARMACY_CHANGE_PERC").cast(IntegerType())) \ .withColumn("PARKS_CHANGE_PERC", F.col("PARKS_CHANGE_PERC").cast(IntegerType())) \ .withColumn("DATE", F.to_date("DATE","yyyy-MM-dd")) \ .withColumn("LAST_UPDATE_DATE", F.to_timestamp("LAST_UPDATE_DATE", "HH:mm.s")) \ .withColumn('json', F.struct(*[F.col(col) for col in json_edit.select('json.*').columns]))\ json_edit = json_edit.drop('json') – Adam Aug 09 '21 at 06:27
  • I cant seem to caste date to date. .withColumn("DATE", F.to_date("DATE","yyyy-MM-dd")) \ keeps showing null. Any idea? – Adam Aug 09 '21 at 06:28
  • @Adam the format string in `to_date` probably does not match your data. Can you check if your data matches the [datetime pattern](https://spark.apache.org/docs/3.1.1/sql-ref-datetime-pattern.html) `yyyy-MM-dd`? – werner Aug 11 '21 at 15:54