0

I have to derive a value from one of the columns (coming from a JSON file and this value is an array of arrays), and use the derived value as a key to select the next column which contains JSON data.

Example:

Consider the following two elements in my record:

categories: [["Movies & TV", "Movies"]]
salesRank: {
  "Appliances": null, "Arts_ Crafts & Sewing": null, 
  "Automotive": null, "Baby": null, "Beauty": null, 
  "Books": null, "Camera & Photo": null, 
  "Cell Phones & Accessories": null, "Clothing": null, 
  "Computers & Accessories": null, "Electronics": null,
  "Gift Cards Store": null, "Grocery & Gourmet Food": null, 
  "Health & Personal Care": null, "Home & Kitchen": null, 
  "Home Improvement": null, "Industrial & Scientific": null, 
  "Jewelry": null, "Kitchen & Dining": null, "Magazines": null, 
  "Movies & TV": 1084845, "Music": null, "Musical Instruments": null, 
  "Office Products": null, "Patio_ Lawn & Garden": null, 
  "Pet Supplies": null, "Prime Pantry": null, "Shoes": null, 
  "Software": null, "Sports & Outdoors": null, "Toys & Games": null, 
  "Video Games": null, "Watches": null}

I am picking the category for my record (inserted into rdbms) with col('categories').getItem(0).getItem(0).alias("categories")

This will give me "Movies & TV"

Now I need to get the salesRank. This should be the value for salesRank of "Movies & TV". How can I get this?

Owen Kelvin
  • 14,054
  • 10
  • 41
  • 74
ymo
  • 23
  • 4

1 Answers1

0

I will try to replicate your dataframe.

df = spark.createDataFrame(
     [
         (1, '{"a": true, "b":98, "c":"xxx1"}', [['a','b']]),
         (2, '{"a": false, "b":98, "c":"xxx2"}', [['c','a']]),
         (3, '{"a": true, "b":99, "c":"xxx3"}', [['b','c']]),
     ],
     schema=['id', 'salesRank', 'categories'],
 )
df.show()

+---+--------------------+----------+
| id|           salesRank|categories|
+---+--------------------+----------+
|  1|{"a": true, "b":9...|  [[a, b]]|
|  2|{"a": false, "b":...|  [[c, a]]|
|  3|{"a": true, "b":9...|  [[b, c]]|
+---+--------------------+----------+

Lets parse your salesRank column I will make every key as a column of the dataframe and lets also get first value from your categories.

from pyspark.sql.types import StructType, StructField, StringType

schema = StructType(
    [
        StructField('a', StringType(), True), # define the data type of values 
        StructField('b', DoubleType(), True), # of your json here, means `a`
        StructField('c', StringType(), True)  # holds stringType & so on for b,c

    ]
)



df.withColumn("salesRank_expanded", from_json("salesRank", schema))\
    .select(F.col('id'), F.col('salesRank'),F.col('salesRank_expanded.*'),\
 F.col('categories'),(F.col('categories').getItem(0).getItem(0)).alias('key')).show()


+---+--------------------+-----+----+----+----------+---+
| id|           salesRank|    a|   b|   c|categories|key|
+---+--------------------+-----+----+----+----------+---+
|  1|{"a": true, "b":9...| true|98.0|xxx1|  [[a, b]]|  a|
|  2|{"a": false, "b":...|false|98.0|xxx2|  [[c, a]]|  c|
|  3|{"a": true, "b":9...| true|99.0|xxx3|  [[b, c]]|  b|
+---+--------------------+-----+----+----+----------+---+

So I have converted your json keys to columns and extracted first element of your categories. Now we just need to add values for columns present in key column.

To do this i referred to this SO answer.

from pyspark.sql.functions import col, lit, when
from functools import reduce

data_cols = ['a','b','c'] #all keys of your json dict 

out = reduce(
    lambda acc, x: when(col("key") == x, col(x)).otherwise(acc), 
    data_cols,
    lit(None)
)

df.withColumn("salesRank_expanded", from_json("salesRank", schema))\
    .select(F.col('id'), F.col('salesRank'),F.col('salesRank_expanded.*'),\
 F.col('categories'),(F.col('categories').getItem(0).getItem(0)).alias('key'))\
.withColumn('value',out).show()

And the final output looks like this, all the values of the keys of your json you want in the column value:

+---+--------------------+-----+----+----+----------+---+-----+
| id|           salesRank|    a|   b|   c|categories|key|value|
+---+--------------------+-----+----+----+----------+---+-----+
|  1|{"a": true, "b":9...| true|98.0|xxx1|  [[a, b]]|  a| true|
|  2|{"a": false, "b":...|false|98.0|xxx2|  [[c, a]]|  c| xxx2|
|  3|{"a": true, "b":9...| true|99.0|xxx3|  [[b, c]]|  b| 99.0|
+---+--------------------+-----+----+----+----------+---+-----+

EDIT

Turns out there is a way to make this schema independent. There is a function get_json_object which lets solve this problem.

from pyspark.sql.functions import get_json_object

out2 = reduce(
    lambda acc, x: when(col("key") == x, get_json_object(F.col('salesRank'), f"$.{x}").alias(f"{x}")).otherwise(acc), 
    data_cols,
    lit(None)
)


df.select(F.col('id'), F.col('salesRank'), F.col('categories'),
  (F.col('categories').getItem(0).getItem(0)).alias('key'))\
              .withColumn('value',out2).show()


+---+--------------------+----------+---+-----+
| id|           salesRank|categories|key|value|
+---+--------------------+----------+---+-----+
|  1|{"a": true, "b":9...|  [[a, b]]|  a| true|
|  2|{"a": false, "b":...|  [[c, a]]|  c| xxx2|
|  3|{"a": true, "b":9...|  [[b, c]]|  b|   99|
+---+--------------------+----------+---+-----+
Siddhant Tandon
  • 651
  • 4
  • 15
  • This still needs prior knowledge of the schema in the column of the dataframe. Is there a way to go about this more dynamically? – ymo Oct 27 '20 at 13:21
  • assuming the data types of the json dicts of your column `salesRank` remain consistent, you can simply get the schema from the very first json dict. no ? – Siddhant Tandon Oct 27 '20 at 17:13