0

I am trying to make my scripts more efficient.

At the moment, I have 10 scripts - they all read data in, process it and output it.

They all read from the same main DB tables though and just do different things with the data.

So I have consolidated to one script, with the idea that I only read data once, rather than 10 times.

Should that not result in a faster execution? Because it doesn't.

Below is an example of the structure I am using

Any help would be amazing

Thanks

'''
TABLE DEFINITION AND CACHING
'''
spark_session = create_session('usage CF')
usage_logs = spark_session.sql("Select * from db.table where dt = " + yday_date ).cache()
user_logs = spark_session.sql("Select * from db2.table2 where dt = " + yday_date ).cache()
usercat = spark_session.sql("Select * from db3.table3 where dt = " + yday_date ).cache()
radius_logs = spark_session.sql("Select * from db.table4 where dt = " + yday_date )
radius = radius_logs.select('emsisdn2', 'sessionid2', 'custavp1').cache()


'''
usage CF
'''
usage = usage_logs.select('field1', 'field2', 'field3')
conditions = [usage.sid == radius.sessionid2]
df3 = usage.join(radius, conditions, how='left')
df4 = df3.groupBy('field1', 'field2').agg(sqlfunc.sum('field3').alias('bytesdl'))
usage = df4.createOrReplaceTempView('usage')
usage_table_output = spark_session.sql(' insert overwrite table outputdb.outputtbl partition(dt = ' + yday_date + ') select "usage" as type, * from usage ')

'''
user CF
'''
user = usage_logs.filter((usage_logs.vslsessid == '0')).select('field1', 'field2', 'field3', 'field4')
conditionsx = [user.sessionid == radius.sessionid2]
user_joined = user.join(radius, conditionsx, how='left')
user_output = user_joined.groupBy('field1', 'field2', 'field3').agg(sqlfunc.sum('field4').alias('bytesdl'))
user = user_output.createOrReplaceTempView('user')
user_table_output = spark_session.sql(' insert overwrite table outputdb.outputtbl2 partition(dt = ' + yday_date + ') select "user" as type, * from user')
kikee1222
  • 1,866
  • 2
  • 23
  • 46
  • To get any performance gains, you need to use the same dataframe multiple times ( > 1), i.e., you need to actually perform multiple [actions](https://spark.apache.org/docs/latest/rdd-programming-guide.html#actions) on them. – Shaido Apr 16 '20 at 08:05
  • They are used lots of times with different aggregations. This is just a snippet – kikee1222 Apr 16 '20 at 08:30
  • Then it should be correct. Could it be that the dataframes are very small, i.e., that reading them is actually not that time consuming / a bottleneck. If the time taken for the aggregations are a lot more than the database reading time then it could be that the gain from the cache is irrelevant. – Shaido Apr 16 '20 at 08:42
  • 1
    The dataframe for usage_logs is 15TB for the day, which is why I was expecting a pretty massive performance jump. ill keep going :) – kikee1222 Apr 16 '20 at 08:49
  • You could look into optimizing the `join` which usually is very slow for large data, see here for some ideas: https://stackoverflow.com/questions/37842595/what-is-an-optimized-way-of-joining-large-tables-in-spark-sql – Shaido Apr 16 '20 at 09:20
  • I'd first look at the query plan and see where the hold up is. Like @Shaido-ReinstateMonica said your bottleneck is probably somewhere else. – moon Apr 30 '20 at 02:00

0 Answers0