Почему поток, созданный на ConflatedBroadcastChannel, может получать только последний элемент?

Следующий код печатает только 10000, то есть только последний элемент

val channel = BroadcastChannel<Int>(Channel.CONFLATED)
val flowJob = channel.asFlow().buffer(Channel.UNLIMITED).onEach {
    println(it)
}.launchIn(GlobalScope)

for (i in 0..100) {
    channel.offer(i*i)
}
flowJob.join()

Код можно запустить на игровой площадке.

Но поскольку поток запускается в отдельном потоке диспетчеризации, а значение отправляется в канал, и поскольку поток имеет неограниченный буфер, он должен получать каждый элемент до тех пор, пока не будет вызван onEach. Но почему может быть получен только последний элемент?

Это ожидаемое поведение или ошибка? Если это ожидаемое поведение, как кто-то попытается протолкнуть в поток только самые новые элементы, но весь поток, имеющий определенный буфер, может получить элемент.


person Animesh Sahu    schedule 14.10.2020    source источник
comment
Каков ваш вариант использования для этого? Я не совсем уверен, что понимаю причину объединения, если для вас проблема с отсутствующими элементами   -  person Joffrey    schedule 15.10.2020


Ответы (2)


Проблема здесь в Channel.CONFLATED. Взято из документов:

Channel that buffers at most one element and conflates all subsequent `send` and `offer` invocations,
so that the receiver always gets the most recently sent element.
Back-to-send sent elements are _conflated_ -- only the the most recently sent element is received,
while previously sent elements **are lost**.
Sender to this channel never suspends and [offer] always returns `true`.

This channel is created by `Channel(Channel.CONFLATED)` factory function invocation.

This implementation is fully lock-free.

вот почему вы получаете только самый последний (последний) элемент. Я бы использовал вместо этого UNLIMITED Channel:

val channel = Channel<Int>(Channel.UNLIMITED)
val flowJob = channel.consumeAsFlow().onEach {
    println(it)
}.launchIn(GlobalScope)

for (i in 0..100) {
    channel.offer(i*i)
}
flowJob.join()
person Adam Arold    schedule 14.10.2020
comment
Думаю, это только часть ответа. Технически, если бы сопрограмма launchIn() была запущена, даже объединенный широковещательный канал мог бы отправлять каждый элемент по мере их поступления в поток, и поток печатал бы их. Я думаю, проблема заключается в сочетании этого смешанного поведения И того факта, что сопрограмма не запускается до нажатия join(). Может быть, начало UNDISPATCHED или немного delay() перед циклом for также решит проблему. - person Joffrey; 14.10.2020
comment
Но проблема в том, что я не хотел создавать кучу хранилища элементов в самом канале ... Только поток, имеющий буфер, должен хранить такие элементы, иначе Channel должен отправлять только недавние элементы в небуферизованный поток. - person Animesh Sahu; 14.10.2020
comment
@AnimeshSahu проблема с использованием объединенного канала заключается в том, что ваши send() вызовы никогда не приостанавливаются (использование offer еще хуже), и поэтому все ваши элементы отправляются циклом for в канал (и отбрасываются из-за объединения) до того, как сопрограмма потока будет любой шанс получить элементы. Я даже тестировал с задержкой перед for циклом, и это не решает проблемы. Вам действительно понадобится приостановка (например, задержка) между каждым send, чтобы поток мог получать элементы. Тот факт, что у потока есть буфер или нет, не имеет значения, даже если он ничего не может получить. - person Joffrey; 14.10.2020
comment
Вместо фиктивной задержки используйте канал Channel.RENDEZVOUS по умолчанию, который гарантирует, что элемент будет обработан перед переходом, и не добавляет больше задержки, чем необходимо. - person Louis Wasserman; 14.10.2020
comment
Channel.RENDEZVOUS не будет работать, поскольку BroadcastChannel не примет размер 0. - person Adam Arold; 14.10.2020

Как указано в некоторых комментариях, использование Channel.CONFLATED сохранит только последнее значение, а вы предлагаете channel, даже если у вашего потока есть буфер.

Также join() будет приостановлено до тех пор, пока Job не будет завершено, в вашем случае бесконечно, поэтому вам нужен тайм-аут.

 val channel = Channel<Int>(Channel.RENDEZVOUS)
 val flowJob = channel.consumeAsFlow().onEach {
     println(it)
 }.launchIn(GlobalScope)

GlobalScope.launch{
    for (i in 0..100) {
        channel.send(i * i)
    }
    channel.close()
}
flowJob.join()

Оцените это решение (ссылка на игровую площадку) с Channel.RENDEZVOUS ваш канал будет принимать новые элементы, только если остальные уже потреблены. Вот почему мы должны использовать send вместо offer, send приостанавливается до тех пор, пока не сможет отправить элементы, а offer возвращает логическое значение, указывающее, была ли отправка успешной. Наконец, мы должны close канал, чтобы join() не приостановился до вечности.

person Róbert Nagy    schedule 15.10.2020