Объединить каналы в один

Я ищу функцию, которая может сделать что-то похожее на:

merge :: MonadIO m => [Producer m a] -> Producer m a

Я бегло взглянул на stm-conduit, он выглядит похоже, но я не уверен, что он соответствует моим требованиям:

messagesSource :: MonadIO m => AmqpConn -> Ack -> Text -> Producer m (Message, Envelope)
messagesSource conn ack q = loop
  where
    loop = do
      mmsg <- liftIO $ getMsg chan ack q
      case mmsg of
        Just (m, e) -> do
          yield (m, e)
          liftIO $ ackMsg chan (envDeliveryTag e) False
          loop
        Nothing     -> loop
    chan = fst $ amqpChan conn

Как видите, этот производитель каналов подтверждает сообщение после его передачи. В простом «однопоточном» конвейере это работает хорошо, сообщение попадает в приемник, а затем подтверждается.

Однако с stm-conduit это может измениться, потому что, насколько я понимаю, производитель не будет ждать, пока сообщение будет использовано приемником, вместо этого они будут работать параллельно, и сообщение может быть подтверждено преждевременно.

Правильно ли я понимаю stm-conduit?
И как можно объединить отдельные источники в один, чтобы получить хорошую семантику одного потока?

ОБНОВЛЕНИЕ: код обновлен до реального рабочего примера AMQP в соответствии с запросом (однако он может быть немного более шумным).

ОБНОВЛЕНИЕ 2: я думаю, что то, что мне нужно, может быть альтернативным экземпляром для источников каналов, поэтому я мог бы сделать что-то вроде let src = src1 <|> src2. Можно как-нибудь?


person Alexey Raga    schedule 17.02.2016    source источник
comment
Не могли бы вы опубликовать автономный пример? Я не видел, откуда взялся тип Queue. Это TBMQueue?   -  person zakyggaps    schedule 17.02.2016
comment
Это может быть очередь amqp или раздел kafka. Я не думаю, что это имеет большое значение, но я обновлю свой вопрос одним из этих примеров.   -  person Alexey Raga    schedule 17.02.2016


Ответы (2)


mergeSources в stm-conduit поддерживает TBMChannel за кулисами. Все ваши Источники/Продюсеры сначала подключаются к TBMChannel, затем он создаст один Источник, который попытается извлечь значения из канала FIFO.

Вы можете установить границу промежуточного TBMChannel при использовании mergeSources. Допустим, вы установили границу на n, тогда первые n значений, созданных всеми источниками, будут немедленно сброшены в TBMChannel и AmqpConn, при условии, что он не заблокирован в конце AmqpConn, и ваш потребитель медленнее, чем источники (кстати, AmqpConn использует неограниченный Control.Concurrent.Chan, поэтому он не будет блокироваться). После этого TBMChannel заполняется, поэтому все источники, пытающиеся передать значение каналу, блокируются. Ваш потребитель берет значение одно за другим из комбинированного источника, поэтому оно последовательно после первых n элементов.

Чтобы убедиться, что это последовательно с самого начала, вы можете установить границу на 1, однако это может вызвать некоторые проблемы с производительностью.

person zakyggaps    schedule 17.02.2016
comment
Это в значительной степени то, что я сейчас использую, но здесь есть интересный пограничный случай. Скажем, я хочу взять только 3 элемента из потока результатов, поэтому я использую комбинатор =$= take 3. После этого мой поток завершается и мои ресурсы (соединение, канал) удаляются. Но поскольку фактические источники являются асинхронными, они понятия не имеют об этом и все равно пытаются потреблять больше сообщений и помещать их в TMChan. И когда они это делают, я получаю исключение. Есть идеи, как с этим справиться? - person Alexey Raga; 20.02.2016
comment
@AlexeyRaga Я не думаю, что можно справиться с этим пограничным случаем только с функциональностью stm-conduit. Промежуточный канал, созданный mergeSources, закрывается только тогда, когда закрыты все восходящие источники. Я думаю, вам, возможно, придется упаковать некоторый код финализации в исходный код. Я когда-то думал, что методы в другом вопросе будут работать, но теперь я сомневаюсь, что это действительно работает. - person zakyggaps; 20.02.2016

Взгляните на ZipSource. , который представляет собой оболочку нового типа, Applicative которой позволяет вам комбинировать Source так, как вы хотите.

Получив ZipSource, вы можете использовать zipSources для объединения Source в Traversable (например, списка) в Source из Traversable.

Единственное отличие от желаемого типа результата состоит в том, что это Source над Traversable значений, а не просто одно значение, но это не должно быть большой проблемой.

person Will Sewell    schedule 17.02.2016
comment
Не будет ли он по-прежнему ждать, пока все источники излучат, прежде чем создавать следующее значение? Например, если очередь a имеет 1000 msg/sec, а b имеет 1 msg/sec, я все равно хочу, чтобы источник результатов выдавал 1000 a и 1 b и так далее. - person Alexey Raga; 17.02.2016
comment
В этом случае да, это не будет применимо, потому что он будет производить вывод только после того, как все Source сгенерируют значение. Я предлагаю вам прочитать ответы на этот вопрос, который, похоже, является той же проблемой, что и у вас есть. Обратите внимание, что это похоже на то, для чего кабелепровод не очень подходит. - person Will Sewell; 17.02.2016