8

I'd like to create a pyspark dataframe from a json file in hdfs.

the json file has the following contet:

{ "Product": { "0": "Desktop Computer", "1": "Tablet", "2": "iPhone", "3": "Laptop" }, "Price": { "0": 700, "1": 250, "2": 800, "3": 1200 } }

Then, I read this file using pyspark 2.4.4 df = spark.read.json("/path/file.json")

So, I get a result like this:

df.show(truncate=False)
+---------------------+---------------------------------+
|Price                |Product                          |
+---------------------+---------------------------------+
|[700, 250, 800, 1200]|[Desktop, Tablet, Iphone, Laptop]|
+---------------------+---------------------------------+

But I'd like a dataframe with the following structure:

+-------+--------+
|Price  |Product |
+-------+--------+
|700    |Desktop | 
|250    |Tablet  |
|800    |Iphone  |
|1200   |Laptop  |
+-------+--------+

How can I get a dataframe with the prevvious structure using pyspark?

I tried to use explode df.select(explode("Price")) but I got the following error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:

Py4JJavaError: An error occurred while calling o688.select.
: org.apache.spark.sql.AnalysisException: cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;;
'Project [explode(Price#107) AS List()]
+- LogicalRDD [Price#107, Product#108], false

    at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:97)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:89)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:289)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:288)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$3.apply(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:304)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:286)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$transformExpressionsUp$1.apply(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$1.apply(QueryPlan.scala:107)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpression$1(QueryPlan.scala:106)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:118)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1$1.apply(QueryPlan.scala:122)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
    at scala.collection.AbstractTraversable.map(Traversable.scala:104)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$1(QueryPlan.scala:122)
    at org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$2.apply(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.mapExpressions(QueryPlan.scala:127)
    at org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:95)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:89)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:84)
    at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
    at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:84)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:92)
    at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:105)
    at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
    at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
    at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
    at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:3301)
    at org.apache.spark.sql.Dataset.select(Dataset.scala:1312)
    at sun.reflect.GeneratedMethodAccessor47.invoke(Unknown Source)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)


During handling of the above exception, another exception occurred:

AnalysisException                         Traceback (most recent call last)
<ipython-input-46-463397adf153> in <module>
----> 1 df.select(explode("Price"))

/usr/lib/spark/python/pyspark/sql/dataframe.py in select(self, *cols)
   1200         [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
   1201         """
-> 1202         jdf = self._jdf.select(self._jcols(*cols))
   1203         return DataFrame(jdf, self.sql_ctx)
   1204 

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: "cannot resolve 'explode(`Price`)' due to data type mismatch: input to function explode should be array or map type, not struct<0:bigint,1:bigint,2:bigint,3:bigint>;;\n'Project [explode(Price#107) AS List()]\n+- LogicalRDD [Price#107, Product#108], false\n"
Master_RDA
  • 83
  • 1
  • 1
  • 3
  • 1
    Check accepted ans in this : https://stackoverflow.com/questions/41027315/pyspark-split-multiple-array-columns-into-rows – SMaZ Sep 05 '19 at 21:19

3 Answers3

13

Recreating your DataFrame:

from pyspark.sql import functions as F

df = spark.read.json("./row.json") 
df.printSchema()
#root
# |-- Price: struct (nullable = true)
# |    |-- 0: long (nullable = true)
# |    |-- 1: long (nullable = true)
# |    |-- 2: long (nullable = true)
# |    |-- 3: long (nullable = true)
# |-- Product: struct (nullable = true)
# |    |-- 0: string (nullable = true)
# |    |-- 1: string (nullable = true)
# |    |-- 2: string (nullable = true)
# |    |-- 3: string (nullable = true)

As shown above in the printSchema output, your Price and Product columns are structs. Thus explode will not work since it requires an ArrayType or MapType.

First, convert the structs to arrays using the .* notation as shown in Querying Spark SQL DataFrame with complex types:

df = df.select(
    F.array(F.expr("Price.*")).alias("Price"),
    F.array(F.expr("Product.*")).alias("Product")
)

df.printSchema()

#root
# |-- Price: array (nullable = false)
# |    |-- element: long (containsNull = true)
# |-- Product: array (nullable = false)
# |    |-- element: string (containsNull = true)

Now since you're using Spark 2.4+, you can use arrays_zip to zip the Price and Product arrays together, before using explode:

df.withColumn("price_product", F.explode(F.arrays_zip("Price", "Product")))\
    .select("price_product.Price", "price_product.Product")\
    .show()

#+-----+----------------+
#|Price|         Product|
#+-----+----------------+
#|  700|Desktop Computer|
#|  250|          Tablet|
#|  800|          iPhone|
#| 1200|          Laptop|
#+-----+----------------+

For older versions of Spark, before arrays_zip, you can explode each column separately and join the results back together:

df1 = df\
.withColumn("price_map", F.explode("Price"))\
.withColumn("id", F.monotonically_increasing_id())\
.drop("Price", "Product")

df2 = df\
.withColumn("product_map", F.explode("Product"))\
.withColumn("id", F.monotonically_increasing_id())\
.drop("Price", "Product")

df3 = df1.join(df2, "id", "outer").drop("id")

df3.show()

#+---------+----------------+
#|price_map|     product_map|
#+---------+----------------+
#|      700|Desktop Computer|
#|      250|          Tablet|
#|     1200|          Laptop|
#|      800|          iPhone|
#+---------+----------------+
pault
  • 41,343
  • 15
  • 107
  • 149
thePurplePython
  • 2,621
  • 1
  • 13
  • 34
  • 1
    I wouldn't recommend using `concat_ws` + `split` - you're assuming the separator doesn't exist in the data. In any case, you can directly unpack the elements of the struct into an `array`, without enumerating each of them. – pault Sep 05 '19 at 20:58
  • true ... that is where i was getting stuck ... how do you convert a nested ```struct``` to an ```array```? – thePurplePython Sep 05 '19 at 21:00
  • Using `.*`: [Querying Spark SQL DataFrame with complex types](https://stackoverflow.com/a/33850490/5858851). For example: `df = df.select(array(expr("Price.*")).alias("Price"), array(f.expr("Product.*")).alias("Product"))`. I think this followed by the `arrays_zip` is the way to go. – pault Sep 05 '19 at 21:06
  • thanks! i agree ... very clean code ... it was the ```array``` function i was missing – thePurplePython Sep 05 '19 at 21:21
  • the `id`s by monotonically_increasing_id() in df1 and df2 are problematic, they can easily out of sync on a large dataset. I suggest you create this id with `df = df.withColumn('id', F.monotonically_increasing_id())` before creating df1 and df2. – jxc Sep 07 '19 at 16:13
2

For Spark version without array_zip, we can also do this:

  1. First read the json file into a DataFrame

from pyspark.sql import functions as F


df=spark.read.json("your_json_file.json")
df.show(truncate=False)

+---------------------+------------------------------------------+
|Price                |Product                                   |
+---------------------+------------------------------------------+
|[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]|
+---------------------+------------------------------------------+

Next, expand the struct into array:

df = df.withColumn('prc_array', F.array(F.expr('Price.*')))
df = df.withColumn('prod_array', F.array(F.expr('Product.*')))

Then create a map between the two arrays

df = df.withColumn('prc_prod_map', F.map_from_arrays('prc_array', 'prod_array'))
df.select('prc_array', 'prod_array', 'prc_prod_map').show(truncate=False)


+---------------------+------------------------------------------+-----------------------------------------------------------------------+
|prc_array            |prod_array                                |prc_prod_map                                                           |
+---------------------+------------------------------------------+-----------------------------------------------------------------------+
|[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]|[700 -> Desktop Computer, 250 -> Tablet, 800 -> iPhone, 1200 -> Laptop]|
+---------------------+------------------------------------------+-----------------------------------------------------------------------+

Finally, apply explode on the map:

df = df.select(F.explode('prc_prod_map').alias('prc', 'prod'))
df.show(truncate=False)

+----+----------------+
|prc |prod            |
+----+----------------+
|700 |Desktop Computer|
|250 |Tablet          |
|800 |iPhone          |
|1200|Laptop          |
+----+----------------+

This way, we avoid the potentially time consuming join operation on two tables.

Kubra Altun
  • 365
  • 3
  • 12
niuer
  • 1,589
  • 2
  • 11
  • 14
  • This is a nice answer but it won't work if the keys are not unique or if any of the keys are null – pault Sep 06 '19 at 01:04
  • Duplicate keys don't have any problem on mapping, null keys might be an issue here. May have to fill the missing values first. – niuer Sep 06 '19 at 02:40
0

In case you are using < 2.4.4 Then following gives answers. However, for the strange schema of Json, I could not make it generic In real life example, please create a better formed json

PYSPARK VERSION

>>> from pyspark.sql import Row
>>> json_df = spark.read.json("file.json") # File in current directory
>>> json_df.show(20,False) # We only have 1 Row with two StructType columns
    +---------------------+------------------------------------------+
    |Price                |Product                                   |
    +---------------------+------------------------------------------+
    |[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]|
    +---------------------+------------------------------------------+
   >>> # We convert dataframe to Row and Zip two nested Rows Assuming there 
         #will be no gap in values
    >>> spark.createDataFrame(zip(json_df.first().__getitem__(0), json_df.first().__getitem__(1)), schema=["Price", "Product"]).show(20,False)

         +-----+----------------+
         |Price|Product         |
         +-----+----------------+
         |700  |Desktop Computer|
         |250  |Tablet          |
         |800  |iPhone          |
         |1200 |Laptop          |
         +-----+----------------+

SCALA Version( without preferred Case Class Method)

    scala> val sparkDf = spark.read.json("file.json")
sparkDf: org.apache.spark.sql.DataFrame = [Price: struct<0: bigint, 1: bigint ... 2 more fields>, Product: struct<0: string, 1: string ... 2 more fields>]

scala> sparkDf.show(false)
+---------------------+------------------------------------------+
|Price                |Product                                   |
+---------------------+------------------------------------------+
|[700, 250, 800, 1200]|[Desktop Computer, Tablet, iPhone, Laptop]|
+---------------------+------------------------------------------+
scala> import spark.implicits._
import spark.implicits._

scala> (sparkDf.first.getStruct(0).toSeq.asInstanceOf[Seq[Long]], sparkDf.first.getStruct(1).toSeq.asInstanceOf[Seq[String]]).zipped.toList.toDF("Price","Product")
res6: org.apache.spark.sql.DataFrame = [Price: bigint, Product: string]

scala> // We do same thing but able to use methods of Row  use Spark Implicits to get DataSet Directly

scala> (sparkDf.first.getStruct(0).toSeq.asInstanceOf[Seq[Long]], sparkDf.first.getStruct(1).toSeq.asInstanceOf[Seq[String]]).zipped.toList.toDF("Price","Product").show(false)
+-----+----------------+
|Price|Product         |
+-----+----------------+
|700  |Desktop Computer|
|250  |Tablet          |
|800  |iPhone          |
|1200 |Laptop          |
+-----+----------------+
SanBan
  • 635
  • 5
  • 12