I could not find a Hazlecast Jet
source connector for Apache Pulsar
. Have anybody tried this? Appreciate any directions, points, sources, considerations if I have to write a custom stream
connector for Pulsar
as source for Jet?
Asked
Active
Viewed 208 times
3

vvra
- 2,832
- 5
- 38
- 82
-
I establish a Pulsar connection & consumer in SourceBuilder.timestampedStream, add messages (consumer.receive(10, TimeUnit.MILLISECONDS)) read from PulsarConsumer to buffer on fillBufferFn, finally, closing the consumer & connection to Pulsar on destroyFn. It worked. However, how to handle the createSnapshotFn & restoreSnapshotFn? As I havent worked much on Kafka, I am not quite able to link similar approach in Pulsar. Any references? – vvra Jan 06 '20 at 16:14
-
`createSnapshotFn` should return the current message cursor so it will be stored and `restoreSnapshotFn ` should restore the cursor and set it to the client to consume messages from where it left off. – eminn Jan 08 '20 at 08:45
2 Answers
2
The initial version of the Jet connector for Apache Pulsar is recently implemented here. It hasn't been extensively tested yet. For details, you can look at the design document in which the connector's sufficiencies and deficiencies stated, and tutorial. If there is anything confusing about these, you can ask again.

Ufuk Yılmaz
- 66
- 4
-
Thanks Ufuk, I will do a use case implementation with Pulsar in a Jet job. – vvra Apr 20 '20 at 10:46
1
Hazelcast Jet doesn't have any connector for Apache Pulsar as of now (version 4.0). If you'd like to contribute one you can have a look at the Source Builder class and its section on the reference manual as a starting point.
Also, please check out existing implementations of various connectors at the Hazelcast Jet extension modules repository which uses source builder API and contribute yours to there.
-
I wrote a pulsar-source keeping Kafka-connect as reference. The problem I face is, if I use Pulsar `reader` to read pulsar messages, in absense of this reader any message written to Pulsar are not read, but any new incoming message are read. Also, if the job is restarted this approach do not reread all messages from the start. In another approach, if I use pulsar `consumer`, everytime the Jet job is restarted it always starts from the first due to messages not acknowledged, so no messages are left unprocessed, but possible chance of duplicate processing. Any comments? – vvra Jan 08 '20 at 15:05
-
1You should use Pulsar Reader interface with user provided start point in the beginning. If Jet snapshots are enabled then it should store the current message id and when there is a failure it need to restore message id from the snapshot. According to [Pulsar Reader](https://pulsar.apache.org/docs/en/concepts-clients/#non-partitioned-topics-only) documentation, you can use it only for non-partitioned topics. – eminn Jan 09 '20 at 19:29
-
So, in that case the snapshot have to be persisted? Consider situations where the streaming app can be cold rebooted. – vvra Jan 11 '20 at 11:27
-
Hazelcast Jet stores the snapshot data in a [Hazelcast IMap](https://docs.hazelcast.org/docs/3.12.5/manual/html-single/index.html#map) which keeps the data in-memory. With the [Lossless Cluster Restart](https://hazelcast.com/product-features/lossless-cluster-restart/) feature the data can be persisted to a disk. – eminn Jan 13 '20 at 09:33