Vertx plus Kotlin coroutines зависает навсегда

Я переписываю некоторый асинхронный код Java Vertx, используя сопрограммы Kotlin в учебных целях. Однако, когда я пытаюсь протестировать простой HTTP-вызов, тест на основе сопрограмм зависает навсегда, и я действительно не понимаю, в чем проблема. Вот репродуктор:

@RunWith(VertxUnitRunner::class)
class HelloWorldTest {

    private val vertx: Vertx = Vertx.vertx()

    @Before
    fun setUp(context: TestContext) {
        // HelloWorldVerticle is a simple http server that replies "Hello, World!" to whatever call
        vertx.deployVerticle(HelloWorldVerticle::class.java!!.getName(), context.asyncAssertSuccess())
    }

    // ORIGINAL ASYNC TEST HERE. IT WORKS AS EXPECTED
    @Test
    fun testAsync(context: TestContext) {
        val atc = context.async()
        vertx.createHttpClient().getNow(8080, "localhost", "/") { response ->
            response.handler { body ->
                context.assertTrue(body.toString().equals("Hello, World!"))
                atc.complete()
            }
        }
    }

    // First attempt, it hangs forever, the response is never called
    @Test
    fun testSync1(context: TestContext) = runBlocking<Unit> {
        val atc = context.async()
        val body = await<HttpClientResponse> {
            vertx.createHttpClient().getNow(8080, "localhost", "/", { response -> response.handler {it}} )
        }
        context.assertTrue(body.toString().equals("Hello, World!"))
        atc.complete()
    }

    // Second attempt, it hangs forever, the response is never called
    @Test
    fun testSync2(context: TestContext) = runBlocking<Unit> {
        val atc = context.async()
        val response = await<HttpClientResponse> {
                vertx.createHttpClient().getNow(8080, "localhost", "/", it )
        }
        response.handler { body ->
            context.assertTrue(body.toString().equals("Hello, World!"))
            atc.complete()
        }
    }

    suspend fun <T> await(callback: (Handler<T>) -> Unit) =
            suspendCoroutine<T> { cont ->
                callback(Handler { result: T ->
                    cont.resume(result)
                })
            }
}

Все ли способны разобраться в проблеме?


person Francesco Cina    schedule 03.08.2017    source источник


Ответы (1)


Мне кажется, у вашего кода несколько проблем:

  1. вы можете запустить тест до того, как http-сервер будет развернут
  2. Я считаю, что, поскольку вы выполняете свой код внутри runBlocking, вы блокируете цикл обработки событий от завершения запроса.
  3. Наконец, я посоветую вам использовать метод HttpClienctResponse::bodyHandler вместо HttpClientResponse::handler, поскольку обработчик может получать частичные данные.

Вот альтернативное решение, которое отлично работает:

import io.vertx.core.AbstractVerticle
import io.vertx.core.Future
import io.vertx.core.Handler
import io.vertx.core.Vertx
import io.vertx.core.buffer.Buffer
import io.vertx.core.http.HttpClientResponse
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.EmptyCoroutineContext
import kotlin.coroutines.experimental.startCoroutine
import kotlin.coroutines.experimental.suspendCoroutine

inline suspend fun <T> await(crossinline callback: (Handler<T>) -> Unit) =
        suspendCoroutine<T> { cont ->
            callback(Handler { result: T ->
                cont.resume(result)
            })
        }

fun <T : Any> async(code: suspend () -> T) = Future.future<T>().apply {
    code.startCoroutine(object : Continuation<T> {
        override val context = EmptyCoroutineContext
        override fun resume(value: T) = complete()
        override fun resumeWithException(exception: Throwable) = fail(exception)
    })
}

fun main(args: Array<String>) {
    async {
        val vertx: Vertx = Vertx.vertx()

        //0. take the current context
        val ctx = vertx.getOrCreateContext()

        //1. deploy the http server
        await<Unit> { cont ->
            vertx.deployVerticle(object : AbstractVerticle() {
                override fun start() {
                    vertx.createHttpServer()
                            .requestHandler { it.response().end("Hello World") }
                            .listen(7777) { ctx.runOnContext { cont.handle(Unit) } }
                    //note that it is important tp complete the handler in the correct context
                }
            })
        }

        //2. send request
        val response: HttpClientResponse = await { vertx.createHttpClient().getNow(7777, "localhost", "/", it) }

        //3. await response
        val body = await<Buffer> { response.bodyHandler(it) }
        println("received $body")
    }
}
person bennyl    schedule 04.08.2017
comment
Я принял ваш ответ, потому что проблема действительно заключалась в блокировке цикла событий; однако ваш пункт 1 неверен, на самом деле context.asyncAssertSuccess () в методе setUp () гарантирует, что тесты выполняются только тогда, когда сервер готов. - person Francesco Cina; 17.08.2017