I have a spark data frame like below with 7k columns.
+---+----+----+----+----+----+----+
| id| 1| 2| 3|sf_1|sf_2|sf_3|
+---+----+----+----+----+----+----+
| 2|null|null|null| 102| 202| 302|
| 4|null|null|null| 104| 204| 304|
| 1|null|null|null| 101| 201| 301|
| 3|null|null|null| 103| 203| 303|
| 1| 11| 21| 31|null|null|null|
| 2| 12| 22| 32|null|null|null|
| 4| 14| 24| 34|null|null|null|
| 3| 13| 23| 33|null|null|null|
+---+----+----+----+----+----+----+
I wanted to transform data frame like below by merging null rows. by doing the groupBy operation I'm able to merge it as single row, but the performance of this aggregation is very poor as I have 7k columns in my table.
import pyspark.sql.functions as F
(df.groupBy('id').agg(*[F.first(x,ignorenulls=True) for x in df.columns if x!='id'])
.show())
+---+----+----+----+----+----+----+
| id| 1| 2| 3|sf_1|sf_2|sf_3|
+---+----+----+----+----+----+----+
| 1| 11| 21| 31| 101| 201| 301|
| 2| 12| 22| 32| 102| 202| 302|
| 4| 14| 24| 34| 104| 204| 304|
| 3| 13| 23| 33| 103| 203| 303|
+---+----+----+----+----+----+----+
Any other recommendations/optimizations/efficient way of doing. Thanks
update1: after trying out with self join
---------------------------------------------------------------------------
Py4JJavaError Traceback (most recent call last)
<ipython-input-17-b7de100341cc> in <module>
15 """.format(table_name, query, join_key)
16
---> 17 spark.sql(final_query).dropDuplicates().filter(filters).count()
~/quartic/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/dataframe.py in count(self)
583 2
584 """
--> 585 return int(self._jdf.count())
586
587 @ignore_unicode_prefix
~/quartic/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
1306
1307 for temp_arg in temp_args:
~/quartic/spark-3.0.0-bin-hadoop2.7/python/pyspark/sql/utils.py in deco(*a, **kw)
129 def deco(*a, **kw):
130 try:
--> 131 return f(*a, **kw)
132 except py4j.protocol.Py4JJavaError as e:
133 converted = convert_exception(e.java_exception)
~/quartic/spark-3.0.0-bin-hadoop2.7/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
Py4JJavaError: An error occurred while calling o148.count.
: java.lang.StackOverflowError
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:35)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at scala.collection.generic.Growable.$plus$plus$eq(Growable.scala:62)
at scala.collection.generic.Growable.$plus$plus$eq$(Growable.scala:53)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:184)
at scala.collection.mutable.ListBuffer.$plus$plus$eq(ListBuffer.scala:47)
at scala.collection.generic.GenericCompanion.apply(GenericCompanion.scala:53)
at org.apache.spark.sql.catalyst.expressions.BinaryExpression.children(Expression.scala:533)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild$lzycompute(TreeNode.scala:115)
at org.apache.spark.sql.catalyst.trees.TreeNode.containsChild(TreeNode.scala:115)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:349)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformUp$1(TreeNode.scala:330)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$mapChildren$1(TreeNode.scala:399)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:237)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:397)
at org.apache.spark.sql.catalyst.trees.TreeNode.mapChildren(TreeNode.scala:350)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:330)