Как сделать разветвленный Conduit?

Я хочу, чтобы одни и те же данные были разделены на две «ветки», которые обрабатывались отдельно, а затем «объединялись»...

                                +----------+
                +---------+  -->| doublber |---   +--------+
   +--------+   |         |--   +----------+   -->|        |   +------+
   | source |-->| splitter|                       | summer |-->| sink |
   +--------+   |         |--   +----------+   -->|        |   +------+
                +---------+  -->| delayer  |---   +--------+
                                +----------+

Как мне это сделать?

Моя попытка:

import Data.Conduit
import Control.Monad.IO.Class
import qualified Data.Conduit.List as CL
-- import Data.Conduit.Internal (zipSources)
import Control.Arrow ((>>>))

source :: Source IO Int
source = do
    x <- liftIO $ getLine
    yield (read x)
    source

splitter :: Conduit Int IO (Int, Int)
splitter = CL.map $ \x -> (x,x)

doubler = CL.map (* 2)

delayer :: Conduit Int IO Int
delayer = do
    yield 0
    CL.map id

twoConduitBranches :: Monad m => Conduit a m b -> Conduit c m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = awaitForever $ \(x, y) -> do
    out1 <- undefined q x
    out2 <- undefined w y
    yield (out1, out2)


summer :: Conduit (Int,Int) IO Int
summer = CL.map $ \(x,y) -> x + y

sink :: Sink Int IO ()
sink = CL.mapM_ (show >>> putStrLn) 

-- combosrc = zipSources (source $= delayer) (source $= doubler)
main = source $= splitter $= twoConduitBranches doubler delayer $= summer $$ sink

Что мне написать вместо undefineds?


person Vi.    schedule 09.07.2014    source источник


Ответы (1)


Вы можете сделать это, но это уродливо, и, надеюсь, реализация прояснит, почему это уродливо, а не встроенная функция conduit:

twoConduitBranches :: Monad m => Conduit a m c -> Conduit b m d -> Conduit (a,b) m (c,d)
twoConduitBranches q w = getZipConduit
      (ZipConduit (CL.map fst =$= q =$= CL.map Left)
    <* ZipConduit (CL.map snd =$= w =$= CL.map Right)) =$= collapse
  where
    collapse = do
        v1 <- await
        case v1 of
            Nothing -> return ()
            Just (Left _) -> error "out of sequence 1"
            Just (Right d) -> do
                v2 <- await
                case v2 of
                    Nothing -> error "mismatched count"
                    Just (Right _) -> error "out of sequence 2"
                    Just (Left c) -> do
                        yield (c, d)
                        collapse

(Примечание: я немного подправил вашу подпись типа, я предполагаю, что это та подпись типа, которую вы действительно хотели.)

Вот подход: превратить q в Conduit, который берет первое значение из каждого входящего кортежа, а затем обернуть его вывод Left. Точно так же мы берем второе значение из каждого входящего кортежа и передаем его в w, а затем оборачиваем вывод в Right.

Теперь, когда эти Conduit имеют один и тот же тип (они принимают одни и те же входные кортежи и генерируют одни и те же значения Choose), мы объединяем их с помощью ZipConduit, который разделяет ввод между всеми компонентами и объединяет вывод в один поток.

Этот поток является потоком Either c d, а не желаемым (c, d). Чтобы сделать это окончательное преобразование, мы используем collapse. Он извлекает значения Right и Left, а затем объединяет их в один кортеж, который получается.

Эта функция предполагает, что последовательность выходных значений всегда будет состоять из одного значения из w, а затем одного из q. Если что-то еще произойдет, он выдаст исключение. Проблема в том, что ничто не указывает на то, что они на самом деле будут генерировать продукцию с одинаковой скоростью. На самом деле канал специально разработан, чтобы избежать этого предположения!

Таким образом, если вы знаете, что ваши два компонента всегда будут производить вывод с одинаковой скоростью, эта функция будет работать. Но это не будет правдой вообще говоря.

person Michael Snoyman    schedule 10.07.2014