I am wondering why my Glue job runs so slow even tho the query has a LIMIT clause
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from datetime import datetime, timedelta
from pyspark.sql.types import ArrayType, DateType, Row
from pyspark.sql.functions import UserDefinedFunction, regexp_replace, to_timestamp
# INIT GLUE JOB
print(">>> INIT GLUE JOB ...")
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)
# READ IN FLIGHTS, AIRPORTS, AGENTS TABLES
# NOTE: Bookmarks enabled for flights data catalog
flightsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "flights", transformation_ctx="flights")
flightsDF = flightsGDF.toDF()
flightsDF.createOrReplaceTempView("flights")
airportsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "airports")
airportsDF = airportsGDF.toDF()
airportsDF.createOrReplaceTempView("airports")
agentsGDF = glueContext.create_dynamic_frame.from_catalog(database = "pinfare", table_name = "agents")
agentsRawDF = agentsGDF.toDF()
agentsRawDF.createOrReplaceTempView("agents_raw")
agentsDF = spark.sql("""
SELECT * FROM agents_raw
WHERE type IN ('Airline', 'TravelAgent')
""")
agentsDF.createOrReplaceTempView("agents")
# TRY TO PREPROCESS FLIGHTS
testDf = spark.sql("""
SELECT
f.*, countryName, cityName, airportName, a.name AS agentName,
CONCAT(f.outboundlegid, '-', f.inboundlegid, '-', f.agent) AS key
FROM flights f
LEFT JOIN agents a
ON cast(f.agent as bigint) = a.id
LEFT JOIN airports p
ON cast(f.querydestinationplace as bigint) = p.airportId
LIMIT 10
""")
df = testDf.withColumn("querydatetime", regexp_replace(testDf["querydatetime"], "-", "").cast("int"))
df = testDf.withColumn("queryoutbounddate", regexp_replace(testDf["queryoutbounddate"], "-", "").cast("int"))
df = testDf.withColumn("queryinbounddate", regexp_replace(testDf["queryinbounddate"], "-", "").cast("int"))
df = testDf.withColumn("outdeparture", to_timestamp(testDf["outdeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("outarrival", to_timestamp(testDf["outarrival"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("indeparture", to_timestamp(testDf["indeparture"], "yyyy-MM-ddTHH:mm:ss"))
df = testDf.withColumn("inarrival", to_timestamp(testDf["inarrival"], "yyyy-MM-ddTHH:mm:ss"))
df.show()
Notice the SQL has LIMIT 10. When I run the same query in Athena, it takes just 6s ... I tried commenting out the withColumn part but its still running for ~30+ minutes already ... why the huge difference?
Some recent log output:
19/01/28 07:45:55 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING) 19/01/28 07:45:55 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:56 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:56 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:57 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:57 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
19/01/28 07:45:58 INFO Client: Application report for application_1548657105793_0002 (state: RUNNING)
19/01/28 07:45:58 DEBUG Client: client token: N/A diagnostics: N/A ApplicationMaster host: 172.31.44.1 ApplicationMaster RPC port: 0 queue: default start time: 1548660882386 final status: UNDEFINED tracking URL: http://ip-172-31-40-134.ap-southeast-1.compute.internal:20888/proxy/application_1548657105793_0002/ user: root
... if it helps
Any general guidelines about how to debug Glue? Like it will be very beneficial to understand whats glue currently processing of which parts is slow
========
I tried filtering out data
df = flightsDf \
.where(flightsDf.querydatetime > "2019-01-22") \
Or sampling
df = flightsDf \
.sample(False, 0.000001) \
Do they help? It seems to take a long time also ...