9

I have a use case that seems relatively simple to solve using Spark, but can't seem to figure out a sure way to do this.

I have a dataset which contains time series data for various users. All I'm looking to do is:

  • partition this dataset by user ID
  • sort the time series data for each user which by then should supposedly be contained within individual partitions,
  • write each partition to a single CSV file. In the end I'd like to end up with 1 CSV file per user ID.

I tried using the following code snippet, but ended up getting surprising results. I do end up with 1 csv file per user ID and some users' time series data do end up getting sorted, but a lot of other users' were unsorted.

# repr(ds) = DataFrame[userId: string, timestamp: string, c1: float, c2: float, c3: float, ...]
ds = load_dataset(user_dataset_path)
ds.repartition("userId")
    .sortWithinPartitions("timestamp")
    .write
    .partitionBy("userId")
    .option("header", "true")
    .csv(output_path)

I'm unclear as to why this could happen, and I'm not entirely sure how to do this. I'm also not sure if this is potentially a bug in Spark.

I'm using Spark 2.0.2 with Python 2.7.12. Any advice would be very much appreciated!

Ivan Gozali
  • 2,089
  • 1
  • 27
  • 25
  • Sure, I made a gist on GitHub to detail the issue I'm seeing. Behavior is deterministic. Spark and Scala versions are also included in gist. https://gist.github.com/igozali/d327a85646abe7ab10c2ae479bed431f – Ivan Gozali Jan 24 '17 at 20:15
  • 2
    It seems that this might actually be a bug. I've filed https://issues.apache.org/jira/browse/SPARK-19352 and is currently being worked on in this GitHub PR: https://github.com/apache/spark/pull/16724 – Ivan Gozali Feb 01 '17 at 22:41
  • 5
    For those coming later, it seems the desired behavior can be achieved by doing: ``` ds.repartition("userId") .sortWithinPartitions("userId", "timestamp") .write .partitionBy("userId") .option("header", "true") .csv(output_path) ``` see e.g. https://github.com/apache/spark/pull/16724#issuecomment-279190560 and https://github.com/apache/spark/pull/16898 – MikeGM Feb 12 '18 at 16:31

1 Answers1

3

The following code works for me (shown here in Scala, but is similar for Python).

I get one file for each username with the rows in the output file sorted by the timestamp value.

testDF
  .select( $"username", $"timestamp", $"activity" )
  .repartition(col("username"))
  .sortWithinPartitions(col("username"),col("timestamp")) // <-- both here
  .write
  .partitionBy("username")
  .mode(SaveMode.Overwrite)
  .option("header", "true")
  .option("delimiter", ",")
  .csv(folder + "/useractivity")

The import thing is to have both the username and timestamp columns as parameters to sortWithinPartitions.

Here is how one of the output files looks (I used a simple integer as my timestamp):

timestamp,activity
345,login
402,upload
515,download
600,logout
warrens
  • 1,661
  • 18
  • 16