Spring Интеграция Java DSL + Activiti

Я посмотрел видео о Spring + Activiti https://m.youtube.com/watch?v=0PV_8Lew3vg< /а>. Мне нужно, чтобы процесс был немедленно перемещен в serviceTask, где Activiti вызывает мой шлюз, отправляя сообщение запроса в очередь (rabbitMQ). После отправки сообщения запроса процесс останавливается. Затем serviceTask запускается снова, как только ответное сообщение оказывается в очереди ответов. serviceTask может занять очень много времени.

Я попробовал пример с вебинара, и он работает нормально, но синхронизируется.

Это мой ActivitiDemoApplication.java

package com.example;

import java.util.Map;

import javax.sql.DataSource;

import org.activiti.engine.ProcessEngine;
import org.activiti.engine.RuntimeService;
import org.activiti.engine.TaskService;
import org.activiti.engine.impl.pvm.delegate.ActivityExecution;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.spring.integration.ActivitiInboundGateway;
import org.activiti.spring.integration.IntegrationActivityBehavior;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.integration.activiti.gateway.AsyncActivityBehaviorMessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@Configuration
@ComponentScan
// @EnableAutoConfiguration
@SpringBootApplication
public class ActivitiDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(ActivitiDemoApplication.class, args);
    }

    @Bean
    IntegrationActivityBehavior activitiDelegate(
            ActivitiInboundGateway activitiInboundGateway) {
        return new IntegrationActivityBehavior(activitiInboundGateway);
    }

    @Bean
    ActivitiInboundGateway inboundGateway(ProcessEngine processEngine) {
        return new ActivitiInboundGateway(processEngine, "processed");
    }

    @Bean
    IntegrationFlow inboundProcess(
            ActivitiInboundGateway activitiInboundGateway,
            PhotoService photoService) {
        return IntegrationFlows.from(activitiInboundGateway)
                .handle(new GenericHandler<ActivityExecution>() {
                    @Override
                    public Object handle(ActivityExecution execution,
                            Map<String, Object> headers) {
                        try {
                            photoService.Execute();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        return MessageBuilder.withPayload(execution)
                                .setHeader("processed", (Object) true)
                                .copyHeaders(headers).build();
                    }
                }).get();
    }

    @Bean
    public DataSource database() {
        return DataSourceBuilder
                .create()
                .url("jdbc:sqlserver://localhost:1433;databaseName=activiti")
                .username("activiti")
                .password("activiti")
                .driverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
                .build();
    }

    @RestController
    public static class MyRestController {
        @Autowired
        private RuntimeService runtimeService;

        @RequestMapping(value = "/start-my-process", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
        public void startMyProcess() {
            ProcessInstance p = runtimeService
                    .startProcessInstanceByKey("TestProcess");
            System.out.println("id: " + p.getId());
            System.out.println("count: "
                    + runtimeService.createProcessInstanceQuery().count());
        }

    }
}

@Service
@Transactional
class PhotoService {
    @Autowired
    private TaskService taskService;

    public void Execute() throws InterruptedException {
        System.out.println("debug 1");
        Thread.currentThread().sleep(2000);
        System.out.println("debug 2");
    }
}

это мой TestProcess.bmpn20.xml

<?xml version='1.0' encoding='UTF-8'?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"
    xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
    xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
    typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath"
    targetNamespace="http://www.activiti.org/processdef" xmlns:modeler="http://activiti.com/modeler"
    modeler:version="1.0ev" modeler:exportDateTime="20151228174550"
    modeler:modelId="969411" modeler:modelVersion="1"
    modeler:modelLastUpdated="1451324745996">
    <process id="TestProcess" name="TestProcess" isExecutable="true">
        <startEvent id="startEvent1" />
        <intermediateCatchEvent id="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1">
            <timerEventDefinition>
                <timeDuration>PT20S</timeDuration>
            </timerEventDefinition>
        </intermediateCatchEvent>
        <endEvent id="sid-23D49CE8-B018-4ABF-871F-07F42508C98A" />
        <sequenceFlow id="sid-53B3780A-A1F6-4059-9660-D56CECD236F1"
            sourceRef="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1" targetRef="sid-23D49CE8-B018-4ABF-871F-07F42508C98A" />
        <serviceTask id="task-integration" name="task integration"
            activiti:delegateExpression="#{activitiDelegate}" />
        <sequenceFlow id="sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07"
            sourceRef="task-integration" targetRef="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1" />
        <sequenceFlow id="sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7"
            sourceRef="startEvent1" targetRef="task-integration" />
    </process>
    <bpmndi:BPMNDiagram id="BPMNDiagram_TestProcess">
        <bpmndi:BPMNPlane bpmnElement="TestProcess" id="BPMNPlane_TestProcess">
            <bpmndi:BPMNShape bpmnElement="startEvent1"
                id="BPMNShape_startEvent1">
                <omgdc:Bounds height="30.0" width="30.0" x="100.0" y="163.0" />
            </bpmndi:BPMNShape>
            <bpmndi:BPMNShape bpmnElement="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1"
                id="BPMNShape_sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1">
                <omgdc:Bounds height="31.0" width="31.0" x="480.0" y="162.5" />
            </bpmndi:BPMNShape>
            <bpmndi:BPMNShape bpmnElement="sid-23D49CE8-B018-4ABF-871F-07F42508C98A"
                id="BPMNShape_sid-23D49CE8-B018-4ABF-871F-07F42508C98A">
                <omgdc:Bounds height="28.0" width="28.0" x="570.0" y="161.0" />
            </bpmndi:BPMNShape>
            <bpmndi:BPMNShape bpmnElement="task-integration"
                id="BPMNShape_task-integration">
                <omgdc:Bounds height="80.0" width="100.0" x="340.0" y="135.0" />
            </bpmndi:BPMNShape>
            <bpmndi:BPMNEdge bpmnElement="sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7"
                id="BPMNEdge_sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7">
                <omgdi:waypoint x="130.0" y="178.0" />
                <omgdi:waypoint x="162.5" y="178.0" />
                <omgdi:waypoint x="157.0" y="178.0" />
                <omgdi:waypoint x="340.0" y="161.521327014218" />
            </bpmndi:BPMNEdge>
            <bpmndi:BPMNEdge bpmnElement="sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07"
                id="BPMNEdge_sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07">
                <omgdi:waypoint x="440.0" y="176.4218009478673" />
                <omgdi:waypoint x="480.0062629076814" y="177.5594197983227" />
            </bpmndi:BPMNEdge>
            <bpmndi:BPMNEdge bpmnElement="sid-53B3780A-A1F6-4059-9660-D56CECD236F1"
                id="BPMNEdge_sid-53B3780A-A1F6-4059-9660-D56CECD236F1">
                <omgdi:waypoint x="512.0" y="178.5" />
                <omgdi:waypoint x="540.5" y="178.5" />
                <omgdi:waypoint x="540.5" y="175.0" />
                <omgdi:waypoint x="570.0" y="175.0" />
            </bpmndi:BPMNEdge>
        </bpmndi:BPMNPlane>
    </bpmndi:BPMNDiagram>
</definitions>

Я хотел бы сделать асинхронный процесс.

<serviceTask id="task-integration" name="task integration" activiti:delegateExpression="#{activitiDelegate}"/>

Когда Activiti вызывает мой шлюз "#{activitiDelegate}", я хочу переместить сообщение запроса в RequestQueue для выполнения "photoService.Execute()". Затем процесс запускается снова, когда от ResponseQueue получено ответное сообщение.

return MessageBuilder.withPayload(execution).setHeader("processed", (Object) true).copyHeaders(headers).build();

Я не знаю, как реализовать amqp со шлюзом.

    @Bean
    IntegrationActivityBehavior activitiDelegate(
            ActivitiInboundGateway activitiInboundGateway) {
        return new IntegrationActivityBehavior(activitiInboundGateway);
    }

    @Bean
    ActivitiInboundGateway inboundGateway(ProcessEngine processEngine) {
        return new ActivitiInboundGateway(processEngine, "processed");
    }

    @Bean
    IntegrationFlow inboundProcess(
            ActivitiInboundGateway activitiInboundGateway,
            PhotoService photoService) {
        return IntegrationFlows.from(activitiInboundGateway)
                .handle(new GenericHandler<ActivityExecution>() {
                    @Override
                    public Object handle(ActivityExecution execution,
                            Map<String, Object> headers) {
                        try {
                            photoService.Execute();
                        } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                        }
                        return MessageBuilder.withPayload(execution)
                                .setHeader("processed", (Object) true)
                                .copyHeaders(headers).build();
                    }
                }).get();
    }

Извините, но я новичок в интеграции Java Spring. Я надеюсь, что мое объяснение понятно.


person Bully    schedule 27.12.2015    source источник


Ответы (1)


Извините, ваш вопрос не ясен, особенно для тех, кто не знаком с одним из упомянутых фреймворков.

Вы говорите serviceTask, но не показываете это в коде. Я могу только догадываться, что вы говорите об этом photoService, который все равно не вызывается в IntegrationFlow ниже. Это еще больше смущает.

Единственное, что могу сказать, что ActivitiInboundGateway является заблокированным компонентом (см. ActivitiInboundGateway.execute()):

Message<?> reply = sendAndReceiveMessage(mb.build());

Это точно блокирует поведение request/reply. И мы видим его правильный вызов с вашим определением IntegrationFlow.

Пожалуйста, поделитесь дополнительной информацией, чтобы понять дело.

ОБНОВЛЕНИЕ

Я хотел бы сделать асинхронный процесс.

 <serviceTask id="task-integration" name="task integration" activiti:delegateExpression="#{activitiDelegate}"/>

Похоже, вы можете расширить IntegrationActivityBehavior:

public class AsyncIntegrationActivityBehavior extends IntegrationActivityBehavior {
   
   private TaskExecutor executor = new ThreadPoolTaskExecutor();

    @Override
    public void execute(final ActivityExecution execution) throws Exception {
        this.executor(new Runnable() {

                  public void void run() {
                     AsyncIntegrationActivityBehavior.super.execute(execution);
                       }
               });
    }

}

и используйте его для своего activitiDelegate @Bean.

person Artem Bilan    schedule 28.12.2015
comment
Спасибо за ответ. Ты прав. ActivitiInboundGateway заблокирован. Есть ли другой способ? serviceTask только отправляет сообщение запроса, и процесс немедленно останавливается. Затем какой-то получатель снова запускает процесс с ответным сообщением. Я не знаю, смогу ли я решить это в весенней интеграции или только в bpmn activiti. Например, сообщение запроса от serviceTask может быть обработано только в режиме ночной обработки. - person Bully; 09.01.2016
comment
Смотрите ОБНОВЛЕНИЕ к моему ответу. - person Artem Bilan; 09.01.2016