0

I created a pipeline with PySpark that basically loops through a list of queries, each of which are ran on a MySQL database using a JDBC connector, store the result in a Spark DataFrame, filter its columns that have only one value and then saves it as a Parquet file.

Since I'm looping through the query list with a for, each query and column filtering process are done sequentially, so I'm not using all CPU available.

What I want to accomplish is to start a new process (query + filter + Parquet persistence) whenever I have CPU available.

NOTE: I'm processing different inputs (queries) each time, which is different to what was asked here, where different processing are done in the same input. Also, I don't want to specify how many process to run at the same time, instead, I'd like to use all CPU available in the first process and, if there is still resources available, start a new one. If there are still resources available, start another one, and so on...

Here's the script that I'm running:

# Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, when, count, countDistinct
from time import time


# Spark session initialization
spark = SparkSession \
    .builder \
    .appName('Filtering Columns') \
    .config('spark.driver.memory', '16g') \
    .config('spark.executor.memory', '16g') \
    .config('spark.driver.extraClassPath',
            '/path/to/mysql-connector-java-5.1.38.jar') \
    .getOrCreate()


# JDBC config
jdbc_config = {
    'url': 'jdbc:mysql://my_db_ip_address',
    'properties': {
        'user': 'my_db_user',
        'password': 'my_db_password'
    }
}


# My queries... Didn't put the real queries here, but
# they have nothing in special
queries_to_run = [
    {
        'table_name': 'table1',
        'query': '''
             (some query) as tmp
        '''
    },
    {
        'table_name': 'table2',
        'query': '''
             (some query) as tmp
        '''
    },
    {
        'table_name': 'table3',
        'query': '''
             (some query) as tmp
        '''
    },
    ...
]


# The function I'm using to filter the columns
def drop_constant_columns(df):
    cols_to_drop_map = df.select([
        when(countDistinct(column_name) == 1, True).alias(column_name)
        for column_name in df.columns
    ]).first().asDict()

    cols_to_drop = [
        col for col, should_drop in cols_to_drop_map.iteritems()
        if should_drop
    ]

    return df.drop(*cols_to_drop)


# Here's the code that loops through the queries and, for each 
# one of them:
# 1) Query a MySQL db
# 2) Store the result in a Spark DF
# 3) Filter the constant columns
# 4) Save the filtered DF in a Parquet format
for query in queries_to_run:
    print('Querying {}'.format(query['table_name']))
    df = spark.read.jdbc(table=query['query'], **jdbc_config)

    print('Filtering {}'.format(query['table_name']))
    n_cols = len(df.columns)

    start = time()
    df = drop_constant_columns(df)
    elapsed = time() - start

    n_cols_filtered = n_cols - len(df.columns)
    print('Filtered {} of {} columns in {:.2f} secs'.format(n_cols_filtered, n_cols, elapsed))

    print('Persisting {}'.format(query['table_name']))
    df.write.mode('overwrite').parquet('./{}_test.parquet'.format(query['table_name']))

I'm using PySpark 2.2.1, Python 2.7.12 on a Ubuntu 16.04.

  • 2
    Possible duplicate of [How to run 2 functions doing completely independent transformations on a single RDD in parallel using pyspark?](https://stackoverflow.com/questions/38048068/how-to-run-2-functions-doing-completely-independent-transformations-on-a-single) – Alper t. Turker May 25 '18 at 20:29
  • @user8371915 it is not a duplicate, as noted on my post, after edition – Álvaro Lemos May 25 '18 at 22:03

1 Answers1

1

Basically you need to set FAIR scheduling mode for Spark context, create multiple threads and execute a spark action in each thread to achieve close to 100% cluster saturation (assuming your jobs are CPU bound).

Though you mentioned that you don’t want to set a limit on the number of threads, I’d recommend to do that anyway. There is only a limited number of threads you can create in your OS and they all take valuable memory and CPU resources from driver. E.g. you cannot create a million threads and will have to use some kind of queuing (e.g. a combination of semaphore and locks) anyway.

On the other hand, there is a point of diminishing returns when all the executors are busy 100% of time and no new tasks are accepted by scheduler and many Spark jobs just sit idle, waiting for executors to become available. Spark scheduling is done on task level, i.e. if one of job’s tasks is running on some executor, it won’t be preempted.

You can experimentally figure out a high enough number of simultaneous requests that gives the best overall processing time for all the requests.

Denis Makarenko
  • 2,853
  • 15
  • 29