ошибка привязки spring cloud dataflow kafka при отправке - Magic v1 не поддерживает заголовки записей

Попытка заставить поток данных весеннего облака работать в Kubernetes и получить ошибку ниже, когда источник пытается отправить сообщение в kafka. Тот же код отлично работал, когда я использовал Rabbit MQ.Kafka Server версии Kafka 2.1.0. Я читал в других сообщениях, что это может быть связано с несовместимостью версии kafka с клиентом в комплекте с более низкой версией kafka. Интересно, как я могу заставить это работать.

КОД ДЛЯ ОТПРАВКИ СООБЩЕНИЯ НА КАНАЛ KAFKA

   public  void sendToChannelNew(List<String> list,String name)  {        
        MessageBuilder builder=MessageBuilder.withPayload(list.toString());
        builder=builder.setHeader("SOURCE",name);
        source.output().send(builder.build());
    }

ПОМ

 <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
    </dependency>

    <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>2.1.4.RELEASE</version>
    </dependency>

ТРАССИРОВКА СТЕКА

KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5a4a522a]; nested exception is java.lang.IllegalArgumentException: Magic v1 does not support record headers, failedMessage=GenericMessage [payload=byte[271962], headers={SOURCE=CLIENT
entType=application/json, timestamp=1574199924374}]' [enable DEBUG level for full stacktrace] was thrown by a user handler's exceptionCaught() method while handling the following exception:
reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5
ArgumentException: Magic v1 does not support record headers, failedMessage=GenericMessage [payload=byte[271962], headers={SOURCE=ACCOUNT_CLIENT, id=2dcee73c-9da0-72fe-d5f7-07a2dba5b099, contentType=application/json, timestamp=1574199924374}]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ProducerConfigurationMessageHandler@5a4a522a]; nested exception is java.lang.Il
rt record headers
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:189)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:179)
        at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:1077)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132)
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105)
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401)
        at com.jpmorgan.awm.wm.edpipoc.EdpiSource.sendToChannelNew(EdpiSource.java:73)
        at com.jpmorgan.awm.wm.edpipoc.EdpiSource.lambda$null$2(EdpiSource.java:134)
        at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:73)
        at reactor.core.publisher.MonoRunnable.call(MonoRunnable.java:32)
        at reactor.core.publisher.FluxSubscribeOnCallable$CallableSubscribeOnSubscription.run(FluxSubscribeOnCallable.java:225)
        at reactor.core.scheduler.ImmediateScheduler.schedule(ImmediateScheduler.java:47)
        at reactor.core.publisher.MonoSubscribeOnCallable.subscribe(MonoSubscribeOnCallable.java:52)
        at reactor.core.publisher.Mono.subscribe(Mono.java:3694)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:442)
        at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:244)
        at reactor.core.publisher.FluxBuffer$BufferExactSubscriber.onComplete(FluxBuffer.java:183)
        at reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:248)
        at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onComplete(FluxMapFuseable.java:144)
        at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onComplete(FluxContextStart.java:122)
        at reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:262)
        at reactor.core.publisher.DrainUtils.postCompleteDrain(DrainUtils.java:131)
        at reactor.core.publisher.DrainUtils.postComplete(DrainUtils.java:186)
        at reactor.core.publisher.FluxBufferPredicate$BufferPredicateSubscriber.onComplete(FluxBufferPredicate.java:285)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drainAsync(FluxFlattenIterable.java:325)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.drain(FluxFlattenIterable.java:638)
        at reactor.core.publisher.FluxFlattenIterable$FlattenIterableSubscriber.onComplete(FluxFlattenIterable.java:259)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252)
        at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136)
        at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:372)
        at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:196)
        at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:337)
        at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:334)
        at reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:381)
        at reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:522)
        at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:141)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:102)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:438)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:323)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:297)
        at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:253)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1436)
        at io.netty.handler.ssl.SslHandler.decodeJdkCompatible(SslHandler.java:1203)
        at io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1247)
        at io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:502)
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:441)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:278)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1408)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:930)
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:799)
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:427)
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:328)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
        at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: Magic v1 does not support record headers
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:410)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.appendWithOffset(MemoryRecordsBuilder.java:449)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:506)
        at org.apache.kafka.common.record.MemoryRecordsBuilder.append(MemoryRecordsBuilder.java:529)
        at org.apache.kafka.clients.producer.internals.ProducerBatch.tryAppend(ProducerBatch.java:107)
        at org.apache.kafka.clients.producer.internals.RecordAccumulator.append(RecordAccumulator.java:223)
        at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:864)
        at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:803)
        at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:444)
        at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:372)
        at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:209)
        at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:382)
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123)
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:169)

person Ajith Kannan    schedule 19.11.2019    source источник
comment
Вы уверены, что подключаетесь к правильному кластеру Kafka? Эта ошибка действительно указывает на несоответствие версии. Убедитесь, что вы используете брокера Kafka, который совместим с поддержкой заголовков. От брокеров Kafka 0.11 и выше он должен поддерживать заголовки.   -  person sobychacko    schedule 20.11.2019
comment
Хотя версия 0.11 поддерживает заголовки, она все еще очень старая; текущая версия 2.3.1; вам следует обновиться до как можно более новой версии брокера.   -  person Gary Russell    schedule 20.11.2019
comment
Спасибо, Соби и Гэри, извините, моя плохая версия 0.10.0-kafka2.1.0, пожалуйста, предложите, есть ли способ заставить это работать без обновления кака   -  person Ajith Kannan    schedule 20.11.2019


Ответы (1)


Проблема заключалась в том, что используемый брокер kakfka был старым (0.10.0), не поддерживающим заголовки. Перешел на более высокую версию и исправил это.

person Ajith Kannan    schedule 21.11.2019
comment
Привет, @Аджит Каннан! Как это изменить? - person AleGallagher; 03.08.2020
comment
Я использовал брокера kafka 0.10.0. Поэтому обновил библиотеку kafka до более высокой версии. - person Ajith Kannan; 03.08.2020