Использование Apache Camel / Smallrye / реактивных потоков - как я могу подключить издателя к подписчику через JVM?

Ниже представлена ​​моя попытка решения реактивных потоков Apache Camel для подключения издателя к подписчику (код для маршрутов верблюда показан ниже) через JVM.

Кажется, что для того, чтобы обеспечить связь между JVM, требуется «брокерский» сервер. Поэтому я реализовал брокер Artemis и соответствующим образом изменил файлы application.properties (насколько я понимаю, как это сделать).

Кроме того, чтобы сузить фокус, мы решили использовать разъем smallrye-ampq.

Проблема:

Подписчик должен получать и регистрировать значение String (из тела):

-
-
-
:blahblahblah
:blahblahblah
:blahblahblah
-
-
-

- Вместо этого он регистрирует значения, например:

-
-
-
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
:Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]
-
-
-

Вопрос:

почему полезные данные, отправленные издателем, не достигают подписчика и какой код / ​​конфигурацию я могу изменить, чтобы исправить это?

заранее спасибо за любую помощь!

---

Маршрут "Издатель"

package aaa.bbb.ccc.jar;

import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.reactivestreams.Publisher;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;

@ApplicationScoped
public class CamelPub extends RouteBuilder {

    @Inject
    CamelContext ctx;

    CamelReactiveStreamsService crss;
    static int x = 0;

    @Outgoing("data")
    public Publisher<Exchange> source() {
        return crss.from("seda:thesource");
    }

    @Override
    public void configure() {

        crss = CamelReactiveStreams.get(ctx);
        from("timer://thetimer?period=1000")
                .process(new Processor() {
                    @Override
                    public void process(Exchange msg) throws Exception {
                        msg.getIn().setBody("blahblahblah"); //(Integer.toString(x++));
                    }
                })
                .log("....... PUB ....... camelpub - body: ${body}")
                .to("direct:thesource");
    }
}

microprofile-config.properties - издатель

injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8084
server.host=0.0.0.0

mp.messaging.outgoing.data.connector=smallrye-amqp
mp.messaging.outgoing.data.host=localhost
mp.messaging.outgoing.data.port=5672
mp.messaging.outgoing.data.username=artuser
mp.messaging.outgoing.data.password=artpassword
mp.messaging.outgoing.data.endpoint-uri:seda:thesource
mp.messaging.outgoing.data.broadcast=true
mp.messaging.outgoing.data.durable=true

соответствующий отрывок из журнала консоли (?) - издатель

...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelpub ---
2019.12.17 22:26:34 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:26:35 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:26:35 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:26:35 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:26:36 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding:  [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:26:36 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:26:36 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:26:36 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:26:37 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesource is using shared queue: seda://thesource with size: 1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: timer://thetimer?period=1000
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.191 seconds
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelPub] with qualifiers [@Any @Default]
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelPub
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:26:37 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: seda://thesource
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:26:37 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting method aaa.bbb.ccc.jar.CamelPub#source to sink data
2019.12.17 22:26:37 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container e71e38c0-91ec-4758-a310-55f1368c6a9c initialized
2019.12.17 22:26:37 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:26:37 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:26:37 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@57fbc06f
2019.12.17 22:26:38 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:26:38 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0x52928b67, L:/0:0:0:0:0:0:0:0:8084]
2019.12.17 22:26:38 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8084 (and all other host addresses) in 3668 milliseconds.
2019.12.17 22:26:39 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 1
2019.12.17 22:26:40 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 2
2019.12.17 22:26:41 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 3
2019.12.17 22:26:42 INFO route1 Thread[Camel (camel-1) thread #1 - timer://thetimer,5,main]: ....... PUB ....... camelpub - body: 4
...

маршрут "Подписчик"

package aaa.bbb.ccc.jar;

import javax.inject.Inject;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreamsService;
import javax.enterprise.context.ApplicationScoped;
import org.apache.camel.component.reactive.streams.api.CamelReactiveStreams;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.reactivestreams.Subscriber;

@ApplicationScoped
public class CamelSub extends RouteBuilder {

    public CamelSub() throws Exception {
    }

    @Inject
    CamelContext ctx;

    CamelReactiveStreamsService crss;

    @Incoming("data")
    public Subscriber<String> sink() {
    return crss.subscriber("seda:thesink", String.class);
    }    

    @Override
    public void configure() {

    crss = CamelReactiveStreams.get(ctx);

    from("seda:thesink")
        .convertBodyTo(String.class)
        .log("ooooooo SUB ooooooo camelsub - body: ${body}");
    }
}

microprofile-config.properties - подписчик

injected.value=Injected value
value=lookup value
# Microprofile server properties
server.port=8082
server.host=0.0.0.0

mp.messaging.incoming.data.connector=smallrye-amqp
mp.messaging.incoming.data.host=localhost
mp.messaging.incoming.data.port=5672
mp.messaging.incoming.data.username=artuser
mp.messaging.incoming.data.password=artpassword
mp.messaging.incoming.data.endpoint-uri:seda:thesink
mp.messaging.incoming.data.broadcast=true
mp.messaging.incoming.data.durable=true

соответствующий отрывок из журнала консоли (?) - подписчик

...
--- exec-maven-plugin:1.5.0:exec (default-cli) @ camelsub ---
2019.12.17 22:28:09 INFO io.helidon.microprofile.server.Main Thread[main,5,main]: Logging configured using classpath: /logging.properties
2019.12.17 22:28:10 INFO org.jboss.weld.Version Thread[main,5,main]: WELD-000900: 3.1.1 (Final)
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-000020: Using jandex for bean discovery
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001208: Error when validating null@6 against xsd. cvc-complex-type.4: Attribute 'bean-discovery-mode' must appear on element 'beans'.
2019.12.17 22:28:10 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-000101: Transactional services not available. Injection of @Inject UserTransaction not available. Transactional observers will be invoked synchronously.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] public org.glassfish.jersey.ext.cdi1x.internal.ProcessAllAnnotatedTypes.processAnnotatedType(@Observes ProcessAnnotatedType<?>, BeanManager) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private io.helidon.microprofile.openapi.IndexBuilder.processAnnotatedType(@Observes ProcessAnnotatedType<X>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 INFO org.jboss.weld.Event Thread[main,5,main]: WELD-000411: Observer method [BackedAnnotatedMethod] private org.apache.camel.cdi.CdiCamelExtension.processAnnotatedType(@Observes ProcessAnnotatedType<?>) receives events for all annotated types. Consider restricting events using @WithAnnotations or a generic type with bounds.
2019.12.17 22:28:10 WARN org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-001101: Member of array type or annotation type must be annotated @NonBinding:  [EnhancedAnnotatedMethodImpl] public abstract javax.enterprise.inject.Typed.value()
2019.12.17 22:28:11 INFO org.apache.camel.cdi.CdiCamelExtension Thread[main,5,main]: Camel CDI is starting Camel context [camel-1]
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) is starting
2019.12.17 22:28:11 INFO org.apache.camel.impl.engine.DefaultManagementStrategy Thread[main,5,main]: JMX is disabled
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: StreamCaching is not in use. If using streams then its recommended to enable stream caching. See more details at http://camel.apache.org/stream-caching.html
2019.12.17 22:28:11 INFO org.apache.camel.component.seda.SedaEndpoint Thread[main,5,main]: Endpoint seda://thesink is using shared queue: seda://thesink with size: 1000
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route1 started and consuming from: seda://thesink
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Total 1 routes, of which 1 are started
2019.12.17 22:28:11 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Apache Camel 3.0.0 (CamelContext: camel-1) started in 0.173 seconds
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.ReactiveMessagingExtension Thread[main,5,main]: Analyzing mediator bean: Managed Bean [class aaa.bbb.ccc.jar.CamelSub] with qualifiers [@Any @Default]
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Scanning Type: class aaa.bbb.ccc.jar.CamelSub
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Deployment done... start processing
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found incoming connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Found outgoing connectors: []
2019.12.17 22:28:11 INFO io.smallrye.reactive.messaging.impl.ConfiguredChannelFactory Thread[main,5,main]: Stream manager initializing...
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Initializing mediators
2019.12.17 22:28:12 INFO org.apache.camel.impl.DefaultCamelContext Thread[main,5,main]: Route: route2 started and consuming from: reactive-streams://ID-LAPTOP-4LR4PMVQ-1576639692145-0-1
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting mediators
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Attempt to resolve aaa.bbb.ccc.jar.CamelSub#sink
2019.12.17 22:28:12 INFO io.smallrye.reactive.messaging.extension.MediatorManager Thread[main,5,main]: Connecting aaa.bbb.ccc.jar.CamelSub#sink to `data` (org.eclipse.microprofile.reactive.streams.operators.core.PublisherBuilderImpl@3eda0aeb)
2019.12.17 22:28:12 INFO org.jboss.weld.Bootstrap Thread[main,5,main]: WELD-ENV-002003: Weld SE container c1eaa1fb-486c-4b95-b56b-0f1a7b88f741 initialized
2019.12.17 22:28:12 WARNING io.helidon.microprofile.server.Server$Builder Thread[main,5,main]: Failed to find JAX-RS resource to use
2019.12.17 22:28:12 INFO io.helidon.microprofile.security.SecurityMpService Thread[main,5,main]: Security extension for microprofile is enabled, yet security configuration is missing from config (requires providers configuration at key security.providers). Security will not have any valid provider.
2019.12.17 22:28:12 INFO io.smallrye.openapi.api.OpenApiDocument Thread[main,5,main]: OpenAPI document initialized: io.smallrye.openapi.api.models.OpenAPIImpl@77f905e3
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[main,5,main]: Version: 1.4.0
2019.12.17 22:28:12 INFO io.helidon.webserver.NettyWebServer Thread[nioEventLoopGroup-2-1,10,main]: Channel '@default' started: [id: 0xd8f72801, L:/0:0:0:0:0:0:0:0:8082]
2019.12.17 22:28:12 INFO io.helidon.microprofile.server.ServerImpl Thread[nioEventLoopGroup-2-1,10,main]: Server initialized on http://localhost:8082 (and all other host addresses) in 3310 milliseconds.
2019.12.17 22:28:13 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]
2019.12.17 22:28:14 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-292]
2019.12.17 22:28:15 INFO route1 Thread[Camel (camel-1) thread #1 - seda://thesink,5,main]: ooooooo SUB ooooooo camelsub - body: Exchange[ID-LAPTOP-4LR4PMVQ-1576639597494-0-295]    
...

ПРИМЕЧАНИЕ. Приведенные выше выходные данные должны отображать числа ... а не, например, «Обмен [ID-LAPTOP-4LR4PMVQ-1576639597494-0-289]» и т. д. :-(

по сути один и тот же maven pom.xml для каждого

<?xml version="1.0" encoding="UTF-8"?>
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
    <modelVersion>4.0.0</modelVersion>
    <groupId>aaa.bbb.ccc</groupId>
    <artifactId>[NOTE: essentially same pom.xml for both camelpub or camelsub]</artifactId>
    <version>1.0</version>
    <properties>
        <helidonVersion>1.4.0</helidonVersion>
        <package>aaa.bbb.ccc.jar</package>
        <failOnMissingWebXml>false</failOnMissingWebXml>
        <mpVersion>3.2</mpVersion>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <libs.classpath.prefix>libs</libs.classpath.prefix>
        <mainClass>io.helidon.microprofile.server.Main</mainClass>
        <jersey.version>2.29</jersey.version>
        <copied.libs.dir>${project.build.directory}/${libs.classpath.prefix}</copied.libs.dir>
        <camelversion>3.0.0</camelversion>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.eclipse.microprofile</groupId>
            <artifactId>microprofile</artifactId>
            <version>${mpVersion}</version>
            <type>pom</type>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.eclipse.microprofile.reactive.messaging</groupId>
            <artifactId>microprofile-reactive-messaging-api</artifactId>
            <version>1.0</version>
            <type>jar</type>
        </dependency>
        <dependency>
            <groupId>javax</groupId>
            <artifactId>javaee-api</artifactId>
            <version>8.0</version>
            <type>jar</type>
        </dependency> 
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camelversion}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-reactive-streams</artifactId>
            <version>${camelversion}</version>
        </dependency>       
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-cdi</artifactId>
            <version>${camelversion}</version>
        </dependency>                              
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-provider</artifactId>
            <version>1.0.8</version>
        </dependency>          
        <dependency>
            <groupId>io.smallrye.reactive</groupId>
            <artifactId>smallrye-reactive-messaging-amqp</artifactId>
            <version>1.0.8</version>
        </dependency>          
        <dependency>
            <groupId>javax.enterprise</groupId>
            <artifactId>cdi-api</artifactId>
            <version>2.0</version>
        </dependency>          
        <dependency>
            <groupId>io.helidon</groupId>
            <artifactId>helidon-bom</artifactId>
            <version>${helidonVersion}</version>
            <type>pom</type>
        </dependency>
        <dependency>
            <groupId>org.jboss</groupId>
            <artifactId>jandex</artifactId>
            <version>2.1.1.Final</version>
            <scope>runtime</scope>
            <optional>true</optional>            
        </dependency>
        <dependency>
            <groupId>javax.activation</groupId>
            <artifactId>javax.activation-api</artifactId>
            <version>1.2.0</version>
            <scope>runtime</scope>            
        </dependency>
        <dependency>
            <groupId>org.glassfish.jersey.media</groupId>
            <artifactId>jersey-media-json-binding</artifactId>
            <version>${jersey.version}</version>
        </dependency>
        <dependency>
            <groupId>io.helidon.microprofile.bundles</groupId>
            <artifactId>helidon-microprofile-3.0</artifactId>
            <version>${helidonVersion}</version>
        </dependency>
    </dependencies>
    <build>
        <finalName>[NOTE: essentially same pom.xml for both camelpub or camelsub]</finalName>
        <plugins>                             
            <plugin>
                <artifactId>maven-jar-plugin</artifactId>
                <version>2.5</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <classpathPrefix>${libs.classpath.prefix}</classpathPrefix>
                            <mainClass>${mainClass}</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <artifactId>maven-dependency-plugin</artifactId>
                <version>2.9</version>
                <executions>
                    <execution>
                        <id>copy-dependencies</id>
                        <phase>prepare-package</phase>
                        <goals>
                            <goal>copy-dependencies</goal>
                        </goals>
                        <configuration>
                            <outputDirectory>${copied.libs.dir}</outputDirectory>
                            <overWriteReleases>false</overWriteReleases>
                            <overWriteSnapshots>false</overWriteSnapshots>
                            <overWriteIfNewer>true</overWriteIfNewer>
                            <overWriteIfNewer>true</overWriteIfNewer>
                            <includeScope>runtime</includeScope>
                            <excludeScope>test</excludeScope>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

docker-compose.yml (Artemis)

# A docker compose file to start an Artemis AMQP broker
# more details on https://github.com/vromero/activemq-artemis-docker.
version: '2'

services:

  artemis:
    image: vromero/activemq-artemis:2.8.0-alpine
    ports:
      - "8161:8161"
      - "61616:61616"
      - "5672:5672"
    environment:
      ARTEMIS_USERNAME: artuser
      ARTEMIS_PASSWORD: artpassword

используемые технологии

java 8
apache camel
smallrye
artemis
reactive streams/programming

(использовал эту ссылку в качестве ресурса: https://smallrye.io/smallrye-reactive-messaging/)


person sairn    schedule 04.12.2019    source источник
comment
Вы пытались использовать компонент vm вместо seda? seda работает только в одном CamelContext, следовательно, не работает в двух разных приложениях.   -  person thokuest    schedule 11.12.2019
comment
Я попробую это. Спасибо!   -  person sairn    schedule 11.12.2019
comment
Не ответ, но я предполагаю, что application.properties, если он вообще читается, читается Camel, поскольку Helidon не читает его по умолчанию. Кроме того, я отмечу, что у вас есть @Startup, которая является аннотацией EJB, а Helidon намеренно не поддерживает EJB, поэтому эта аннотация будет проигнорирована.   -  person Laird Nelson    schedule 11.12.2019
comment
Меня также интересует mp.messaging.outgoing.data.host=0.0.0.0, и сразу после этого mp.messaging.outgoing.data.host=8084 то же самое относится и к входящим свойствам, хотя я уверен, что это не будет причиной   -  person Roman Vottner    schedule 11.12.2019
comment
FWIW, @ClausIbsen имеет похожий пример в его книге Camel in Action 2 в главе 20   -  person Roman Vottner    schedule 11.12.2019
comment
спасибо, за комментарии! :-) (например, это была одна из многих итераций - я использовал localhost вместо 0.0.0.0). то, что я, кажется, не могу определить, - это то, какими именно должны быть свойства / значения проводки, чтобы обеспечить связь между издателем и подписчиком. Кажется, обмен seda: на vm: не имеет значения (?)   -  person sairn    schedule 11.12.2019
comment
также - изменение ошибочного свойства хоста на порт (для 8084). не уверен, что это была ошибка вырезания и вставки при создании этого сообщения, но сейчас проверяю. Спасибо!   -  person sairn    schedule 11.12.2019
comment
переработал файл application.properties (исправил свойство порта). удалили аннотацию @Startup из маршрутов издателя и подписчика. - Также добавьте исключения, которые я вижу сейчас - как для издателя, так и для подписчика.   -  person sairn    schedule 11.12.2019
comment
@sairn да, vm не помогает, работает только на той же ВМ.   -  person thokuest    schedule 12.12.2019
comment
кажется, что application.properties не имеет должного эффекта. переключил (пока) всю конфигурацию на microprofile-config.properties - еще раз спасибо за полезные наблюдения!   -  person sairn    schedule 18.12.2019


Ответы (3)


Проблема, которую вы указали в своем сообщении, является довольно распространенным вариантом использования с некоторыми четко определенными шаблонами для решения проблемы, что в этом случае разумно предполагает настройку какого-либо промежуточного программного обеспечения для асинхронного обмена сообщениями, такого как Apache ActiveMQ, RabbitMQ , Apache Kafka и т. д. Это дает вам идеальный способ разделить контексты Camel, как указано в статье. Зачем использовать несколько контекстов Camel? Эта концепция дополнительно объясняется в документации Apache Camel для EIP канала сообщений (EIP = Шаблон интеграции предприятия).

В вашем сообщении выше я вижу, что вы пытаетесь использовать Camel SEDA . На его странице документации указано:

Обратите внимание, что очереди видны только в пределах одного CamelContext. Если вы хотите взаимодействовать между экземплярами CamelContext (например, взаимодействовать между веб-приложениями), см. Компонент виртуальной машины.

Этот компонент не реализует никакого вида сохранения или восстановления, если виртуальная машина завершает работу, пока сообщения еще не обработаны. Если вам нужна настойчивость, надежность или распределенная SEDA, попробуйте использовать JMS или ActiveMQ.

Компонент Camel VM не будет работать для вас здесь, так как ваши многократные Контексты Camel распределены по разным серверам. Компонент виртуальной машины может работать между несколькими контекстами Camel, но все они должны работать в одной JVM для взаимодействия.

По этим причинам я не вижу возможности использовать какое-то промежуточное ПО для обмена сообщениями в этом случае.

Поскольку вы упомянули потоковую передачу, хорошим выбором может быть что-то вроде Apache Kafka. Я не работал с этим раньше и не мог больше комментировать это, но я нашел статью, в которой один парень говорит об этом (см. Реактивные потоки для Apache Kafka). Camel имеет компонент Kafka, который можно использовать для соединения всего вместе.

person Eric Green    schedule 11.12.2019
comment
Требуется брокер. Я использовал документацию по верблюжьей рыбе, найденную здесь (smallrye.io/smallrye-reactive-messaging). Хотя в документации упоминается конфигурация с участием брокера, скажем, для MQTT, KAFKA и AMPQ - для интеграции Apache Camel это не похоже. - Действительно ли брокер должен подключать верблюжий маршрут подписчика к верблюжьему маршруту издателя, когда они находятся на разных серверах / портах? - person sairn; 12.12.2019
comment
Брокер не потребуется, если вы запускаете несколько веб-приложений в одной JVM. В этом случае я бы использовал компонент Camel VM, который позволяет взаимодействовать между несколькими контекстами Camel, но только в пределах одной JVM. Однако вы упомянули, что ваши приложения Camel распределены по нескольким серверам, и в этом случае какой-то брокер обычно считается подходящим решением. Взгляните на некоторые статьи, на которые я ссылался в своем ответе. Большинство из них довольно короткие и достаточно нетехнические. Я с радостью отвечу на любые другие вопросы по мере необходимости. - person Eric Green; 12.12.2019
comment
Еще бы. Также, если вы считаете, что я дал правильный ответ на ваш вопрос, не забудьте отметить его как таковой :) - person Eric Green; 12.12.2019
comment
вы можете быть уверены. :-) - person sairn; 12.12.2019
comment
Привет, Эрик. Я только что настроил Артемис в качестве своего брокера. все еще безуспешно. Проблема заключается в отсутствии убедительной документации и / или примеров того, как подключить application.properties (для маршрутов верблюда издателя / подписчика), чтобы мой сценарий (см. выше) работал. Похоже, это то, что мне действительно нужно. Дайте мне знать, есть ли у вас какое-либо представление о правильном подключении свойств, чтобы заставить вышеуказанный сценарий работать (при условии, что брокер artemis). Уже поздно - завтра я опубликую свои изменения, связанные с Artemis / вывод на консоль, включая свойства приложения. спасибо, еще раз! - person sairn; 12.12.2019
comment
Я только что посмотрел на Артемиса. Похоже, это новая версия Apache ActiveMQ. Это хороший выбор. - person Eric Green; 12.12.2019
comment
Позвольте нам продолжить это обсуждение в чате. - person Eric Green; 12.12.2019

Не общий простой ответ верблюда, но RSocket реализует модель программирования RX по сети (поверх сокетов TCP, веб-сокетов HTTP и т. Д.).

https://github.com/rsocket/rsocket-java

Он хорошо поддерживается фреймворками приложений, такими как Spring Boot. Но это не тот простой пример работы, о котором вы просите.

person Yuri Schimke    schedule 14.12.2019
comment
Спасибо за ответ, Юрий. Согласно документации SmallRye / Camel, подход, который я использую, должен работать. На данный момент я предполагаю, что причина, по которой у меня не было прорыва, заключается в том, что я не понимаю, как следует использовать application.properties для настройки успешного соединения. (Я понимаю, что также может быть проблема с версиями зависимостей [?] ... - Я тоже мучился [не очень систематически], пытаясь решить эту проблему - но безуспешно). - person sairn; 15.12.2019

Итак, получается, что изменение сигнатуры метода с ...

@Outgoing("data")
public Publisher<Exchange> source() {
    ...
}

to...

@Outgoing("data")
public Publisher<String> source() {
    ...
}

устраняет проблему, так что теперь подписчик получает и регистрирует значение / полезную нагрузку, отправленную издателем

person sairn    schedule 06.01.2020