9

I noticed that when launching this bunch of code with only one action, I have three jobs that are launched.

from typing import List
from pyspark.sql import DataFrame
from pyspark.sql.types import StructType, StructField, StringType
from pyspark.sql.functions import avg

data: List = [("Diamant_1A", "TopDiamant", "300", "rouge"),
    ("Diamant_2B", "Diamants pour toujours", "45", "jaune"),
    ("Diamant_3C", "Mes diamants préférés", "78", "rouge"),
    ("Diamant_4D", "Diamants que j'aime", "90", "jaune"),
    ("Diamant_5E", "TopDiamant", "89", "bleu")
  ]

schema: StructType = StructType([ \
    StructField("reference", StringType(), True), \
    StructField("marque", StringType(), True), \
    StructField("prix", StringType(), True), \
    StructField("couleur", StringType(), True)
  ])

dataframe: DataFrame = spark.createDataFrame(data=data,schema=schema)

dataframe_filtree:DataFrame = dataframe.filter("prix > 50")

dataframe_filtree.show()

From my understanding, I should get only one. One action corresponds to one job. I'm using Databricks. It could be the problem. I have 2 questions :

  • Why do I have 3 jobs instead of 1?
  • Can I change this behaviour?

Here is the first job: first dag

Here is the second one: Dag for the second job

And the last one: dag for the third job

Nastasia
  • 557
  • 3
  • 22

2 Answers2

10

1 action corresponds to 1 job is True. The caveat that gets missed here is that this is true in the RDD API

Dataframe and Dataset API is a layer of abstraction on top of the RDD API to make your life easier. Sometimes, while calling an action it triggers several actions internally and you see multiple jobs

An example would be reading a csv with header=True. When you call an action downstream, it would trigger another internal action that reads the first row of the csv to infer the header and you will see that show up as a job

Another reason is adaptive query execution. spark.sql.adaptive.enabled set to True leads to spark using the stage statistics and deciding the subsequent physical plan based upon those statistics. This is useful in the cases like end users not having to worry about the skews in spark joins. However, This leads to spark breaking up the job into many jobs. You see these as skipped stages from previous jobs in your job DAGs. If you set spark.sql.adaptive.enabled to False, you will see all these jobs disappear. But, you almost always want to use adaptive query execution

figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56
1

@Nastasia Though i couldn't find answer for the above question , i want to give some of my findings in my 8-core CPU system:

  1. Above program without any change :

    No. of Jobs - 3  , 
    No. Tasks  in each Job - 3 , 4 , 1 (because default no. of partitions is 8 )
    
  2. In your example add a few lines as shown below which gives some understanding

     print(dataframe.rdd.getNumPartitions())
     dataframe2 = dataframe.coalesce(1)
     print(dataframe2.rdd.getNumPartitions())
     dataframe_filtree:DataFrame = dataframe2.filter("prix > 50")
     dataframe_filtree.show()
    
    No. of Jobs - 1 ,   
    No. Tasks  in each Job - 1 (because  no. of partitions is 1 )
    
  3. Changing the parameter in coalesce has given the following results:

    coalesce(2): 
    No. of Jobs - 2 (why not 1 ?) , 
    No. Tasks  in each Job - 1,1 
    
    coalesce(6):
    No. of Jobs - 3 (why not 1 or 2) , 
    No. Tasks  in each Job - 1,4,1
    

Obviously 'No.of partitions' is a factor here. But there is something else also which is deciding the no. of jobs.

tryData
  • 25
  • 5