I have mysql table stored in dataframe_mysql
dataframe_mysql = sqlContext.read.format("jdbc").options(...
dataframe_mysql.registerTempTable('dataf')
groupedtbl=sqlContext.sql("""SELECT job_seq_id,first(job_dcr_content) as firststage,last(job_dcr_content) as laststage,
first(from_stage) as source, last(from_stage) as target , count(jid) as noofstages from dataf group by job_seq_id having count(jid)>1""" )
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, ArrayType
func1 = udf(fu1, StringType())
func2= udf(fu2, StringType())
res1=groupedtbl.withColumn('dcol',func1(groupedtbl.firststage,groupedtbl.lastage,groupedtbl.job_seq_id))
res2=res1.withColumn('lcol',func2(res1.dcol,res1.job_seq_id))
For the above code I see that even I issue a limit command:
lb=res2.limit(2).collect()
or the following command to only get results of one record:
labels.filter(res2.job_seq_id==5843064)
Instead of just working to get the two results in the first query or a single result in the second , it does a lot of unnecessary computations on other rows which wastes times even if only two rows are required. I can see this from the internal logs then even for just fetching two rows its computing 100s of rows and then retrieving the two result rows from them. I though that the DAG mechanism should've handle this but it seems it does not , am I wrong in this observation?