0

I have a list with all values distinct in a column and I need to replace all values that IS NOT IN this list with 1

I've tried this

uniq = X_train3.select('street').distinct().collect()
X_test3 = X_test3.withColumn('street', F.when(array_contains('street', uniq), 1))

and i also tried this:

uniq = X_train3.select('street').distinct().collect()
X_test3 = X_test3.withColumn('street', F.when(~col('street').isin(uniq), 1))

both result in this error: java.lang.RuntimeException: Unsupported literal type class java.util.ArrayList [[1.0]]

This is what I did in python and works:

uniq = X_train3[cl].unique()
uniq = uniq.tolist()
X_test3['street'] = X_test3['street'].map(lambda x: 1 if x not in uniq else x)]
  • See this question on how to collect column values into a list: https://stackoverflow.com/questions/38610559/convert-spark-dataframe-column-to-python-list – Shaido May 27 '19 at 08:05

1 Answers1

-1

You can do this (in Scala, write the equivalent pyspark):

val new_X_test3 = X_test3
.join(X_train3
    .select("street")
    .distinct()
    .withColumnRenamed("street","street_train"), 
    col("street") === col("street_train"), 
    "leftouter")
.withColumn("street_test", 
    when(col("street_train").isNull, lit("1"))
    .otherwise(col("street")))
.drop("street","street_train")
.withColumnRenamed("street_test","street")

Also if you're convinced that the list of unique streets is very small (since you tried collecting it on the driver in your code) you can provide the broadcasthint around X_train3. So code becomes:

val new_X_test3 = X_test3
.join(broadcast(X_train3
    .select("street")
    .distinct()
    .withColumnRenamed("street","street_train")), 
    col("street") === col("street_train"), 
    "leftouter")
.withColumn("street_test", 
    when(col("street_train").isNull, lit("1"))
    .otherwise(col("street")))
.drop("street","street_train")
.withColumnRenamed("street_test","street")
Gsquare
  • 689
  • 1
  • 5
  • 18
  • Why was this downvoted? It answers the question precisely. – Gsquare May 27 '19 at 09:32
  • I didn't downvote, but it seems abundantly clear that the question is about PySpark, not Spark, and therefore your answer does not directly address the issue. – gmds May 27 '19 at 14:03