Haskell: разделение каналов (трансляция) без использования спавна

Этот вопрос немного кодовый и много нового.

Я использую потрясающую библиотеку pipes в Haskell, и я хотел бы разделить канал для отправки одних и тех же данных по нескольким каналам (сделать широковещательную рассылку). В Pipes.Concurrent учебнике предлагается использовать spawn для создания почтовых ящиков, используя моноидный статус Output. Например, мы могли бы сделать что-то вроде этого:

main = do
 (output1, input1) <- spawn Unbounded
 (output2, input2) <- spawn Unbounded
 let effect1 = fromInput input1 >-> pipe1
 let effect2 = fromInput input2 >-> pipe2
 let effect3 = P.stdinLn >-> toOutput (output1 <> output2)
 ...

Действительно ли необходима эта косвенность через почтовые ящики? Можем ли мы вместо этого написать что-то вроде этого?

main = do
 let effect3 = P.stdinLn >-> (pipe1 <> pipe2)
 ...

Вышеприведенное не компилируется, потому что Pipe не имеет экземпляра Monoid. Есть ли для этого веская причина? Действительно ли первый метод самый чистый способ разбить трубу?


person emchristiansen    schedule 03.11.2013    source источник


Ответы (1)


Есть два способа сделать это без использования параллелизма, оба с оговорками.

Первый способ заключается в том, что если pipe1 и pipe2 - это просто простые Consumer, которые зацикливаются навсегда, например:

p1 = for cat f  -- i.e. p1 = forever $ await >>= f
p2 = for cat g  -- i.e. p2 = forever $ await >>= g

... тогда простой способ решить эту проблему - просто написать:

for P.stdinLn $ \str -> do
    f str
    g str

Например, если p1 просто print выполняет каждое значение:

p1 = for cat (lift . print)

... и p2 просто записывает это значение в дескриптор:

p2 = for cat (lift . hPutStrLn h)

... тогда вы бы объединили их так:

for P.stdinLn $ \str -> do
    lift $ print str
    lift $ hPutStrLn h str

Однако это упрощение работает только для Consumer, которые тривиально зацикливаются. Есть еще одно решение, более общее, которое заключается в определении экземпляра ArrowChoice для каналов. Я считаю, что Pipe, основанные на вытягивании, не допускают правильного законопослушного экземпляра, но Pipe, основанные на push, позволяют:

newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }

instance (Monad m) => Category (Edge m r) where
    id = Edge push
    (Edge p2) . (Edge p1) = Edge (p1 >~> p2)

instance (Monad m) => Arrow (Edge m r) where
    arr f = Edge (push />/ respond . f)
    first (Edge p) = Edge $ \(b, d) ->
        evalStateP d $ (up \>\ unsafeHoist lift . p />/ dn) b
      where
        up () = do
            (b, d) <- request ()
            lift $ put d
            return b
        dn c = do
            d <- lift get
            respond (c, d)

instance (Monad m) => ArrowChoice (Edge m r) where
    left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
      where
          bef x = case x of
              Left b -> return b
              Right d -> do
                  _ <- respond (Right d)
                  x2 <- request ()
                  bef x2
          up () = do
              x <- request ()
              bef x
          dn c = respond (Left c)

Для этого требуется новый тип, чтобы параметры типа находились в порядке, ожидаемом ArrowChoice.

Если вы не знакомы с термином Pipe, основанным на проталкивании, это в основном Pipe, который начинается с самого верхнего канала, а не с самого нижнего канала, и все они имеют следующую форму:

a -> Pipe a b m r

Думайте об этом как о Pipe, который не может «идти», пока не получит хотя бы одно значение от восходящего потока.

Эти Pipe на основе push являются "двойными" по сравнению с обычными Pipe на основе pull, дополненными собственным оператором композиции и идентификатором:

(>~>) :: (Monad m)
      => (a -> Pipe a b m r)
      -> (b -> Pipe b c m r)
      -> (a -> Pipe a c m r)

push  :: (Monad m)
      ->  a -> Pipe a a m r

... но однонаправленный Pipes API не экспортирует это по умолчанию. Вы можете получить эти операторы только из Pipes.Core (и вы можете более внимательно изучить этот модуль, чтобы понять, как они работают). Этот модуль показывает, что и Pipe на основе push, и Pipe на основе pull являются частными случаями более общих двунаправленных версий, и понимание двунаправленного случая позволяет понять, почему они дублируют друг друга.

Если у вас есть экземпляр Arrow для push-каналов, вы можете написать что-то вроде:

p >>> bifurcate >>> (p1 +++ p2)
  where
    bifurcate = Edge $ pull ~> \a -> do
        yield (Left  a)  -- First give `p1` the value
        yield (Right a)  -- Then give `p2` the value

Затем вы должны использовать runEdge, чтобы преобразовать это в канал на основе вытягивания, когда вы закончите.

У этого подхода есть один существенный недостаток, заключающийся в том, что вы не можете автоматически обновить канал, основанный на вытягивании, на канал, основанный на проталкивании (но обычно нетрудно понять, как сделать это вручную). Например, чтобы обновить Pipes.Prelude.map до Pipe, основанного на push-уведомлениях, вы должны написать:

mapPush :: (Monad m) => (a -> b) -> (a -> Pipe a b m r)
mapPush f a = do
    yield (f a)
    Pipes.Prelude.map f

Тогда это правильный тип, который нужно обернуть в Arrow:

mapEdge :: (Monad m) => (a -> b) -> Edge m r a b
mapEdge f = Edge (mapPush f)

Конечно, еще проще было бы просто написать его с нуля:

mapEdge f = Edge $ push ~> yield . f

Используйте тот подход, который подходит вам лучше всего.

На самом деле, я придумал экземпляры Arrow и ArrowChoice именно потому, что пытался ответить на тот же вопрос, что и вы: как вы решаете такого рода проблемы без использования параллелизма? Я написал длинный ответ на эту более общую тему в другом ответе о переполнении стека здесь, где я описываю, как вы можете использовать эти экземпляры Arrow и ArrowChoice для преобразования параллельных систем в эквивалентные чистые.

person Gabriel Gonzalez    schedule 04.11.2013