4

I've written some code with Akka Streams and Alpakka that reads from Amazon SQS and indexes the events in Elasticsearch. Everything works smoothly and the performance is awesome, but I have a problem with index names. I have this code:

class ElasticSearchIndexFlow(restClient: RestClient) {

  private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)

  def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
    ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
      restClient,
      DomainEventMarshaller.domainEventWrites
    )

  private def index = {
    val now = DateTime.now()
    s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
  }
}

The problem is that after some days running the flow, the index name is not changing. I imagine that Akka Streams creates under the hood a fused actor, and that the function index for getting the index name is only evaluated at the beginning of execution.

Any idea of what can I do to index events in ES with an index name according to the current date?

Jeffrey Chung
  • 19,319
  • 8
  • 34
  • 54
SergiGP
  • 669
  • 7
  • 17

1 Answers1

0

The solution to the problem is setting the index name in previous step with IncomingMessage.withIndexName

So:

def flow: Flow[(DomainEvent, Message), IncomingMessage[DomainEvent, Message], NotUsed] =
  Flow[(DomainEvent, Message)].map {
    case (domainEvent, message) =>
      IncomingMessage(Some(domainEvent.eventId), domainEvent, message)
        .withIndexName(indexName(domainEvent.ocurredOn))
}

And:

def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
  ElasticsearchFlow.create[DomainEvent]("this-index-name-is-not-used", "domain-event", elasticSettings)(
    restClient,
    DomainEventMarshaller.domainEventWrites
  )
SergiGP
  • 669
  • 7
  • 17