Нашей точкой входа в TenantTaskCoordinator является единственный метод fun <T : Any> execute(job: Callable<T>): Mono<T>:

fun <T : Any> execute(job: Callable<T>): Mono<T> {
        return Mono.create({ outsideSink ->
            val _workInProgressWasDecremented = AtomicBoolean(false)
            fun decrementOnce() {
                if (_workInProgressWasDecremented.compareAndSet(false, true)) {
                    currentWorkInProgressCounter.decrementAndGet()
                }
            }

            val workInProgress = currentWorkInProgressCounter.incrementAndGet()
            if (workInProgress > maximumWorkInProgress) {
                outsideSink.error(TooManyTasks("Work in progress $workInProgress exceeds $maximumWorkInProgress jobs in $name"))
                decrementOnce()
            } else {
                val singleJob = Mono.fromCallable(job).doAfterTerminate {
                    decrementOnce()
                }

                val delayedTask = Task(name, singleJob as Mono<Any>, outsideSink as MonoSink<Any>)

                outsideSink.onCancel {
                    delayedTask.outsideCancel()
                    decrementOnce()
                }
                
                taskSink.next(delayedTask)
            }
        })
    }

Первый шаг — вернуть Mono<T>, что просто делается с помощью Mono.create. sink, которое мы получаем, используется для контроля результата, наблюдаемого извне. Это также позволяет зарегистрировать обратный вызов onCancel, который вызывается, когда восходящий поток отменяет свою подписку.

_workInProgressWasDecremented используется для защиты и уменьшения currentWorkInProgressCounter потокобезопасным способом. Сначала мы проверяем, не превысили ли мы сразу максимальное количество заданий в очереди. Если порог достигнут, мы уведомляем наблюдателя об ошибке с помощью outsideSink.error.

Если у нас достаточно ресурсов для выполнения job, мы конвертируем его в реактивный мир с помощью Mono.fromCallable и прикрепляем обратный вызов doAfterTerminate, который уменьшает счетчик незавершенной работы. Класс Task связывает singleJob и outsideSink, чтобы они оба были доступны при обработке. Наконец, мы планируем период с task по taskSink.next(delayedTask).

Состояние координатора задач

Давайте посмотрим на переменные состояния координатора задач и на то, как они инициализируются:

class TenantTaskCoordinator(private val scheduler: Scheduler,
                            val maximumConcurrency: Int = 1,
                            val maximumQueueSize: Int = 50,
                            val name: String = "") : AutoCloseable {

    private val maximumWorkInProgress = maximumQueueSize + maximumConcurrency

    private val maxBufferSize = maximumWorkInProgress * 2

    val currentWorkInProgressCounter = AtomicInteger()

    private lateinit var taskSink: FluxSink<Task>

    private val taskSource = Flux.create<Task>({ taskSink = it }, FluxSink.OverflowStrategy.BUFFER)

    private val processSinkOnErrorResume = processSinkWithLimitedConcurrency()
        .onErrorResume { error: Throwable? ->
            LOG.warn("name={} Error processing sink with limited concurrency", name, error)
            processSinkWithLimitedConcurrency()
        }

Первая интересная часть — это то, как мы настраиваем taskSink с помощью Flux.create. Для ясности мы явно передаем FluxSink.OverflowStrategy.BUFFER, чтобы задачи буферизировались, если они опережают процессор. name используется для получения более качественных сообщений журнала. Наконец, мы вызываем processSinkWithLimitedConcurrency, чтобы начать обработку задачи, используя данный scheduler. Интересно, что onErrorResume перезапускает обработку на случай ошибки.

Параллельная обработка координатора задач

Самая важная и сложная для понимания часть — правильно обрабатывать задания. Мне потребовалось несколько шагов вперед и назад, пока я не получил правильный порядок реактивных вызовов API.

private fun processSinkWithLimitedConcurrency(): Flux<Any> {
        return taskSource
            .filter { !it.isCancelled }
            .flatMap({ task ->
                task.work
                    .doOnError(task::onError)
                    .doOnSuccess(task::onSuccess)
                    .subscribeOn(scheduler)
                    .timeout(task.outsideTimeout)
                    .onErrorReturn(task)
            }, maximumConcurrency, maxBufferSize)
    }

Во-первых, мы отфильтровываем задачи, которые уже отменены. Затем мы используем перегрузку flatMap для обработки задач с заданным максимальным параллелизмом. Обратный вызов flatMap делегирует большую часть работы упомянутому экземпляру Task. onErrorReturn эффективно подавляет любые ошибки, которые могут возникнуть во время выполнения task. Давайте посмотрим, как выглядит внутренний класс Task:

private data class Task(val name: String,
                            private val job: Mono<Any>,
                            val outsideSink: MonoSink<Any>,
                            @field:Volatile var isCancelled: Boolean = false) {

        val work: Mono<Any> get() = if (isCancelled) Mono.empty() else job

        lateinit var outsideTimeoutSink: MonoSink<Task>
        val outsideTimeout = Mono.create<Task> { outsideTimeoutSink = it }

        fun outsideCancel() {
            isCancelled = true
            outsideTimeoutSink.success(this)
        }

        fun onSuccess(result: Any?) {
            outsideSink.success(result)
        }
        
        fun onError(error: Throwable) {
            LOG.warn("Task.onError {}", this, error)
            outsideSink.error(error)
        }
    }

Аргумент job — это Callable, переданный методу execute. outsideTimeout сигнализирует об отмене подписки на экземпляр task. Сигнал распространяется внутри processSinkWithLimitedConcurrency с вызовом Mono.timeout и прерывает обработку task. Наконец, что не менее важно, onSuccess и onError просто передают результат или ошибку outsideSink, эффективно уведомляя наблюдателя о результате.

TenantTaskCoordinator было непросто понять, учитывая требования, упомянутые в начале поста. Я доволен конечным результатом, хотя должен сказать, что было не интуитивно понять, как объединить все гайки и болты библиотеки Reactor для достижения желаемого результата.

Пётр Мионсковски,поклонник TDD, стремящийся узнать что-то новое

Личный блог Электронная почта Twitter Github Stackoverflow

Первоначально опубликовано на brightinventions.pl