Как сделать Pipe параллельной с библиотекой Haskell Pipe?

У меня есть код Haskell, использующий Pipes:

module Main(main) where
import Pipes

a :: Producer Int IO ()
a = each [1..10]

b :: Pipe Int Int IO ()
b = do
  x <- await
  yield (x*2)
  b

c :: Consumer Int IO ()
c = do
  x <- await
  lift $ print x
  c

main :: IO ()
main = runEffect $ a >-> b >-> c

Pipes.Concurrent учебник демонстрирует использование нескольких работников наряду с кражей работы. Как я могу сделать что-то подобное внутри b? Я хотел бы, чтобы b выполнял свою работу одновременно, используя установленное количество рабочих.

Очевидно, параллелизм бесполезен именно в этом случае, но это самый простой пример, который я смог придумать. В моем реальном случае использования я хотел бы делать несколько веб-запросов одновременно, используя ограниченное количество рабочих.


person Buttons840    schedule 01.01.2017    source источник
comment
hackage.haskell.org/ package/pipes-async-0.1.1/docs/src/ похоже, что его можно настроить так, чтобы несколько заданных каналов загружались в TQueue, поэтому мы получаем [Pipe a b] -> Pipe a b. Я разместил запрос функции... час назад ›:D   -  person Gurkenglas    schedule 01.01.2017


Ответы (1)


РЕДАКТИРОВАТЬ: я неправильно понял, о чем вы спрашивали; Возможно, вы сможете сделать это внутри трубы, но я не совсем уверен, какова будет мотивация. Я бы рекомендовал создавать многоразовые цепочки каналов и просто отправлять их с помощью рабочих, а не пытаться создавать рабочих ВНУТРИ канала. Вы потеряете любые гарантии упорядочения, что первый пришел первым, если вы встроите его в саму трубу.

Раздел по Работа Воровство — это то, что вы ищете, этот код в основном дословно взят из руководства, но давайте разберем, как это работает. Вот один из способов сделать то, что вы хотите:

module Main(main) where
import Pipes
import Pipes.Concurrent

import Control.Concurrent.Async (async, wait)
import Control.Concurrent (threadDelay)
import Control.Monad (forM)

a :: Producer Int IO ()
a = each [1..10]

b :: Pipe Int Int IO ()
b = do
  x <- await
  yield (x*2)
  b

c :: Consumer Int IO ()
c = do
  x <- await
  lift $ print x
  c

main :: IO ()
main = do
  (output, input) <- spawn unbounded
  feeder <- async $ do runEffect $ a >-> toOutput output
                       performGC

  workers <- forM [1..3] $ \i ->
    async $ do runEffect $ fromInput input  >-> b >-> c
               performGC

  mapM_ wait (feeder:workers)

Первая строка spawn unbounded из Pipes.Concurrent, она инициализирует «почтовый ящик», который имеет дескриптор для ввода и вывода. Меня это сначала смутило, но в данном случае мы посылаем сообщения НА выход и тянем их СО входа. Это напоминает канал push-pull сообщений в таких языках, как golang.

Мы указываем буфер чтобы сказать, сколько сообщений мы можем хранить, в этом случае мы устанавливаем без ограничений с неограниченным.

Итак, почтовый ящик инициализирован, теперь мы можем создать Effect, которые будут отправлять в него сообщения. Каналы почтовых ящиков реализованы с использованием STM., поэтому он может собирать сообщения асинхронно.

Давайте создадим асинхронное задание, которое загружает данные в почтовый ящик;

feeder <- async $ do runEffect $ a >-> toOutput output
                     performGC

a >-> toOutput output — это обычная композиция канала, нам нужно toOutput для преобразования вывода в канал. Обратите внимание на вызов performGC, который также является частью ввода-вывода, он позволяет Pipes.Concurrent знать, что нужно выполнить очистку после завершения задания. Мы могли бы запустить это, используя forkIO, если хотим, но в этом случае мы используем async, чтобы мы могли дождаться завершения результата позже. Итак, наш почтовый ящик должен асинхронно получать сообщения, давайте вытащим их и немного поработаем.

workers <- forM [1..3] $ \i ->
  async $ do runEffect $ fromInput input  >-> b >-> c
             performGC

Та же идея, что и раньше, но на этот раз мы просто создаем несколько из них. Мы читаем из ввода, как обычный канал, используя fromInput, а затем пропускаем его через остальную часть нашей цепочки, очищая, когда мы закончим. input гарантирует, что каждый раз, когда значение извлекается, его получает только один рабочий процесс. Когда все задания, поступающие в output, завершатся (он отслеживает все открытые задания), он закроет канал input, и рабочие закончат работу.

Если вы используете это в сценарии веб-воркера, у вас будет основной цикл, который продолжает отправлять запросы на канал toOutput output, а затем порождает столько рабочих, сколько вам нужно, которые подключаются к своему конвейеру из fromInput input.

person Chris Penner    schedule 03.01.2017