Как распараллелить краулер REST API в http4s и fs2?

Я написал последовательный поисковый робот REST API в http4s и fs2 здесь:

https://gist.github.com/NicolasRouquette/656ed7a2d6984ce0995fd78a3aec2566

Это делается для запроса службы REST API для получения начального набора идентификаторов, выборки элементов для пакета идентификаторов и продолжения на основе идентификаторов перекрестных ссылок, найденных в этих элементах, до тех пор, пока не появятся новые идентификаторы для выборки и не будет возвращена карта всех извлеченные элементы.

Это работает; однако производительность неадекватная - слишком медленная!

Поскольку у меня нет доступа к серверу, я попытался поэкспериментировать с различными размерами пакетов, от 10, 50, 100, 200, 500 и даже объединить все идентификаторы в один запрос. Время запроса значительно увеличивается с размером пакета. При больших размерах (500 и все) я даже получал ответы HTTP 500 от сервера.

Я хотел бы поэкспериментировать с пакетной обработкой параллельных запросов в режиме балансировки нагрузки с использованием пула потоков; однако мне непонятно, как это сделать на основе документации fs2.

Может ли кто-нибудь дать предложения, как этого добиться?

Относительно использования http4s & fs2: Что ж, я нашел эту библиотеку довольно простой в использовании для простого программирования на стороне клиента. Учитывая упор на вспомогательные задачи, потоки и т. Д., Я полагаю, что пакетирование параллельных запросов должно каким-то образом быть выполнимым.


person Nicolas Rouquette    schedule 09.07.2017    source источник


Ответы (1)


fs2.concurrent.join позволит вам запускать несколько потоков одновременно. Конкретный раздел руководства доступен по адресу https://github.com/functional-streams-for-scala/fs2/blob/v0.9.7/docs/guide.md#concurrency

Для вашего варианта использования вы можете взять свою очередь идентификаторов, разбить их на части, создать задачу http, а затем обернуть ее в поток. Затем вы должны запустить этот поток потоков одновременно с join и объединить результаты.

def createHttpRequest(ids: Seq[ID]): Task[(ElementMap, Set[ID])] = ???

def fetch(queue: Set[ID]): Task[(ElementMap, Set[ID])] = {
  val resultStreams = Stream.emits(queue.toSeq)
    .vectorChunkN(batchSize)
    .map(createHttpRequest)
    .map(Stream.eval)

  val resultStream = fs2.concurrent.join(maxOpen)(resultStreams)
  resultStream.runFold((Map.empty[ID, Element], Set.empty[ID])) {
    case ((a, b), (_a, _b)) => (a ++ _a, b ++ _b)
  }
}
person guymers    schedule 15.09.2017