Декодирование потока JSON, где одни значения нужны раньше других

Допустим, у нас есть такой объект JSON (со строкой байтов, закодированной в base64):

TaggedImage = TaggedImage {  id :: Text, image :: ByteString }

Теперь мы хотим получить image из источника и сохранить его в месте, используя информацию в теге id. Итак, это означает, что id должен анализироваться заранее (чтобы определить местоположение изображения), а image анализируется в потоковом режиме. Это просто сделать?

Я планирую использовать pipes-aeson, aws (для хранения S3) и pipes для выполнения потокового декодирования от производителя Websocket с ведром S3 в качестве потребителя (которое нельзя создать, пока мы не проанализируем id, чтобы определить местоположение ведра S3). Глядя на decoded, я не могу понять, могу ли я действительно сделать то, о чем просил выше. Это моя первая попытка потоковой передачи в JSON и каналах. Таким образом, помощь будет очень признательна.

Простой пример с чтением и записью в файловую систему также подойдет для замены Websocket producer и S3 consumer.

Дополнение

Поскольку пары ключ-значение JSON неупорядочены в соответствии с RFC, а массивы упорядочены, image данные могут предшествовать id, кажется, для типа данных, который я определил выше. Таким образом, это также может помочь изменить его на массив JSON (кортеж в Haskell, который, похоже, aeson TH преобразует в упорядоченный массив). Пожалуйста, не стесняйтесь изменять определение типа данных, если это необходимо, чтобы наложить порядок декодирования. Например, тип данных может быть изменен на:

TaggedImage = TaggedImage (Text,ByteString)

person Sal    schedule 28.05.2016    source источник
comment
Я предполагаю, что вы хотите передать изображение прямо на диск?   -  person ErikR    schedule 28.05.2016
comment
@ErikR, да, для примера это должно подойти. Расположение на диске зависит от id.   -  person Sal    schedule 28.05.2016
comment
Я не думаю, что decoded может транслировать напрямую на диск. Каждое проанализированное значение JSON верхнего уровня анализируется полностью, прежде чем оно будет передано вам.   -  person ErikR    schedule 28.05.2016
comment
@ErikR, да, подпись decoded показывает то же самое. Будет ли ручной парсер работать вместо decoded? Например, создать поток из строки байтов и передать его непосредственно в pipes после ручного разбора id в упорядоченном массиве JSON? Не уверен, что aeson позволяет это.   -  person Sal    schedule 28.05.2016
comment
Вам нужно будет написать свой собственный парсер JSON. Было бы проще реализовать простой линейный протокол с двумя командами: Id ... и chunk .... Когда вы видите строку id ..., начинается новый файл. Когда вы видите строку chunk ..., это означает, что этот закодированный фрагмент (base64) нужно записать в текущий файл. Возможно, пустая строка chunk означает конец файла.   -  person ErikR    schedule 28.05.2016


Ответы (1)


Я полагаю, что вы не сможете повторно использовать библиотеку pipes-aeson, потому что она не предоставляет способа потоковой передачи по вложенному полю декодированной записи JSON и не поддерживает курсороподобную навигацию по структуре. Это означает, что вам нужно будет проанализировать скелет записи JSON вручную.

Кроме того, необходимо проделать некоторую работу, чтобы обернуть base64-bytestring в pipes-подобный API с этим типом:

-- Convert a base64-encoded stream to a raw byte stream
decodeBase64
    :: Producer ByteString m r
    -- ^ Base64-encoded bytes
    -> Producer ByteString m (Either SomeException (Producer ByteString m r)) 
    -- ^ Raw bytes

Обратите внимание, что результат возвращает Producer для оставшейся части строки байтов (т. е. всего после байтов, закодированных в base64), если декодирование завершается успешно. Это позволяет возобновить синтаксический анализ там, где заканчиваются байты изображения.

Однако если предположить, что у вас есть функция decodeBase64, то грубая схема того, как будет работать код, будет состоять из трех частей:

  • Разобрать префикс записи перед байтами изображения с помощью парсера binary, адаптированного к pipes
  • Используйте функцию decodeBase64 для потоковой передачи байтов декодированного изображения.
  • Разобрать суффикс записи после байтов изображения также с помощью парсера binary, адаптированного к pipes

Другими словами, типы и реализация будут выглядеть примерно так:

-- This would match the "{ 'id' : 'foo', 'image' : '" prefix of the JSON record
skipPrefix :: Data.Binary.Get ()

skipPrefix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipPrefix’ = execStateT (Pipes.Binary.decodeGet skipPrefix)

— This would match the "' }" suffix of the JSON record
skipSuffix :: Data.Binary.Get ()

skipSuffix’ :: Monad m => Producer ByteString m r -> m (Either DecodingError (Producer ByteString m r))
skipSuffix’ = execStateT (Pipes.Binary.decodeGet skipSuffix)

streamImage
    ::  Monad m
    =>  Producer ByteString m r
    ->  Producer ByteString m (Either SomeException (Producer ByteString m r))
streamImage p0 = do
    e0 <- lift (skipPrefix’ p0)
    case e0 of
        Left exc -> return (Left (toException exc))
        Right p1 -> do
            e1 <- decodeBase64 p1
            case e1 of
                Left exc -> return (Left exc)
                Right p2 -> do
                    e2 <- lift (skipSuffix’ p2)
                    case e2 of
                        Left exc -> return (Left (toException exc))
                        Right p3 -> return (Right p3)

Другими словами, streamImage будет принимать Producer в качестве входных данных, начинающихся с первого символа записи JSON, и будет передавать байты декодированного изображения, извлеченные из этой записи. Если декодирование прошло успешно, он вернет оставшуюся часть потока байтов сразу после записи JSON.

person Gabriel Gonzalez    schedule 29.05.2016