not sure what you mean by a url string, but strings have the most bytes and take up the most memory when serialized ... i would run a
df_join.explain()
and check how many shuffles are being triggered in the transformations ... since it is a small data set reduce down to something like
spark.conf.set("spark.sql.shuffle.partitions, 8)
also want to make sure you have enough cores per executor which you can set via launching shell at runtime like
pyspark --master yarn executor-cores 5
overall the slowness could be caused by a lot of things like data volume with what deployment (local, standalone, yarn [client/cluster]) config params are set ... typically for what i have seen the culprit for longer lasting jobs comes down to the many # of output partitions triggered by wide transformations (joins/aggs), not enough executor cores (default at launch is 1 i believe), and the fact that pyspark/sparkR just aren't as fast because of the separate processes outside the JVM that require serialized object being transferred to and from
also check the Spark UI under STORAGE TAB and make sure all partitions are 100% cached ... if only a fraction is fitting in memory then you might have to increase executor memory because partially cached DFs cause a ton of problems with retrieving uncached partitions
pyspark --master yarn --executor-memory "gb"
sorry for the many suggestions ... Spark is a nasty little bugger at times and the root cause can be a long list of issues
from pyspark.sql.functions import col, array
df = spark.createDataFrame([
(["1, 2, 3"]),
(["4, 5, 6"]),
(["7, 8, 9"])
], ["string_array"])
df.select(array("string_array").alias("array_data")).printSchema()
df.select(array("string_array").alias("array_data")).show()
root
|-- array_data: array (nullable = false)
| |-- element: string (containsNull = true)
+----------+
|array_data|
+----------+
| [1, 2, 3]|
| [4, 5, 6]|
| [7, 8, 9]|
+----------+
jsonDF = spark.range(1).selectExpr("""
'{"myJSONValue" : [1, 2, 3]}' as jsonString""")
jsonDF.show(truncate=False)
jsonDF.printSchema()
jsonDF.select(array("jsonString").alias("json_array")).show(truncate=False)
jsonDF.select(array("jsonString").alias("json_array")).printSchema()
+---------------------------+
|jsonString |
+---------------------------+
|{"myJSONValue" : [1, 2, 3]}|
+---------------------------+
root
|-- jsonString: string (nullable = false)
+-----------------------------+
|json_array |
+-----------------------------+
|[{"myJSONValue" : [1, 2, 3]}]|
+-----------------------------+
root
|-- json_array: array (nullable = false)
| |-- element: string (containsNull = false)