Как получить доступ к брокерам kafka, защищенным kerberos, из Eclispe, работающего в Windows 7

У меня есть этот код Java, пытающийся добавить сообщения в очередь kafka

    String msgID = UUID.randomUUID().toString();

    Properties prop = new Properties();
    prop.put("metadata.broker.list", DEFAULT_BROKER);
    prop.put("serializer.class", "kafka.serializer.StringEncoder");
    prop.put("request.required.acks", "-1");
    prop.put("producer.type", "async");

    ProducerConfig config = new ProducerConfig(prop);
    Producer<String, String> producer = new Producer<String, String>(config);

    KeyedMessage<String, String> message = new KeyedMessage<String, String>(TOPIC_NAME, msgID, "stackoverflow");

    try {
        producer.send(message);
        LOG.info("Messages sent with key" + msgID);
    } catch (Exception exception) {

    }

    producer.close();

Брокеры kafka — это удаленные машины Linux, защищенные Kerberos.

когда я выполняю приведенный выше код Java на своем локальном компьютере с Windows из Eclispe, я получаю следующий вывод журнала

2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Verifying properties
2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Property metadata.broker.list is overridden to remote000001.machine.test.group:6667,remote000002.machine.test.group:6667,remote000003.machine.test.group:6667
2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Property producer.type is overridden to async
2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Property request.required.acks is overridden to -1
2016-01-06 15:48:57 INFO  VerifiableProperties:68 - Property serializer.class is overridden to kafka.serializer.StringEncoder
2016-01-06 15:48:58 TRACE Producer:36 - Added to send queue an event: KeyedMessage(stackoverflowTopic,2f0c8b29-46e0-4860-961b-0fea480d21f9,2f0c8b29-46e0-4860-961b-0fea480d21f9,stackoverflow)
2016-01-06 15:48:58 TRACE Producer:36 - Remaining queue size: 10000
2016-01-06 15:48:58 INFO  KafkaPro:63 - Messages sent with key2f0c8b29-46e0-4860-961b-0fea480d21f9
2016-01-06 15:48:58 INFO  Producer:68 - Shutting down producer
2016-01-06 15:48:58 TRACE ProducerSendThread:36 - Dequeued item for topic stackoverflowTopic, partition key: 2f0c8b29-46e0-4860-961b-0fea480d21f9, data: stackoverflow
2016-01-06 15:48:58 INFO  ProducerSendThread:68 - Begin shutting down ProducerSendThread
2016-01-06 15:48:58 DEBUG ProducerSendThread:52 - Handling 1 events
2016-01-06 15:48:58 DEBUG DefaultEventHandler:52 - Handling 1 events
2016-01-06 15:48:58 TRACE SyncProducer:36 - Instantiating Scala Sync Producer with properties: {metadata.broker.list=remote000001.machine.test.group:6667,remote000002.machine.test.group:6667,remote000003.machine.test.group:6667, request.required.acks=-1, port=6667, serializer.class=kafka.serializer.StringEncoder, host=remote000002.machine.test.group, producer.type=async}
2016-01-06 15:48:58 INFO  ClientUtils$:68 - Fetching metadata from broker id:1,host:remote000002.machine.test.group,port:6667 with correlation id 0 for 1 topic(s) Set(stackoverflowTopic)
2016-01-06 15:48:58 TRACE SyncProducer:36 - verifying sendbuffer of size 34
2016-01-06 15:48:58 INFO  SyncProducer:68 - Connected to remote000002.machine.test.group:6667 for producing
2016-01-06 15:48:58 INFO  SyncProducer:68 - Disconnecting from remote000002.machine.test.group:6667
2016-01-06 15:48:58 WARN  ClientUtils$:89 - Fetching topic metadata with correlation id 0 for topics [Set(stackoverflowTopic)] from broker [id:1,host:remote000002.machine.test.group,port:6667] failed
java.nio.channels.ClosedChannelException
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)
    at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
    at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67)
    at kafka.utils.Utils$.swallow(Utils.scala:172)
    at kafka.utils.Logging$class.swallowError(Logging.scala:106)
    at kafka.utils.Utils$.swallowError(Utils.scala:45)
    at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67)
    at kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:105)
    at kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:94)
    at kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:45)
2016-01-06 15:48:58 INFO  SyncProducer:68 - Disconnecting from remote000002.machine.test.group:6667
2016-01-06 15:48:58 TRACE SyncProducer:36 - Instantiating Scala Sync Producer with properties: {metadata.broker.list=remote000001.machine.test.group:6667,remote000002.machine.test.group:6667,remote000003.machine.test.group:6667, request.required.acks=-1, port=6667, serializer.class=kafka.serializer.StringEncoder, host=remote000001.machine.test.group, producer.type=async}
2016-01-06 15:48:58 INFO  ClientUtils$:68 - Fetching metadata from broker id:0,host:remote000001.machine.test.group,port:6667 with correlation id 0 for 1 topic(s) Set(stackoverflowTopic)
2016-01-06 15:48:58 TRACE SyncProducer:36 - verifying sendbuffer of size 34
2016-01-06 15:48:58 INFO  SyncProducer:68 - Connected to remote000001.machine.test.group:6667 for producing
2016-01-06 15:48:58 INFO  SyncProducer:68 - Disconnecting from remote000001.machine.test.group:6667
2016-01-06 15:48:58 WARN  ClientUtils$:89 - Fetching topic metadata with correlation id 0 for topics [Set(stackoverflowTopic)] from broker [id:0,host:remote000001.machine.test.group,port:6667] failed

я создал файл kafka_jaas.conf со следующим содержимым

KafkaServer {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/kafka.service.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="kafka"
   principal="myPrincipal";
};
KafkaClient {
   com.sun.security.auth.module.Krb5LoginModule required
   useTicketCache=true
   renewTicket=true
   serviceName="kafka";
};
Client {
   com.sun.security.auth.module.Krb5LoginModule required
   useKeyTab=true
   keyTab="/etc/security/keytabs/kafka.service.keytab"
   storeKey=true
   useTicketCache=false
   serviceName="zookeeper"
   principal="myPrincipal";
};

и передал его местоположение в качестве аргумента JVM

 -Djava.security.auth.login.config=C:\\kafka_jaas.conf

почему этот подход не работает?

Как вы выполняете kafka procuder, который пишет сообщения брокеру, защищенному kerberos.


person Hector    schedule 06.01.2016    source источник
comment
Чтобы получить дополнительную информацию об отладке: title="подключиться к кусту в защищенном кластере с проверкой подлинности Kerberos с помощью keytab"> stackoverflow.com/questions/31824149/   -  person Samson Scharfrichter    schedule 06.01.2016
comment
Просто предположение: клиент Zookeper создает частный билет (useKeyTab=true + useTicketCache=false), но KafkaClient не может создать другой билет (без keytab) или использовать существующий билет (useTicketCache=true, но если вы kinit в другом процессе кеш по умолчанию будет пуст...)   -  person Samson Scharfrichter    schedule 06.01.2016
comment
Спасибо, что нашли время, чтобы посмотреть на это. Я получу дополнительную отладку безопасности и рассмотрю исправление моего файла jaas conf.   -  person Hector    schedule 06.01.2016
comment
как только я прошел -Djava.security.debug=gssloginconfig,configfile,configparser,logincontext ; я все еще не получил никакого дополнительного вывода отладки. Фактически, затем я попытался передать несуществующие хосты и номера портов в моем списке брокеров производителю kafka, и это не имело никакого значения для выхода из системы, который я получил. У меня такое чувство, что я вообще ни к чему не подключаюсь с моей машины с Windows7. просто невозможно подключиться к удаленному брокеру kafka из eclipse?   -  person Hector    schedule 08.01.2016
comment
Возможно, где-то есть брандмауэр, который не позволяет вашему компьютеру подключаться к серверам Kafka через этот конкретный порт — проверьте с вашими сетевыми администраторами :-/   -  person Samson Scharfrichter    schedule 08.01.2016
comment
Вы нашли решение для этого? Я пытаюсь сделать точно так же   -  person Biplob Biswas    schedule 06.03.2018