django-viewflow, как правильно обрабатывать исключения в обработчике?

Допустим, у меня есть следующий узел:

perform_disk_size_change = ( 
    flow.Handler(
        this.perform_proxmox_api_request
    ).Next(this.notify_iaas_success)
)

Что, если perform_proxmox_api_request вызовет исключение? Могу ли я указать узел исключения, чтобы любое исключение переходило к этому узлу?

perform_disk_size_change = ( 
    flow.Handler(
        this.perform_proxmox_api_request
    ).Next(this.notify_iaas_success).Except(this.notify_iaas_fail)
)

person James Lin    schedule 11.11.2019    source источник


Ответы (1)


Итак, я придумал следующее решение, не совсем уверен, что это должно быть сделано таким образом, проблема в том, что HandlerActivation выполняется немедленно, поэтому не дает мне возможности передать объект исключения для следующей активации (если это узел обработчика) , я до сих пор не знаю, как передать исключение при следующей активации без необходимости использования другого пользовательского узла.

import logging
from copy import copy
from viewflow.activation import Activation, STATUS, all_leading_canceled
from viewflow.nodes.handler import HandlerActivation
from viewflow.rest import flow
from viewflow.mixins import Edge
from viewflow.contrib import celery


log = logging.getLogger(__name__)


class HandlerActivationCatchExcept(HandlerActivation):
    def __init__(self, *args, **kwargs):
        self.caught_exception = None
        super(HandlerActivationCatchExcept, self).__init__(*args, **kwargs)

    def execute(self):
        """Run the callback."""
        try:
            self.flow_task.handler(self)
        except Exception as ex:
            self.caught_exception = ex
            log.exception('Exception caught in CatchExceptionHandler')

    @Activation.status.transition(source=STATUS.DONE, conditions=[all_leading_canceled])
    def activate_next(self):
        """Activate all outgoing edges."""
        if self.caught_exception:
            for mapping in self.flow_task.exception_mappings:
                if isinstance(self.caught_exception, mapping['cls']):
                    return mapping['node'].activate(prev_activation=self, token=self.task.token)
        else:
            return super(HandlerActivationCatchExcept, self).activate_next()


class CatchExceptionMixin(object):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.exception_mappings = []

    def _resolve(self, resolver):
        super()._resolve(resolver)
        for exception_mapping in self.exception_mappings:
            exception_mapping['node'] = resolver.get_implementation(exception_mapping['node'])

    def _outgoing(self):
        if self._next:
            yield Edge(src=self, dst=self._next, edge_class='next')
        for exception_mapping in self.exception_mappings:
            yield Edge(src=self, dst=exception_mapping['node'], edge_class='cond_false')

    def Except(self, node, exception_cls=Exception):
        self.exception_mappings.append({"cls": exception_cls, "node": node})
        return copy(self)


class CatchExceptionHandler(CatchExceptionMixin, flow.Handler):
    """
    Custom handler node to allow catching exception and route to certain node
    usage: CatchExceptionHandler(method).Except(default_node)
           CatchExceptionHandler(method).Except(specific_node, SpecificExceptionCls).Except(default_node)
    """
    activation_class = HandlerActivationCatchExcept


class JobActivationCatchException(celery.JobActivation):
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.caught_exception = None

    def error(self, comments=""):
        super().error(comments)
        self.caught_exception = Exception(comments)
        self.activate_exception_node()

    @Activation.status.transition(source=STATUS.ERROR, conditions=[all_leading_canceled])
    def activate_exception_node(self):
        """Activate all outgoing edges."""
        self.flow_task.exception_mappings[0]['node'].activate(prev_activation=self, token=self.task.token)


class CatchExceptionJob(CatchExceptionMixin, celery.Job):
    """
    Custom job node to allow catching exception and route to certain node
    usage: CatchExceptionJob(method).Except(default_node)
    """
    activation_class = JobActivationCatchException


class HandleExceptionActivation(HandlerActivation):
    @classmethod
    def activate(cls, flow_task, prev_activation, token):
        """Instantiate new task."""
        task = flow_task.flow_class.task_class(
            process=prev_activation.process,
            flow_task=flow_task,
            token=token)

        task.save()
        task.previous.add(prev_activation.task)

        activation = cls()
        # adds previous activation ref
        activation.prev_activation = prev_activation
        activation.initialize(flow_task, task)
        activation.perform()

        return activation


class ExceptionHandler(flow.Handler):
    activation_class = HandleExceptionActivation

использование будет таким:

perform_disk_size_change = (
    nodes.CatchExceptionHandler(
        this.perform_proxmox_api_request
    ).Next(
        this.notify_iaas_success
    ).Except(
        this.notify_iaas_team, TechnicalExceptionCls
    ).Except(
        this.notify_customer_failed
    )
)

notify_iaas_team = ExceptionHandler(this.email_iaas)
notify_customer_decline = Handler(...)

def email_iaas(self, activation):
    # access to previous activation
    prev_activation = activation.prev_activation
    # access to previously caught exception
    exception = prev_activation.caught_exception
    ...

# OR Celery job

populate_current_info = (nodes.CatchExceptionJob(populate_proxmox_info).Next(this.determine_approval_needed).Except(this.notify_iaas_failed))
person James Lin    schedule 11.11.2019