I have two separate jet pipeline. and I have to execute one by one.
public static void main(String[] args) {
JetInstance instance = Jet.newJetInstance();
// this job is for stream processing
JobConfig cfg1 = new JobConfig().setName("CDC-monitor-stream");
instance.newJob(pipeLine1(), cfg1);
// this job is for batch processing
JobConfig cfg2 = new JobConfig().setName("Batch-Processing-monitor");
instance.newJobIfAbsent(pipeLine2(), cfg2).join();
}
public static Pipeline pipeLine1() {
StreamSource<ChangeRecord> source = PostgresCdcSources.postgres("source")
.setCustomProperty("plugin.name", "pgoutput").setDatabaseAddress("127.0.0.1").setDatabasePort(5432)
.setDatabaseUser("postgres").setDatabasePassword("root").setDatabaseName("postgres")
.setTableWhitelist("public.customers").build();
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(source).withoutTimestamps().writeTo(CdcSinks.map("customers", r -> r.key().toMap().get("id"),
r -> r.value().toObject(Customer.class).toString()));
return pipeline;
}
public static Pipeline pipeLine2() {
Pipeline pipeline = Pipeline.create();
pipeline.readFrom(Sources.map("customers")).filter(e -> e.getKey().equals("0001")).writeTo(Sinks.logger());
return pipeline;
}
First job is used to have a data sink between db and cache..
second job is used perform the batch processing and using Sources.map("customers")
as source.(here I'm using batch processing the data extraction is depends on the user input
like String id="0001", String userName="harry"
Or Is this possible using stream?)..
Or what is other best approach?