0

I am wondering why my Glue job runs so slow even tho the query has a LIMIT clause

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, DateType, Row
from pyspark.sql.functions import UserDefinedFunction, regexp_replace, to_timestamp


# INIT GLUE JOB
print(">>> INIT GLUE JOB ...")
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# READ IN FLIGHTS, AIRPORTS, AGENTS TABLES
# NOTE: Bookmarks enabled for flights data catalog
flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="flights")
flightsDF = flightsGDF.toDF()
flightsDF.createOrReplaceTempView("flights")

airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "airports")
airportsDF = airportsGDF.toDF()
airportsDF.createOrReplaceTempView("airports")

agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "agents")
agentsRawDF = agentsGDF.toDF()
agentsRawDF.createOrReplaceTempView("agents_raw")

agentsDF = spark.sql("""
    SELECT * FROM agents_raw
    WHERE type IN ('Airline', 'TravelAgent')
""") 
agentsDF.createOrReplaceTempView("agents")

# TRY TO PREPROCESS FLIGHTS
testDf = spark.sql("""
    SELECT 
        f.*, countryName, cityName, airportName, a.name AS agentName,
        CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
    FROM flights f
    LEFT JOIN agents a
    ON cast(f.agent as bigint) = a.id
    LEFT JOIN airports p
    ON cast(f.querydestinationplace as bigint) = p.airportId
    LIMIT 10
""")
df = testDf.withColumn("querydatetime", regexp_replace(testDf["querydatetime"], "-", "").cast("int"))
df = testDf.withColumn("queryoutbounddate", regexp_replace(testDf["queryoutbounddate"], "-", "").cast("int"))
df = testDf.withColumn("queryinbounddate", regexp_replace(testDf["queryinbounddate"], "-", "").cast("int"))
df = testDf.withColumn("outdeparture", to_timestamp(testDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("outarrival", to_timestamp(testDf["outarrival"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("indeparture", to_timestamp(testDf["indeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("inarrival", to_timestamp(testDf["inarrival"], "yyyy-MM-ddTHH:mm:ss"))
df.show()

Notice the SQL has LIMIT 10. When I run the same query in Athena, it takes just 6s ... I tried commenting out the withColumn part but its still running for ~30+ minutes already ... why the huge difference?

Some recent log output:

19/01/28 07:45:55 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING) 19/01/28 07:45:55 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:56 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:56 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:57 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:57 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:58 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:58 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root

... if it helps

Any general guidelines about how to debug Glue? Like it will be very beneficial to understand whats glue currently processing of which parts is slow

========

I tried filtering out data

df = flightsDf \
    .where(flightsDf.querydatetime > "2019-01-22") \

Or sampling

df = flightsDf \
    .sample(False, 0.000001) \

Do they help? It seems to take a long time also ...

Jiew Meng
  • 84,767
  • 185
  • 495
  • 805
  • _ job runs so slow even tho the query has a LIMIT clause_ - `LIMIT` is primarily irrelevant here, as it cannot be used to optimize the process, unless broadcast joins are used. In such cases Spark will load and shuffle all the data (some twice) before applying `LIMIT`. Just glancing over the names, you might want to add [broadcast hints](https://stackoverflow.com/q/37487318/6910411) to `agents` and `airports` and see how it goes then. – zero323 Jan 28 '19 at 11:02
  • @user6910411 can I say that in this case Glue will still load the entire data, join then do the limit? So basically limit is purely to limit the output with no improvement to performance? – Jiew Meng Jan 28 '19 at 14:12
  • @JiewMeng you can simply run explain on your query and verify the query plan which will tell you whether the limit is happening before join or after join. If you are really planning on to filter data even before the data get loaded to job try using predicate pushdown feature of Glue ETL.You can refer to https://aws.amazon.com/blogs/big-data/work-with-partitioned-data-in-aws-glue/ which has a clear blog of explaining how you can do it. – Prabhakar Reddy Jan 28 '19 at 16:54
  • Try flightsDF = flightsGDF.toDF().limit(10) and so for other tables and then JOIN query – Sandeep Fatangare Jan 29 '19 at 18:06
  • @JiewMeng Sounds about right. With shuffle based joins there is really not much field for improvement - before you shuffle you cannot know, if partition is empty or not. – zero323 Jan 29 '19 at 19:44
  • @user6910411 you can add your comments as an answer and I will accept it :) – Jiew Meng May 10 '19 at 10:04

0 Answers0