Есть два способа сделать это без использования параллелизма, оба с оговорками.
Первый способ заключается в том, что если pipe1
и pipe2
- это просто простые Consumer
, которые зацикливаются навсегда, например:
p1 = for cat f -- i.e. p1 = forever $ await >>= f
p2 = for cat g -- i.e. p2 = forever $ await >>= g
... тогда простой способ решить эту проблему - просто написать:
for P.stdinLn $ \str -> do
f str
g str
Например, если p1
просто print
выполняет каждое значение:
p1 = for cat (lift . print)
... и p2
просто записывает это значение в дескриптор:
p2 = for cat (lift . hPutStrLn h)
... тогда вы бы объединили их так:
for P.stdinLn $ \str -> do
lift $ print str
lift $ hPutStrLn h str
Однако это упрощение работает только для Consumer
, которые тривиально зацикливаются. Есть еще одно решение, более общее, которое заключается в определении экземпляра ArrowChoice
для каналов. Я считаю, что Pipe
, основанные на вытягивании, не допускают правильного законопослушного экземпляра, но Pipe
, основанные на push, позволяют:
newtype Edge m r a b = Edge { unEdge :: a -> Pipe a b m r }
instance (Monad m) => Category (Edge m r) where
id = Edge push
(Edge p2) . (Edge p1) = Edge (p1 >~> p2)
instance (Monad m) => Arrow (Edge m r) where
arr f = Edge (push />/ respond . f)
first (Edge p) = Edge $ \(b, d) ->
evalStateP d $ (up \>\ unsafeHoist lift . p />/ dn) b
where
up () = do
(b, d) <- request ()
lift $ put d
return b
dn c = do
d <- lift get
respond (c, d)
instance (Monad m) => ArrowChoice (Edge m r) where
left (Edge k) = Edge (bef >=> (up \>\ (k />/ dn)))
where
bef x = case x of
Left b -> return b
Right d -> do
_ <- respond (Right d)
x2 <- request ()
bef x2
up () = do
x <- request ()
bef x
dn c = respond (Left c)
Для этого требуется новый тип, чтобы параметры типа находились в порядке, ожидаемом ArrowChoice
.
Если вы не знакомы с термином Pipe
, основанным на проталкивании, это в основном Pipe
, который начинается с самого верхнего канала, а не с самого нижнего канала, и все они имеют следующую форму:
a -> Pipe a b m r
Думайте об этом как о Pipe
, который не может «идти», пока не получит хотя бы одно значение от восходящего потока.
Эти Pipe
на основе push являются "двойными" по сравнению с обычными Pipe
на основе pull, дополненными собственным оператором композиции и идентификатором:
(>~>) :: (Monad m)
=> (a -> Pipe a b m r)
-> (b -> Pipe b c m r)
-> (a -> Pipe a c m r)
push :: (Monad m)
-> a -> Pipe a a m r
... но однонаправленный Pipes
API не экспортирует это по умолчанию. Вы можете получить эти операторы только из Pipes.Core
(и вы можете более внимательно изучить этот модуль, чтобы понять, как они работают). Этот модуль показывает, что и Pipe
на основе push, и Pipe
на основе pull являются частными случаями более общих двунаправленных версий, и понимание двунаправленного случая позволяет понять, почему они дублируют друг друга.
Если у вас есть экземпляр Arrow
для push-каналов, вы можете написать что-то вроде:
p >>> bifurcate >>> (p1 +++ p2)
where
bifurcate = Edge $ pull ~> \a -> do
yield (Left a) -- First give `p1` the value
yield (Right a) -- Then give `p2` the value
Затем вы должны использовать runEdge
, чтобы преобразовать это в канал на основе вытягивания, когда вы закончите.
У этого подхода есть один существенный недостаток, заключающийся в том, что вы не можете автоматически обновить канал, основанный на вытягивании, на канал, основанный на проталкивании (но обычно нетрудно понять, как сделать это вручную). Например, чтобы обновить Pipes.Prelude.map
до Pipe
, основанного на push-уведомлениях, вы должны написать:
mapPush :: (Monad m) => (a -> b) -> (a -> Pipe a b m r)
mapPush f a = do
yield (f a)
Pipes.Prelude.map f
Тогда это правильный тип, который нужно обернуть в Arrow
:
mapEdge :: (Monad m) => (a -> b) -> Edge m r a b
mapEdge f = Edge (mapPush f)
Конечно, еще проще было бы просто написать его с нуля:
mapEdge f = Edge $ push ~> yield . f
Используйте тот подход, который подходит вам лучше всего.
На самом деле, я придумал экземпляры Arrow
и ArrowChoice
именно потому, что пытался ответить на тот же вопрос, что и вы: как вы решаете такого рода проблемы без использования параллелизма? Я написал длинный ответ на эту более общую тему в другом ответе о переполнении стека здесь, где я описываю, как вы можете использовать эти экземпляры Arrow
и ArrowChoice
для преобразования параллельных систем в эквивалентные чистые.
person
Gabriel Gonzalez
schedule
04.11.2013