Questions tagged [spark-shuffle]

20 questions
3
votes
1 answer

What is spark spill (disk and memory both)?

As per the documentation: Shuffle spill (memory) is the size of the deserialized form of the shuffled data in memory. Shuffle spill (disk) is the size of the serialized form of the data on disk. My understanding of shuffle is this: Every…
3
votes
1 answer

Understanding the shuffle in spark

Shuffling in spark is (as per my understanding): Identify the partition that the records have to go to (Hashing and modulo) Serialize the data that needs to go to the same partition transmit the data The data gets deserialized and read by the…
figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56
3
votes
1 answer

What is the difference between spark.shuffle.partition and spark.repartition in spark?

What I understand is When we repartition any dataframe with value n, data will continue to remain on those n partitions, until you hit any shuffle stages or other value of repartition or coalesce. For Shuffle, it only comes into the play when you…
2
votes
0 answers

filter data in tfrecord with spark/scala without aggregate steps?

I have a very large tfrecord directory, and need to filter it with some column to generate new tfrecord files. Code likes that val df = spark.read.format("tfrecords").option("recordType",…
2
votes
1 answer

How does spark calculate the number of reducers in a hash shuffle?

I am trying to understand hash shuffle in Spark. I am reading this article Hash Shuffle: Each mapper task creates separate file for each separate reducer, resulting in M * R total files on the cluster, where M is the number of “mappers” and R is…
figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56
1
vote
0 answers

HashPartioning dataframes to achieve co-partitioning during join in PySpark

I am trying to figure out the best way to achieve co-partitioning on my two datasets to eliminate join related shuffles. I'm working with 2 dataframes A and B where A contains minimal user date including a field for event IDs they interacted with,…
1
vote
1 answer

How to avoid unnecessary shuffle in pyspark?

I have two CSVs. df_sales, df_products. I want use pyspark to: Join df_sales and df_products on product_id. df_merged = df_sales.join(df_products,df_sales.product_id==df_products.product_id,"inner") Compute the summation of df_sales.num_pieces_sold…
figs_and_nuts
  • 4,870
  • 2
  • 31
  • 56
1
vote
0 answers

Spark shuffle service on local shared dir with Ceph on kubernetes

We run Spark 3.X on kubernetes, executor pods share the same readWriteMany Ceph volume. So, all Spark workers write shuffle data on the same volume (I guess into different dirs), available for any worker. On other side, Spark is sharing shuffle data…
Thomas Decaux
  • 21,738
  • 2
  • 113
  • 124
1
vote
3 answers

Count words from a list within array columns without invoking a shuffle

I'm coming from this post: pyspark: count number of occurrences of distinct elements in lists where the OP asked about getting the counts for distinct items from array columns. What if I already know the vocabulary in advance and want to get a…
0
votes
1 answer

Does Spark shuffle write all intermediate data to disk?

Does Spark shuffle write all intermediate data to disk, or only that which will not fit in memory ("spill")? In particular, if the intermediate data is small, will anything be written to disk, or will the shuffle be performed entirely using memory…
Denziloe
  • 7,473
  • 3
  • 24
  • 34
0
votes
0 answers

No space left on device error in Spark Scala

I have a spark program that basically reads files from S3, processes them, and writes output back to S3, then loads a new set of files and does the same thing. I also have 2 small data frames which I create at the start of the program and keep it…
0
votes
0 answers

org.apache.spark.shuffle.FetchFailedException: The relative remote executor is dead

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: ResultStage 9 (runJob at FileFormatWriter.scala:237) has failed the maximum allowable number of times: 4. Most recent failure reason:…
湘晗刚
  • 1
  • 1
0
votes
0 answers

How to use ShuffleDriverComponents to initiate service for shuffling

ShuffleDriverComponents description says: "This method should prepare the module with its shuffle components i.e. registering against an external file servers or shuffle services, or creating tables in a shuffle storage data database." I'm preparing…
Brave
  • 159
  • 11
0
votes
0 answers

Spark shuffle reads reduced but executor compute time takes longer

I recently implemented a change where my Spark job reads a fraction of the original data needed. When I test, this change shows up as the shuffle read sizes are substantially reduced. However, the job consistently takes ~30% longer than production,…
0
votes
0 answers

Is it possible to avoid Shuffle while getting distinct data from Spark DataFrame

Let's Assume, we have data gathered from Kafka with 3 partitions (from one topic). Kafka Keys and Values presented as table below: | key | value …
1
2