0

when I run

import time
start_time = time.time()
print(df_join.count())
end_time = time.time()
print((end_time - start_time))

i get

25721
19.099464416503906

when i run

start_time = time.time()
df_join.cache()
print(df_join.count())
end_time = time.time()
print((end_time - start_time))

it still running after 5 minutes. does it really take that long to cache 27 rows of data? about 15-20 columns wide, and the complexity is a url string.

EDIT 1: it turns out i have a column which its type is array of jsons. if I take that out everything works fine. unfortuantly, pyspark reads that as a string and i do not know how to tell it to be array of jsons

How can I improve it?

user1871528
  • 1,655
  • 3
  • 27
  • 41

2 Answers2

0

not sure what you mean by a url string, but strings have the most bytes and take up the most memory when serialized ... i would run a

df_join.explain()

and check how many shuffles are being triggered in the transformations ... since it is a small data set reduce down to something like

spark.conf.set("spark.sql.shuffle.partitions, 8)

also want to make sure you have enough cores per executor which you can set via launching shell at runtime like

pyspark --master yarn executor-cores 5

overall the slowness could be caused by a lot of things like data volume with what deployment (local, standalone, yarn [client/cluster]) config params are set ... typically for what i have seen the culprit for longer lasting jobs comes down to the many # of output partitions triggered by wide transformations (joins/aggs), not enough executor cores (default at launch is 1 i believe), and the fact that pyspark/sparkR just aren't as fast because of the separate processes outside the JVM that require serialized object being transferred to and from

also check the Spark UI under STORAGE TAB and make sure all partitions are 100% cached ... if only a fraction is fitting in memory then you might have to increase executor memory because partially cached DFs cause a ton of problems with retrieving uncached partitions

pyspark --master yarn --executor-memory "gb"

sorry for the many suggestions ... Spark is a nasty little bugger at times and the root cause can be a long list of issues

from pyspark.sql.functions import col, array

df = spark.createDataFrame([
    (["1, 2, 3"]),
    (["4, 5, 6"]),
    (["7, 8, 9"])
], ["string_array"])

df.select(array("string_array").alias("array_data")).printSchema()
df.select(array("string_array").alias("array_data")).show()

root
 |-- array_data: array (nullable = false)
 |    |-- element: string (containsNull = true)

+----------+
|array_data|
+----------+
| [1, 2, 3]|
| [4, 5, 6]|
| [7, 8, 9]|
+----------+

jsonDF = spark.range(1).selectExpr("""
  '{"myJSONValue" : [1, 2, 3]}' as jsonString""")
jsonDF.show(truncate=False)
jsonDF.printSchema()


jsonDF.select(array("jsonString").alias("json_array")).show(truncate=False)
jsonDF.select(array("jsonString").alias("json_array")).printSchema()


 +---------------------------+
    |jsonString                 |
    +---------------------------+
    |{"myJSONValue" : [1, 2, 3]}|
    +---------------------------+

root
 |-- jsonString: string (nullable = false)

+-----------------------------+
|json_array                   |
+-----------------------------+
|[{"myJSONValue" : [1, 2, 3]}]|
+-----------------------------+

root
 |-- json_array: array (nullable = false)
 |    |-- element: string (containsNull = false)
thePurplePython
  • 2,621
  • 1
  • 13
  • 34
  • i have updated my answer ... is the structure array or json key/value? if it is array then you can convert string to arrays and if it is json key/value then you will probably want to flatten it out depending on what you need or you can still convert to array (which is kind of strange) because json is its own file format. – thePurplePython Apr 19 '19 at 03:02
0

In general there are multiple factors here:

  • count executes a query equivalent to SELECT COUNT(1) FROM table - this allows Spark to apply drastic early optimizations to avoid fetching any data that is no strictly required to compute parent table.

    However if data is marked o be cached, caching might take precedence, and all columns present in the plan must be fetched.

  • Spark SQL uses MEMORY_AND_DISK storage level - and both allocating and / or reclaiming memory, as well as potential disk IO, are expensive.

  • Finally caching is not a free lunch - it requires expensive and extensive transformations - hence _AND_DISK storage level by default, to reduce risk of cache eviction and re-computation.

If you assume that the final data contains only a small number of rows, then the first component is the most likely culprit.

user10938362
  • 3,991
  • 2
  • 12
  • 29