This is a classic
case where you can use multithread scheduling
capabilities of Spark by setting spark.scheduler.mode=FAIR
and assigning pools
What you need to do is
- create your
list
of partitions beforehand
- Use this list as an iterable
- for each iterator from this list trigger one spark-job in different pools
- No need to use a differents3distcp
An example shown below:
before doing spark-submit =>
# Create a List of all *possible* partitions like this
# Example S3 prefixes :
# s3://my_bucket/my_table/year=2019/month=02/day=20
# ...
# ...
# s3://my_bucket/my_table/year=2020/month=03/day=15
# ...
# ...
# s3://my_bucket/my_table/year=2020/month=09/day=01
# WE SET `TARGET_PREFIX` as:
TARGET_PREFIX="s3://my_bucket/my_table"
# And Create a List ( till Day=nn part)
# By looping twice
# Increase loop numbers if partition is till hour
aws s3 ls "${TARGET_PREFIX}/"|grep PRE|awk '{print $2}'|while read year_part ;
do
full_year_part="${TARGET_PREFIX}/${year_part}";
aws s3 ls ${full_year_part}|grep PRE|awk '{print $2}'|while read month_part;
do
full_month_part=${full_year_part}${month_part};
aws s3 ls ${full_month_part}|grep PRE|awk -v pref=$full_month_part '{print pref$2}';
done;
done
Once Done, we run this script and save result in a file like this:
bash build_year_month_day.sh > s3_<my_table_day_partition>_file.dat
Now We are ready to run spark in multithread
The Spark code would need two things ( other than scheduler.mode=FAIR
1. creating an iterator from the file created above # s3_<my_table_day_partition>_file.dat
2. sc.setLocalProperty
See How It is Done .
A. We read The File in our spark-app Python
year_month_date_index_file = "s3_<my_table_day_partition>_file.dat"
with open(year_month_date_index_file, 'r') as f:
content = f.read()
content_iter = [(idx, c) for idx, c in enumerate(content.split("\n")) if c]
B.And use a slice of 100 Days to fire 100 threads:
# Number of THREADS can be Increased or Decreased
strt = 0
stp = 99
while strt < len(content_iter):
threads_lst = []
path_slices = islice(content_iter, strt, stp)
for s3path in path_slices:
print("PROCESSING FOR PATH {}".format(s3path))
pool_index = int(s3path[0]) # Spark needs a POOL ID
my_addr = s3path[1]
# CALLING `process_in_pool` in each thread
agg_by_day_thread = threading.Thread(target=process_in_pool, args=(pool_index, <additional_args>)) # Pool_index is mandatory argument.
agg_by_day_thread.start() # Start opf Thread
threads_lst.append(agg_by_day_thread)
for process in threads_lst:
process.join() # Wait for All Threads To Finish
strt = stp
stp += 100
Two Things to notice
path_slices = islice(content_iter, strt, stp)
=> returns slices of the size (strt - stp)
pool_index = int(s3path[0])
=> the index of content_iter
, we would use this to assign a pool id.
Now The Meat of the code
def process_in_pool(pool_id, <other_arguments>):
sc.setLocalProperty("spark.scheduler.pool", "pool_id_{}".format(str(int(pool_id) % 100)))
As you see we want to restrict threads to 100 pools
So, we set spark.scheduler.pool
as pool_idex
%100
Write your actual Transformation/Action in this `process_in_pool() function
And once done, exit the function by freeing that pool
as
...
sc.setLocalProperty("spark.scheduler.pool", None)
return
finally
Run you spark-submit like
spark-submit \
-- Other options \
--conf spark.scheduler.mode=FAIR \
--other options \
my_spark_app.py
If tuned with correct executor/core/memory, you would see a huge performance gain.
Same can be done in scala
with concurrent.futures
But that's for another day.