Введение

В Pipedrive мы используем Consul для обнаружения сервисов. Мы написали библиотеку-оболочку под названием Diplomat, которая предоставляет необходимые интерфейсы. Проблема возникает с библиотеками, когда они нуждаются в обновлении. Для этого есть разные причины, начиная от незначительных ошибок и заканчивая необходимостью заменить Consul на что-то другое. Когда доступны сотни микросервисов, это требует времени и усилий.

Один из возможных способов решения этой проблемы — перенести сменную часть из библиотеки в коляску. Библиотека содержит только статическую часть слоя связи API с sidecar. Sidecar — это просто еще один контейнер в модуле Kubernetes.

В Pipedrive мы также построили коляску под названием Spock. Вы можете настроить библиотеку Diplomat, вызвав Consul напрямую или используя вспомогательную панель Spock для обнаружения служб. В случае со Споком он заботится о звонках Консулу.

У нас была проблема с задержкой с коляской Spock в сервисах с интенсивным трафиком. Замена текущей реализации пользовательской помогла сократить задержку более чем на 200%, но об этом позже.

В следующем абзаце я быстро опишу все термины, чтобы было легче следить за следующими главами.

Глоссарий

  • Diplomat — библиотека в Pipedrive для поиска сервисов непосредственно в Consul или через приложение Spock.
  • Контейнер — строительный блок модуля Kubernetes и модуль программного обеспечения, который упаковывает код и его зависимости, обычно работающий как один процесс в модуле.
  • Consul — бесплатная сетевая платформа с открытым исходным кодом, разработанная HashiCorp.
  • gRPC — платформа удаленного вызова процедур с открытым исходным кодом.
  • MTU — максимальная единица передачи, наибольший размер пакета или кадра, который можно отправить в пакетной или основанной на кадрах сети.
  • Pod — набор из одного или нескольких контейнеров и наименьшая единица приложения Kubernetes.
  • Sidecar — отдельный контейнер, работающий вместе с контейнером приложения в модуле Kubernetes.
  • Spock – дополнительный компонент Kubernetes в Pipedrive, отвечающий за вызовы сервисов для обнаружения различных бэкэндов, например, Consul.
  • TCP. Протокол управления передачей — это стандарт, определяющий, как мы устанавливаем и поддерживаем сетевой обмен данными, с помощью которого приложения могут обмениваться данными (считается надежным).
  • UDP — протокол пользовательских дейтаграмм — это протокол связи, используемый в Интернете для особенно чувствительных ко времени передач (считающихся ненадежными).
  • UDS — сокет домена Unix, также известный как сокет IPC (межпроцессное взаимодействие), представляет собой конечную точку передачи данных для обмена данными между процессами (считается надежным).

Проблема

Нашей конечной целью было перенести все сервисы на Spock для обнаружения сервисов. В начале это шло нормально. Мы начали с небольших, менее важных сервисов. Как только мы начали мигрировать более крупные сервисы с большей нагрузкой, мы начали замечать проблемы. Задержка больших служб значительно увеличилась, как и используемое процессорное время. Было достаточно плохо, что мы отменили это изменение и не перенесли более крупные сервисы на Spock.

Подробнее о том, как мы душили наши сервисы Kubernetes NodeJS, можно прочитать здесь. В этой статье объясняется, как мы запускаем наши сервисы в Kubernetes, управляем их ресурсами, как извлечь из них максимальную пользу и на что обращать внимание.

Чтобы понять основную причину, мы измерили задержку на стороне Spock (насколько быстро Spock сам обрабатывает внутренние запросы), и все выглядело нормально — для кешированных ответов в основном это занимало менее 100 микросекунд. Итак, мы решили, что это должно быть что-то на стороне Дипломата, коммуникации или сети. Для связи между Дипломатом и Споком использовался протокол gRPC, известный своей скоростью и малой задержкой.

Мы попытались настроить все возможные ручки в клиентской библиотеке gRPC. Ничего не помогло.

Примечание: большинство сервисов в Pipedrive написаны на NodeJS. Спок написан на Голанге. Все крупные сервисы, с которыми у нас были проблемы, также написаны на NodeJS.

Кроме того, мы измерили задержку связи sidecar со службами, написанными на NodeJS, с помощью Diplomat и Spock. Это было примерно 600 микросекунд в идеальных условиях. Для служб, делающих много вызовов для обнаружения служб, это слишком много. Загрузка процессора также была заметной. Эта задержка была самой низкой, когда кластер Kubernetes находился под небольшой нагрузкой. Когда узлы Kubernetes находились под большой нагрузкой, задержка увеличивалась до 1500 микросекунд и более.

Мы также проверили латентность клиентов, написанных на Golang и BOOM. Здесь задержка составила около 200 микросекунд с незначительной нагрузкой на ЦП. При высокой загрузке Kubernetes оно достигало 800 микросекунд. Детали реализации в библиотеке NodeJS gRPC казались достаточно сложными для нашего варианта использования сервисов с высокой пропускной способностью запросов.

Решение

Мы пытались изменить поведение библиотеки всеми возможными ручками, но ничего не помогло. Поэтому мы решили вообще отказаться от gRPC и протестировать собственный простой протокол поверх сокетов домена unix, используя только стандартную библиотеку для среды выполнения NodeJS.

Существует несколько типов сокетов домена unix:

  1. Потоковые сокеты, которые работают как TCP
  2. Сокеты датаграмм, которые работают как сокеты UDP
  3. Пакетные сокеты последовательности, которые сочетают в себе элементы обоих

Сделать выбор было довольно просто: мы выбрали потоковые сокеты. Во-первых, потому что он единственный, который NodeJS поддерживает из коробки через стандартную библиотеку; Golang поддерживает все три. Еще одна более важная причина заключается в том, что потоковые сокеты работают так же, как TCP, а это означает, что при необходимости их очень легко заменить на TCP/IP вместо UDS (сокеты домена unix). Например, если однажды мы решим переместить коляску Spock в отдельный сервис, мы можем сделать это с очень небольшими изменениями кода:

  1. На стороне Diplomat при создании сетевого подключения вместо указания файла сокета пути для подключения мы указываем порт и необязательный хост:
    net.createConnection({ path: sockPath }); -› net.createConnection({ хост:'1.2.3.4', порт: 8765});
  2. На стороне Spock это означает изменение параметров net.Listen:
    net.Listen(“unix”, socketPath) -› net.Listen( «tcp», hostPortAddr)

Потоковый сокет домена Unix работает как TCP без соответствующих подтверждений, контрольных сумм, управления потоком и т. д. Таким образом, в sidecar имеет смысл использовать его, чтобы максимально сократить накладные расходы.

В качестве примечания, gRPC также может использовать UDS, но при его тестировании мы не увидели никаких изменений в задержке по сравнению с TCP/IP.

Протокол данных

Когда мы начали использовать потоковую UDS, нам понадобился протокол или согласованная структура данных для связи между библиотекой Diplomat и коляской Spock. Мы старались максимально упростить первый подход (который также был нашим последним, потому что работал так хорошо). Мы придумали что-то вроде этого:

  1. Мы решили использовать JSON для формата данных, так как он хорошо поддерживается на обоих языках и не требует внешних библиотек.
  2. Потенциально он имеет более высокую стоимость с точки зрения как ЦП, так и размера по сравнению с Protobuf, но мы хотели посмотреть, действительно ли это будет проблемой.
  3. Поскольку это потоковый протокол, нам нужно знать, как заканчивается каждое предыдущее сообщение и начинается следующее. Мы решили использовать разрывы строк в качестве разделения сообщений. Опять же, это произошло потому, что он поддерживается на обоих языках и хорошо работает с сериализованными полезными нагрузками JSON. Оба языка имеют простой способ чтения потока байтов до тех пор, пока некоторая строка не разделит байт(ы), что является разрывом строки по умолчанию.
  4. Нам также необходимо решить вопрос о том, как сопоставлять запросы с ответами. Для этого мы добавили специальное поле под названием id, которое представляет собой простой счетчик. Счетчик рассчитывается на стороне библиотеки Diplomat и в функциональных блоках библиотеки. Он не разрешает Promise, пока не получит ответ с тем же id или не истечет время ожидания.
  5. Кроме того, есть два поля:
    a. func: в основном говорит, какую функцию вызывать в Spock (как аналогия с REST, думайте об этом как о маршруте для вызова)
    b. данные: какие данные отправляются в функцию (по аналогии с REST, это может быть тело запроса или параметры запроса)
  6. Для конечного запроса полезная нагрузка выглядела примерно так:
    {
    "id": 1,
    "func": "getService",
    "data": {
    "service": "barista"
    }
    }

    Этот пример предназначен для обнаружения службы и отвечает IP-адресом и портом, с помощью которых потребители могут подключиться к бариста service.
    Пример ответа:
    {
    "id": 1,
    "data": {
    "host": "1.2.3.4",
    "port": 8765
    }
    }

Тома в Kubernetes для включения IPC

Поскольку мы используем Kubernetes и доменные сокеты Unix, нам нужен способ совместного использования UDS между контейнерами. Это можно сделать с помощью томов Kubernetes.

Прежде всего, нам нужно определить новый том в манифесте Kubernetes:

volumes:
  - name: uds
    emptyDir:
      medium: Memory

Здесь мы указываем тип носителя тома как память, что означает, что любое другое хранилище не используется. Мы делаем это, чтобы максимизировать производительность и минимизировать задержки. Эти данные существуют только в контексте, когда работают как основная служба, так и дополнительные контейнеры и происходит обмен информацией, поэтому тип носителя памяти идеально подходит.

Теперь нам нужно только смонтировать этот том в оба контейнера — основной сервис и sidecar:

volumeMounts:
  - name: uds
    mountPath: /uds
    readOnly: false

readOnly должно быть равно false, так как мы также пишем туда при отправке данных по UDS. Хотя нет необходимости указывать его в манифесте как false, это значение по умолчанию для этого свойства.

Состояние гонки при запуске между коляской и сервисным контейнером

Как повторить попытку, если коляска еще не готова? При запуске пода все контейнеры запускаются одновременно, и между ними не гарантируется порядок запуска. Поэтому мы должны позаботиться об этом в коде библиотеки Diplomat. Здесь можно сделать несколько вещей. Первая часть частично перекрывается со следующим разделом, управляющим разрывом соединения UDS. Если по какой-то причине подключение к сокету домена Unix не удается — например, он еще не существует — то в конечном итоге вызывается обработчик события close. Если это не изящное завершение работы, он должен попытаться подключиться обратно.

Кроме того, поскольку sidecar отвечает за создание файла UDS, мы можем подождать, пока он не появится при запуске службы в контейнере, создав функцию, которая не возвращает значение до создания файла и установления соединения UDS. Кроме того, можно добавить таймаут — после его превышения. Во-первых, ошибка тайм-аута возвращается и регистрируется. Во-вторых, вызывается process.exit(1);, поэтому Kubernetes может попытаться перезапустить сервисный контейнер.

Поскольку обнаружение службы является жесткой зависимостью, даже если бы мы не установили тайм-аут в коде, это сработало бы, поскольку Kubernetes в конечном итоге принудительно перезапустит сам контейнер.

Потенциальный псевдокод для ожидания наличия файла UDS без обработки тайм-аута может выглядеть примерно так:

const waitUdsListenerReady = async () => {
    while (true) {
        try {
            await fs.access(sockPath, fs.constants.R_OK | fs.constants.W_OK);
            break;
        } catch (e) {
          // waits 1 second
          // production ready code it could be dynamic
          // like Math.min(2 ** ATTEMPT_NUMBER, 5000)
            await sleep(1000);
        }
    }
};

Управление обрывом соединения UDS

До сих пор это было чрезвычайно стабильно для нас. Мы могли бы сказать, что он настолько стабилен, что нас даже не должно волновать, если соединение UDS обрывается. На самом деле это должно нас волновать, так как мы никогда не знаем, когда и почему это может произойти — мы хотим писать надежное и надежно работающее программное обеспечение (даже в крайних случаях). Даже если мы часто тестировали его и проблема никогда не появлялась, мы все равно должны учитывать все возможности.

Короче говоря, когда в библиотеке Diplomat происходит событие закрытия, оно должно подключаться обратно, если только это не корректное завершение работы модуля. Вот пример псевдокода:

// when connection ends
// does not matter if with unexpected error or not
client.on('close', (hadError) => {
    // only connect back if graceful shutdown not in progress
    if (gracefulShutdownInProgress) {
        return;
    }
    // if closed with error then backoff for a second before connecting back
    // can happen on startup if sock file is not there yet
    if (hadError) {
        setTimeout(createClient, 1000);
    } else {
        // reconnect to sidecar
        createClient();
    }
});

Как не закрыть коляску слишком рано при плавном отключении

Еще одна проблема, о которой нам нужно позаботиться, — это корректное завершение работы. Когда происходит корректное завершение работы, Kubernetes отправляет сигнал SIGTERM сразу всем контейнерам в поде. Это может вызвать проблему, когда коляска отключается раньше основного сервисного контейнера, а основной службе все еще может понадобиться коляска в этот момент.

Поскольку соединение UDS является «постоянным соединением», мы можем отслеживать, сколько соединений открыто на стороне боковой панели Spock. Теперь, если происходит корректное завершение работы и сервисные контейнеры закрываются, количество подключений должно упасть до нуля. Только тогда, если он упадет до нуля, мы изящно выйдем из контейнера коляски. Если неисправный основной сервис не закрывается корректно, об этом позаботится льготный период Kubernetes. По умолчанию это 30 секунд, что означает, что Kubernetes уничтожит контейнер, если он не будет корректно закрыт к этому времени. Это можно изменить в манифесте Kubernetes, указав свойство terminationGracePeriodSeconds.

Вот псевдокод того, как может выглядеть слушатель:

// using atomic to track active connections
var activeConnections atomic.Int32
// channel used for tracking if graceful shutdown can be proceed
var allClientsDisconnected chan struct{}
// flag to determine if graceful shutdown is running
var shuttingDown atomic.Bool

_ = os.Remove(socketAddr)
listener, err := net.Listen("unix", socketAddr)
if err != nil {
    log.Fatal(err)
}

if err := os.Chmod(socketAddr, os.ModeSocket|0666); err != nil {
    log.Fatal(err)
}

go func() {
    <-allClientsDisconnected
    listener.Close()
}()

for {
    conn, err := listener.Accept()
    if err != nil {
        log.Println(err)
        continue
    }

    activeConnections.Add(1)

    go func() {
        defer conn.Close()

        scanner := bufio.NewScanner(conn)

        for scanner.Scan() {
            _, err = conn.Write(append(process(scanner.Bytes()), '\n'))
            if err != nil {
                log.Println(err)
            }
        }

        if err := scanner.Err(); err != nil {
            log.Println(err)
        }

        curActiveConnections := activeConnections.Add(-1)

        // checking if graceful shutdown is in progress
        // checking if it was last closed connection
        if shuttingDown.Load() && curActiveConnections == 0 {
            close(allClientsDisconnected)
        }
    }()
}

Функция, вызываемая при получении сигнала SIGTERM, может выглядеть примерно так:

func WaitAllDisconnected() {
    // minimum graceful shutdown time
    time.Sleep(10 * time.Second)

    // indicates that graceful shutdown is running
    shuttingDown.Store(true)

    // if connections are already closed
    if activeConnections.Load() == 0 {
        return
    }

    // waiting until main service container is exiting
    <-allClientsDisconnected
}

В более готовом к работе коде все переменные, такие как shuttingDown, activeConnections и другие, должны быть частью структуры, сохраняющей полное состояние.

Тайм-аут на стороне клиента

Мы никогда не можем полностью доверять сервису или коляске с точки зрения потребителя. Чтобы предотвратить слишком долгое ожидание потребителя или основной службы, мы добавили в библиотеку Diplomat механизм тайм-аута для всех вызовов sidecar. Через определенный период без ответа от коляски вылетает ошибка.

Пример псевдокода функции sendRequest, который также обрабатывает время ожидания на стороне клиента:

const sendRequest = (func, data, timeout = SPOCK_DEFAULT_REQUEST_TIMEOUT_MS) => {
    // id overflow is not handled here and it's highly unlikely to happen if maxRequests is big enough
    // could just throw an error if that happens or backoff for some time to retry again
    currentId = (currentId + 1) % maxRequests;

    return new Promise((resolve, reject) => {
        // send data to sidecar
        client.write(
            `${JSON.stringify({
                id: currentId,
                func,
                data,
            })}\n`,
        );

        // capture the variable from closure
        ((currentId) => {
          // this object is used to track inflight requests
          // used for resolving/rejecting responses
          requests[currentId] = {
              resolve,
              reject,
              // client side timeout for request
              // will be cleared when answer comes before
              timeout: setTimeout(() => {
                  requests[currentId].reject(new Error('Request to Spock timed out'));
                  // cleanup used request socket in array
                  // using null as we are using fixed length arrays for less allocations
                  requests[currentId] = null;
              }, timeout),
          };
      })(currentId);
    });
};

Полученные результаты

При использовании этого решения задержка снизилась примерно до 0,1–0,2 мс. Даже когда узлы Kubernetes находились под большой нагрузкой, это время составляло около 0,6 мс. Кроме того, чем тяжелее нагрузка, тем больше разница между старым и новым решением.

Старое и новое решение с точки зрения задержки:

+----------------+--------------+--------------+-----------------+
|                | Old solution | New solution | Difference      |
+----------------+--------------+--------------+-----------------+
| Node load low  | 0.6 ms       | 0.2 ms       | ~0.4 ms / ~300% |
+----------------+--------------+--------------+-----------------+
| Node load high | 1.8 ms       | 0.6 ms       | ~1.2 ms / ~300% |
+----------------+--------------+--------------+-----------------+

Задержка имеет линейную корреляцию с нагрузкой на узел. Новое решение превосходит старое примерно на 300%.

Полная реализация примера sidecar

package uds

import (
    "bufio"
    "encoding/json"
    "errors"
    "log"
    "net"
    "os"
    "sync/atomic"
    "time"
)

// request/response structure
// response case Function is just empty
type Message struct {
    Id       uint32          `json:"id"`             // id - counter
    Function string          `json:"func,omitempty"` // function name
    Data     json.RawMessage `json:"data"`           // data to send function
}

type UdsServer struct {
    activeConnections      atomic.Int32
    shuttingDown           atomic.Bool
    allClientsDisconnected chan struct{}
}

func New() *UdsServer {
    us := UdsServer{
        allClientsDisconnected: make(chan struct{}),
    }

    return &us
}

func (us *UdsServer) StartServer(socketAddr string) {
    // let's cleanup just for case
    // helps if container crashes or is killed for some reason and file is already there
    _ = os.Remove(socketAddr)
    listener, err := net.Listen("unix", socketAddr)
    if err != nil {
        log.Fatalf("Failed to start UDS listener at %q: %v\n", socketAddr, err)
    }

    if err := os.Chmod(socketAddr, os.ModeSocket|0666); err != nil {
        log.Fatalf("Failed to change UDS socket permissions: %v\n", err)
    }

    // wait until service container has finished on graceful shutdown
    go func() {
        <-us.allClientsDisconnected
        listener.Close()
    }()

    log.Printf("UDS listener started at %s\n", socketAddr)

    for {
        conn, err := listener.Accept()
        if err != nil {
            // normal closing case at graceful shutdown when all clients disconnected
            // break out of loop
            if errors.Is(err, net.ErrClosed) {
                break
            }

            if nErr, ok := err.(net.Error); ok && nErr.Timeout() {
                // log error when timeout happens in case of connecting to client
                log.Printf("Connection timeout happened in establishing UDS connection %v\n", nErr)
            } else {
                // log unknown error
                log.Printf("Failed to establish UDS connection %v\n", err)
            }

            // continue to accept next connection
            continue
        }

        // client connected, incrementing active connections count
        us.activeConnections.Add(1)

        go func() {
            defer conn.Close()

            scanner := bufio.NewScanner(conn)

            for scanner.Scan() {
                _, err = conn.Write(append(process(scanner.Bytes()), '\n'))
                if err != nil {
                    log.Printf("Writing back to client failed with error: %v\n", err)
                }
            }

            if err := scanner.Err(); err != nil {
                if !errors.Is(err, net.ErrClosed) {
                    log.Printf("UDS scanner stopped with error: %v\n", err)
                }
            }

            log.Printf("UDS client disconnected from %s\n", conn.RemoteAddr().String())

            // client disconnected, decrementing active connections count
            activeConnections := us.activeConnections.Add(-1)

            // checking if graceful shutdown is in progress
            // checking if it was last closed connection
            if us.shuttingDown.Load() && activeConnections == 0 {
                close(us.allClientsDisconnected)
            }
        }()

        log.Printf("UDS client connected from %s\n", conn.RemoteAddr().String())
    }

    log.Printf("UDS listener closed at %s\n", socketAddr)
}

// indicates that graceful shutdown started
// if connection closes then should gracefully end
func (us *UdsServer) WaitAllDisconnected() {
    // minimum graceful shutdown time
    time.Sleep(10 * time.Second)

    // indicates that graceful shutdown is running
    us.shuttingDown.Store(true)

    // if connections are already closed
    if us.activeConnections.Load() == 0 {
        return
    }

    // waiting until main service container is exiting
    <-us.allClientsDisconnected
}

// errors we do not handle here for sake of simpler example
func process(in []byte) []byte {
    req := Message{}
    _ = json.Unmarshal(in, &req)

    funct := req.Function
    var res []byte

    switch funct {
    case "examplefunc1":
        // here call function processor
        // res = processExampleFunc1(req.Data)
        // and so on for each func
    }

    // we are reusing request struct for response
    req.Function = ""
    req.Data = res

    out, _ := json.Marshal(req)

    return out
}

Полная реализация примера клиентской библиотеки

const fs = require('fs');
const net = require('net');
const readline = require('readline');
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
// maximum tolerable inflight requests number
const maxRequests = Math.pow(2, 17) - 1; // 131071
// used for tracking inflight requests
const requests = Array.from({ length: maxRequests });
const sockPath = '/uds/sidecar.sock';
const SIDECAR_DEFAULT_REQUEST_TIMEOUT_MS = 1000;

// tracking for current ID, autoincrement
let currentId = 0;
// client is cached here
// behaves as singleton
let client;
// line reader is cached here
let rl;
// keeps track if graceful shutdown is already in progress
let gracefulShutdownInProgress = false;

// listen SIGTERM event to mark graceful shutdown in progress
process.on('SIGTERM', () => {
    gracefulShutdownInProgress = true;
});

const waitUdsListenerReady = async () => {
    while (true) {
        try {
            // check that socket file is there with read and write access
            // correct rights are set by sidecar after socket file listener is setup
            fs.accessSync(sockPath, fs.constants.R_OK | fs.constants.W_OK);
            break;
        } catch (e) {
            // waits 1 second
            // production ready code it could be dynamic
            // like Math.min(2 ** ATTEMPT_NUMBER, 5000)
            await sleep(1000);
        }
    }
};

// creates line reader on client
const createLineReader = () => {
    rl = readline.createInterface({
        input: client,
        crlfDelay: Infinity,
    });

    // if full line is read from incoming stream then line event is emitted
    rl.on('line', (line) => {
        let res;

        try {
            res = JSON.parse(line);
        } catch (e) {
            console.warn(`Sidecar sent data not in JSON format (line=${line})`);
            return;
        }

        const id = res.id;
        // retrieves cached request object by id
        const request = requests[id];

        // meaning request already timed out and is removed from object
        if (!request) {
            console.warn(`Timed out sidecar request returned (${id})`);
            return;
        }

        // in case or error reject
        if (res.error) {
            request.reject(res.error);
        } else {
            request.resolve(res.data);
        }

        // clear timeout timer
        clearTimeout(request.timeout);
        // cleanup used request socket in array
        // using null as we are using fixed length arrays for less allocations
        requests[id] = null;
    });
};

const sendRequest = (func, data, timeout = SIDECAR_DEFAULT_REQUEST_TIMEOUT_MS) => {
    // id overflow is not handled here and it's highly unlikely to happen if maxRequests is big enough
    // could just throw an error if that happens or backoff for some time to retry again
    currentId = (currentId + 1) % maxRequests;

    return new Promise((resolve, reject) => {
        // send data to sidecar
        client.write(
            `${JSON.stringify({
                id: currentId,
                func,
                data,
            })}\n`,
        );

        // capture the variable from closure
        ((currentId) => {
          // this object is used to track inflight requests
          // used for resolving/rejecting responses
          requests[currentId] = {
              resolve,
              reject,
              // client side timeout for request
              // will be cleared when answer comes before
              timeout: setTimeout(() => {
                  requests[currentId].reject(new Error('Request to sidecar timed out'));
                  // cleanup used request socket in array
                  // using null as we are using fixed length arrays for less allocations
                  requests[currentId] = null;
              }, timeout),
          };
        })(currentId);
    });
};

const createClient = async () => {
    // waits when unix domain socket file is there with correct rights
    await waitUdsListenerReady();

    client = net.createConnection({
        path: sockPath,
    });

    client.on('error', (err) => {
        console.warn(`Sidecar UDS connection got error: ${err.toString()}`);
    });

    client.on('ready', () => {
        // if client is ready setup line reader for incoming stream
        createLineReader();
    });

    client.on('end', () => {
        // if client ended then close properly line reader to not leak
        rl.close();
    });

    // when connection ends
    // does not matter if with unexpected error or not
    client.on('close', (hadError) => {
        // only connect back if graceful shutdown not in progress
        if (gracefulShutdownInProgress) {
            return;
        }
        // if closed with error then backoff for a second before connecting back
        // can happen on startup if sock file is not there yet
        if (hadError) {
            setTimeout(createClient, 1000);
        } else {
            // reconnect to sidecar
            createClient();
        }
    });

    // returns function what can be used sending requests to sidecar
    return sendRequest;
};

// behaves as singleton
module.exports = async () => {
    // only creates client once
    if (client) {
        return sendRequest;
    }

    return await createClient();
};

Заключение

В нашем случае использование кастомной реализации себя оправдало. Мы смогли установить связь между контейнерами достаточно быстро, чтобы больше не было узких мест, даже для сервисов с более высокой нагрузкой, которые делают много вызовов обнаружения сервисов.

Тем не менее, всегда есть возможности для улучшения, и если мы хотим повысить производительность в будущем и даже снизить задержки, мы можем попробовать несколько вещей:

  1. Вместо JSON мы могли бы использовать Protobuf. Это требует меньше места и ускоряет сериализацию и десериализацию. Если мы хотим пойти дальше, мы могли бы реализовать собственный двоичный протокол, который, вероятно, будет даже быстрее, чем протокол общего назначения, такой как Protobuf, поскольку мы можем точно настроить его для нашего варианта использования.
  2. Использование UDP вместо TCP (в контексте UDS это означает использование сокетов дейтаграмм). Это требует много дополнительной работы, так как мы должны сами сделать его надежным и внедрить какой-то механизм подтверждения, повторную передачу пакетов, избегая фрагментации, сохраняя размер полезной нагрузки ниже значения MTU (максимальная единица передачи), создавая какое-то управление сеансом для соединений. и многое другое. Тем не менее, это может стоить того, если мы хотим пройти последнюю лишнюю милю, чтобы выжать все возможное. Сказав это, я думаю, что для 99% случаев использования TCP более чем достаточно.