I created a pipeline with PySpark that basically loops through a list of queries, each of which are ran on a MySQL database using a JDBC connector, store the result in a Spark DataFrame, filter its columns that have only one value and then saves it as a Parquet file.
Since I'm looping through the query list with a for
, each query and column filtering process are done sequentially, so I'm not using all CPU available.
What I want to accomplish is to start a new process (query + filter + Parquet persistence) whenever I have CPU available.
NOTE: I'm processing different inputs (queries) each time, which is different to what was asked here, where different processing are done in the same input. Also, I don't want to specify how many process to run at the same time, instead, I'd like to use all CPU available in the first process and, if there is still resources available, start a new one. If there are still resources available, start another one, and so on...
Here's the script that I'm running:
# Imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import isnull, when, count, countDistinct
from time import time
# Spark session initialization
spark = SparkSession \
.builder \
.appName('Filtering Columns') \
.config('spark.driver.memory', '16g') \
.config('spark.executor.memory', '16g') \
.config('spark.driver.extraClassPath',
'/path/to/mysql-connector-java-5.1.38.jar') \
.getOrCreate()
# JDBC config
jdbc_config = {
'url': 'jdbc:mysql://my_db_ip_address',
'properties': {
'user': 'my_db_user',
'password': 'my_db_password'
}
}
# My queries... Didn't put the real queries here, but
# they have nothing in special
queries_to_run = [
{
'table_name': 'table1',
'query': '''
(some query) as tmp
'''
},
{
'table_name': 'table2',
'query': '''
(some query) as tmp
'''
},
{
'table_name': 'table3',
'query': '''
(some query) as tmp
'''
},
...
]
# The function I'm using to filter the columns
def drop_constant_columns(df):
cols_to_drop_map = df.select([
when(countDistinct(column_name) == 1, True).alias(column_name)
for column_name in df.columns
]).first().asDict()
cols_to_drop = [
col for col, should_drop in cols_to_drop_map.iteritems()
if should_drop
]
return df.drop(*cols_to_drop)
# Here's the code that loops through the queries and, for each
# one of them:
# 1) Query a MySQL db
# 2) Store the result in a Spark DF
# 3) Filter the constant columns
# 4) Save the filtered DF in a Parquet format
for query in queries_to_run:
print('Querying {}'.format(query['table_name']))
df = spark.read.jdbc(table=query['query'], **jdbc_config)
print('Filtering {}'.format(query['table_name']))
n_cols = len(df.columns)
start = time()
df = drop_constant_columns(df)
elapsed = time() - start
n_cols_filtered = n_cols - len(df.columns)
print('Filtered {} of {} columns in {:.2f} secs'.format(n_cols_filtered, n_cols, elapsed))
print('Persisting {}'.format(query['table_name']))
df.write.mode('overwrite').parquet('./{}_test.parquet'.format(query['table_name']))
I'm using PySpark 2.2.1
, Python 2.7.12
on a Ubuntu 16.04
.