Получение результата от потокового сетевого протокола с conduit

Я реализую простой сетевой протокол с каналом; протокол представляет собой поток сообщений, каждое сообщение имеет префикс uint32, описывающий длину сообщения. (Затем данные сообщения имеют дополнительную внутреннюю структуру, но здесь это не важно, так как я могу прочитать все сообщение в память перед его разбором, поскольку ожидаемый размер сообщения невелик). Протокол одинаков в обоих направлениях: клиент отправляет сообщения, содержащие запросы, на сервер, а сервер возвращает сообщения, содержащие ответы (без параллелизма операций).

Моя идея состояла в том, чтобы построить код поверх двух простых каналов для перехода от Message (мой собственный тип, описывающий различные возможные сообщения) к ByteString и наоборот:

import qualified Data.ByteString as B
import qualified Data.ByteString.Lazy as LB

data Message = ...
parseMessage :: LB.ByteString -> Message
serializeMessage :: Message -> LB.ByteString

messageReceiver :: Conduit B.ByteString IO Message
messageReceiver = loop
  where
    loop = do
      lenBytes <- takeCE 4 =$= sinkLazy
      message <- takeCE (runGet getWord32be' lenBytes) =$= sinkLazy
      yield $ parseMessage message
      loop

messageSender :: Conduit Message IO B.ByteString
messageSender = concatMapC $ \message ->
  let messageBytes = serializeMessage message
      lenBytes = runPut $ putWord32be' (LB.length messageBytes)
  in map LB.toStrict [lenBytes, messageBytes]

Все идет нормально; или, по крайней мере, проверка типов кода, хотя я уверен, что есть более элегантный способ написать его (особенно цикл в messageReceiver). Теперь я хочу написать что-нибудь, чтобы подключиться к серверу, отправить запрос, получить ответ и отключиться. Я написал это:

runOneCommand request = do
  yield request
  response <- await
  return response

Однако я не уверен, как на самом деле подключить это к источнику сетевого клиента и приемнику таким образом, чтобы я получил значение «ответ». Я пробовал это:

appSource agent $$ messageReceiver =$= runOneCommand =$= messageSender =$= appSink agent

который не компилируется:

Couldn't match type `Data.Maybe.Maybe SSH.Agent.Message' with `()'
Expected type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.Conduit
                 SSH.Agent.Message ghc-prim:GHC.Types.IO SSH.Agent.Message
  Actual type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.ConduitM
                 SSH.Agent.Message
                 SSH.Agent.Message
                 ghc-prim:GHC.Types.IO
                 (Data.Maybe.Maybe SSH.Agent.Message)
In the return type of a call of `Main.runOneCommand'
In the first argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
  `Main.runOneCommand SSH.Agent.RequestIdentities'
In the second argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
  `Main.runOneCommand SSH.Agent.RequestIdentities
   conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
     Main.messageSender
     conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
       Data.Conduit.Network.appSink agent'
Couldn't match type `Data.Maybe.Maybe SSH.Agent.Message' with `()'
Expected type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.Conduit
                 SSH.Agent.Message ghc-prim:GHC.Types.IO SSH.Agent.Message
  Actual type: conduit-1.2.3.1:Data.Conduit.Internal.Conduit.ConduitM
                 SSH.Agent.Message
                 SSH.Agent.Message
                 ghc-prim:GHC.Types.IO
                 (Data.Maybe.Maybe SSH.Agent.Message)
In the return type of a call of `Main.runOneCommand'
In the first argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
  `Main.runOneCommand SSH.Agent.RequestIdentities'
In the second argument of `(conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=)', namely
  `Main.runOneCommand SSH.Agent.RequestIdentities
   conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
     Main.messageSender
     conduit-1.2.3.1:Data.Conduit.Internal.Conduit.=$=
       Data.Conduit.Network.appSink agent'

Предполагая, что я правильно следую типам здесь, это терпит неудачу, потому что приемник сетевого клиента ожидает возвращаемый тип (), а не Message, поэтому я предполагаю, что здесь мне нужна какая-то другая форма композиции канала, но я не знаю что.


person mithrandi    schedule 28.12.2014    source источник


Ответы (1)


После публикации этого вопроса я нашел https://stackoverflow.com/a/23925496/31490, который указал мне направление из fuseUpstream:

  response <- appSource agent $$ messageReceiver =$= runOneCommand RequestIdentities `fuseUpstream` messageSender `fuseUpstream` appSink agent

Кажется, предупреждение о пугающем типе fuseUpstream в этом ответе больше не применяется (поскольку типы Conduit были упрощены?); сравнивать:

(=$=) :: Monad m => Conduit a m b -> ConduitM b c m r -> ConduitM a c m r
fuseBoth :: Monad m => ConduitM a b m r1 -> ConduitM b c m r2 -> ConduitM a c m (r1, r2)
fuseUpstream :: Monad m => ConduitM a b m r -> Conduit b m c -> ConduitM a c m r
person mithrandi    schedule 28.12.2014