6

I have a DataFrame that I want to add a column of distinct uuid4() rows. My code:

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import StringType

from uuid import uuid4

spark_session = SparkSession.builder.getOrCreate()

df = spark_session.createDataFrame([
        [1, 1, 'teste'],
        [2, 2, 'teste'],
        [3, 0, 'teste'],
        [4, 5, 'teste'],
    ],
    list('abc'))


df = df.withColumn("_tmp", f.lit(1))

uuids = [str(uuid4()) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, StringType())
df1 = df_1.withColumn("_tmp", f.lit(1))


df2 = df.join(df_1, "_tmp", "inner").drop("_tmp")
df2.show()

But I've got this ERROR:

Py4JJavaError: An error occurred while calling o1571.showString.
: org.apache.spark.sql.AnalysisException: Detected implicit cartesian product for INNER join between logical plans

I already try with alias and using monotonically_increasing_id as the join column, but I see here that I cannot trust in monotonically_increasing_id as merge column. I'm expecting:

+---+---+-----+------+
|  a|  b|    c| value|
+---+---+-----+------+
|  1|  1|teste| uuid4|
|  2|  2|teste| uuid4|
|  3|  0|teste| uuid4|
|  4|  5|teste| uuid4|
+---+---+-----+------+

what's the correct approach in this case?

bcosta12
  • 2,364
  • 16
  • 28
  • 1
    You can register uuid4() as udf and call it inside spark. If You want to stick to Your aproach use row_number() to give you row number and join on it. – Tetlanesh Mar 11 '20 at 16:04
  • 1
    Spark is wisely [warning](https://github.com/apache/spark/blob/5be0d04f16dce1f2a3ad94669b6cff7460287397/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala#L1441) you that your join was identified as a cartesian product. That is because the left dataset will match to any column of the right dataset creating a cartesian product. And indeed that is expected since you are joining df with df1 on a column which has the same values for both datsets, that equivalent to crossJoin. – abiratsis Mar 11 '20 at 16:44
  • @AlexandrosBiratsis, so what approach you recommend? – bcosta12 Mar 11 '20 at 16:54
  • @Tetlanesh woks with row_number! – bcosta12 Mar 11 '20 at 16:55
  • 1
    The simplest approach is to use spark sql's uuid function inside an expression, like this: `df.withColumn("uuid", f.expr("uuid()"))`. – Michał Jabłoński Oct 07 '21 at 15:06

1 Answers1

8

I use row_number as @Tetlanesh suggest. I have to create an ID column to ensure that row_number count every row of Window.

from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from uuid import uuid4
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

from pyspark.sql.functions import row_number


spark_session = SparkSession.builder.getOrCreate()

df = spark_session.createDataFrame([
        [1, 1, 'teste'],
        [1, 2, 'teste'],
        [2, 0, 'teste'],
        [2, 5, 'teste'],
    ],
    list('abc'))

df = df.alias("_tmp")
df.registerTempTable("_tmp")

df2 = self.spark_session.sql("select *, uuid() as uuid from _tmp")

df2.show()

Another approach is using windows, but It's not efficient as the first one:


df = df.withColumn("_id", f.lit(1))
df = df.withColumn("_tmp", row_number().over(Window.orderBy('_id')))

uuids = [(str(uuid4()), 1) for _ in range(df.count())]
df1 = spark_session.createDataFrame(uuids, ['uuid', '_id'])
df1 = df1.withColumn("_tmp", row_number().over(Window.orderBy('_id')))


df2 = df.join(df1, "_tmp", "inner").drop('_id')

df2.show()

both outputs:

+---+---+-----+------+
|  a|  b|    c|  uuid|
+---+---+-----+------+
|  1|  1|teste| uuid4|
|  2|  2|teste| uuid4|
|  3|  0|teste| uuid4|
|  4|  5|teste| uuid4|
+---+---+-----+------+
bcosta12
  • 2,364
  • 16
  • 28