Перенаправление каналов LWT

У меня есть служба ssh, работающая на сокете Unix, и у меня есть локальный сервер TCP, для которого я хочу, чтобы он был направлен на каналы сокета unix.

В основном, когда я делаю:

$ ssh root@localhost -p 2000

Затем мой локальный TCP-сервер получает запрос и направляет его в сокет Unix, а TCP-клиент, в данном случае ssh, получает ответ из сокета Unix. Соответствующий код:

  let running_tunnel debug (tcp_ic, tcp_oc) () =
    Lwt_io.with_connection a_unix_addr begin fun (mux_ic, mux_oc) ->
      let%lwt _ = some_call with_an_arg
      and _ =

        (* Some setup code *)


      let rec forever () =
        Lwt_io.read_line tcp_ic >>= fun opening_message ->
        Lwt_io.write_from_string_exactly
        mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
        Lwt_io.read_line mux_ic >>= fun reply ->
        Lwt_io.printl reply >>= fun () ->
        Lwt_io.write_line tcp_oc reply >>= fun () ->
        forever ()
      in
      forever ()
      in
      Lwt.return_unit
    end

И такое работает. Он «зависает», когда я вызываю ssh в командной строке, но я знаю, что получаю некоторые данные, потому что заголовок ssh другой стороны правильный, SSH-2.0-OpenSSH_6.7. Я также заставляю свою сторону распечатать больше частей начального рукопожатия ssh, т. Е. Я вижу это напечатанным:

??^?W????\[email protected],ecdh-sha2-nistp256,ecdh-sha2-nistp384,ecdh-sha2-nistp521,diffie-hellman-group-exchange-sha256,diffie-hellman-group14-sha1ssh-rsa,ssh-dss>aes128-ctr,aes192-ctr,aes256-ctr,[email protected]>aes128-ctr,aes192-ctr,aes256-ctr,[email protected][email protected],[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512,[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],[email protected],hmac-sha2-256,hmac-sha2-512,hmac-sha1none,[email protected],[email protected] 

д., что кажется правильным. Я решил, что причина зависания в том, что я использую Lwt_io.read_line, поэтому вместо этого я попробовал это:

            let rec forever () =
              Lwt_io.read tcp_ic >>= fun opening_message ->
              Lwt_io.write_from_string_exactly
                mux_oc opening_message 0 (String.length opening_message) >>= fun () ->
              Lwt_io.read mux_ic >>= fun reply ->
              Lwt_io.printl reply >>= fun () ->
              Lwt_io.write tcp_oc reply >>= fun () ->
              forever ()
            in
            forever ()

Который на самом деле работал хуже, он даже не распечатывал начальное рукопожатие. Я также пробовал специальные функции {write,read}_into... с ограниченным успехом. Работая под strace/dtruce, я вижу такие конечные результаты, как:

read(0x6, "SSH-2.0-OpenSSH_6.9\r\n\0", 0x1000)       = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.9\n\0", 0x14)      = 20 0
read(0x7, "\0", 0x1000)      = -1 Err#35
write(0x7, "SSH-2.0-OpenSSH_6.9\0", 0x13)        = 19 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0)         = 1 0
read(0x7, "SSH-2.0-OpenSSH_6.7\r\n\0", 0x1000)       = 21 0
write(0x1, "SSH-2.0-OpenSSH_6.7\n\0", 0x14)      = 20 0
read(0x6, "\0", 0x1000)      = -1 Err#35
write(0x6, "SSH-2.0-OpenSSH_6.7\n\0", 0x14)      = 20 0
select(0x9, 0x7FFF5484F880, 0x7FFF5484F800, 0x7FFF5484F780, 0x0)         = 1 0
read(0x6, "\0", 0x1000)      = 1968 0
read(0x6, "\0", 0x1000)      = -1 Err#35
^C

Где 6.9 — это ssh моей локальной машины, а 6.7 — удаленная машина за сокетом Unix. Одна вещь, которая кажется мне странной, это то, как отбрасывается \r, и это изменяет количество операций чтения/записи на 1. Я не уверен, что это может быть решающей разницей.

В идеале я хотел бы какую-то абстракцию от Lwt, которая говорила бы, что всякий раз, когда есть данные, доступные на этом доступном для чтения канале (сокет TCP), записывайте их непосредственно в доступный для записи канал (сокет Unix) и наоборот.


person Edgar Aroutiounian    schedule 17.12.2015    source источник


Ответы (1)


Вариант с readline не работал, так как поток данных бинарный, а readline для текстового построчного ввода. Второй вариант с функцией Lwt_io.read не работал, так как эта функция будет читать весь ввод до конца, если вы не указали необязательный параметр count. Это означает, что управление будет передано write только после EOF на стороне считывателя. Использование Lwt_io.read с некоторым числом, например, Lwt_io.read ~count:1024 mux_ic было бы неплохой идеей. Кроме того, вы не должны забывать проверять возвращаемое значение, если вы ожидаете, что ваш поток будет конечным. read_into следует использовать с осторожностью, так как, в отличие от функции read, она не гарантирует, что будет считано именно то количество данных, которое вы запросили. Другими словами, будут короткие чтения. То же верно и для функции write_into. Версии _exactly этих функций не страдают от этой проблемы, поэтому вместо них лучше использовать их.

Есть еще одна вещь, которую вы должны учитывать. Lwt_io предоставляет интерфейс для буферизованного ввода и вывода. Это означает, что все функции в этом модуле выполняют запись и чтение в некоторый внутренний буфер или из него, вместо того, чтобы напрямую взаимодействовать с операционной системой через дескриптор устройства. Это означает, что когда вы передаете данные из одного буферизованного источника в другой буферизованный источник, у вас будут неожиданные задержки на обоих концах. Таким образом, вы должны предвидеть их, используя флеши. В противном случае это может привести к возникновению условий гонки, когда у вас двустороннее взаимодействие.

Более того, хотя буферизованный ввод-вывод значительно упрощает работу, он имеет свою цену. На самом деле у вас есть несколько ненужных слоев буферов, когда вы используете Lwt_io, вы также выделяете много ненужных данных, засоряя свою память мусором. Проблема в том, что Lwt_io имеет свой собственный внутренний буфер, который он не показывает обычному пользователю, и все функции, которые возвращают данные или потребляют данные, должны выполнять дополнительную операцию копирования во внутреннюю функцию или из нее. Например, использование Lwt_io.{read,write} сделает следующее:

  1. скопировать данные из ядра во внутренний буфер
  2. выделить строку
  3. скопировать данные из внутреннего буфера в выделенную строку
  4. (теперь часть write) скопировать данные из строки во внутренний буфер
  5. скопировать данные из внутреннего буфера в ядро.
  6. (где-то, а иногда и позже, внутри сборщика мусора) скопировать выделенную строку из вспомогательной кучи в основную (если строка была достаточно мала, чтобы поместиться в вспомогательную кучу) или скопировать ее из одного места в другое (если уплотнение алгоритм решает ее переместить, а строка все еще жива, что вполне возможно, если производители опережают потребителя и время жизни считанных данных становится достаточно большим).

Похоже, что мы можем избавиться от копий в 2, 3, 4 и 6. Мы можем использовать свой собственный буфер, и копировать в него данные из ядра, а затем копировать данные из этого ядра обратно в ядро. Мы можем даже избавиться от копий в 1 и 5, используя splice и tee системные вызовы, которые копируют данные напрямую между буферами ядра, вообще не затрагивая пространство пользователя. Но в этом случае мы потеряем возможность исследовать данные, а обычно это то, что нам нужно.

Итак, попробуем удалить все копии, кроме копий из пространства ядра. Мы можем использовать низкоуровневый интерфейс к внутреннему буферу в Lwt_io, например direct_access и недавно добавленную функцию block, но это требует знания внутренностей Lwt_io и не очень тривиально, но все же выполнимо. Вместо этого мы будем использовать более простой подход, использующий библиотеку Lwt_unix. Эта библиотека напрямую взаимодействует с ядром без каких-либо промежуточных буферов, оставляя буферизацию на наше усмотрение.

open Lwt.Infix

let bufsiz = 32768

let echo ic oc =
  let buf = Lwt_bytes.create bufsiz in
  let rec loop p =
    let p = p mod bufsiz in
    Lwt_bytes.read ic buf p (bufsiz - p) >>= function
    | 0 -> Lwt.return ()
    | n -> Lwt_bytes.write oc buf p n >>= loop in
  loop 0

Это позволит реализовать простое и быстрое дублирование данных, которое будет копировать данные с той же скоростью, что и программа cat. Тем не менее, есть еще место для улучшения. Например, для надежности следует добавить обработку ошибок (в частности, для сигнала EINTR). Также эта функция реализует синхронное копирование, где вход и выход жестко заблокированы. Иногда это не вариант. Рассмотрим следующий пример: ввод — это UDP-сокет, который может легко обойти потребителя, и данные будут удалены, даже если в среднем производитель медленнее, чем потребитель. Чтобы справиться с этим, вам нужно разделить читателей и писателей на два отдельных потока, которые взаимодействуют через некоторую эластичную очередь.

Lwt — довольно низкоуровневая библиотека, которая не решает и не должна решать эти проблемы за вас. Он предоставляет механизмы, которые можно использовать для построения решения для каждого конкретного случая. Существуют библиотеки, которые предоставляют решения для некоторых распространенных шаблонов, 0MQ и наносообщения — хорошие примеры.

Обновлять

Может быть, я слишком низкоуровневый парень, а может быть, я копаю слишком глубоко. Если вы действительно ищете высокоуровневый подход, вам следует использовать Lwt_stream, в этом случае вы можете кодировать узел, эквивалентный foo.pipe(bar).pipe(foo), как

let echo ic oc = Lwt_io.(write_chars oc (read_chars ic))

Конечно, это будет намного медленнее, но это зависит от вашей задачи.

И да, чтобы выполнить двустороннее перенаправление, вы должны просто запустить два потока, например: echo ic oc <&> echo ic oc для версии с файловыми дескрипторами, оба из которых доступны для записи. Если вы используете Lwt_io каналов, которые являются однонаправленными, как трубы, вы получите две конечные точки для каждой части. Назовем их fi и fo для ввода и вывода фронтенда соответственно, и bi, bo для бэкэнд части. Потом нужно подключить вот так: echo fo bi <&> echo bo fi, используя вторую версию echo с потоками.

Стоимость производительности

Обычно высокоуровневые абстракции требуют снижения производительности. В нашем конкретном случае при использовании первой версии эха пропускная способность превышает 1Gb в секунду. Версия с потоками имеет среднюю пропускную способность 5MB/s. В зависимости от вашей настройки это может работать или не работать. Этого более чем достаточно для обычного сеанса ssh, но это может повлиять на scp в локальных сетях.

person ivg    schedule 17.12.2015
comment
Большое тебе спасибо. В узле это просто foo.pipe(bar).pipe(foo). Буквально. В вашем примере просто сказано, что A копирует данные в B., чтобы B копировал данные в A, тогда мне нужно другое использование эха, правильно? Интересно, можно ли сказать, что запускать оба одновременно с let %lwt _ = a_to_b и _ = b_to_a. Я также видел Lwt_unix.wrap, интересно, пригодится ли он? - person Edgar Aroutiounian; 17.12.2015
comment
да, для перемещения данных в обоих направлениях вы должны использовать echo a b <&> echo b a. Lwt_unix.wrap тут ни при чем. Наконец, lwt — это библиотека низкого уровня. Он не предоставляет никаких высокоуровневых примитивов. Это то же самое, что и вызовы чтения/записи уровня C и библиотека fstreams. Существуют и другие библиотеки, которые обеспечивают постепенно более высокие уровни абстракции с меньшим контролем со стороны пользователя. foo.pipe(bar).pipe(foo) может не подходить для всех случаев использования. И вообще, если вам это нужно, то вы можете просто позвонить tee или использовать cat<foo>bar. - person ivg; 17.12.2015
comment
Я фактически обновил сообщение с ответом, который может подойти вам лучше. Вы можете использовать потоки для подключения ваших каналов. Я исключил этот вариант, так как он медленнее, но, возможно, для вашего конкретного случая это сработает. - person ivg; 17.12.2015
comment
Да! Потоковое решение работает. Нам нужно встретиться, я должен купить вам несколько напитков / колы! - person Edgar Aroutiounian; 17.12.2015
comment
Кроме того, как вы рассчитали эти значения 1 ГБ в секунду или 5 МБ/с? - person Edgar Aroutiounian; 18.12.2015