РЕДАКТИРОВАТЬ: я неправильно понял, о чем вы спрашивали; Возможно, вы сможете сделать это внутри трубы, но я не совсем уверен, какова будет мотивация. Я бы рекомендовал создавать многоразовые цепочки каналов и просто отправлять их с помощью рабочих, а не пытаться создавать рабочих ВНУТРИ канала. Вы потеряете любые гарантии упорядочения, что первый пришел первым, если вы встроите его в саму трубу.
Раздел по Работа Воровство — это то, что вы ищете, этот код в основном дословно взят из руководства, но давайте разберем, как это работает. Вот один из способов сделать то, что вы хотите:
module Main(main) where
import Pipes
import Pipes.Concurrent
import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)
a :: Producer Int IO ()
a = each [1..10]
b :: Pipe Int Int IO ()
b = do
x <- await
yield (x*2)
b
c :: Consumer Int IO ()
c = do
x <- await
lift $ print x
c
main :: IO ()
main = do
(output, input) <- spawn unbounded
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
mapM_ wait (feeder:workers)
Первая строка spawn unbounded
из Pipes.Concurrent, она инициализирует «почтовый ящик», который имеет дескриптор для ввода и вывода. Меня это сначала смутило, но в данном случае мы посылаем сообщения НА выход и тянем их СО входа. Это напоминает канал push-pull сообщений в таких языках, как golang.
Мы указываем буфер чтобы сказать, сколько сообщений мы можем хранить, в этом случае мы устанавливаем без ограничений с неограниченным.
Итак, почтовый ящик инициализирован, теперь мы можем создать Effect
, которые будут отправлять в него сообщения. Каналы почтовых ящиков реализованы с использованием STM., поэтому он может собирать сообщения асинхронно.
Давайте создадим асинхронное задание, которое загружает данные в почтовый ящик;
feeder <- async $ do runEffect $ a >-> toOutput output
performGC
a >-> toOutput output
— это обычная композиция канала, нам нужно toOutput
для преобразования вывода в канал. Обратите внимание на вызов performGC
, который также является частью ввода-вывода, он позволяет Pipes.Concurrent знать, что нужно выполнить очистку после завершения задания. Мы могли бы запустить это, используя forkIO
, если хотим, но в этом случае мы используем async
, чтобы мы могли дождаться завершения результата позже. Итак, наш почтовый ящик должен асинхронно получать сообщения, давайте вытащим их и немного поработаем.
workers <- forM [1..3] $ \i ->
async $ do runEffect $ fromInput input >-> b >-> c
performGC
Та же идея, что и раньше, но на этот раз мы просто создаем несколько из них. Мы читаем из ввода, как обычный канал, используя fromInput
, а затем пропускаем его через остальную часть нашей цепочки, очищая, когда мы закончим. input
гарантирует, что каждый раз, когда значение извлекается, его получает только один рабочий процесс. Когда все задания, поступающие в output
, завершатся (он отслеживает все открытые задания), он закроет канал input
, и рабочие закончат работу.
Если вы используете это в сценарии веб-воркера, у вас будет основной цикл, который продолжает отправлять запросы на канал toOutput output
, а затем порождает столько рабочих, сколько вам нужно, которые подключаются к своему конвейеру из fromInput input
.
person
Chris Penner
schedule
03.01.2017
[Pipe a b] -> Pipe a b
. Я разместил запрос функции... час назад ›:D - person Gurkenglas   schedule 01.01.2017