Spring облачная функция Cosuming rabbitmq queue - Dispatcher не имеет подписчиков для канала

Данный:

  cloud:
    stream:
      rabbit:
      bindings:
        inboundApolloLookupVehicleChannel:
          destination: fed.apollo-vehicle-lookup-test
          group: apollo-mngt-group
          consumer:
            missingQueuesFatal: true
            prefetch: 25
            autoBindDlq: true
            maxAttempts: 1
            republishToDlq: true
            requeueRejected: false
            durableSubscription: true
            maxConcurrency: 6              
      function:
        definition: consume  

а также:

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(LookupMessageChannel.class)
public class ApolloLookupServiceApplication {

    public static void main(String[] args) {
        SpringApplication.run(ApolloLookupServiceApplication.class, args);
    }
}


public interface LookupMessageChannel {

    String INBOUND_LOOKUP_VEHICLE_CHANNEL = "inboundApolloLookupVehicleChannel";

    @Input(INBOUND_LOOKUP_VEHICLE_CHANNEL)
    SubscribableChannel inboundApolloLookupVehicleChannel();

}

@Service
@MessageEndpoint
@RequiredArgsConstructor
public class ApolloVehicleLookupService {

    private final ApolloVehicleLookUpRepository apolloVehicleLookUpRepository;
    private static final Logger LOGGER = LoggerFactory.getLogger(ApolloVehicleLookupService.class);

    @Bean
    public Consumer<Flux<ApolloVehicleLookUp>> consume() {
        return stream -> stream             
                .flatMap(this.apolloVehicleLookUpRepository::save)
                .subscribe(value -> {
                        LOGGER.info("stored value: " + value.toString());
                });
    }

}

ПОМ:



    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath />
        <!-- lookup parent from repository -->
    </parent>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <java.version>14</java.version>
        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>
        <os-maven-plugin.version>1.5.0.Final</os-maven-plugin.version>
        <spotify-docker-maven.version>1.2.0</spotify-docker-maven.version>      
        <os.detected.classifier>linux-x86_64</os.detected.classifier>
    </properties>

  <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-rsocket</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
  </dependencies>


Почему я получаю следующее исключение?

[payload = org.springframework.messaging.MessageDeliveryException: У диспетчера нет подписчиков на канал apollo-lookup-service-1.inboundApolloLookupVehicleChannel.; вложенное исключение - org.springframework.integration.MessageDispatchingException: Dispatcher не имеет подписчиков, failedMessage = GenericMessage [payload = byte [439]

Я просто хотел получить сообщения из этой очереди.

Любая помощь будет оценена, спасибо.


person Diego S    schedule 15.04.2020    source источник


Ответы (1)


Простое исключение означает, что ваш канал, определенный в LookupMessageChannel, не имеет подписчиков и почему он должен это делать, поскольку вы их не определили. Мне также кажется, что вы пытаетесь использовать функциональный подход, в то время как остальная часть вашего потокового приложения использует устаревшие конфигурации, которые почти не рекомендуются.

Не поймите меня неправильно, но в вашем приложении сейчас есть проблемы, поэтому сложно определить, что именно вы хотите сделать. . .

Примите во внимание следующее: это краткое руководство (5 минут вверху), чтобы познакомить вас с правильным функциональным подходом и затем задать дополнительные вопросы.

person Oleg Zhurakousky    schedule 15.04.2020
comment
Спасибо @Oleg, мы переносим сервис, и многое пошло не так. теперь аккуратнее. Я следил за быстрым запуском, и он начал работать. Вот мой новый вопрос: если я получаю одно старое сообщение, созданное для старой службы, и я копирую и вставляю его полезные данные в новую очередь, он обрабатывает его нормально, но если я перемещу все сообщение с заголовками в новую очередь i ' m получает следующее исключение: Вызвано: org.springframework.integration.MessageDispatchingException: Диспетчер не имеет подписчиков. Это из-за заголовков? Должен ли я создавать сообщение с помощью функции производителя? - person Diego S; 16.04.2020