Мы закончили наше приключение с биткойнами, построив простой биткойн-узел, который подключается к другому одноранговому узлу в сети. Хотя наш узел на базе Elixir мог подключиться к одноранговому узлу, это соединение было в лучшем случае хрупким. Любые проблемы с начальным подключением или обменом версиями могут оставить наше приложение мертвым.

К счастью, есть способы повысить устойчивость нашего узла Эликсира. Сегодня мы проведем рефакторинг нашего узла Биткойн, чтобы использовать поведение соединения Джеймса Фиша, а не базовое поведение GenServer, которое поставляется с Elixir. Реализация этого поведения в нашем узле повысит надежность нашего процесса подключения, а также даст возможность повторно подключиться к одноранговому узлу в случае сбоя.

Поехали!

Наша отправная точка

Прежде чем мы погрузимся в рефакторинг нашего узла Биткойн для использования нового поведения подключения, мы должны рассмотреть некоторые изменения, которые я внес для упрощения модуля BitcoinNetwork.Node.

Раньше каждое сообщение, извлекаемое из входящих TCP-пакетов, собиралось в структуру BitcoinNetowkr.Protocol.Message и возвращалось в текущий процесс узла как сообщение процесса. Оглядываясь назад, можно сказать, что это решение чрезмерно сложное и обременено шаблонным кодом и накладными расходами на передачу сообщений. Вместо этого я решил последовать собственному совету и« просто использовать функцию » для обработки входящих сообщений.

def handle_info({:tcp, _port, data}, state) do
  {messages, rest} = chunk(state.rest <> data)

  case handle_messages(messages, state) do
    {:error, reason, _state} -> {:stop, reason}
    {:ok, state} -> {:noreply, %{state | rest: rest}}
  end
end

Теперь собранные Message структуры передаются вспомогательной функции handle_messages/2, которая возвращает либо :error кортеж, либо :ok кортеж с обновленным состоянием текущего узла после обработки каждого из полученных сообщений.

handle_messages/2 отфильтровывает недопустимые сообщения и запускает каждое из оставшихся сообщений с помощью вспомогательной функции handle_payload/2. Мы передаем этой функции новое поле parsed_payload, которое содержит проанализированное структурное представление входящего сообщения Биткойн:

defp handle_messages(messages, state) do
  messages
  |> Enum.filter(&Message.verify_checksum/1)
  |> Enum.reduce_while({:ok, state}, fn message, state ->
    case handle_payload(message.parsed_payload, state) do
      {:error, reason, state} -> {:halt, {:error, reason, state}}
      {:ok, state} -> {:cont, {:ok, state}}
    end
  end)
end

Обратите внимание, что мы используем Enum.reduce_while/3, чтобы дать нашим handle_payload/2 вызовам возможность изменять состояние узла перед обработкой следующего сообщения.

Если мы сталкиваемся с проблемой обработки проанализированной полезной нагрузки, мы немедленно выходим из сокращения, возвращая :halt кортеж.

Основное преимущество этого рефакторинга заключается в простоте наших handle_payload/2 методов. Вот как выглядит наш обработчик ping после рефакторинга:

defp handle_payload(%Ping{}, state) do
  with :ok <- Message.serialize("pong") |> send_message(state.socket) do
    {:ok, state}
  else
    {:error, reason} -> {:error, reason, state}
  end
end

Мы используем сопоставление с образцом для прослушивания BitcoinNetwork.Protocol.Ping сообщений. Когда мы получаем Ping, мы сериализуем и отправляем «понг» обратно нашему одноранговому узлу. Если что-то пойдет не так с отправкой ответа, мы вернем :error кортеж.

Красивый.

Подключение без подключения

Поведение Connection - это специализация поведения GenServer, предназначенная для использования для представления подключений к внешним ресурсам. Он полностью отражает API стандартного GenServer и добавляет два дополнительных обратных вызова, которые мы можем реализовать: connect/2 и disconnect/2. Как вы, наверное, догадались, эти два обратных вызова используются для подключения и отключения от нашего внешнего ресурса.

Прежде чем мы начнем использовать поведение Connection в нашем приложении, нам нужно добавить его как зависимость в наш mix.exs файл:

defp deps do
  [
    {:connection, "~> 1.0"}
  ]
end

Затем мы начнем преобразование GenServer в Connection, заменив use поведения GenServer новым поведением Connection и полностью заменив GenServer на Connection во всем модуле BitcoinNetwork.Node:

defmodule BitcoinNetwork.Node do
  use Connection

  def start_link({ip, port}) do
    Connection.start_link(__MODULE__, %{ip: ip, port: port, rest: ""})
  end

  ...

Поскольку поведение Connection является надмножеством поведения GenServer, наш узел должен работать так же, как и раньше, с учетом этих изменений. Давай попробуем.

** (Mix) Could not start application bitcoin_network: exited in: BitcoinNetwork.Application.start(:normal, [])
    ** (EXIT) an exception was raised:
        ** (ArgumentError) The module BitcoinNetwork.Node was given as
        a child to a supervisor but it does not implement child_spec/1.

Uh oh.

Поведение Connection не реализует child_spec/1 обратный вызов, как это было в нашем старом поведении GenServer, и нашему приложению больше не нравится сокращенная запись дочерней спецификации, которую мы используем в нашем BitcoinNetwork.Application супервизоре:

{BitcoinNetwork.Node,
 {Application.get_env(:bitcoin_network, :ip),
  Application.get_env(:bitcoin_network, :port)}}

Мы исправим это, превратив нашу дочернюю спецификацию в полную карту спецификаций в нашем BitcoinNetwork.Application модуле:

%{
  id: BitcoinNetwork.Node,
  start:
    {BitcoinNetwork.Node, :start_link,
     [
       {
         Application.get_env(:bitcoin_network, :ip),
         Application.get_env(:bitcoin_network, :port)
       }
     ]},
  restart: :transient
}

С этими изменениями наш биткойн-узел работает так же, как и раньше.

Подключение с помощью Connect

Пока что наш рефакторинг не очень интересный. Хотя наш биткойн-узел все еще работает, мы не добавили никаких новых функций. Давайте изменим это, конкретизируя обратный вызов connect/2, обеспечиваемый поведением Connection.

Начнем с наброска обратного вызова connect/2 в нашем модуле:

def connect(_info, state) do
end

В рамках нашего обратного вызова connect/2 мы должны обрабатывать все поведение, связанное с подключением к нашему внешнему ресурсу. Возможно, вы помните, что это ранее обрабатывалось в нашем обратном вызове init/1. Давайте начнем переносить этот код в нашу connect/2 функцию.

Первый шаг в подключении к нашему одноранговому узлу - установить TCP-соединение:

:gen_tcp.connect(IP.to_tuple(state.ip), state.port, options)

Следующим шагом является отправка нашего исходного сообщения «версия» и установление связи с партнером:

send_message(message, socket)

Если обе эти вещи пойдут хорошо, мы можем сказать, что мы успешно подключились к нашему одноранговому узлу Биткойн. В этом случае поведение Connection диктует, что мы должны вернуть кортеж :ok с новым состоянием процесса.

with {:ok, socket} <- :gen_tcp.connect(IP.to_tuple(state.ip), state.port, options),
     :ok <- send_message(message, socket) do
  {:ok, Map.put_new(state, :socket, socket)}
end

Однако, если что-то пойдет не так, у нас есть пара вариантов. Мы можем либо вернуть кортеж :stop, чтобы убить текущий процесс. Это похоже на предыдущую функциональность нашего узла. В качестве альтернативы мы можем вернуть кортеж :backoff, который инструктирует поведение подключения, чтобы повторить попытку нашего поведения подключения после указанного timeout.

Давайте попробуем повторно подключиться к нашему одноранговому узлу, если что-то пойдет не так. Для этого все, что нам нужно сделать, это добавить блок else к нашему with, который возвращает наш кортеж :backoff:

else
  _ -> {:backoff, 1000, state}

Теперь, после неудачной попытки подключения, наш биткойн-узел будет повторять попытку подключения через тысячу миллисекунд.

Ограничение повторных попыток

Наша новая логика повторных попыток подключения работает прекрасно. На самом деле, это почти работает слишком хорошо. Если мы попытаемся подключиться к несуществующему одноранговому узлу Биткойн, мы увидим, что наш узел будет пытаться повторно подключиться до конца времени. Давайте ограничим количество попыток повторения, которое может сделать наш узел, прежде чем он откажется.

Мы сделаем это, добавив поле retries к нашему начальному состоянию с начальным значением 0:

def start_link({ip, port}) do
  Connection.start_link(__MODULE__, %{
    ...
    retries: 0
  })
end

Мы также добавим атрибут модуля @max_retries, чтобы указать, сколько повторных попыток мы хотим, чтобы наш узел выполнял:

@max_retries 3

Затем мы изменим кортеж :backoff, возвращаемый нашим обратным вызовом connection/2, чтобы увеличить retries в возвращенной карте state:

{:backoff, 1000, Map.put(state, :retries, state.retries + 1)}

Наконец, мы добавим новый заголовок connect/2 функции, который определяет, когда мы достигли максимально допустимого числа повторных попыток. Когда мы достигнем этого предела, мы хотим вернуть кортеж :stop, чтобы убить текущий процесс:

def connect(_info, state = %{retries: @max_retries}) do
  {:stop, :normal, state}
end

Красивый. Теперь наш биткойн-узел перестанет пытаться подключиться к своему одноранговому узлу после трех неудачных попыток, ожидая между ними одну секунду.

Отключение с помощью Connect

Теперь, когда мы изменили способ подключения к одноранговому узлу, нам нужно подумать, что должно произойти в случае, если мы отключимся от этого узла.

Если наши handle_call/3, handle_cast/2 или handle_info/2 обратные вызовы возвращают :disconnect кортеж, наше поведение Connection вызовет наш обратный вызов disconnect/2, который определит следующий курс действий.

У нас есть несколько вариантов обработки отключения в нашем обратном вызове disconnect/2. Мы можем вернуть кортеж :connect, чтобы немедленно попытаться восстановить соединение. Точно так же мы можем вернуть кортеж :backoff, чтобы отложить повторное подключение на указанный timestamp. В качестве альтернативы мы можем вернуть :noconnect кортеж, чтобы поддерживать текущий процесс в рабочем состоянии, но не пытаться повторно подключиться к нашему одноранговому узлу. Наконец, наш обратный вызов disconnect/2 может вернуть кортеж :stop, чтобы немедленно завершить процесс нашего узла Биткойн.

Когда в будущем мы начнем подключаться к большему количеству узлов, потеря одного узла станет большой проблемой. К сожалению, потеря сверстников - это просто часть жизни. Имея это в виду, если мы обнаружим разрыв соединения, мы просто закроем наше TCP-соединение и вернем :stop кортеж из нашего disconnect/2 обратного вызова:

def disconnect(_, state) do
  :ok = :gen_tcp.close(state.socket)
  {:stop, :normal, state}
end

Затем, обрабатывая результат нашего вызова handle_messages/2, мы будем обрабатывать ошибки несколько иначе. Вместо того, чтобы возвращать :stop кортеж, когда мы получаем :error при обработке одного из наших сообщений, мы вместо этого вернем :disconnect кортеж:

case handle_messages(messages, state) do
  {:error, reason, state} -> {:disconnect, reason, %{state | rest: rest}}
  state -> {:noreply, %{state | rest: rest}}
end

Это приведет нас к нашему обратному вызову disconnect/2 с заданным reason для отключения.

Вот и все!

Последние мысли

Этот рефакторинг включал в себя довольно много движущихся частей, но в конечном итоге конечный продукт стал более чистым, простым и надежным программным обеспечением. С этими изменениями мы очень хорошо позиционировали себя, чтобы двигаться вперед и расширять проект узла Биткойн, в котором мы оказались.

Обязательно ознакомьтесь с полным кодом на Github, чтобы получить целостное представление о том, что мы сделали.

В следующий раз мы начнем расширять нашу сеть узлов, рекурсивно соединяясь с соседними узлами, которые мы получаем от нашего однорангового узла. Быть в курсе!

Первоначально опубликовано на www.petecorey.com 14 мая 2018 г.