当前位置:  开发笔记 > 编程语言 > 正文

Kafka Producer 0.9性能问题与小消息

如何解决《KafkaProducer0.9性能问题与小消息》经验,为你挑选了1个好方法。

在发送小消息时,我们观察到Java Kafka Producer 0.9客户端的性能非常差.消息不会被累积到更大的请求批处理中,因此每个小记录都是单独发送的.

我们的客户配置有什么问题?或者这是另一个问题吗?


使用Kafka Client 0.9.0.0.我们没有在Kafka未发布的9.0.1或9.1固定或未解析的列表中看到任何相关的帖子,因此我们专注于我们的客户端配置和服务器实例.

我们了解linger.ms应该使客户端将记录累积到批处理中.

我们将linger.ms设置为10(并且还尝试了100和1000)但这些不会导致批量累积记录.记录大小约为100字节,请求缓冲区大小为16K,我们预计在一个请求中将发送大约160条消息.

尽管已经分配了一个新的Bluemix Messaging Hub(Kafka Server 0.9)服务实例,但客户端上的跟踪似乎表明该分区可能已满.测试客户端在没有其他I/O的循环中发送多个消息.


日志显示带有可疑行的重复序列:" 唤醒发件人,因为主题mytopic分区0已满或获得新批次 ".

因此,在我们的测试用例中,新分配的分区应该基本上是空的,那么为什么生产者客户端会获得一个新的批处理?

2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00'  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer  - Sending record ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B@4923ab24 with callback null to topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator  - Allocating a new 16384 byte message buffer for topic mytopic partition 0  
2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer  - Waking up the sender since topic mytopic partition 0 is either full or getting a new batch  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Nodes with data ready to send: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)]  
2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)]  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender  - Received produce response from node 0 with correlation id 11  
2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch  - Produced messages to topic-partition mytopic-0 with base offset offset 130 and error: null.  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer  - Send returned metadata: Topic='mytopic', Partition=0, Offset=130  
2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer  - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00'

Log entries repeat like the above for each record sent

我们提供了以下属性文件:

2015-12-10 15:14:37,843 185  [main] INFO  com.isllc.client.AbstractClient  - Properties retrieved from file for Kafka client: kafka-producer.properties
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     acks=-1
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     ssl.protocol=TLSv1.2
2015-12-10 15:14:37,909 251  [main] INFO  com.isllc.client.AbstractClient  -     key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     client.id=ExploreProducer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.identification.algorithm=HTTPS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.password=changeit
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.type=JKS
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.enabled.protocols=TLSv1.2
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094
2015-12-10 15:14:37,910 252  [main] INFO  com.isllc.client.AbstractClient  -     security.protocol=SASL_SSL

Plus we added linger.ms=10 in code.

Kafka客户端显示扩展/合并配置列表(并显示linger.ms设置):

2015-12-10 15:14:37,970 312  [main] INFO  org.apache.kafka.clients.producer.ProducerConfig  - ProducerConfig values: 
    compression.type = none
    metric.reporters = []
    metadata.max.age.ms = 300000
    metadata.fetch.timeout.ms = 60000
    reconnect.backoff.ms = 50
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094]
    retry.backoff.ms = 100
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    buffer.memory = 33554432
    timeout.ms = 30000
    key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.keystore.type = JKS
    ssl.trustmanager.algorithm = PKIX
    block.on.buffer.full = false
    ssl.key.password = null
    max.block.ms = 60000
    sasl.kerberos.min.time.before.relogin = 60000
    connections.max.idle.ms = 540000
    ssl.truststore.password = [hidden]
    max.in.flight.requests.per.connection = 5
    metrics.num.samples = 2
    client.id = ExploreProducer
    ssl.endpoint.identification.algorithm = null
    ssl.protocol = TLSv1.2
    request.timeout.ms = 30000
    ssl.provider = null
    ssl.enabled.protocols = [TLSv1.2]
    acks = -1
    batch.size = 16384
    ssl.keystore.location = null
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.truststore.type = JKS
    security.protocol = SASL_SSL
    retries = 0
    max.request.size = 1048576
    value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer
    ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts
    ssl.keystore.password = null
    ssl.keymanager.algorithm = SunX509
    metrics.sample.window.ms = 30000
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    send.buffer.bytes = 131072
    linger.ms = 10

发送100条记录后的Kafka指标:

Duration for 100 sends 8787 ms. Sent 7687 bytes.  
    batch-size-avg = 109.87 [The average number of bytes sent per partition per-request.]  
    batch-size-max = 110.0 [The max number of bytes sent per partition per-request.]  
    buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory that is not being used (either unallocated or in the free list).]  
    buffer-exhausted-rate = 0.0 [The average per-second number of record sends that are dropped due to buffer exhaustion]  
    buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the client can use (whether or not it is currently used).]  
    bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for space allocation.]  
    byte-rate = 291.8348916277093 []  
    compression-rate = 0.0 []  
    compression-rate-avg = 0.0 [The average compression rate of record batches.]  
    connection-close-rate = 0.0 [Connections closed per second in the window.]  
    connection-count = 2.0 [The current number of active connections.]  
    connection-creation-rate = 0.05180541884681138 [New connections established per second in the window.]  
    incoming-byte-rate = 10.342564641029007 []  
    io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent doing I/O]  
    io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per select call in nanoseconds.]  
    io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread spent waiting.]  
    io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.]
    metadata-age = 8.096 [The age in seconds of the current producer metadata being used.]  
    network-io-rate = 5.2937784999213795 [The average number of network operations (reads or writes) on all connections per second.]  
    outgoing-byte-rate = 451.2298783403283 []  
    produce-throttle-time-avg = 0.0 [The average throttle time in ms]  
    produce-throttle-time-max = 0.0 [The maximum throttle time in ms]  
    record-error-rate = 0.0 [The average per-second number of record sends that resulted in errors]  
    record-queue-time-avg = 15.5 [The average time in ms record batches spent in the record accumulator.]  
    record-queue-time-max = 434.0 [The maximum time in ms record batches spent in the record accumulator.]  
    record-retry-rate = 0.0 []  
    record-send-rate = 2.65611304417116 [The average number of records sent per second.]  
    record-size-avg = 97.87 [The average record size]  
    record-size-max = 98.0 [The maximum record size]  
    records-per-request-avg = 1.0 [The average number of records per request.]  
    request-latency-avg = 0.0 [The average request latency in ms]  
    request-latency-max = 74.0 []  
    request-rate = 2.6468892499606897 [The average number of requests sent per second.]  
    request-size-avg = 42.0 [The average size of all requests in the window..]  
    request-size-max = 170.0 [The maximum size of any request sent in the window.]  
    requests-in-flight = 0.0 [The current number of in-flight requests awaiting a response.]  
    response-rate = 2.651196976060479 [The average number of responses received per second.]  
    select-rate = 10.989861465830819 [Number of times the I/O layer checked for new I/O to perform per second]  
    waiting-threads = 0.0 [The number of user threads blocked waiting for buffer memory to enqueue their records]  

谢谢



1> Gary Gershon..:

Kafka Users邮件列表上的Guozhang Wang能够通过查看我们的应用程序代码来识别问题:

国章,

是的 - 你发现了问题!

我们插入了.get()进行调试,但没想到(巨大的!)副作用.

使用异步回调非常有效.

我们现在能够在14秒内从笔记本电脑向Bluemix云发送100,000条记录 - 大约快1000倍,

非常感谢你!

加里


2015年12月13日下午2:48,王国章写道:

加里,

你正在调用"kafkaProducer.send(record).get();" 对于每个消息,get()调用块直到Future被初始化,这有效地同步通过在发送下一个消息之前为每个消息请求ACK而发送的所有消息,因此没有批处理.

您可以尝试使用"send(record,callback)"进行异步发送,并让回调处理返回的元数据中的错误.

国章

推荐阅读
php
这个屌丝很懒,什么也没留下!
DevBox开发工具箱 | 专业的在线开发工具网站    京公网安备 11010802040832号  |  京ICP备19059560号-6
Copyright © 1998 - 2020 DevBox.CN. All Rights Reserved devBox.cn 开发工具箱 版权所有