0

I have a spark job that works perfectly. But I wanted to parallelize the search and operations on two df, before doing the join between the two. I decided to use Threads, but after start and join the spark functions no longer work.

T_df_1 = ThreadWithReturnValue(target=thread_df_1, args = (arg1, arg2),)
T_df_2 = ThreadWithReturnValue(target=thread_df_2, args= (arg3, arg4, args),)

T_df_1.start()
T_df_2.start()

df_1 = T_df_1.join()
df_2 = T_df_2.join()

print(f'df_1 dim = {[df_1.count(),len(df_1.columns)]}')
Error message
ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Error from Python:Traceback (most recent call last):
  File "/tmp/xxx.fy", line 464, in <module>
    print(f'df_1 dim = {[df_1.count(),len(df_1.columns)]}')
AttributeError: 'NoneType' object has no attribute 'count'
luk2302
  • 55,258
  • 23
  • 97
  • 137
  • Does this answer your question? [How to get the return value from a thread in Python?](https://stackoverflow.com/questions/6893968/how-to-get-the-return-value-from-a-thread-in-python) – luk2302 Oct 26 '22 at 15:11
  • `join` does not return anything. – luk2302 Oct 26 '22 at 15:11
  • I did it locally and it works, but on AWS I had this problem. – Fabiano Briao Oct 26 '22 at 16:07
  • I would try to examine the [execution plan](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.explain.html) of your original job without multiple treads and check if something could be improved using [cache](https://spark.apache.org/docs/3.1.2/api/python/reference/api/pyspark.sql.DataFrame.cache.html) or [hints](https://spark.apache.org/docs/3.3.0/sql-ref-syntax-qry-select-hints.html). Usually [Spark's Catalyst Optimizer](https://www.databricks.com/de/glossary/catalyst-optimizer) does a good job. – werner Oct 27 '22 at 20:23

0 Answers0