0

I have a PCollection<Row> upon which I want to apply SQLTransform which runs an aggregation query as:

SELECT *, ANY_VALUE(col1) OVER (PARTITION BY col2 ORDER BY timestamp) as some_name FROM PCOLLECTION

Here is the code below:

Schema appSchema =
        Schema
        .builder()
        .addStringField("col1")
        .addStringField("col2")
        .addDateTimeField("col3")
        .build();

 PCollection<Row> transform1 = inputClass
        .apply(
            ParDo.of(new DoFn<SomeClass, Row>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                SomeClass pojo = c.element();

                // Create a Row with the appSchema schema
                // and values from the current POJO

                Row appRow =
                    Row
                        .withSchema(appSchema)
                        .addValues(
                        pojo.getc1(),
                        pojo.getc2(),
                        pojo.getc3())
                        .build();

                // Output the Row representing the current POJO
                c.output(appRow);
            }
            })).setRowSchema(appSchema);


PCollection<Row> transform2 = transform1.apply(
            SqlTransform.query(<sql>));

I get the error as :

Exception in thread "main" com.google.zetasql.SqlException: Analytic functions not supported [at 1:11]
SELECT *, ANY_VALUE(col1) OVER (PARTITION BY col2 ORDER BY timesta...
          ^
    at com.google.zetasql.Analyzer.analyzeNextStatement(Analyzer.java:216)
    at org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.analyzeNextStatement(SqlAnalyzer.java:113)
    at org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.analyzeQuery(SqlAnalyzer.java:82)
    at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:92)
    at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:188)
    at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:176)
    at org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:112)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:172)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:110)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:360)
    at com.company.events.someclass.WindowFuncTest.main(WindowFuncTest.java:270)
Caused by: com.google.zetasql.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Analytic functions not supported [at 1:11]
SELECT *, ANY_VALUE(col1) OVER (PARTITION BY col2 ORDER BY timesta...
          ^
    at com.google.zetasql.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
    at com.google.zetasql.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
    at com.google.zetasql.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
    at com.google.zetasql.ZetaSqlLocalServiceGrpc$ZetaSqlLocalServiceBlockingStub.analyze(ZetaSqlLocalServiceGrpc.java:1506)
    at com.google.zetasql.Analyzer.analyzeNextStatement(Analyzer.java:214)
    ... 12 more

If this needs to create a custom Class extending PipelineOptions, then I am not sure how to implement that here.

I saw here, that the property to enable analytical function must be enabled explicitly. But while creating PiplineOptions for BeamPipeline, there is no such option.

HUsr
  • 1
  • 1

0 Answers0