I've written some code with Akka Streams and Alpakka that reads from Amazon SQS and indexes the events in Elasticsearch. Everything works smoothly and the performance is awesome, but I have a problem with index names. I have this code:
class ElasticSearchIndexFlow(restClient: RestClient) {
private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)
def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
restClient,
DomainEventMarshaller.domainEventWrites
)
private def index = {
val now = DateTime.now()
s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
}
}
The problem is that after some days running the flow, the index name is not changing. I imagine that Akka Streams creates under the hood a fused actor, and that the function index
for getting the index name is only evaluated at the beginning of execution.
Any idea of what can I do to index events in ES with an index name according to the current date?