1

Recently, I was given a task to build a REST API request that is responsible for sending messages to Kafka's inbound channel and then waiting for output from an outbound one. All went well until I encountered the issue related to waiting for this particular message.

It's worth pointing out that, after successful arrival, messages are written to the global message holders, which is just a ruby hash under the hood. Below is the function that monitors hash, until the latter is filled with some value.

def monitor_payment_hash(key)
 while @uuid.payment_create.get_message(key).nil?
   next
 end
 @uuid.payment_create.get_message(key)
end

Is it even appropriate to implement it that way? What should I attempt at this point?
NOTICE: Kafka consumer runs in a separate thread.

Update

I've just headed over to ruby docs and stumbled upon some interesting sections on channels. As far as I'm aware, channels are the best choice for communication between rubytines (just a fancy name for goroutines, but in ruby ecosystem :) )
fatihyildizhan
  • 8,614
  • 7
  • 64
  • 88
  • At the very least I would add a `sleep` in the loop, and may be a timeout also – Fravadona Apr 24 '21 at 09:26
  • @Fravadona, thank you for reply! The thing is that I already tried it, but it turned out to be quite messy. That's why I've been eagerly seeking some better alternative. – ThreadedStream Apr 24 '21 at 09:38
  • This seems like a design problem that one process both sends a request and then waits for a response via Kafka: it kind of defeats the whole pub-sub ideology. What's stopping you from writing a new listener that performs the payment_create action? Does your original process need to return something to the user that indicates the payment has been created after receiving the kafka message? Can you use channels? Can you use persisted data that is constantly queried? This is quite a loaded question under the hood, I think. – benjessop Apr 24 '21 at 09:47
  • @benjessop, request is supposed to immediately return a confirmation url the user should be redirected to afterwards in order to complete a payment operation. Could you please elaborate on creating a listener performing payment_create action? – ThreadedStream Apr 24 '21 at 10:01
  • This code will jam up and burn 100% CPU, and unless you have another thread, that value will never show up. Ruby doesn't have coroutines (Go calls them "goroutines"), but it does have Fibers and scheduling around that like [Ruby Async](https://github.com/socketry/async). – tadman Apr 24 '21 at 13:42
  • For inter-thread communication you can often use [Queue](https://ruby-doc.org/core-3.0.1/Queue.html). – tadman Apr 24 '21 at 13:43

1 Answers1

1

I think you need timeout and a way to force stop polling process, moreover, you maybe need an abstract to improve in future.

class Poller
  def self.poll(key:, from_source:, options: {})
    start_time = Time.now
    catch(:stop_polling) do
      loop do
        message = from_source.get_message(key)
        if message.nil?
          wait_time = Time.now - start_time
          throw :stop_polling if wait_time > options[:timeout]
        else
          yield(message) if block_given?
          throw :stop_polling
        end
      end
    end
  end
end

def monitor_payment_hash(key)
  Poller.poll key: key, from_source: @uuid.payment_create, options: {timeout: 60} do |message|
    # write to the global message holders
    # or handle message by block
    yield(message) if block_given?
  end
end

You maybe need to add more logic such as retry if timeout, polling a list of keys, log... I recommend you learn how to build a long polling from this source : https://github.com/aws/aws-sdk-ruby/blob/version-3/gems/aws-sdk-sqs/lib/aws-sdk-sqs/queue_poller.rb

Lam Phan
  • 3,405
  • 2
  • 9
  • 20