I'm using Hazelcast Change data capture (CDC) in my application. (Reason I'm using CDC because if use jdbc or other alternative feature to load data into cache its taking to much of time). So CDC will have a data sync between database and Hazelcast Jet.
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
.setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
.setTableWhitelist("tblName").build();
here I have following steps
Pipeline pipeline = Pipeline.create();
// filter records based on deleted false
StreamStage<ChangeRecord> deletedFlagRecords = pipeline.readFrom(source).withoutTimestamps()
.filter(deletedFalse);
deletedFlagRecords.filter(idBasedFetch).writeTo(Sinks.logger());
Here I'm using StreamSource<ChangeRecord> source
object as input for my pipeLine
. As you know source
object is a Stream type. But in my case pipeLine data process is depends upon the user input data (some metadata). If I do any updates or delete in the db. jet will updates in all the stream instances. Since my data processing is depends upon the user data I don't want to use stream type after the first step. Only this first StreamSource<ChangeRecord> source;
required in the form of stream. In this next step I just want to this for batch process. So how to use source
in the batch processing.
pipeLine.readFrom(source)
//always return Stream type. so how to convert this into batch type. I tried one more way like read from source
and Sink everything to map.
pipeLine.readFrom(source).writeTo(Sinks.map("dbStreamedData", e -> e.key(), e -> e.value()));
Again construct pipeLine readFrom from map.
pipeline.readFrom(Sources.map("dbStreamedData")).writeTo(Sinks.logger());
this is just returning null data. so Any suggestions would be helpful.