Вопросы параллелизма между конвейерным и неконвейерным кодом

Я нахожусь в процессе создания библиотеки C для некоторой кодировки в интерфейсе каналов, но я натолкнулся на некоторые дизайнерские решения, которые необходимо принять.

После того, как библиотека C настроена, мы сохраняем контекст кодировщика. При этом мы можем либо закодировать, либо изменить некоторые параметры (давайте вызовем интерфейс Haskell для этой последней функции tune :: Context -> Int -> IO ()). В моем вопросе две части:

  1. Часть кодирования легко завернуть в Pipe Foo Bar IO (), но я также хотел бы раскрыть tune. Поскольку одновременное использование контекста кодирования должно быть защищено блокировкой, мне нужно будет блокировать каждую итерацию в канале и защищать tune, взяв ту же блокировку. Но теперь я чувствую, что навязываю пользователю скрытые блокировки. Я лаю не на то дерево? Как обычно разрешается подобная ситуация в экосистеме труб? В моем случае я ожидаю, что канал, частью которого является мой конкретный код, всегда будет выполняться в своем собственном потоке с одновременной настройкой, но я не хочу навязывать эту точку зрения каким-либо пользователям. Другие пакеты в экосистеме каналов, похоже, тоже не принуждают своих пользователей.
  2. Контекст кодирования, который больше не используется, необходимо правильно деинициализировать. Как в экосистеме труб гарантировать, что такие вещи (в данном случае выполнение некоторых IO действий) будут выполнены, когда труба будет уничтожена?

Конкретным примером может быть упаковка библиотеки сжатия, и в этом случае вышеприведенное может быть:

  1. Сила сжатия регулируется. Ставим трубу, и она весело бежит. Как лучше всего разрешить изменение параметра силы сжатия, пока канал продолжает работать, предполагая, что параллельный доступ к контексту кодека сжатия должен быть сериализован?
  2. Библиотека сжатия выделила кучу памяти из кучи Haskell при настройке, и нам нужно будет вызвать какую-то библиотечную функцию, чтобы очистить это, когда канал будет разорван.

Спасибо… это может быть все очевидно, но я новичок в экосистеме трубок.

Редактировать: Читая это после публикации, я совершенно уверен, что это самый расплывчатый вопрос, который я когда-либо задавал здесь. Фу! Извини ;-)


person gspr    schedule 03.09.2014    source источник
comment
Вы проверили библиотеку pipes-concurrency? В нем есть учебный модуль Которые могут оказаться полезными. Я дам этот ответ поближе сегодня вечером.   -  person Gabriel Gonzalez    schedule 03.09.2014
comment
О, я как-то совсем упустил из виду pipe-concurrency! Спасибо, мистер. Парень с трубами! Я все еще не уверен, что это полностью покрывает мой случай, но я посмотрю на это.   -  person gspr    schedule 03.09.2014
comment
@gspr Могут ли отдельные трубы изменить параметр прочности на сжатие или они могут просто прочитать его? Должны ли все каналы иметь один и тот же параметр?   -  person danidiaz    schedule 03.09.2014
comment
@danidiaz: Отдельные каналы соответствуют отдельным кодировщикам с их собственным контекстом. Их (индивидуальные) параметры задаются действием ввода-вывода, которое зависит от контекста кодировщика (и вызывает функцию библиотеки C).   -  person gspr    schedule 04.09.2014


Ответы (1)


Что касается (1), общее решение состоит в том, чтобы изменить тип вашего Pipe на:

Pipe (Either (Context, Int) Foo) Bar IO ()

Другими словами, он принимает как Foo входных данных, так и tune запросов, которые обрабатывает внутри себя.

Итак, давайте предположим, что у вас есть два одновременных Producer, соответствующих входам и запросам на настройку:

producer1 :: Producer Foo IO ()

producer2 :: Producer (Context, Int) IO ()

Вы можете использовать pipes-concurrency для создания буфера, в который они оба загружаются, например:

example = do
    (output, input) <- spawn Unbounded
    -- input  :: Input  (Either (Context, Int) Foo)
    -- output :: Output (Either (Context, Int) Foo)

    let io1 = runEffect $ producer1 >-> Pipes.Prelude.map Right >-> toOutput output
        io2 = runEffect $ producer2 >-> Pipes.Prelude.map Left  >-> toOutput output
    as <- mapM async [io1, io2]
    runEffect (fromInput >-> yourPipe >-> someConsumer)
    mapM_ wait as

Вы можете узнать больше о библиотеке pipes-concurrency, прочитав этот урок.

Заставляя все запросы на настройку проходить через один и тот же однопоточный Pipe, вы можете гарантировать, что у вас случайно не будет двух одновременных вызовов функции tune.

Что касается (2), есть два способа получить ресурс, используя pipes. Более сложный подход заключается в использовании библиотеки pipes-safe, которая предоставляет функцию bracket, которую вы можете использовать в Pipe, но это, вероятно, излишне для ваших целей и существует только для получения и высвобождения нескольких ресурсов в течение жизненного цикла канала. Более простое решение — просто использовать следующую идиому with для получения канала:

withEncoder :: (Pipe Foo Bar IO () -> IO r) -> IO r
withEncoder k = bracket acquire release $ \resource -> do
    k (createPipeFromResource resource)

Тогда пользователь просто напишет:

withEncoder $ \yourPipe -> do
    runEffect (someProducer >-> yourPipe >-> someConsumer)

При желании вы можете использовать пакет managed, который немного упрощает типы и упрощает получение нескольких ресурсов. Вы можете узнать больше об этом, прочитав эту запись в блоге. мой.

person Gabriel Gonzalez    schedule 06.09.2014
comment
Великолепно! Вы ясно поняли мои проблемы по моему сумбурному описанию! - person gspr; 07.09.2014
comment
Это потому, что у меня была точно такая же проблема раньше! :) - person Gabriel Gonzalez; 07.09.2014