Let's say I have two kafka topics, request_topic
for my Post requests, and response_topic
for my responses.
This are the models:
case class Request(requestId: String, body: String)
case class Response(responseId: String, body: String, requestId: String)
This is my socket handler
def socket = WebSocket.accept[String, String] { req =>
val requestId = ??? // Generate a unique requestId
val in: Sink[String, Future[Done]] = Sink.foreach[String]{ msg =>
val record = new ProducerRecord[String, Request]("request_topic", "key", Request(requestId, msg))
val producer: KafkaProducer[String, Request] = ???
Future { producer.send(record).get }
}
// Once produced, some stream processing apps will manage to process request and publish the reponse to response_topic
// The Request and Response object are linked by the requestId field.
val consumerSettings = ???
val out: Source[ConsumerRecord[String, Response], _] = Consumer
.plainSource(consumerSettings, Subscriptions.topics("response_topic"))
.filter(cr => cr.value.requestId == requestId)
.map(cr => someResponseString(cr.value))
Flow.formSinkAndSource(in, out)
}
def someResponseString(res: Response): String = ???
Basically, for each incoming message, I publish a Request object to Kafka, then the request is processed by some stream processing app (not shown here) and hopefully a response is published back to Kafka.
I have some concerns here:
1 - Will Alpakka Kafka Connector create a new instance of the connector for each incoming message or will it use the same instance as long as Play is running ?
2 - Is it a good idea to filter response based on individual requestId, or should I send the whole stream back to each Client, and let them filter the response based on the requestId they are interested in.
3 - Am I wrong in everything ? (I am a real newbie in Websocket)
Thanks in advance.