2

I'm using Hazelcast Change data capture (CDC) in my application. (Reason I'm using CDC because if use jdbc or other alternative feature to load data into cache its taking to much of time). So CDC will have a data sync between database and Hazelcast Jet.

StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
                .setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
                .setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
                .setTableWhitelist("tblName").build();

here I have following steps

Pipeline pipeline = Pipeline.create();

// filter records based on deleted false
StreamStage<ChangeRecord> deletedFlagRecords = pipeline.readFrom(source).withoutTimestamps()
                .filter(deletedFalse);

deletedFlagRecords.filter(idBasedFetch).writeTo(Sinks.logger());

Here I'm using StreamSource<ChangeRecord> source object as input for my pipeLine. As you know source object is a Stream type. But in my case pipeLine data process is depends upon the user input data (some metadata). If I do any updates or delete in the db. jet will updates in all the stream instances. Since my data processing is depends upon the user data I don't want to use stream type after the first step. Only this first StreamSource<ChangeRecord> source; required in the form of stream. In this next step I just want to this for batch process. So how to use source in the batch processing.

pipeLine.readFrom(source) //always return Stream type. so how to convert this into batch type. I tried one more way like read from source and Sink everything to map.

pipeLine.readFrom(source).writeTo(Sinks.map("dbStreamedData", e -> e.key(), e -> e.value()));

Again construct pipeLine readFrom from map.

pipeline.readFrom(Sources.map("dbStreamedData")).writeTo(Sinks.logger());

this is just returning null data. so Any suggestions would be helpful.

Vy Do
  • 46,709
  • 59
  • 215
  • 313
vivek
  • 115
  • 2
  • 9

2 Answers2

0

Pipeline.readFrom returns either StreamStage or BatchStage, depending on the source. Sources.map() is a batch source, it will read the map once and complete. PostgresCdcSources.postgres() is a streaming source, it will connect to the DB and will keep returning events as they happen until cancelled.

You need to pick a source depending on your use case, if this is your question.

Oliv
  • 10,221
  • 3
  • 55
  • 76
0

Using a CDC source only makes sense if you need your data to be continuously updated. E.g. react to each update in the database, or possibly load data into a Map and then run a batch job repeatedly at some time interval on an in-memory snapshot.

In this case, it's likely you want the first to happen only after the CDC source is up-to-date - after it read all current state from the database and is only receiving updates as they are made to the database. Unfortunately, at the moment (Hazelcast 5.0) there is no way to tell when this happens using Jet API.

It might be possible that you can use some domain-specific information - having a timestamp field that you query for, last inserted record is present in the map or similar.

If you want to run a single batch job on data from a database table you should use a jdbc source.

(Reason I'm using CDC because if use jdbc or other alternative feature to load data into cache its taking to much of time)

Using CDC has its overhead and this is not something we usually see. Using plain SQL query like SELECT * FROM table with the jdbc source is faster than CDC source. Maybe you don't measure time it takes to process whole current state? It it really takes more time to load data using jdbc than CDC please file an issue with a reproducer.

František Hartman
  • 14,436
  • 2
  • 40
  • 60