我目前正在使用汇合3.0.1平台.我试图在两个不同的工作者上创建2个连接器,但尝试创建一个新连接器正在为它创建一个新组.
Two connectors were created using below details: 1) POST http://devmetric.com:8083/connectors { "name": "connector1", "config": { "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector", "tasks.max": "1", "topics": "dev.ps_primary_delivery", "elasticsearch.cluster.name": "ad_metrics_store", "elasticsearch.hosts": "devkafka1.com:9300", "elasticsearch.bulk.size": "100", "tenants": "tenant1" } } 2) POST http://devkafka01.com:8083/connectors { "name": "connector2", "config": { "connector.class": "com.xxx.kafka.connect.sink.DeliverySinkConnector", "tasks.max": "1", "topics": "dev.ps_primary_delivery", "elasticsearch.cluster.name": "ad_metrics_store", "elasticsearch.hosts": "devkafka.com:9300", "elasticsearch.bulk.size": "100", "tenants": "tenant1" } }
但是他们都是在不同的群组ID下创建的.在此之后,我询问现有的团体.
$ sh ./bin/kafka-consumer-groups --bootstrap-server devmetric.com:9091 --new-consumer --list Result was: connect-connector2 connect-connector1
这些组是由Kafka自动创建的,而不是由我提供的.我在worker.properties中给了不同的group.id. 但是我希望两个连接器都在同一个组下,以便它们并行工作以共享消息.截至目前,我有一个关于主题"dev.ps_primary_delivery"的100万个数据,我希望两个连接器各自获得50万个.
请让我知道如何做到这一点.
我认为需要澄清一些......
group.id
在worker.properties文件中不引用使用者组.它是一个"工作组" - 同一工作组中的多个工作人员将在它们之间拆分工作 - 因此,如果同一连接器有许多任务(例如JDBC连接器为每个表都有一个任务),那么这些任务将分配给所有工作组集团的工人.
接收器确实具有消费者群体的消费者群体.该组的group.id始终是"connect - "+连接器名称.在您的情况下,根据您的连接器名称,您有"connect-connector1"和"connect-connector2".这也意味着两个连接器在同一组中的唯一方式是......如果它们具有相同的名称.但名称是唯一的,因此您不能在同一组中使用两个连接器.原因是...
连接器本身并不真正获得事件,它们只是启动了一堆任务.每个任务都有消费者,这些消费者是连接器使用者组的一部分,每个任务将独立处理主题和分区的子集.因此,在同一组中有两个连接器,基本上意味着它们的所有任务都属于同一组 - 所以为什么需要两个连接器?只需为该一个连接器配置更多主题和更多任务,就可以了.
唯一的例外是您使用的连接器未正确使用任务或仅限于一项任务.在这种情况下 - 要么他们有充分的理由,要么(更有可能)有人需要改进他们的连接器......