Группа потребителей KAFKA + FLINK 1.1.2 не работает как исключение

Когда я попытался подключиться к одной теме с 3 с разделом и 3 FlinkKafkaConsumer09 потреблять из одной темы и использовать свойство группы потребителей Kafka, как показано ниже.

props.setProperty("group.id", "myGroup");                 
props.setProperty("auto.offset.reset", "latest");

но все равно 3 потребитель получает все данные. согласно концепции группы потребителей, данные должны отправляться только одному потребителю внутри группы потребителей.

Но это хорошо работает с обычным потребителем Java. проблема с FlinkKafkaConsumer09 ?


person Sudhanshu Lenka    schedule 01.10.2016    source источник


Ответы (1)


Этот вопрос можно решить, написав на FlinkConsumer.

Шаги: 1. вы должны передать разделы как свойство, чтобы щелкнуть потребителя

проблема: в соответствии с этим у вас есть один потребитель для одного раздела

public class YourConsumer<T> extends FlinkKafkaConsumerBase<T> 
{
    public static final long DEFAULT_POLL_TIMEOUT = 100L;

    private final long pollTimeout;

    public FlinkKafkaConsumer09(String topic, DeserializationSchema<T> valueDeserializer, Properties props) {
        this(Collections.singletonList(topic), valueDeserializer, props);
    }


    public FlinkKafkaConsumer09(String topic, KeyedDeserializationSchema<T> deserializer, Properties props) {
        this(Collections.singletonList(topic), deserializer, props);
    }


    public FlinkKafkaConsumer09(List<String> topics, DeserializationSchema<T> deserializer, Properties props) {
        this(topics, new KeyedDeserializationSchemaWrapper<>(deserializer), props);
    }

    public FlinkKafkaConsumer09(List<String> topics, KeyedDeserializationSchema<T> deserializer, Properties props) {
        super(topics, deserializer);

        this.properties = checkNotNull(props, "props");
        setDeserializer(this.properties);

        // configure the polling timeout
        try {
            if (properties.containsKey(KEY_POLL_TIMEOUT)) {
                this.pollTimeout = Long.parseLong(properties.getProperty(KEY_POLL_TIMEOUT));
            } else {
                this.pollTimeout = DEFAULT_POLL_TIMEOUT;
            }
        }
        catch (Exception e) {
            throw new IllegalArgumentException("Cannot parse poll timeout for '" + KEY_POLL_TIMEOUT + '\'', e);
        }
    }

    @Override
    protected AbstractFetcher<T, ?> createFetcher(
            SourceContext<T> sourceContext,
            List<KafkaTopicPartition> thisSubtaskPartitions,
            SerializedValue<AssignerWithPeriodicWatermarks<T>> watermarksPeriodic,
            SerializedValue<AssignerWithPunctuatedWatermarks<T>> watermarksPunctuated,
            StreamingRuntimeContext runtimeContext) throws Exception {

        boolean useMetrics = !Boolean.valueOf(properties.getProperty(KEY_DISABLE_METRICS, "false"));

        return new Kafka09Fetcher<>(sourceContext, thisSubtaskPartitions,
                watermarksPeriodic, watermarksPunctuated,
                runtimeContext, deserializer,
                properties, pollTimeout, useMetrics);
    }

    @Override
    protected List<KafkaTopicPartition> getKafkaPartitions(List<String> topics) {
        // read the partitions that belong to the listed topics
        final List<KafkaTopicPartition> partitions = new ArrayList<>();
         int partition=Integer.valueOf(this.properties.get("partitions"));
        try (KafkaConsumer<byte[], byte[]> consumer = new KafkaConsumer<>(this.properties)) {
            for (final String topic: topics) {
                // get partitions for each topic
                List<PartitionInfo> partitionsForTopic = consumer.partitionsFor(topic);
                // for non existing topics, the list might be null.
                if (partitionsForTopic != null) {
                    partitions.addAll(convertToFlinkKafkaTopicPartition(partitionsForTopic),partition);
                }
            }
        }

        if (partitions.isEmpty()) {
            throw new RuntimeException("Unable to retrieve any partitions for the requested topics " + topics);
        }

        // we now have a list of partitions which is the same for all parallel consumer instances.
        LOG.info("Got {} partitions from these topics: {}", partitions.size(), topics);

        if (LOG.isInfoEnabled()) {
            logPartitionInfo(LOG, partitions);
        }

        return partitions;
    }


    private static List<KafkaTopicPartition> convertToFlinkKafkaTopicPartition(List<PartitionInfo> partitions,int partition) {
        checkNotNull(partitions);

        List<KafkaTopicPartition> ret = new ArrayList<>(partitions.size());
        //for (PartitionInfo pi : partitions) {
            ret.add(new KafkaTopicPartition(partitions.get(partition).topic(), partitions.get(partition).partition()));
        //  }
        return ret;
    }


    private static void setDeserializer(Properties props) {
        final String deSerName = ByteArrayDeserializer.class.getCanonicalName();

        Object keyDeSer = props.get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        Object valDeSer = props.get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);

        if (keyDeSer != null && !keyDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured key DeSerializer ({})", ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG);
        }
        if (valDeSer != null && !valDeSer.equals(deSerName)) {
            LOG.warn("Ignoring configured value DeSerializer ({})", ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG);
        }

        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deSerName);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deSerName);
    }
}
person Sudhanshu Lenka    schedule 03.10.2016