как заставить генсервер работать со значением частоты Elixir

Я видел много реализаций GenServer, я пытаюсь создать одну с такими спецификациями, но я не уверен, что это вариант использования GenServer.

У меня есть такое состояние, как

%{url: "abc.com/jpeg", name: "Camera1", id: :camera_one, frequency: 10}

У меня таких состояний 100, с разными значениями, мой вариант использования содержит по 5 шагов.

  1. Начинайте каждый штат как поколение{?}.
  2. Отправьте HTTP-запрос на этот URL-адрес.
  3. Получите результаты.
  4. Отправьте еще один HTTP-запрос с данными, полученными из первого запроса.
  5. Усыпите процесс. если частота 10 то на 10 секунд и так далее и через 10 секунд опять начнется с 1 шага.

Теперь, когда я запущу 100 таких воркеров, будет 100 * 2 HTTP-запросов с частотой. Я не уверен, буду ли я использовать GenServer, GenStage, Flow или даже Broadway?

Я также обеспокоен тем, что HTTP-запросы не рухнут, например, один рабочий с состоянием отправит запрос, и если частота составляет 1 секунду до того, как вернется первый запрос, другой запрос будет отправлен, если GenServer способен достаточно для обработки этих случаев? которые, я думаю, называются обратным давлением?

Я так долго спрашивал и рассматривал этот вариант использования, что меня также направили на RabbitMQ для моего варианта использования.

Любое руководство было бы очень полезным, или любой минимальный пример был бы очень благодарен.

? GenServer/ GenStage / GenStateMachine


person Junaid Farooq    schedule 04.12.2020    source источник
comment
Это точно не Broadway и не Flow. Я также не вижу никакого применения GenStage здесь. Некоторое время назад я столкнулся с похожей задачей, и я опубликовал библиотеку Tarearbol, делающую именно это (помимо других вещи), один реализует поведение, а DynamicSupervisor позади делает все остальное.   -  person Aleksei Matiushkin    schedule 06.12.2020
comment
Можно ли запустить gen-сервер, а затем заставить его двигаться по кругу, выполняя запрос, сохраняя его и снова отправляя, и усыпляя себя, и запуская снова через некоторое время. И также обработайте, если запрос не увенчался успехом, другие тоже будут соблюдаться.   -  person Junaid Farooq    schedule 06.12.2020
comment
@JunaidFarooq Что-то подобное можно сделать, как описано в этой статье.   -  person zwippie    schedule 06.12.2020


Ответы (1)


Ваша проблема сводится к уменьшению количества одновременных сетевых запросов в данный момент времени.

Простым подходом было бы иметь GenServer, который отслеживает количество исходящих запросов. Затем для каждого клиента (до 200 в вашем случае) он может проверить наличие открытого запроса, а затем действовать соответствующим образом. Вот как может выглядеть сервер:

defmodule Throttler do
  use GenServer

  #server
  @impl true
  def init(max_concurrent: max_concurrent) do
    {:ok, %{count: 0, max_concurrent: max_concurrent}}
  end

  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count < max_concurrent, do: {:reply, :ok, %{state | count: count + 1}}
  @impl true
  def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count >= max_concurrent, do: {:reply, {:error, :too_many_requests}, state}


  @impl true
  def handle_call(:finished, _from, %{count: count} = state) when count > 0, do: {:reply, :ok, %{state | count: count - 1}}
end

Итак, теперь у нас есть сервер, на котором мы можем вызвать handle_call(pid, :run), и он сообщит нам, превысили ли мы количество. Как только задача (получение URL-адреса) завершена, нам нужно вызвать handle_call(pid, :finished), чтобы сообщить серверу, что мы выполнили задачу.

На стороне клиента мы можем обернуть это в удобную вспомогательную функцию. (Обратите внимание, что это все еще находится в модуле Throttler, поэтому __MODULE__ работает)

defmodule Throttler do
  #client
  def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
    GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
  end
  
  def execute_async(pid, func) do
    GenServer.call(pid, :run)
    |> case do
      :ok ->
        task = Task.async(fn -> 
          try do
            func.()
          after
            GenServer.call(pid, :finished)
          end
        end)
        {:ok, task}
      {:error, reason} -> {:error, reason, func}
    end
  end
end

Здесь мы передаем функцию, которую хотим асинхронно выполнить на стороне клиента, и выполняем работу по вызову :run и :finished на стороне сервера перед выполнением. В случае успеха мы возвращаем задачу, в противном случае получаем отказ.

Соединив все вместе, вы получите код, который выглядит так:

{:ok, pid} = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num -> 
  Throttler.execute(pid, fn ->
    IO.puts("Running command #{num}")
    :timer.sleep(:5000)
    IO.puts("Sleep complete for #{num}")
    num * 10
  end)
end)
valid_tasks = Enum.filter(results, &(match?({:ok, _func}, &1))) |> Enum.map(&elem(&1, 1))

Теперь у вас есть куча задач, которые либо были выполнены, либо провалены, и вы можете действовать соответствующим образом.

Что вы делаете в случае неудачи? Это интересная часть обратного давления :) Самое простое — это установить тайм-аут и повторить попытку, предполагая, что вы в конечном итоге сбросите давление ниже по течению. В противном случае вы можете полностью отказаться от запросов и продолжать продвигать проблему вверх по течению.

person AnilRedshift    schedule 07.12.2020