Как вывести значение потока из другой функции? Котлинские сопрограммы

У меня есть поток:

val myflow = kotlinx.coroutines.flow.flow<Message>{}

и хотите испускать значения с помощью функции:

override suspend fun sendMessage(chat: Chat, message: Message) {
    myflow.emit(message)
}

Но компилятор не позволяет мне этого сделать, есть ли обходные пути для решения этой проблемы?


person Community    schedule 07.05.2020    source источник
comment
Вы имеете в виду, что не можете получить доступ к объявленной извне и созданной переменной? Вы передаете ссылку на поток функции или, что еще лучше, инкапсулируете ссылку на поток в свой конструктор / построитель throw в качестве обязательного параметра. Вы можете поделиться с нами ошибкой?   -  person MartenCatcher    schedule 07.05.2020


Ответы (3)


Ответ Анимеш Саху в значительной степени правильный. Вы также можете вернуть Канал в виде потока (см. consumerAsFlow или asFlow на BroadcastChannel).

Но есть также вещь под названием StateFlow, которая в настоящее время разрабатывается командой Kotlin. отчасти предназначен для реализации аналогичного поведения, хотя неизвестно, когда оно будет готово.

РЕДАКТИРОВАТЬ: _2 _ и _3 _ были выпущены как часть стабильного API (https://blog.jetbrains.com/kotlin/2020/10/kotlinx-coroutines-1-4-0-introduction-stateflow-and-sharedflow/). Эти инструменты можно и нужно использовать, когда требуется управление состоянием в контексте асинхронного выполнения.

person V1raNi    schedule 07.05.2020
comment
Внесите +1 для этого, они могут стать потоком, но учтите, что они все равно будут горячими. - person Animesh Sahu; 07.05.2020
comment
Кажется, всего несколько часов назад StateFlow был представлен в выпуске сопрограмм 1.3.6 - person V1raNi; 09.05.2020
comment
Согласно stackoverflow.com/a/64148101/1015126 существуют ограничения, основанные на скорости сбора, вызванные объединением StateFlow. Будь осторожен! - person cren90; 01.10.2020

В таком случае можно использовать StateFlow. Вот пример кода.

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

val chatFlow = MutableStateFlow<String>("")

fun main() = runBlocking {

    // Observe values
    val job = launch {
        chatFlow.collect {
            print("$it ")
        }
    }

    // Change values
    arrayOf("Hey", "Hi", "Hello").forEach {
        delay(100)
        sendMessage(it)
    }

    delay(1000)

    // Cancel running job
    job.cancel()
    job.join()
}

suspend fun sendMessage(message: String) {
    chatFlow.value = message
}

You can test this code by running below snippet.

<iframe src="https://pl.kotl.in/DUBDfUnX3" style="width:600px;"></iframe>

person Shreyas Patil    schedule 23.05.2020

Поток является самодостаточным, как только блок (лямбда) внутри потока выполняется, поток завершается, вы должны выполнять операции внутри и отправлять их оттуда.

Вот аналогичная проблема с github, в которой говорится:

Afaik Flow спроектирован как автономный, воспроизводимый, холодный поток, поэтому эмиссия за пределами его собственной области не будет частью контракта. Я думаю, что вы ищете канал.

И ИМХО вы, вероятно, смотрите на каналы или в частности, ConflatedBroadcastChannel для нескольких приемников. Разница между обычным каналом и широковещательным каналом заключается в том, что несколько приемников могут прослушивать широковещательный канал, используя openSubscription, которая возвращает ReceiveChannel, связанный с BroadcastChannel.

person Animesh Sahu    schedule 07.05.2020