2

I'm trying to read in data from a Spark streaming data source, window it by event time, and then run a custom Python function over the windowed data (it uses non-standard Python libraries).

My data frame looks something like this:

| Time                    | Value |
| 2018-01-01 12:23:50.200 | 1234  |
| 2018-01-01 12:23:51.200 |   33  |
| 2018-01-01 12:23:53.200 |  998  |
|           ...           |  ...  |

The windowing seems to work nicely with Spark SQL, using something like this:

windowed_df = df.groupBy(window("Time", "10 seconds"))

..., and there is a section on windowing by event time in the Spark Structured Streaming docs so I think that should work fine with Spark Structured Streaming.

So far, so good.

Separately, I've been able to use Spark Streaming (DStream) to apply my custom transformation operation, which currently operates on an incoming stream (basically, it assumes the data comes in correctly windowed chunks, an assumption I'm trying to get rid of). The code looks something like this:

def my_analysis(input_rdd):
    # convert RDD to native types (would also be possible from a DataFrame)
    # run through various Python libs
    # construct new RDD with results - 1 row, multiple values (could construct new DataFrame here instead)

my_dstream\
    .map(deserialize_from_string)\
    .transform(my_analysis)\
    .map(serialize_to_string)\
    .foreachRDD(write_to_sink)

I'd essentially now want to combine the two, so do something like:

df\
    .groupBy(window("Time", "10 seconds"))\
    .transform(my_analysis)\  # how do I do this with pyspark.sql.group.GroupedData?
    .writeStream  # ...

# OR:

my_dstream\
    .map(deserialize_from_string)\
    .window_by_event_time("10 seconds")\  # how do I do this with a DStream?
    .transform(my_analysis)\
    .map(serialize_to_string)\
    .foreachRDD(write_to_sink)

Any idea how I might be able to accomplish the above?

Things I've tried:

  • The functions I can run on windowed_df seem very limited, basically IPython suggests I can only do aggregations here (min/max/avg/agg with pyspark.sql.functions). agg seems most useful, but the best I've found so far in that area is using collect_list, something like this:
    windowed_df.agg(collect_list("Value")).sort("window").show(20, False)

... but that means I lose the timestamps.

  • Custom aggregation functions (UDAF) are not supported in PySpark (SPARK-10915)

Other things I've looked at:

zero323
  • 322,348
  • 103
  • 959
  • 935
m01
  • 9,033
  • 6
  • 32
  • 58
  • 1
    Considering that `DStream` have no concept of event time or trigger at all? Stateful streams and a bunch of Spark wizardy can mimic that (see how Beam works), but it is a lot of work. `collect_list` is straightforward but inefficient. And you can easily collect `structs` (take a look at sklearn `gapply`) And difference between `UDAF` and "vectorized thing" is that the latter one is not designed for optimized map-side reduction. It is just another group-by-key (like collect list) with more efficient serialization. If you train a model, then UDAF won't be useful anyway. – zero323 Jan 30 '18 at 22:08

0 Answers0