I have a PCollection<Row>
upon which I want to apply SQLTransform which runs an aggregation query as:
SELECT *, ANY_VALUE(col1) OVER (PARTITION BY col2 ORDER BY timestamp) as some_name FROM PCOLLECTION
Here is the code below:
Schema appSchema =
Schema
.builder()
.addStringField("col1")
.addStringField("col2")
.addDateTimeField("col3")
.build();
PCollection<Row> transform1 = inputClass
.apply(
ParDo.of(new DoFn<SomeClass, Row>() {
@ProcessElement
public void processElement(ProcessContext c) {
SomeClass pojo = c.element();
// Create a Row with the appSchema schema
// and values from the current POJO
Row appRow =
Row
.withSchema(appSchema)
.addValues(
pojo.getc1(),
pojo.getc2(),
pojo.getc3())
.build();
// Output the Row representing the current POJO
c.output(appRow);
}
})).setRowSchema(appSchema);
PCollection<Row> transform2 = transform1.apply(
SqlTransform.query(<sql>));
I get the error as :
Exception in thread "main" com.google.zetasql.SqlException: Analytic functions not supported [at 1:11]
SELECT *, ANY_VALUE(col1) OVER (PARTITION BY col2 ORDER BY timesta...
^
at com.google.zetasql.Analyzer.analyzeNextStatement(Analyzer.java:216)
at org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.analyzeNextStatement(SqlAnalyzer.java:113)
at org.apache.beam.sdk.extensions.sql.zetasql.SqlAnalyzer.analyzeQuery(SqlAnalyzer.java:82)
at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLPlannerImpl.rel(ZetaSQLPlannerImpl.java:92)
at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRelInternal(ZetaSQLQueryPlanner.java:188)
at org.apache.beam.sdk.extensions.sql.zetasql.ZetaSQLQueryPlanner.convertToBeamRel(ZetaSQLQueryPlanner.java:176)
at org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:112)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:172)
at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:110)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:360)
at com.company.events.someclass.WindowFuncTest.main(WindowFuncTest.java:270)
Caused by: com.google.zetasql.io.grpc.StatusRuntimeException: INVALID_ARGUMENT: Analytic functions not supported [at 1:11]
SELECT *, ANY_VALUE(col1) OVER (PARTITION BY col2 ORDER BY timesta...
^
at com.google.zetasql.io.grpc.stub.ClientCalls.toStatusRuntimeException(ClientCalls.java:262)
at com.google.zetasql.io.grpc.stub.ClientCalls.getUnchecked(ClientCalls.java:243)
at com.google.zetasql.io.grpc.stub.ClientCalls.blockingUnaryCall(ClientCalls.java:156)
at com.google.zetasql.ZetaSqlLocalServiceGrpc$ZetaSqlLocalServiceBlockingStub.analyze(ZetaSqlLocalServiceGrpc.java:1506)
at com.google.zetasql.Analyzer.analyzeNextStatement(Analyzer.java:214)
... 12 more
If this needs to create a custom Class extending PipelineOptions, then I am not sure how to implement that here.
I saw here, that the property to enable analytical function must be enabled explicitly. But while creating PiplineOptions for BeamPipeline, there is no such option.