0

Summary

Developing a Spark application using Java for financial analytics. Apache Kafka is being used for streaming data in conjunction with Spark Struct Streaming API for reading data. group_by() and window() functions are used to calculate the average bid and ask prices every minute. I want to calculate the percentage change of the average of those two prices between the current and past window.

Relevant code block

`public static void test2() throws Exception {
    // Create SparkSession
    SparkSession spark = SparkSession.builder()
            .appName("BidAskPriceChanges")
            .master("local")
            .getOrCreate();

    // Define the schema for the streaming data
    StructType schema = new StructType()
            .add("book", "string", false)
            .add("volume", "string", false)
            .add("high", "string",true)
            .add("last", "string", false)
            .add("low", "string", false)
            .add("vwap", "string", false)
            .add("ask", "string", false)
            .add("bid", "string", false)
            .add("created_at", "string",true);

    // Read streaming data from Kafka
    Dataset<Row> data = spark
            .readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9093")
            .option("subscribe", "topic1")
            .load()
            .selectExpr("CAST(value AS STRING) as json")
            .select(functions.from_json(functions.col("json"), schema).as("data"))
            .select("data.*");

    // Parse the 'created_at' column as timestamp
    data = data
            .withColumn("volume", col("volume").cast(DataTypes.DoubleType))
            .withColumn("high", col("high").cast(DataTypes.DoubleType))
            .withColumn("last", col("last").cast(DataTypes.DoubleType))
            .withColumn("low", col("low").cast(DataTypes.DoubleType))
            .withColumn("vwap", col("vwap").cast(DataTypes.DoubleType))
            .withColumn("ask", col("ask").cast(DataTypes.DoubleType))
            .withColumn("bid", col("bid").cast(DataTypes.DoubleType))
            .withColumn("created_at", to_timestamp(col("created_at"), "yyyy-MM-dd'T'HH:mm:ssZ"));
    
    
    // Define the window duration and sliding interval
    String windowDuration = "5 minutes";
    String slidingInterval = "1 minute";
    
    WindowSpec windowSpec = Window.partitionBy(window(col("created_at"), windowDuration, slidingInterval))
            .orderBy("created_at");
    Dataset<Row> windowedData = data
            .withColumn("avg_bid", avg(col("bid")).over(windowSpec))
            .withColumn("avg_ask", avg(col("ask")).over(windowSpec));

    // Compute the lagged average bid/ask prices for comparison
    Column prevAvgBid = lag(col("avg_bid"), 1).over(windowSpec);
    Column prevAvgAsk = lag(col("avg_ask"), 1).over(windowSpec);

    windowedData = windowedData.withColumn("prev_avg_bid", prevAvgBid)
            .withColumn("prev_avg_ask", prevAvgAsk);

    
    Dataset<Row> avgPrices = data
            .withWatermark("created_at", "1 minute")
            .groupBy(window(col("created_at"), "1 minute"))
            .agg(avg("bid").alias("avg_bid"), avg("ask").alias("avg_ask"))
            .withColumn("next_minute", col("window.end"));
            .withColumn("bid_percentage_change", (col("avg_bid").minus(lag("avg_bid", 1).over(Window.orderBy("window")))).divide(lag("avg_bid", 1).over(Window.orderBy("window"))).multiply(100))
            .withColumn("ask_percentage_change", (col("avg_ask").minus(lag("avg_ask", 1).over(Window.orderBy("window")))).divide(lag("avg_ask", 1).over(Window.orderBy("window"))).multiply(100));

        // Start the streaming query to write the results to the console
        StreamingQuery query = avgPrices
            .writeStream()
            .outputMode("complete")
            .format("console")
            .trigger(Trigger.ProcessingTime("1 minute"))
            .start();

    query.awaitTermination();`

Errors displayed

When I add the columns bid_percentage_change and ask_percentage_change I go the error: org.apache.spark.sql.AnalysisException: Non-time-based windows are not supported on streaming DataFrames/Datasets;. If I remove those two columns, the program runs without a problem.

Additional information

I've consulted additional resources before posting my question but none have thrown a solution. According to one answer from a similar question, Spark Struct Streaming does not handle multiple aggregation operations. Tried implementing an apporach using flatMapGroupWithState with no result. Here is a list of resources I already consulted.

  1. ChatGPT
  2. Similar SOF questions and answers Spark - Non-time-based windows are not supported on streaming DataFrames/Datasets;.
  3. Official Spark Documentation
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Diego Gallo
  • 125
  • 2
  • 8

0 Answers0