1

I am working on a very large dataset called Reddit on AWS. I have read a small sample first by :

file_lzo = sc.newAPIHadoopFile("s3://mv559/reddit/sample-data/", 
                               "com.hadoop.mapreduce.LzoTextInputFormat", 
                               "org.apache.hadoop.io.LongWritable", 
                               "org.apache.hadoop.io.Text")

So I got a rdd called file_lzo. I toke the first element and the data looks like:

[(0,
  '{"archived":false,"author":"TistedLogic","author_created_utc":1312615878,"author_flair_background_color":null,"author_flair_css_class":null,"author_flair_richtext":[],"author_flair_template_id":null,"author_flair_text":null,"author_flair_text_color":null,"author_flair_type":"text","author_fullname":"t2_5mk6v","author_patreon_flair":false,"body":"Is it still r\\/BoneAppleTea worthy if it\'s the opposite?","can_gild":true,"can_mod_post":false,"collapsed":false,"collapsed_reason":null,"controversiality":0,"created_utc":1538352000,"distinguished":null,"edited":false,"gilded":0,"gildings":{"gid_1":0,"gid_2":0,"gid_3":0},"id":"e6xucdd","is_submitter":false,"link_id":"t3_9ka1hp","no_follow":true,"parent_id":"t1_e6xu13x","permalink":"\\/r\\/Unexpected\\/comments\\/9ka1hp\\/jesus_fking_woah\\/e6xucdd\\/","removal_reason":null,"retrieved_on":1539714091,"score":2,"send_replies":true,"stickied":false,"subreddit":"Unexpected","subreddit_id":"t5_2w67q","subreddit_name_prefixed":"r\\/Unexpected","subreddit_type":"public"}')]

Then I create a dataframe from this rdd by using

df = spark.createDataFrame(file_lzo,['idx','map_col'])
df.show(4)

It looks like this

+-----+--------------------+
|  idx|             map_col|
+-----+--------------------+
|    0|{"archived":false...|
|70139|{"archived":false...|
|70139|{"archived":false...|
|70139|{"archived":false...|
+-----+--------------------+
only showing top 4 rows

And finally I want to get the data in dataframe format which looks like this, and save it as a parquet format in S3 for the future steps.

the desired results

I have tried to create a Schema and then use read.json, however I got all of the value is Null

fields = [StructField("archived", BooleanType(), True), 
          StructField("author", StringType(), True),
          StructField("author_flair_css_class", StringType(), True),
          StructField("author_flair_text", StringType(), True),
          StructField("body", StringType(), True),
          StructField("can_gild", BooleanType(), True),         
          StructField("controversiality", LongType(), True),
          StructField("created_utc", StringType(), True),
          StructField("distinguished", StringType(), True),
          StructField("edited", StringType(), True),
          StructField("gilded", LongType(), True), 
          StructField("id", StringType(), True),
          StructField("is_submitter", StringType(), True),
          StructField("link_id", StringType(), True),
          StructField("parent_id", StringType(), True),
          StructField("permalink", StringType(), True),
          StructField("permalink", StringType(), True),
          StructField("removal_reason", StringType(), True),
          StructField("retrieved_on", LongType(), True), 
          StructField("score",LongType() , True),
          StructField("stickied", BooleanType(), True),  
          StructField("subreddit", StringType(), True),
          StructField("subreddit_id", StringType(), True)]

schema = StructType(fields)

+--------+------+----------------------+-----------------+----+--------+----------------+-----------+-------------+------+------+----+------------+-------+---------+---------+---------+--------------+------------+-----+--------+---------+------------+
|archived|author|author_flair_css_class|author_flair_text|body|can_gild|controversiality|created_utc|distinguished|edited|gilded|  id|is_submitter|link_id|parent_id|permalink|permalink|removal_reason|retrieved_on|score|stickied|subreddit|subreddit_id|
+--------+------+----------------------+-----------------+----+--------+----------------+-----------+-------------+------+------+----+------------+-------+---------+---------+---------+--------------+------------+-----+--------+---------+------------+
|    null|  null|                  null|             null|null|    null|            null|       null|         null|  null|  null|null|        null|   null|     null|     null|     null|          null|        null| null|    null|     null|        null|
|    null|  null|                  null|             null|null|    null|            null|       null|         null|  null|  null|null|        null|   null|     null|     null|     null|          null|        null| null|    null|     null|        null|
|    null|  null|                  null|             null|null|    null|            null|       null|         null|  null|  null|null|        null|   null|     null|     null|     null|          null|        null| null|    null|     null|        null|
+--------+------+----------------------+-----------------+----+--------+----------------+-----------+-------------+------+------+----+------------+-------+---------+---------+---------+--------------+------------+-----+--------+---------+------------+

1 Answers1

0

Looking at your desired output, you could consider your json as a column of MapType() and then extracting your columns from that.

Start creating a dataframe:

my_rdd = [(0, {"author":  "abc", "id": "012", "archived": "False"}),
        (1, {"author": "bcd", "id": "013", "archived": "False"}),
        (2, {"author": "cde", "id": "014", "archived": "True"}),
        (3, {"author": "edf", "id": "015", "archived": "False"})]
df = sqlContext.createDataFrame(my_rdd,['idx','map_col'])
df.show()
# +---+--------------------+
# |idx|             map_col|
# +---+--------------------+
# |  0|Map(id -> 012, au...|
# |  1|Map(id -> 013, au...|
# |  2|Map(id -> 014, au...|
# |  3|Map(id -> 015, au...|
# +---+--------------------+

Then, if you don't know in advance what keys you want to extract, collect one and get keys, for example doing:

from pyspark.sql import functions as f

one = df.select(f.col('map_col')).rdd.take(1)
my_dict = one[0][0].keys()
my_dict
# dict_keys(['id', 'author', 'archived'])

If you already know a list of key, directly use that one.

Hence, you can flatten your map column doing:

keep_cols = [f.col('map_col').getItem(k).alias(k) for k in my_dict]
df.select(keep_cols).show()
#+---+------+--------+
#| id|author|archived|
#+---+------+--------+
#|012|   abc|   False|
#|013|   bcd|   False|
#|014|   cde|    True|
#|015|   edf|   False|
#+---+------+--------+

Methods getItem() and alias() are doing the magic: the first extracts the selected key from the map column, and the second renames the obtained column as desired.

ndricca
  • 490
  • 4
  • 13
  • 1
    Hi, thanks for your reply. I have tried it and in the step to get the keys. I have an error said 'str' object has no attribute 'keys'. So I cannot get the keys. By the way, Can we convert this kind of rdd to my desired format directly by setting proper schema when creating the dataframe? thank you very much – Hongyang Zheng Apr 24 '19 at 02:29
  • It seems your data are not stored as json but as strings, or maybe you simply need `my_dict = one[0].keys()` instead of `my_dict = one[0][0].keys()` – ndricca Apr 24 '19 at 07:50
  • If instead the json dict is saved as string, you could try to change it to dictionary using `json.loads`. Please also provide a new sample with reproduce exactly your actual data (look how I had to change your sample in order to be imported on pyspark without SyntaxError). – ndricca Apr 24 '19 at 07:56
  • You can obtain the same dataframe as the previous `df` also doing `from pyspark.sql import Row` and `sc.parallelize(my_rdd).map(lambda x: Row(x[1])).toDF()`. The resulting dataframe still is of MapType() and must be converted as previously shown. – ndricca Apr 24 '19 at 10:36
  • Hi, thanks for your explanation. I think maybe I am not clear enough so cause some confusions. I update my questions so you can see my real data and my previous procedure. I will try your suggestions later on and give you the feedback : ) thank you very much! – Hongyang Zheng Apr 24 '19 at 16:33
  • Hi, I tried to create a schema first and then used `json.read`, however my dataframe are all null. Could you take a look at my update and then help me figure out the problem? Thank you very much : ) – Hongyang Zheng Apr 25 '19 at 02:19
  • hi, your json is stored as a string, you can check this doing `df.printSchema()`. If your Spark is 2+, you can do as suggested [here](https://stackoverflow.com/questions/51713790/how-to-cast-string-to-arraytype-of-dictionary-json-in-pyspark) Tell me if you are on earlier version of spark, I'll try for another solution. I hope you will finally solve this issue! Please accept my answer once you will success on this task – ndricca Apr 30 '19 at 08:18