3

have use-case where we want to read files from S3 which has JSON. Then, based on a particular JSON node value we want to group the data and write it to S3.

I am able to read the data but not able to find good example on how partition the data based on JSON key and then upload to S3. Can anyone provide any example or point me to a tutorial which can help me with this use-case?

I have got the schema of my data after creating the dataframe:

root
 |-- customer: struct (nullable = true)
 |    |-- customerId: string (nullable = true)
 |-- experiment: string (nullable = true)
 |-- expiryTime: long (nullable = true)
 |-- partitionKey: string (nullable = true)
 |-- programId: string (nullable = true)
 |-- score: double (nullable = true)
 |-- startTime: long (nullable = true)
 |-- targetSets: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- featured: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- data: struct (nullable = true)
 |    |    |    |    |    |-- asinId: string (nullable = true)
 |    |    |    |    |-- pk: string (nullable = true)
 |    |    |    |    |-- type: string (nullable = true)
 |    |    |-- reason: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- recommended: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)

I want to partition the data based on the random hash on the customerId column. But when i do this:

df.write.partitionBy("customerId").save("s3/bucket/location/to/save");

It give error:

org.apache.spark.sql.AnalysisException: Partition column customerId not found in schema StructType(StructField(customer,StructType(StructField(customerId,StringType,true)),true), StructField(experiment,StringType,true), StructField(expiryTime,LongType,true), StructField(partitionKey,StringType,true), StructField(programId,StringType,true), StructField(score,DoubleType,true), StructField(startTime,LongType,true), StructField(targetSets,ArrayType(StructType(StructField(featured,ArrayType(StructType(StructField(data,StructType(StructField(asinId,StringType,true)),true), StructField(pk,StringType,true), StructField(type,StringType,true)),true),true), StructField(reason,ArrayType(StringType,true),true), StructField(recommended,ArrayType(StringType,true),true)),true),true));

Please let me know i can access customerId column.

mrsrinivas
  • 34,112
  • 13
  • 125
  • 125
Ajay
  • 473
  • 7
  • 25

1 Answers1

5

Let's take example dataset sample.json

{"CUST_ID":"115734","CITY":"San Jose","STATE":"CA","ZIP":"95106"}
{"CUST_ID":"115728","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"115730","CITY":"Allentown","STATE":"PA","ZIP":"18101"}
{"CUST_ID":"114728","CITY":"San Mateo","STATE":"CA","ZIP":"94401"}
{"CUST_ID":"114726","CITY":"Somerset","STATE":"NJ","ZIP":"8873"}

Now start hacking it with Spark

val jsonDf = spark.read
  .format("json")
  .load("path/of/sample.json")

jsonDf.show()

+---------+-------+-----+-----+
|     CITY|CUST_ID|STATE|  ZIP|
+---------+-------+-----+-----+
| San Jose| 115734|   CA|95106|
|Allentown| 115728|   PA|18101|
|Allentown| 115730|   PA|18101|
|San Mateo| 114728|   CA|94401|
| Somerset| 114726|   NJ| 8873|
+---------+-------+-----+-----+

Then partition dataset by column "ZIP" and write to S3

jsonDf.write
  .partitionBy("ZIP")
  .save("s3/bucket/location/to/save")
  // one liner athentication to s3
  //.save("s3n://$accessKey:$secretKey" + "@" + s"$buckectName/location/to/save")

Note: In order this code successfully S3 access and secret key has to be configured properly. Check this answer for Spark/Hadoop integration with S3

Edit: Resolution: Partition column customerId not found in schema (as per comment)

customerId exists inside customer struct, so try extract the customerId then do partition.

df.withColumn("customerId", $"customer.customerId")
  .drop("customer")
  .write.partitionBy("customerId")
  .save("s3/bucket/location/to/save")
mrsrinivas
  • 34,112
  • 13
  • 125
  • 125