21

I want to repeatedly run the same sequence of operations over and over again next to a Phoenix application (without crashing the whole web-app if something brakes in the worker of course) and don't really know wether I should use a GenServer, Elixir's Tasks, an Agent or something completely different I haven't thought about so far.

When I start my Phoenix app a worker should start as well, that periodically pulls some values of a serial-connection, broadcasts them through a Phoenix channel, collects them until @save_interval is reached and then calculates the median, broadcasts that median via a different channel and writes it to an InfluxDB. Right now I have something (kind of working) like this:

def do_your_thing(serial_pid) do
  Stream.interval(@interval_live)
    |> get_new_values_from_serial(serial_pid)
    |> broadcast!("live-channel:#{@name}")
    |> Enum.take(div(@interval_save, @interval_live))
    |> calculate_medians()
    |> broadcast!("update-channel:#{@name}")
    |> write_to_database()

  do_your_thing(serial_pid) # repeat
end

I'm only starting to figure all that OTP stuff out and hope someone of you could help me stumble into the right direction here.

optikfluffel
  • 2,538
  • 4
  • 20
  • 27

1 Answers1

33

You should use a GenServer that sends itself messages after x seconds (60 seconds in the example below):

defmodule MyApp.Worker do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  def init([]) do
    schedule_work()
    {:ok, []}
  end

  def handle_info(:work, state) do
    state = do_work(state)
    schedule_work()
    {:noreply, state}
  end

  defp do_work(state) do
    # Do your work here and return state
  end

  defp schedule_work do
    Process.send_after(self(), :work, 60_000)
  end
end
José Valim
  • 50,409
  • 12
  • 130
  • 115
  • 3
    Why not a Task that does its stuff periodically in an infinite loop (possibly powered by Stream.interval or Stream.repeatedly)? If this is just a periodical pull that forwards the pulled data further to the system, it doesn't really need to be a GenServer, right. Task is still OTP compliant, and to me it seems more straightforward for the job. – sasajuric Jun 01 '15 at 09:59
  • 4
    The reason is that Task cannot receive system messages. We want to make Stream and friends aware of those but it isn't in 1.0 and possibly will only be in 1.3. – José Valim Jun 01 '15 at 13:36
  • 1
    Also you may want your Supervisors to be able to restart the worker doing the periodic work rather than just letting the infinite Stream crash the current process if something goes wrong. – Letseatlunch Jun 01 '15 at 19:39
  • José: Yeah, that's a good point. @Letseatlunch I don't think that's a problem here. A Supervisor can start/stop/restart a Task. Using a :brutal_kill shutdown option probably makes sense for such workers, since they probably won't handle a shutdown message. – sasajuric Jun 02 '15 at 06:33
  • 3
    @JoséValim Is there a better way to do this in Elixir 1.3? – knite Jul 22 '16 at 02:43
  • 1
    Thanks José! I am happy to see an answer to this question from the best possible source. – DCameronMauch Oct 04 '16 at 20:28