2

I've managed to plug in the GCP PubSub dependency into the Flink Statefun JAR and then build the Docker image.

I've added the below to the pom.xml.

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-gcp-pubsub</artifactId>
    <version>1.16.0</version>
    <scope>test</scope>
</dependency>

It's not too clear how I now specify my PubSub ingress and egress in the module.yaml that we use with the StateFun image.

https://nightlies.apache.org/flink/flink-statefun-docs-master/docs/modules/overview/

For example, for Kakfa you use:

kind: io.statefun.kafka.v1/egress
spec:
  id: com.example/my-egress
  address: kafka-broker:9092
  deliverySemantic:
    type: exactly-once
    transactionTimeout: 15min

I can see the official connectors have a Kind const in the Java code that you use to reference the connectors within your module.yaml but I can't see in the docs how to reference the Flink connectors you plug in yourself to the StateFun image.

N P
  • 2,319
  • 7
  • 32
  • 54

1 Answers1

1

GCP PubSub is not officially supported as a standard Statefun IO component, only Kafka and Kinesis for now; however you can come up with your own custom ingress/egress connector relatively easily. Unfortunately you won't be able to provide a way to have a new yaml-based config item, as the modules configurators for Kafka and Kinesis seem to be hard-coded in the runtime. You'll have to do your configuration in your code:

Looking at the source/ingress example:

public class ModuleWithSourceSpec implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        IngressIdentifier<TypedValue> id =
            new IngressIdentifier<>(TypedValue.class, "com.example", "custom-source");
        IngressSpec<TypedValue> spec = new SourceFunctionSpec<>(id, new FlinkSource<>());
        binder.bindIngress(spec);
        binder.bindIngressRouter(id, new CustomRouter());
    }
}

Your goal is going to be to provide the new FlinkSource<>(), which is a org.apache.flink.streaming.api.functions.source.SourceFunction

You could declare it thus:

SourceFunction source = 
    PubSubSource.newBuilder()
      .withDeserializationSchema(new IntegerSerializer())
      .withProjectName(projectName)
      .withSubscriptionName(subscriptionName)
      .withMessageRateLimit(1)
      .build();

You'll also have to come up with a new CustomRouter(), to determine which function instance should handle an event initially. You can take inspiration from here:

public static class GreetingsStateBootstrapDataRouter implements Router<Tuple2<String, Integer>> {
  @Override
  public void route(
      Tuple2<String, Integer> message, Downstream<Tuple2<String, Integer>> downstream) {
    downstream.forward(new Address(GREETER_FUNCTION_TYPE, message.f0), message);
  }
}            

Same thing for sink/egress, no router to provide:

public class ModuleWithSinkSpec implements StatefulFunctionModule {

    @Override
    public void configure(Map<String, String> globalConfiguration, Binder binder) {
        EgressIdentifier<TypedValue> id = new EgressIdentifier<>("com.example", "custom-sink", TypedValue.class);
        EgressSpec<TypedValue> spec = new SinkFunctionSpec<>(id, new FlinkSink<>());
        binder.bindEgress(spec);
    }
}

With new FlinkSink<>() replaced by this sink:

SinkFunction sink =
    PubSubSink.newBuilder()
        .withSerializationSchema(new IntegerSerializer())
        .withProjectName(projectName)
        .withTopicName(outputTopicName)
        .build();

That you would use like so, in the egress case:

public class GreeterFn implements StatefulFunction {

    static final TypeName TYPE = TypeName.typeNameFromString("com.example.fns/greeter");

    static final TypeName CUSTOM_EGRESS = TypeName.typeNameFromString("com.example/custom-sink");

    static final ValueSpec<Integer> SEEN = ValueSpec.named("seen").withIntType();

    @Override 
    CompletableFuture<Void> apply(Context context, Message message) {
        if (!message.is(User.TYPE)) {
            throw new IllegalStateException("Unknown type");
        }

        User user = message.as(User.TYPE);
        String name = user.getName();

        var storage = context.storage();
        var seen = storage.get(SEEN).orElse(0);
        storage.set(SEEN, seen + 1);

        context.send(
            EgressMessageBuilder.forEgress(CUSTOM_EGRESS)
                .withUtf8Value("Hello " + name + " for the " + seen + "th time!")
                .build());

        return context.done();
    }
}

You'll also have to make your Module known to the runtime using a file mentioning your Module in the META-INF/services directory of your jar, like so:

com.example.your.path.ModuleWithSourceSpec
com.example.your.path.ModuleWithSinkSpec 

Alternatively if you prefer annotations you can use Google Autoservice like so


I hope it helps!

BenoitParis
  • 3,166
  • 4
  • 29
  • 56