Я должен предварить это, сказав, что я очень новичок в Haskell и библиотеке каналов, и я хотел бы понять, что вызывает высокое использование памяти этой программой в функции test
.
В частности, в сгибе, который создает значение r1
в test
, я вижу накопление значений MyRecord до тех пор, пока не будет получен окончательный результат, если только не используется deepseq
. В моем тестовом наборе данных ~ 500 000 строк / ~ 230 МБ использование памяти превышает 1,5 ГБ.
Свертка, производящая значение r2
, выполняется в постоянной памяти.
Что я хотел бы понять:
1) Что может быть причиной построения значений MyMemory в первом фолде и почему использование deepseq
исправит это? Я очень сильно бросал вещи наугад, пока не пришел к использованию deepseq
для достижения постоянного использования памяти, но хотел бы понять, почему это работает. Можно ли добиться постоянного использования памяти без использования deepseq
, при этом производя результат того же типа, что и Maybe Int?
2). Чем отличается вторая складка, из-за которой не возникает та же проблема?
Я знаю, что если бы мне пришлось работать только с целыми числами вместо кортежей, я мог бы использовать встроенную функцию sum
из Pipes.Prelude, но в конечном итоге я захочу обработать второй элемент, который содержит какие-либо ошибки синтаксического анализа.
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Test where
import Control.Arrow
import Control.DeepSeq
import Control.Monad
import Data.Aeson
import Data.Function
import Data.Maybe
import Data.Monoid
import Data.Text (Text)
import Pipes
import qualified Pipes.Aeson as PA (DecodingError(..))
import qualified Pipes.Aeson.Unchecked as PA
import qualified Pipes.ByteString as PB
import qualified Pipes.Group as PG
import qualified Pipes.Parse as PP
import qualified Pipes.Prelude as P
import System.IO
import Control.Lens
import qualified Control.Foldl as Fold
data MyRecord = MyRecord
{ myRecordField1 :: !Text
, myRecordField2 :: !Int
, myRecordField3 :: !Text
, myRecordField4 :: !Text
, myRecordField5 :: !Text
, myRecordField6 :: !Text
, myRecordField7 :: !Text
, myRecordField8 :: !Text
, myRecordField9 :: !Text
, myRecordField10 :: !Int
, myRecordField11 :: !Text
, myRecordField12 :: !Text
, myRecordField13 :: !Text
} deriving (Eq, Show)
instance FromJSON MyRecord where
parseJSON (Object o) =
MyRecord <$> o .: "field1" <*> o .: "field2" <*> o .: "field3" <*>
o .: "field4" <*>
o .: "field5" <*>
o .: "filed6" <*>
o .: "field7" <*>
o .: "field8" <*>
o .: "field9" <*>
(read <$> o .: "field10") <*>
o .: "field11" <*>
o .: "field12" <*>
o .: "field13"
parseJSON x = fail $ "MyRecord: expected Object, got: " <> show x
instance ToJSON MyRecord where
toJSON _ = undefined
test :: IO ()
test = do
withFile "some-file" ReadMode $ \hIn
{-
the pipeline is composed as follows:
1 a producer reading a file with Pipes.ByteString, splitting chunks into lines,
and parsing the lines as JSON to produce tuples of (Maybe MyRecord, Maybe
ByteString), the second element being an error if parsing failed
2 a pipe filtering that tuple on a field of Maybe MyRecord, passing matching
(Maybe MyRecord, Maybe ByteString) downstream
3 and a pipe that picks an Int field out of Maybe MyRecord, passing (Maybe Int,
Maybe ByteString downstream)
pipeline == 1 >-> 2 >-> 3
memory profiling indicates the memory build up is due to accumulation of
MyRecord "objects", and data types comprising their fields (mainly
Text/ARR_WORDS)
-}
-> do
let pipeline = f1 hIn >-> f2 >-> f3
-- need to use deepseq to avoid leaking memory
r1 <-
P.fold
(\acc (v, _) -> (+) <$> acc `deepseq` acc <*> pure (fromMaybe 0 v))
(Just 0)
id
(pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
print r1
hSeek hIn AbsoluteSeek 0
-- this works just fine as is and streams in constant memory
r2 <-
P.fold
(\acc v ->
case fst v of
Just x -> acc + x
Nothing -> acc)
0
id
(pipeline :: Producer (Maybe Int, Maybe PB.ByteString) IO ())
print r2
return ()
return ()
f1
:: (FromJSON a, MonadIO m)
=> Handle -> Producer (Maybe a, Maybe PB.ByteString) m ()
f1 hIn = PB.fromHandle hIn & asLines & resumingParser PA.decode
f2
:: Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) IO r
f2 = filterRecords (("some value" ==) . myRecordField5)
f3 :: Pipe (Maybe MyRecord, d) (Maybe Int, d) IO r
f3 = P.map (first (fmap myRecordField10))
filterRecords
:: Monad m
=> (MyRecord -> Bool)
-> Pipe (Maybe MyRecord, Maybe PB.ByteString) (Maybe MyRecord, Maybe PB.ByteString) m r
filterRecords predicate =
for cat $ \(l, e) ->
when (isNothing l || (predicate <$> l) == Just True) $ yield (l, e)
asLines
:: Monad m
=> Producer PB.ByteString m x -> Producer PB.ByteString m x
asLines p = Fold.purely PG.folds Fold.mconcat (view PB.lines p)
parseRecords
:: (Monad m, FromJSON a, ToJSON a)
=> Producer PB.ByteString m r
-> Producer a m (Either (PA.DecodingError, Producer PB.ByteString m r) r)
parseRecords = view PA.decoded
resumingParser
:: Monad m
=> PP.StateT (Producer a m r) m (Maybe (Either e b))
-> Producer a m r
-> Producer (Maybe b, Maybe a) m ()
resumingParser parser p = do
(x, p') <- lift $ PP.runStateT parser p
case x of
Nothing -> return ()
Just (Left _) -> do
(x', p'') <- lift $ PP.runStateT PP.draw p'
yield (Nothing, x')
resumingParser parser p''
Just (Right b) -> do
yield (Just b, Nothing)
resumingParser parser p'
seq (Just undefined) = ()
ноseq (undefined :: Int) () = undefined
- person user2407038   schedule 06.09.2016forceMaybe Nothing = Nothing; forceMaybe x@(Just !_) = x
. - person dfeuer   schedule 06.09.2016P.sum (pipeline >-> P.map (fromMaybe 0 . fst))
. - person Gurkenglas   schedule 07.09.2016