在发送小消息时,我们观察到Java Kafka Producer 0.9客户端的性能非常差.消息不会被累积到更大的请求批处理中,因此每个小记录都是单独发送的.
我们的客户配置有什么问题?或者这是另一个问题吗?
使用Kafka Client 0.9.0.0.我们没有在Kafka未发布的9.0.1或9.1固定或未解析的列表中看到任何相关的帖子,因此我们专注于我们的客户端配置和服务器实例.
我们了解linger.ms应该使客户端将记录累积到批处理中.
我们将linger.ms设置为10(并且还尝试了100和1000)但这些不会导致批量累积记录.记录大小约为100字节,请求缓冲区大小为16K,我们预计在一个请求中将发送大约160条消息.
尽管已经分配了一个新的Bluemix Messaging Hub(Kafka Server 0.9)服务实例,但客户端上的跟踪似乎表明该分区可能已满.测试客户端在没有其他I/O的循环中发送多个消息.
日志显示带有可疑行的重复序列:" 唤醒发件人,因为主题mytopic分区0已满或获得新批次 ".
因此,在我们的测试用例中,新分配的分区应该基本上是空的,那么为什么生产者客户端会获得一个新的批处理?
2015-12-10 15:14:41,335 3677 [main] TRACE com.isllc.client.producer.ExploreProducer - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00011 2015-12-10T15:14:41.335-05:00' 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - Sending record ProducerRecord(topic=mytopic, partition=null, key=[B@670b40af, value=[B@4923ab24 with callback null to topic mytopic partition 0 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.internals.RecordAccumulator - Allocating a new 16384 byte message buffer for topic mytopic partition 0 2015-12-10 15:14:41,336 3678 [main] TRACE org.apache.kafka.clients.producer.KafkaProducer - Waking up the sender since topic mytopic partition 0 is either full or getting a new batch 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Nodes with data ready to send: [Node(0, kafka01-prod01.messagehub.services.us-south.bluemix.net, 9094)] 2015-12-10 15:14:41,348 3690 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Created 1 produce requests: [ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.producer.internals.Sender$1@6d62e963, request=RequestSend(header={api_key=0,api_version=1,correlation_id=11,client_id=ExploreProducer}, body={acks=-1,timeout=30000,topic_data=[{topic=mytopic,data=[{partition=0,record_set=java.nio.HeapByteBuffer[pos=0 lim=110 cap=16384]}]}]}), createdTimeMs=1449778481348, sendTimeMs=0)] 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.Sender - Received produce response from node 0 with correlation id 11 2015-12-10 15:14:41,412 3754 [kafka-producer-network-thread | ExploreProducer] TRACE org.apache.kafka.clients.producer.internals.RecordBatch - Produced messages to topic-partition mytopic-0 with base offset offset 130 and error: null. 2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - Send returned metadata: Topic='mytopic', Partition=0, Offset=130 2015-12-10 15:14:41,412 3754 [main] TRACE com.isllc.client.producer.ExploreProducer - Sending record: Topic='mytopic', Key='records', Value='Kafka 0.9 Java Client Record Test Message 00012 2015-12-10T15:14:41.412-05:00' Log entries repeat like the above for each record sent
我们提供了以下属性文件:
2015-12-10 15:14:37,843 185 [main] INFO com.isllc.client.AbstractClient - Properties retrieved from file for Kafka client: kafka-producer.properties 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - acks=-1 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - ssl.protocol=TLSv1.2 2015-12-10 15:14:37,909 251 [main] INFO com.isllc.client.AbstractClient - key.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - client.id=ExploreProducer 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.identification.algorithm=HTTPS 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - value.serializer=org.apache.kafka.common.serialization.ByteArraySerializer 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.password=changeit 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.type=JKS 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.enabled.protocols=TLSv1.2 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - ssl.truststore.location=/Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - bootstrap.servers=kafka01-prod01.messagehub.services.us-south.bluemix.net:9094,kafka02-prod01.messagehub.services.us-south.bluemix.net:9094,kafka03-prod01.messagehub.services.us-south.bluemix.net:9094,kafka04-prod01.messagehub.services.us-south.bluemix.net:9094,kafka05-prod01.messagehub.services.us-south.bluemix.net:9094 2015-12-10 15:14:37,910 252 [main] INFO com.isllc.client.AbstractClient - security.protocol=SASL_SSL Plus we added linger.ms=10 in code.
Kafka客户端显示扩展/合并配置列表(并显示linger.ms设置):
2015-12-10 15:14:37,970 312 [main] INFO org.apache.kafka.clients.producer.ProducerConfig - ProducerConfig values: compression.type = none metric.reporters = [] metadata.max.age.ms = 300000 metadata.fetch.timeout.ms = 60000 reconnect.backoff.ms = 50 sasl.kerberos.ticket.renew.window.factor = 0.8 bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9094, kafka02-prod01.messagehub.services.us-south.bluemix.net:9094, kafka03-prod01.messagehub.services.us-south.bluemix.net:9094, kafka04-prod01.messagehub.services.us-south.bluemix.net:9094, kafka05-prod01.messagehub.services.us-south.bluemix.net:9094] retry.backoff.ms = 100 sasl.kerberos.kinit.cmd = /usr/bin/kinit buffer.memory = 33554432 timeout.ms = 30000 key.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer sasl.kerberos.service.name = null sasl.kerberos.ticket.renew.jitter = 0.05 ssl.keystore.type = JKS ssl.trustmanager.algorithm = PKIX block.on.buffer.full = false ssl.key.password = null max.block.ms = 60000 sasl.kerberos.min.time.before.relogin = 60000 connections.max.idle.ms = 540000 ssl.truststore.password = [hidden] max.in.flight.requests.per.connection = 5 metrics.num.samples = 2 client.id = ExploreProducer ssl.endpoint.identification.algorithm = null ssl.protocol = TLSv1.2 request.timeout.ms = 30000 ssl.provider = null ssl.enabled.protocols = [TLSv1.2] acks = -1 batch.size = 16384 ssl.keystore.location = null receive.buffer.bytes = 32768 ssl.cipher.suites = null ssl.truststore.type = JKS security.protocol = SASL_SSL retries = 0 max.request.size = 1048576 value.serializer = class org.apache.kafka.common.serialization.ByteArraySerializer ssl.truststore.location = /Library/Java/JavaVirtualMachines/jdk1.8.0_51.jdk/Contents/Home/jre/lib/security/cacerts ssl.keystore.password = null ssl.keymanager.algorithm = SunX509 metrics.sample.window.ms = 30000 partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner send.buffer.bytes = 131072 linger.ms = 10
发送100条记录后的Kafka指标:
Duration for 100 sends 8787 ms. Sent 7687 bytes. batch-size-avg = 109.87 [The average number of bytes sent per partition per-request.] batch-size-max = 110.0 [The max number of bytes sent per partition per-request.] buffer-available-bytes = 3.3554432E7 [The total amount of buffer memory that is not being used (either unallocated or in the free list).] buffer-exhausted-rate = 0.0 [The average per-second number of record sends that are dropped due to buffer exhaustion] buffer-total-bytes = 3.3554432E7 [The maximum amount of buffer memory the client can use (whether or not it is currently used).] bufferpool-wait-ratio = 0.0 [The fraction of time an appender waits for space allocation.] byte-rate = 291.8348916277093 [] compression-rate = 0.0 [] compression-rate-avg = 0.0 [The average compression rate of record batches.] connection-close-rate = 0.0 [Connections closed per second in the window.] connection-count = 2.0 [The current number of active connections.] connection-creation-rate = 0.05180541884681138 [New connections established per second in the window.] incoming-byte-rate = 10.342564641029007 [] io-ratio = 0.0038877559207471236 [The fraction of time the I/O thread spent doing I/O] io-time-ns-avg = 353749.2840375587 [The average length of time for I/O per select call in nanoseconds.] io-wait-ratio = 0.21531227995769162 [The fraction of time the I/O thread spent waiting.] io-wait-time-ns-avg = 1.9591901192488264E7 [The average length of time the I/O thread spent waiting for a socket ready for reads or writes in nanoseconds.] metadata-age = 8.096 [The age in seconds of the current producer metadata being used.] network-io-rate = 5.2937784999213795 [The average number of network operations (reads or writes) on all connections per second.] outgoing-byte-rate = 451.2298783403283 [] produce-throttle-time-avg = 0.0 [The average throttle time in ms] produce-throttle-time-max = 0.0 [The maximum throttle time in ms] record-error-rate = 0.0 [The average per-second number of record sends that resulted in errors] record-queue-time-avg = 15.5 [The average time in ms record batches spent in the record accumulator.] record-queue-time-max = 434.0 [The maximum time in ms record batches spent in the record accumulator.] record-retry-rate = 0.0 [] record-send-rate = 2.65611304417116 [The average number of records sent per second.] record-size-avg = 97.87 [The average record size] record-size-max = 98.0 [The maximum record size] records-per-request-avg = 1.0 [The average number of records per request.] request-latency-avg = 0.0 [The average request latency in ms] request-latency-max = 74.0 [] request-rate = 2.6468892499606897 [The average number of requests sent per second.] request-size-avg = 42.0 [The average size of all requests in the window..] request-size-max = 170.0 [The maximum size of any request sent in the window.] requests-in-flight = 0.0 [The current number of in-flight requests awaiting a response.] response-rate = 2.651196976060479 [The average number of responses received per second.] select-rate = 10.989861465830819 [Number of times the I/O layer checked for new I/O to perform per second] waiting-threads = 0.0 [The number of user threads blocked waiting for buffer memory to enqueue their records]
谢谢
Kafka Users邮件列表上的Guozhang Wang能够通过查看我们的应用程序代码来识别问题:
国章,
是的 - 你发现了问题!
我们插入了.get()进行调试,但没想到(巨大的!)副作用.
使用异步回调非常有效.
我们现在能够在14秒内从笔记本电脑向Bluemix云发送100,000条记录 - 大约快1000倍,
非常感谢你!
加里
2015年12月13日下午2:48,王国章写道:
加里,
你正在调用"kafkaProducer.send(record).get();" 对于每个消息,get()调用块直到Future被初始化,这有效地同步通过在发送下一个消息之前为每个消息请求ACK而发送的所有消息,因此没有批处理.
您可以尝试使用"send(record,callback)"进行异步发送,并让回调处理返回的元数据中的错误.
国章