Ваша проблема сводится к уменьшению количества одновременных сетевых запросов в данный момент времени.
Простым подходом было бы иметь GenServer, который отслеживает количество исходящих запросов. Затем для каждого клиента (до 200 в вашем случае) он может проверить наличие открытого запроса, а затем действовать соответствующим образом. Вот как может выглядеть сервер:
defmodule Throttler do
use GenServer
#server
@impl true
def init(max_concurrent: max_concurrent) do
{:ok, %{count: 0, max_concurrent: max_concurrent}}
end
@impl true
def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count < max_concurrent, do: {:reply, :ok, %{state | count: count + 1}}
@impl true
def handle_call(:run, _from, %{count: count, max_concurrent: max_concurrent} = state) when count >= max_concurrent, do: {:reply, {:error, :too_many_requests}, state}
@impl true
def handle_call(:finished, _from, %{count: count} = state) when count > 0, do: {:reply, :ok, %{state | count: count - 1}}
end
Итак, теперь у нас есть сервер, на котором мы можем вызвать handle_call(pid, :run)
, и он сообщит нам, превысили ли мы количество. Как только задача (получение URL-адреса) завершена, нам нужно вызвать handle_call(pid, :finished)
, чтобы сообщить серверу, что мы выполнили задачу.
На стороне клиента мы можем обернуть это в удобную вспомогательную функцию. (Обратите внимание, что это все еще находится в модуле Throttler, поэтому __MODULE__
работает)
defmodule Throttler do
#client
def start_link(max_concurrent: max_concurrent) when max_concurrent > 0 do
GenServer.start_link(__MODULE__, [max_concurrent: max_concurrent])
end
def execute_async(pid, func) do
GenServer.call(pid, :run)
|> case do
:ok ->
task = Task.async(fn ->
try do
func.()
after
GenServer.call(pid, :finished)
end
end)
{:ok, task}
{:error, reason} -> {:error, reason, func}
end
end
end
Здесь мы передаем функцию, которую хотим асинхронно выполнить на стороне клиента, и выполняем работу по вызову :run и :finished на стороне сервера перед выполнением. В случае успеха мы возвращаем задачу, в противном случае получаем отказ.
Соединив все вместе, вы получите код, который выглядит так:
{:ok, pid} = Throttler.start_link(max_concurrent: 3)
results = Enum.map(1..5, fn num ->
Throttler.execute(pid, fn ->
IO.puts("Running command #{num}")
:timer.sleep(:5000)
IO.puts("Sleep complete for #{num}")
num * 10
end)
end)
valid_tasks = Enum.filter(results, &(match?({:ok, _func}, &1))) |> Enum.map(&elem(&1, 1))
Теперь у вас есть куча задач, которые либо были выполнены, либо провалены, и вы можете действовать соответствующим образом.
Что вы делаете в случае неудачи? Это интересная часть обратного давления :) Самое простое — это установить тайм-аут и повторить попытку, предполагая, что вы в конечном итоге сбросите давление ниже по течению. В противном случае вы можете полностью отказаться от запросов и продолжать продвигать проблему вверх по течению.
person
AnilRedshift
schedule
07.12.2020
Broadway
и неFlow
. Я также не вижу никакого примененияGenStage
здесь. Некоторое время назад я столкнулся с похожей задачей, и я опубликовал библиотекуTarearbol
, делающую именно это (помимо других вещи), один реализует поведение, аDynamicSupervisor
позади делает все остальное. - person Aleksei Matiushkin   schedule 06.12.2020