3

I want to do parallel processing in for loop using pyspark.

from pyspark.sql import SparkSession
spark = SparkSession.builder.master('yarn').appName('myAppName').getOrCreate()
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")

data = [a,b,c]


for i in data:
    try:
        df = spark.read.parquet('gs://'+i+'-data')
        df.createOrReplaceTempView("people")
        df2=spark.sql("""select * from people """)
        df.show()
    except Exception as e:
        print(e)
        continue

Above mentioned script is working fine but i want to do parallel processing in pyspark and which is possible in scala

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
Amol
  • 336
  • 3
  • 5
  • 17
  • Does this answer your question? [How to run multiple Spark jobs in parallel?](https://stackoverflow.com/questions/49568940/how-to-run-multiple-spark-jobs-in-parallel) – ernest_k Jan 10 '20 at 05:07
  • 1
    Does this answer your question? [How to run independent transformations in parallel using PySpark?](https://stackoverflow.com/questions/38048068/how-to-run-independent-transformations-in-parallel-using-pyspark) – user10938362 Jan 10 '20 at 10:26

3 Answers3

6

Spark itself runs job parallel but if you still want parallel execution in the code you can use simple python code for parallel processing to do it (this was tested on DataBricks Only link).

data = ["a","b","c"]

from multiprocessing.pool import ThreadPool
pool = ThreadPool(10)


def fun(x):
    try:
        df = sqlContext.createDataFrame([(1,2, x), (2,5, "b"), (5,6, "c"), (8,19, "d")], ("st","end", "ani"))
        df.show()
    except Exception as e:
        print(e)

pool.map( fun,data)

I have changed your code a bit but this is basically how you can run parallel tasks, If you have some flat files that you want to run parallel just make a list with their name and pass it into pool.map( fun,data).

Change the function fun as need be.

For more details on the multiprocessing module check the documentation.

Similarly, if you want to do it in Scala you will need the following modules

import scala.concurrent.{Future, Await}

For a more detailed understanding check this out. The code is for Databricks but with a few changes, it will work with your environment.

Andy_101
  • 1,246
  • 10
  • 20
  • Will this bring it to the driver node? Or will it execute the parallel processing in the multiple worker nodes? – thentangler Feb 03 '21 at 06:28
  • 6
    this is parallel execution in the code not actuall parallel execution. this is simple python parallel Processign it dose not interfear with the Spark Parallelism. – Andy_101 Feb 03 '21 at 12:10
  • 1
    I also think this simply adds threads to the driver node. It doesn't send stuff to the worker nodes. I think Andy_101 is right. – vaudt Jul 15 '22 at 12:24
  • 1
    I actually tried this out, and it does run the jobs in parallel in worker nodes surprisingly, not just the driver! My experiment setup was using 200 executors, and running 2 jobs in series would take 20 mins, and running them in ThreadPool takes 10 mins in total. – Daniel Qiu Aug 22 '22 at 22:57
  • I think this does not work. Here's my sketch of proof. import socket from multiprocessing.pool import ThreadPool pool = ThreadPool(10) def getsock(i): s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) return s.getsockname()[0] list(pool.map(getsock,range(10))) This always gives the same IP address. Namely that of the driver. Hence we are not executing on the workers. – vaudt Oct 05 '22 at 15:10
  • @vaudt I have tested the logic only on DataBricks, the links to it has been added to the post. and yes Thread will only give parallelism on the driver node only. – Andy_101 Oct 06 '22 at 05:40
  • @Andy_101. I agree with you that the code works. I also agree that it only gives parallelism on the driver node. In fact that's what my code fragment shows. With spark you usually do not want parallelism on the driver node, because that blocks the driver and doesn't use the worker nodes. – vaudt Oct 09 '22 at 15:10
  • oh. that's a good point. – Andy_101 Oct 10 '22 at 12:09
0

Here's a parallel loop on pyspark using azure databricks.

import socket

def getsock(i):
  s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  s.connect(("8.8.8.8", 80))
  return s.getsockname()[0]

rdd1 = sc.parallelize(list(range(10)))
parallel=rdd1.map(getsock).collect()

On other platforms than azure you'll maybe need to create the spark context sc. On azure the variable exists by default.

Coding it up like this only makes sense if in the code that is executed parallelly (getsock here) there is no code that is already parallel. For instance, had getsock contained code to go through a pyspark DataFrame then that code is already parallel. So, it would probably not make sense to also "parallelize" that loop.

vaudt
  • 151
  • 6
0

In PySpark, parallel processing is done using RDDs (Resilient Distributed Datasets), which are the fundamental data structure in PySpark. RDDs can be split into multiple partitions, and each partition can be processed in parallel on different nodes in a cluster. Let me use an example to explain.

Example: To parallelize the deletion of 10 tables in PySpark, you can use the ParallelCollectionRDD method, which turns a Python collection into a parallelized RDD. Here is an example of how you can use it:

from pyspark import SparkConf, SparkContext

conf = SparkConf().setAppName("ParallelProcessingExample")
sc = SparkContext(conf=conf)

def delete_column(table_name, column_name):
    # Code to delete the specified column from the table
    spark.sql(f"ALTER TABLE {table_name} DROP COLUMN {column_name}")

# Create a list of table and column tuples to process 
table_columns = [('table1', 'column1'), 
                 ('table2', 'column2'), 
                 ('table3', 'column3'),
                 # ... Repeat for all table/column combinations to process
                ]

# Parallelize the list into an RDD 
rdd = sc.parallelize(table_columns)

# Use the foreach() method to apply the function to each table in parallel
rdd.foreach(lambda table_column: delete_column(*table_column))

In this example, the delete_column function takes the table name and column name as parameters and executes a SQL statement to delete the specified column from the table. The table_columns list contains tuples with the table and column names to process. The rdd variable parallelizes the list into an RDD. The foreach() method applies the delete_column function to each table/column tuple in parallel.

I hope this helps! Let me know if you have any other questions.

Reference: https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize.html https://www.databricks.com/glossary/what-is-rdd

Question-er XDD
  • 640
  • 3
  • 12
  • 27