最近要做个东西,需要从 kafka 消费大量消息,服务器上 kafka 是 0.9 版本的,应用是 springboot-1.5.6 spring-kafka 1.0.0 版本
下面是两个消费者:
@KafkaListener(topicPartitiOns={@TopicPartition(topic = "spyfool",partitiOns= {"0"})},group = "home")
public void doConsume(ConsumerRecord a){
log.info("000000 收到 kafka 消息:{},偏移量={}",a.value(),a.offset());
}
@KafkaListener(topicPartitiOns={@TopicPartition(topic = "spyfool",partitiOns= {"0"})},group = "home")
public void doConsume1(ConsumerRecord a){
log.info("111111 收到 kafka 消息:{},偏移量={}",a.value(),a.offset());
}
配置文件:
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=home
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.fetch-max-wait=10000
spring.kafka.listener.ack-mode=manual_immediate
但结果明明同组的两个消费者发生了重复消费:
111111 收到 kafka 消息:say my name!3d0e7171-cae1-44c9-8a9a-f21289160108,偏移量=42141
000000 收到 kafka 消息:say my name!3d0e7171-cae1-44c9-8a9a-f21289160108,偏移量=42141
111111 收到 kafka 消息:say my name!90851ac4-1e5f-4ac8-93c9-72acd64ce529,偏移量=42142
000000 收到 kafka 消息:say my name!90851ac4-1e5f-4ac8-93c9-72acd64ce529,偏移量=42142
111111 收到 kafka 消息:say my name!4d0fa93d-c7e7-4f44-beba-bfa6c25aee12,偏移量=42143
111111 收到 kafka 消息:say my name!d1707959-0be2-4ec1-a81d-86ef211cb73d,偏移量=42144
000000 收到 kafka 消息:say my name!4d0fa93d-c7e7-4f44-beba-bfa6c25aee12,偏移量=42143
111111 收到 kafka 消息:say my name!79cfea0d-548d-437d-bd05-8b65a27f9e9a,偏移量=42145
000000 收到 kafka 消息:say my name!d1707959-0be2-4ec1-a81d-86ef211cb73d,偏移量=42144
000000 收到 kafka 消息:say my name!79cfea0d-548d-437d-bd05-8b65a27f9e9a,偏移量=42145
百思不得其解,请问大家能指点一下吗
谢谢了!
下面是两个消费者:
@KafkaListener(topicPartitiOns={@TopicPartition(topic = "spyfool",partitiOns= {"0"})},group = "home")
public void doConsume(ConsumerRecord a){
log.info("000000 收到 kafka 消息:{},偏移量={}",a.value(),a.offset());
}
@KafkaListener(topicPartitiOns={@TopicPartition(topic = "spyfool",partitiOns= {"0"})},group = "home")
public void doConsume1(ConsumerRecord a){
log.info("111111 收到 kafka 消息:{},偏移量={}",a.value(),a.offset());
}
配置文件:
spring.kafka.consumer.bootstrap-servers=127.0.0.1:9092
spring.kafka.consumer.group-id=home
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=10
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.fetch-max-wait=10000
spring.kafka.listener.ack-mode=manual_immediate
但结果明明同组的两个消费者发生了重复消费:
111111 收到 kafka 消息:say my name!3d0e7171-cae1-44c9-8a9a-f21289160108,偏移量=42141
000000 收到 kafka 消息:say my name!3d0e7171-cae1-44c9-8a9a-f21289160108,偏移量=42141
111111 收到 kafka 消息:say my name!90851ac4-1e5f-4ac8-93c9-72acd64ce529,偏移量=42142
000000 收到 kafka 消息:say my name!90851ac4-1e5f-4ac8-93c9-72acd64ce529,偏移量=42142
111111 收到 kafka 消息:say my name!4d0fa93d-c7e7-4f44-beba-bfa6c25aee12,偏移量=42143
111111 收到 kafka 消息:say my name!d1707959-0be2-4ec1-a81d-86ef211cb73d,偏移量=42144
000000 收到 kafka 消息:say my name!4d0fa93d-c7e7-4f44-beba-bfa6c25aee12,偏移量=42143
111111 收到 kafka 消息:say my name!79cfea0d-548d-437d-bd05-8b65a27f9e9a,偏移量=42145
000000 收到 kafka 消息:say my name!d1707959-0be2-4ec1-a81d-86ef211cb73d,偏移量=42144
000000 收到 kafka 消息:say my name!79cfea0d-548d-437d-bd05-8b65a27f9e9a,偏移量=42145
百思不得其解,请问大家能指点一下吗
谢谢了!
