Проблема с производительностью Kafka Producer 0.9 с небольшими сообщениями

Мы наблюдаем очень низкую производительность клиента Java Kafka Producer 0.9 при отправке небольших сообщений. Сообщения не накапливаются в более крупный пакет запросов, и поэтому каждая небольшая запись отправляется отдельно.

Что не так с конфигурацией нашего клиента? Или это какая-то другая проблема?


Использование Kafka Client 0.9.0.0. Мы не видели никаких связанных публикаций в неизданных фиксированных или неразрешенных списках Kafka 9.0.1 или 9.1, поэтому мы сосредоточились на конфигурации нашего клиента и экземпляре сервера.

Мы понимаем, что linger.ms должен побуждать клиента собирать записи в пакет.

Мы установили linger.ms на 10 (а также попробовали 100 и 1000), но это не привело к пакетному накоплению записей. При размере записи около 100 байт и размере буфера запроса 16 КБ мы ожидали, что за один запрос будет отправлено около 160 сообщений.

Трассировка на клиенте, кажется, указывает на то, что раздел может быть заполнен, несмотря на выделение нового экземпляра службы Bluemix Messaging Hub (Kafka Server 0.9). Тестовый клиент отправляет несколько сообщений в цикле без других операций ввода-вывода.


В журнале отображается повторяющаяся последовательность с подозрительной строкой: «Пробуждение отправителя, поскольку тематический раздел mytopic 0 либо заполнен, либо идет новый пакет».

Таким образом, вновь выделенный раздел должен быть по существу пустым в нашем тестовом примере, поэтому почему клиент-производитель должен получать новую партию?

2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00'  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer  - Sending record ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B@4923ab24 with callback null to topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator  - Allocating a new 16384 byte message buffer for topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer  - Waking up the sender since topic mytopic partition 0 is either full or getting a new batch  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Nodes with data ready to send: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Received produce response from node 0 with correlation id 11  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch  - Produced messages to topic-partition mytopic-0 with base offset offset 130 and error: null.  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer  - Send returned metadata: Topic='mytopic', Partition=0, Offset=130  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00'

Log entries repeat like the above for each record sent

Мы предоставили следующий файл свойств:

2015-12-10 15:14:37,843 185  [main] INFO  com.isllc.client.AbstractClient  - Properties retrieved from file for Kafka client: kafka-producer.properties
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     acks=-1
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     client.id=ExploreProducer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     security.protocol=SASL_SSL

Plus we added linger.ms=10 in code.

Клиент Kafka показывает расширенный / объединенный список конфигурации (и отображение настройки linger.ms):

2015-12-10 15:14:37,970 312  [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values: 
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = [hidden]
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = ExploreProducer
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLSv1.2
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2]
    acks = -1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = SASL_SSL
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 10

Метрики Kafka после отправки 100 записей:

Duration for 100 sends 8787 ms. Sent 7687 bytes.  
    batch-size-avg = 109.87 [The average number of bytes sent per partition per-request.]  
    batch-size-max = 110.0 [The max number of bytes sent per partition per-request.]  
    buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory that is not being used (either unallocated or in the free list).]  
    buffer-exhausted-rate = 0.0 [The average per-second number of record sends that are dropped due to buffer exhaustion]  
    buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the client can use (whether or not it is currently used).]  
    bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for space allocation.]  
    byte-rate = 291.8348916277093 []  
    compression-rate = 0.0 []  
    compression-rate-avg = 0.0 [The average compression rate of record batches.]  
    connection-close-rate = 0.0 [Connections closed per second in the window.]  
    connection-count = 2.0 [The current number of active connections.]  
    connection-creation-rate = 0.05180541884681138 [New connections established per second in the window.]  
    incoming-byte-rate = 10.342564641029007 []  
    io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent doing I/O]  
    io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per select call in nanoseconds.]  
    io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread spent waiting.]  
    io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.]
    metadata-age = 8.096 [The age in seconds of the current producer metadata being used.]  
    network-io-rate = 5.2937784999213795 [The average number of network operations (reads or writes) on all connections per second.]  
    outgoing-byte-rate = 451.2298783403283 []  
    produce-throttle-time-avg = 0.0 [The average throttle time in ms]  
    produce-throttle-time-max = 0.0 [The maximum throttle time in ms]  
    record-error-rate = 0.0 [The average per-second number of record sends that resulted in errors]  
    record-queue-time-avg = 15.5 [The average time in ms record batches spent in the record accumulator.]  
    record-queue-time-max = 434.0 [The maximum time in ms record batches spent in the record accumulator.]  
    record-retry-rate = 0.0 []  
    record-send-rate = 2.65611304417116 [The average number of records sent per second.]  
    record-size-avg = 97.87 [The average record size]  
    record-size-max = 98.0 [The maximum record size]  
    records-per-request-avg = 1.0 [The average number of records per request.]  
    request-latency-avg = 0.0 [The average request latency in ms]  
    request-latency-max = 74.0 []  
    request-rate = 2.6468892499606897 [The average number of requests sent per second.]  
    request-size-avg = 42.0 [The average size of all requests in the window..]  
    request-size-max = 170.0 [The maximum size of any request sent in the window.]  
    requests-in-flight = 0.0 [The current number of in-flight requests awaiting a response.]  
    response-rate = 2.651196976060479 [The average number of responses received per second.]  
    select-rate = 10.989861465830819 [Number of times the I/O layer checked for new I/O to perform per second]  
    waiting-threads = 0.0 [The number of user threads blocked waiting for buffer memory to enqueue their records]  

Спасибо


person Gary Gershon    schedule 10.12.2015    source источник
comment
Вы поделились этим вопросом в списке рассылки разработчиков kafka? Я смутно помню, как пару месяцев назад читал какое-то электронное письмо о падении производительности их основной версии, потому что они давно не тестировали производительность. Хотя я предполагаю, что они провели тестирование производительности перед выпуском.   -  person Morgan Kenyon    schedule 11.12.2015
comment
Если посмотреть на клиентский код Kafka, откуда исходит сообщение Пробуждение отправителя ..., похоже, что это сообщение может немного вводить в заблуждение. Сообщение выводится, если пакет заполнен или создается новый пакет, а не если раздел заполнен, как предполагает сообщение. Я не думаю, что заполнение раздела действительно имеет смысл в этом контексте - Message Hub сохраняет до 1 ГБ данных сообщений на раздел, прежде чем он начнет отбрасывать старые сообщения для хранения новых, но при этом используется стандартная конфигурация сервера Kafka. параметры.   -  person Oliver Deakin    schedule 11.12.2015
comment
@ morganw09dev - Я видел проблему с производительностью в списке рассылки Kafka, и они решили ее. Мое исходное предположение состоит в том, что у нашей конфигурации есть проблема. Это не провал - разница в производительности при работе буферизации увеличится в 10-100 раз. :) Я дам этому форуму еще день, а затем продолжу список разработчиков. Спасибо.   -  person Gary Gershon    schedule 11.12.2015
comment
@OliverDeakin - Я также просмотрел код и видел альтернативные тесты для сообщения. Одна мысль заключалась в том, что команда Message Hub изменила диалог с сервером в процессе создания многопользовательского сервисного решения, и это могло ошибочно привести к тому, что клиент решил, что раздел заполнен. В любом случае я создал новый пустой экземпляр Message Hub. OTOH - Возможно ли, что никто другой не пытался отправлять небольшие сообщения с пакетной обработкой в ​​несколько миллисекунд с использованием клиента 0.9 Producer? Как насчет Bluemix Event Hub для Twitter? Если это не так, похоже, они должны быть!   -  person Gary Gershon    schedule 11.12.2015
comment
@GaryGershon - Мы выполнили тесты производительности с новым производителем и 100 байт здесь и увидели, что пакетная обработка работает с хорошей пропускной способностью, хотя я не думаю, что мы устанавливали linger.ms. Вы не видите пакетной обработки без этой опции конфигурации?   -  person Oliver Deakin    schedule 11.12.2015
comment
@OliverDeakin - Рад слышать, что он способен работать! Мы вообще не наблюдаем пакетной обработки и проводим тестирование с 77-байтовой записью. Удаление linger.ms ничего не меняет. Он явно что-то делает с linger.ms, поскольку установка его на 1000 (мс) приводит к тому, что цикл из 100 отправок занимает более 100 секунд, как и следовало ожидать. Все наши настройки и показатели показаны выше. Цикл отправки очень прост, форматирует сообщение и использует log4j только с консольным приложением. Вы устанавливаете другие значения конфигурации по сравнению с указанными выше? Есть ли в дикой природе более одного kafka-clients-0.9.0.0.jar?   -  person Gary Gershon    schedule 11.12.2015
comment
Мы переносим это в список рассылки пользователей Apache Kafka. Архив mail-archives.apache.org/mod_mbox/kafka-users /201512.mbox/. Если у вас есть комментарии, воспользуйтесь списком рассылки сейчас. Спасибо!   -  person Gary Gershon    schedule 13.12.2015


Ответы (1)


Guozhang Wang из списка рассылки Kafka Users смог распознать проблему, просмотрев код нашего приложения:

Гочжан,

Да, вы определили проблему!

Мы вставили .get () для отладки, но не думали о (огромных!) Побочных эффектах.

Использование асинхронного обратного вызова работает отлично.

Теперь мы можем отправить 100000 записей за 14 секунд с ноутбука в облако Bluemix - примерно в 1000 раз быстрее,

Большое спасибо!

Гэри


13 декабря 2015 г. в 14:48 Гочжан Ван написал:

Гэри,

Вы вызываете "kafkaProducer.send (record) .get ();" для каждого сообщения блок вызова get () до инициализации Future, который эффективно синхронизирует все отправленное сообщение, запрашивая ACK для каждого сообщения перед отправкой следующего сообщения, следовательно, без пакетной обработки.

Вы можете попробовать использовать «send (запись, обратный вызов)» для асинхронной отправки и позволить обратному вызову обрабатывать ошибки из возвращенных метаданных.

Guozhang

person Gary Gershon    schedule 13.12.2015