Нашей точкой входа в TenantTaskCoordinator
является единственный метод fun <T : Any> execute(job: Callable<T>): Mono<T>
:
fun <T : Any> execute(job: Callable<T>): Mono<T> {
return Mono.create({ outsideSink ->
val _workInProgressWasDecremented = AtomicBoolean(false)
fun decrementOnce() {
if (_workInProgressWasDecremented.compareAndSet(false, true)) {
currentWorkInProgressCounter.decrementAndGet()
}
}
val workInProgress = currentWorkInProgressCounter.incrementAndGet()
if (workInProgress > maximumWorkInProgress) {
outsideSink.error(TooManyTasks("Work in progress $workInProgress exceeds $maximumWorkInProgress jobs in $name"))
decrementOnce()
} else {
val singleJob = Mono.fromCallable(job).doAfterTerminate {
decrementOnce()
}
val delayedTask = Task(name, singleJob as Mono<Any>, outsideSink as MonoSink<Any>)
outsideSink.onCancel {
delayedTask.outsideCancel()
decrementOnce()
}
taskSink.next(delayedTask)
}
})
}
Первый шаг — вернуть Mono<T>
, что просто делается с помощью Mono.create
. sink
, которое мы получаем, используется для контроля результата, наблюдаемого извне. Это также позволяет зарегистрировать обратный вызов onCancel
, который вызывается, когда восходящий поток отменяет свою подписку.
_workInProgressWasDecremented
используется для защиты и уменьшения currentWorkInProgressCounter
потокобезопасным способом. Сначала мы проверяем, не превысили ли мы сразу максимальное количество заданий в очереди. Если порог достигнут, мы уведомляем наблюдателя об ошибке с помощью outsideSink.error
.
Если у нас достаточно ресурсов для выполнения job
, мы конвертируем его в реактивный мир с помощью Mono.fromCallable
и прикрепляем обратный вызов doAfterTerminate
, который уменьшает счетчик незавершенной работы. Класс Task
связывает singleJob
и outsideSink
, чтобы они оба были доступны при обработке. Наконец, мы планируем период с task
по taskSink.next(delayedTask)
.
Состояние координатора задач
Давайте посмотрим на переменные состояния координатора задач и на то, как они инициализируются:
class TenantTaskCoordinator(private val scheduler: Scheduler,
val maximumConcurrency: Int = 1,
val maximumQueueSize: Int = 50,
val name: String = "") : AutoCloseable {
private val maximumWorkInProgress = maximumQueueSize + maximumConcurrency
private val maxBufferSize = maximumWorkInProgress * 2
val currentWorkInProgressCounter = AtomicInteger()
private lateinit var taskSink: FluxSink<Task>
private val taskSource = Flux.create<Task>({ taskSink = it }, FluxSink.OverflowStrategy.BUFFER)
private val processSinkOnErrorResume = processSinkWithLimitedConcurrency()
.onErrorResume { error: Throwable? ->
LOG.warn("name={} Error processing sink with limited concurrency", name, error)
processSinkWithLimitedConcurrency()
}
Первая интересная часть — это то, как мы настраиваем taskSink
с помощью Flux.create
. Для ясности мы явно передаем FluxSink.OverflowStrategy.BUFFER
, чтобы задачи буферизировались, если они опережают процессор. name
используется для получения более качественных сообщений журнала. Наконец, мы вызываем processSinkWithLimitedConcurrency
, чтобы начать обработку задачи, используя данный scheduler
. Интересно, что onErrorResume
перезапускает обработку на случай ошибки.
Параллельная обработка координатора задач
Самая важная и сложная для понимания часть — правильно обрабатывать задания. Мне потребовалось несколько шагов вперед и назад, пока я не получил правильный порядок реактивных вызовов API.
private fun processSinkWithLimitedConcurrency(): Flux<Any> {
return taskSource
.filter { !it.isCancelled }
.flatMap({ task ->
task.work
.doOnError(task::onError)
.doOnSuccess(task::onSuccess)
.subscribeOn(scheduler)
.timeout(task.outsideTimeout)
.onErrorReturn(task)
}, maximumConcurrency, maxBufferSize)
}
Во-первых, мы отфильтровываем задачи, которые уже отменены. Затем мы используем перегрузку flatMap
для обработки задач с заданным максимальным параллелизмом. Обратный вызов flatMap
делегирует большую часть работы упомянутому экземпляру Task
. onErrorReturn
эффективно подавляет любые ошибки, которые могут возникнуть во время выполнения task
. Давайте посмотрим, как выглядит внутренний класс Task
:
private data class Task(val name: String,
private val job: Mono<Any>,
val outsideSink: MonoSink<Any>,
@field:Volatile var isCancelled: Boolean = false) {
val work: Mono<Any> get() = if (isCancelled) Mono.empty() else job
lateinit var outsideTimeoutSink: MonoSink<Task>
val outsideTimeout = Mono.create<Task> { outsideTimeoutSink = it }
fun outsideCancel() {
isCancelled = true
outsideTimeoutSink.success(this)
}
fun onSuccess(result: Any?) {
outsideSink.success(result)
}
fun onError(error: Throwable) {
LOG.warn("Task.onError {}", this, error)
outsideSink.error(error)
}
}
Аргумент job
— это Callable
, переданный методу execute
. outsideTimeout
сигнализирует об отмене подписки на экземпляр task
. Сигнал распространяется внутри processSinkWithLimitedConcurrency
с вызовом Mono.timeout
и прерывает обработку task
. Наконец, что не менее важно, onSuccess
и onError
просто передают результат или ошибку outsideSink
, эффективно уведомляя наблюдателя о результате.
TenantTaskCoordinator
было непросто понять, учитывая требования, упомянутые в начале поста. Я доволен конечным результатом, хотя должен сказать, что было не интуитивно понять, как объединить все гайки и болты библиотеки Reactor для достижения желаемого результата.
Пётр Мионсковски,поклонник TDD, стремящийся узнать что-то новое
Личный блог Электронная почта Twitter Github Stackoverflow
Первоначально опубликовано на brightinventions.pl