В своей предыдущей статье я обсуждал, как написать приложение для обработки данных с классическими акторами Akka.

В этой статье я хотел бы переключиться между передачами и использовать потоковую библиотеку функционального программирования FS2 для обработки журналов HTTP из файла CSV, чтобы получить номера состояний HTTP, которые есть в файле журнала.

Я обнаружил, что изучение FS2, в целом, немного сложно, особенно если учесть, что вы никогда не слышали о потоке. Поэтому, прежде чем мы начнем углубляться в кодовую базу, я хотел бы начать с определения Stream и некоторой необходимой обработки Stream в FS2.

Примечание. В приведенном ниже коде я буду использовать FS2 с типом эффекта cats.effect IO. Если вам интересно узнать больше о cats.effect IO, загляните в их документацию.

Что такое поток?

Поток, как в Википедии, означает непрерывный поток чего-либо, превращающийся из единственного источника в жидкость, воздух или газ. В области вычислений он сказал, что передает или принимает (данные) через Интернет в виде устойчивого непрерывного потока.

Простым термином, который я мог придумать, будет итератор, но на стероидах. Это означает, что вам нужно будет перебирать бесконечное количество контейнеров. В ООП вы можете думать об этом как об итераторе, который будет выполнять итерацию конечного или бесконечного количества массива. Перебирая все эти элементы, вы можете делать любые сумасшедшие вещи и взаимодействовать с внешним миром.

У потока есть много полезных функций, которых нет в List.

Во-первых, каждый из элементов в Stream оценивается лениво, что означает, что при написании кода он не вызывается с нетерпением, пока вы не дадите команду на их выполнение. Эта функция очень важна, потому что она позволяет загружать, преобразовывать и записывать большие наборы данных, не увеличивая вашу кучу.

Во-вторых, поток не изменяет данные, как это делает List. Если вы хотите сопоставить элемент A с элементом B, это не изменяет исходное значение A. Это помогает, если вы работаете с большими наборами данных и файлами в параллельной среде.

Наконец, вы можете выполнять множество операций в потоках данных, например выполнять операции ввода-вывода в промежуточном процессоре, комбинируя его с другим потоком.

Базовая операция потока

FS2 имеет тип: Stream[F[_], O]. Это означает, что вы получаете поток значений, который испускает тип O. F[_] - это тип эффекта. Тип эффекта подразумевает, что если у вас есть какое-то взаимодействие с внешним миром, запись в БД или выполнение операций ввода-вывода. Обычно это тип IO. Если у вас нет никакого эффекта, обычно используется тип Pure, который влияет только на ваш ЦП.

Вы можете составить поток, как если бы вы составляли List:

Stream(1,2,3)

Когда вы создаете Stream выше, он создает только конструктор типа Pure потока. Программа еще не запущена; вы описываете свое приложение. Это означает, что вам нужно явно выполнить его, либо преобразовав его в список, выполнив Stream(1,2,3).toList.

Создание эффективного потока

Вы можете создать Stream с эффектом в нем и выполнить его:

Stream(1,2,3).covary[IO].compile.toList.unsafeRunSync()

Приведенный выше код описывает, как вы поднимаете этот поток до типа эффекта IO. Затем вы хотите выполнить поток в тип эффекта, вызвав compile. Затем мы преобразуем эффект в List, запустив toList. К тому времени характер потока IO(List(1,2,3)). Нам все еще нужно поднять IO, запустив unsafeRunSync, чтобы получить List(1,2,3).

Безопасность ресурсов

FS2 предоставила brackets или resources для взаимодействия с файлами. Они похожи на эффект кошек IO, который может выделять ресурсы и высвобождать их впоследствии, независимо от каких-либо проблем, возникающих в распределении.

Stream.brackets {
  IO {
    new BufferedReader(new FileReader(new File("yourfile.txt")))
  }
}(f => IO(f.close()))

Приведенный выше код описывает операцию по получению ресурсов и их освобождению впоследствии.

Работа с бесконечным потоком

Чтобы создать бесконечное количество постоянного потока constant. Чтобы ограничить количество chunks или значение, которое мы хотим извлечь из источника, take:

Stream.constant(42).take(5).toList
 // res0: List(42,42,42,42,42)

Он возьмет пять элементов из бесконечного чистого потока, который испускает 42.

Параллельная обработка

Чтобы создать множественную одновременную обработку, мы можем использовать parEvalMap(nWorker:Int)(f:A => F[A1]). Он также имеет такой же синтаксис для людей, знакомых с потоком Akka mapAsync.

Он создает nWorker поток и обрабатывает поток. parEvalMap вернется в нисходящий поток по порядку. Под капотом он использует fs2.concurrent.Queue для одновременного выполнения и поддержки запроса в нисходящем направлении.

В FS2 можно использовать гораздо больше операций. Если вам интересно, ознакомьтесь с этим руководством.

Теперь мы можем пойти дальше и победить в обработке данных с помощью FS2.

Мыслительный процесс

Мы хотим получить данные из weblog.csv. Мы продезинфицировали входящие данные, чтобы отфильтровать все недопустимые IP-адреса. Затем мы хотим отправить его рабочим и обработать данные, преобразовав их в экземпляр Log. Наконец, мы подсчитываем статус HTTP в журналах и записываем его в файл out.txt.

Исполнение

Начнем сверху вниз. Вот основная функция:

object Main extends IOApp with Processor {

  def run(args: List[String]): IO[ExitCode] = {

    processData("/weblog.csv", "/out.txt", 100).compile.drain.map(_ => ExitCode.Success)
  }
}

processData предоставит source файл /weblog.csv и sink файл /out.txt с количеством параллелизмов, которые он сгенерирует. Затем, после описания функции processData, нам нужно будет выполнить ее, вызвав compile.drain.

drain удаляет все значения из потока и просто выполняет его. Как только все суммы будут отправлены из потока, в результате мы получим один Unit, потому что мы будем записывать его в функцию out.txt внутри processData.

Мы будем использовать Stream.resource для приобретения и высвобождения ресурсов:

Stream.resource(Blocker[IO]).flatMap {  blocker =>
  val inResource = getClass.getResource(in)
  val outResource = getClass.getResource(out)
  io.file
    .readAll[IO](Paths.get(inResource.toURI), blocker, 4096)
    .through(text.utf8Decode)
    .through(text.lines)
    ..... // our processing logic here
    .through(text.utf8Encode)
    .through(io.file.writeAll(Paths.get(outResource.toURI), blocker))
  
}

Приведенный выше код создает inResource and outResource to read and write data to the file. It reads the data from the file line by line, by first decoding the bytes to UTF8, and then encoding it again to UTF8 and writes incrementally to out.txt`.

Это строка логов будет выглядеть так:

10.128.2.1,[29/Nov/2017:06:58:55,GET /login.php HTTP/1.1,200

Есть ..., который мы будем заполнять для всех операций, которые мы будем выполнять в процессе мысли.

До этого это была модель Log и Date, в которую каждый воркер будет преобразовывать:

case class Log(ip: String, time: Date, url: String, status: String)
  case class Date(year: Int, month: String, date: Int, time: String)

Логика обработки

Отфильтровать неверный IP

Как только мы получим каждую строку из файла, мы отфильтруем недопустимый IP-адрес:

filter(isValidIp) // filter out valid IP

isValidIp функция выглядит так:

def isValidIp(line: String): Boolean = {
    val ipRegex: Regex = """.*?(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3}).*""".r
    ipRegex.pattern.matcher(line.split(",")(0)).matches()
  }

Отправить строки работникам

Мы отправляем каждую из отфильтрованных строк рабочим, чтобы они могли преобразовать их в модель Log.

.parEvalMapUnordered(parallelism)(convertToLog)

convertToLog функция выглядит так:

def convertToLog(line: String): IO[Log] = line.split(",").toList match {
    case ip :: time :: url :: status :: _ =>
      IO(Log(ip, convertToDate(time), url, status))
  }

Подсчитать код состояния HTTP

Как только рабочий завершит преобразование в журналы, он пройдет фазу складывания. Внутри фазы сворачивания мы группируем номер статуса HTTP в Map с status -> number of status.

flatMap { m =>
          Stream.fromIterator[IO](m.keys.map { key =>
            s"Status : ${key} has a total of ${m(key)} amount "
          }.iterator)
        }

Все вместе

Подводя итог, если мы сложим всю логику воедино, получится что-то вроде этого:

io.file
        .readAll[IO](Paths.get(inResource.toURI), blocker, 4096)
        .through(text.utf8Decode)
        .through(text.lines)
        .filter(isValidIp)
        .parEvalMapUnordered(parallelism)(convertToLog)
        .fold(Map.empty[String, Int]) { (map, currLog) =>
          val updatedStatus = map.getOrElse(currLog.status, 0) + 1
          map + (currLog.status -> updatedStatus)
        }
        .flatMap { m =>
          Stream.fromIterator[IO](m.keys.map { key =>
            s"Status : ${key} has a total of ${m(key)} amount "
          }.iterator)
        }
        .through(text.utf8Encode)
        .through(io.file.writeAll(Paths.get(outResource.toURI), blocker))

Мы также можем выполнить свертку с внешней стороны потока, а затем записать в файл out.txt. В этом смысле нам нужно будет получить и освободить ресурс для этой операции writeAll.

Резюме

В этих статьях мы коснемся сложной библиотеки потоковой обработки FS2.

Мы обсуждаем, что такое Stream и чем он отличается от List. Мы также говорим о существенной работе потока. Затем мы подробно рассмотрим, как можно создать приложение для обработки данных с помощью FS2.

Вот полный исходный код.

Если вам интересно узнать больше о FS2, ознакомьтесь с этими ресурсами:

Первоначально опубликовано на https://edward-huang.com.