I am using Apache beam to join multiple streams along with some lookups. I have 2 scenarios, If, the lookup size is huge, I wanted the side input to reload/refresh for every record processing (i.e. I will query the database with where clause) and if the lookup size is less, then reload/refresh once a day.
I want to know what is the correct approach for this. I don't want the huge data side input to eat up all the workers' memory.
I have used the below code for refreshing the side input once a day.
PCollectionView<Map<String, String>> lkp =
p.apply(GenerateSequence.from(0)).withRate(1, Duration.standardDays(1))
.apply(
Window.<Long>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
.discardingFiredPanes())
.apply(
ParDo.of(
new DoFn<Long, Map<String, String>>() {
private static final long serialVersionUID = 1L;
@ProcessElement
public void process(
@Element Long input, OutputReceiver<Map<String, String>> o) {
Map<String, String> map = HiveConnection.getHiveConnection("select * from table");
o.output(map);
}
}))
.apply(View.<Map<String, String>>asSingleton());
Kindly guide me through the best practices for these type of use cases & provide me with some example code for better understanding.
Thanks, Gowtham