Асинхронный вызов для каждого элемента в коллекции

У меня есть проблема, которую я не смог решить до сих пор, я новичок в RxKotlin, так что это может быть легко. Пожалуйста, взгляните на код:

    override fun infos(): Stream<Info> =
        client.infoAboutItem(identifier)
                .map {
                    val itemId = it.itemId ?: ""
                    val item = client.itemForId(itemId)
                    ClientInfo(client, it, source, item) as Info
                }
                .let { AccessStream(it) }

Где поток - это наша самодельная коллекция. Map — это метод, который позволяет перебирать каждый элемент в этой коллекции.

Проблема здесь в том, что

 client.itemForId(itemId)

это HTTP-вызов, возвращающий Single, который не идеален.

Я хотел бы создать асинхронный вызов внутри карты, который возвращал бы Item вместо Single, а затем передал бы его в ClientInfo. То, что я пробовал до сих пор, заключалось в использовании подписки внутри карты и использовании метода blockingGet(), но это блокирует основной поток, даже если я наблюдаю и подписываюсь на другой поток.

Таким образом, это включает в себя асинхронный вызов для каждой вещи в коллекции.

Спасибо за помощь


person John Smith    schedule 31.08.2017    source источник
comment
нужно вернуть Stream<Info> или можно изменить?   -  person A. Shevchuk    schedule 31.08.2017
comment
К сожалению, это необходимо сделать, многие вещи ожидают возврата этой коллекции.   -  person John Smith    schedule 31.08.2017
comment
Это может быть Single<Stream<Info>> или Observable<Stream<Info>>?   -  person A. Shevchuk    schedule 31.08.2017
comment
Потенциально? Почему у вас есть решение?   -  person John Smith    schedule 31.08.2017
comment
проверь мой ответ, может он тебе поможет   -  person A. Shevchuk    schedule 31.08.2017


Ответы (2)


Вы можете попробовать вернуть Observable<Stream<Info>>, и тогда это будет выглядеть так:

   override fun infos(): Observable<Stream<Info>> = 
                Observable.from(client.infoAboutItem(identifier))
                        .flatMapSingle {
                            val itemId = it.itemId ?: ""
                            client.itemForId(itemId)
                        }
                        .map { 
                            ClientInfo(client, it, source, item) as Info
                         }
                        .toList()
                        .flatMap {
                            AccessStream(it)
                        }
person A. Shevchuk    schedule 31.08.2017
comment
Я думаю, что flatMap() вокруг построения ClientInfo() должно быть map(), потому что конструктор возвращает Info, а не Observable<Info> - person Bob Dalgleish; 31.08.2017
comment
Да, ты прав! Или может быть flatMap с Observable.just() внутри, но здесь карта лучше. Спасибо, отредактирую - person A. Shevchuk; 31.08.2017

Вы должны обернуть эту дорогостоящую операцию в наблюдаемую и использовать плоскую карту, чтобы заархивировать эти данные в информацию о клиенте.

Я написал небольшой пример, чтобы показать это.

class SimpleTest {
  val testScheduler = TestScheduler()

  @Test
  fun test() {
    infos().observeOn(Schedulers.immediate())
        .subscribe { logger("Output", it.toString()) }

    testScheduler.advanceTimeBy(10, TimeUnit.MINUTES)
  }

  fun infos(): Single<List<ClientInfo>> {
    return Observable.from(infoAboutItem("some_identifier"))
        .doOnNext { logger("Next", it.toString()) }
        .flatMap { aboutItem ->
          Observable.fromCallable { itemForId(aboutItem.itemId) }
              .subscribeOn(testScheduler)
              .map { ClientInfo(aboutItem = aboutItem, item = it) }
        }
        .doOnNext { logger("Next", it.toString()) }
        .toList()
        .toSingle()
  }

  data class ClientInfo(
      val id: String = UUID.randomUUID().toString(),
      val aboutItem: AboutItem,
      val item: Item
  )

  data class AboutItem(val itemId: String = UUID.randomUUID().toString())
  data class Item(val id: String = UUID.randomUUID().toString())

  fun infoAboutItem(identifier: String): List<AboutItem> {
    return (1..10).map { AboutItem() }
  }

  fun itemForId(itemId: String): Item {
    val sleepTime = Random().nextInt(1000).toLong()
    Thread.sleep(sleepTime)
    return Item()
  }

  fun logger(tag: String, message: String): Unit {
    val formattedDate = Date(Schedulers.immediate().now()).format()
    System.out.println("$tag @ $formattedDate: $message")
  }

  fun Date.format(): String {
    return SimpleDateFormat("HH:mm:ss.SSS", Locale.US).format(this)
  }
}
person Rodrigo Henriques    schedule 31.08.2017