Kafka从上手到实践-Kafka CLI:Consumer CLI

Consumer CLI

这一节来看看使用命令行启动Consumer接收消息,通过如下的命令启动Consumer:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic

  • kafka-console-consumer.sh是启动Consumer的命令。
  • --bootstrap-server指定要连接的Broker地址。
  • --topic指定Topic名称,既要从哪个Topic里读数据。

如上图所示,左边启动的是Consumer,右边启动的是Producer。Producer发送的消息可以实时的被Consumer接收到。但是有一个问题,那就是在上一节中,我们已经给first_topic这个Topic发送了一些数据。但是现在Consumer启动后并没有收到。这是因为通过上面的命令启动的Consumer接收的是最新的消息,如果想接收所有的消息,还需要带一个参数:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning

--from-beginning表示启动的Consumer要接收所有的消息。

前文中说过,Consumer一般都是以组的形式存在,所以可以再加一个参数来创建一个Consumer Group:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --group consumer_group_1

--group可以指定Consumer Group的名称。

如上图所示,左边启动了三个Consumer,这三个Consumer都在同一个名为consumer_group_1的组里。因为first_topic这个Topic有三个Partitions,所以当一个Consumer Group中有三个Consumer时,他们的收到的信息不会重复。

又如上图所示,左边启动了三个Consumer,但是前两个在consumer_group_1的组里,最后一个在consumer_group_2的组里,所以前两个Consumer是以轮询的方式收到消息的,而最后一个Consumer可以收到全部的消息。

上面两个示例也充分证明了前文中所说的,不同的Consumer Group可以消费同一个Topic中相同的Partition的消息,但是Consumer Group内的Consumer不能消费同一个Topic中相同的Partition的消息

上面的命令是显示的创建Consumer Group。上文中说到过,Kafka中,Consumer都是以组的形式连接Broker消费数据的。那么如果只有一个Consumer的情况下,是否有Consumer Group呢?其实,当只有一个Consumer时,也会自动创建一个Consumer Group。我们可以通过另外一组Consumer Group CLI来看一下:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

console-consumer-40439
console-consumer-81216
consumer_group_1c
console-consumer-14387
consumer_group_2
consumer_group_1
console-consumer-40563

可以看到,已经存在的Consumer Group中,除了我们之前创建的,还有以console-consumer-xxxxx这种命名格式存在的Consumer Group。这就是当我们只启动一个Consumer时Kafka自动为这个Consumer创建的Consumer Group。这里可以做个实验,先启动一个Consumer:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic

然后再来看看Consumer Group是否有增加:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --list

console-consumer-96752
console-consumer-40439
console-consumer-81216
consumer_group_1c
console-consumer-14387
consumer_group_2
consumer_group_1
console-consumer-40563

我们看到增加了一个Consumer Groupconsole-consumer-96752

Consumer Group列表看完了,再来看看某一个Consumer Group的详细信息,比如查看consumer_group_1的详细信息。可以使用如下命令:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1

Consumer group 'consumer_group_1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first_topic     0          17              17              0               -               -               -
first_topic     2          17              17              0               -               -               -
first_topic     1          18              18              0               -               -

首先会告诉我们该Consumer Group中是否有正在活跃的Consumer,目前没有启动任何Consumer,所以提示我们Consumer group 'consumer_group_1' has no active members.

然后会列出该Consumer Group消费的Topic、Partition情况、Offset情况、延迟(LAG)情况、处于活跃状态的Consumer信息。

可以看到consumer_group_1这个Consumer Group正在消费first_topic这个Topic中的Message,一共从三个Partition中消费了52条Messages,并且目前已经消费了全部的数据,因为每个Partition的延迟都是0,说明没有还未接收的Message。

现在我们再往first_topic中发送一条Message,再来看看情况如何:

kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic first_topic --producer-property acks=1
>this is another message.

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1

Consumer group 'consumer_group_1' has no active members.

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
first_topic     0          17              17              0               -               -               -
first_topic     2          17              17              0               -               -               -
first_topic     1          18              19              1               -               -               -

可以看到Partition 1 的LOG-END-OFFSET是19,而CURRENT-OFFSET是18,并且Partition 1 的LAG是1,说明现在first-topic一共接收到了19条Message,而consumer-group-1只消费了18条,有1条延迟。

我们再启动consumer_group_1中的Consumer,然后再看看数据:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic --from-beginning --group consumer_group_1

this is another message.

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --describe --group consumer_group_1

TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                     HOST            CLIENT-ID
first_topic     0          17              17              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1
first_topic     1          19              19              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1
first_topic     2          17              17              0               consumer-1-4ec288ac-e202-40b1-a2ec-d43abc49b38d /172.17.222.157 consumer-1

可以看到,目前有一个处于活跃的Consumer,并且Messages全部被消费。

总结

这一章节主要介绍了如何使用Kafka的Consumer CLI接收Producer生产的Message。同时能更直观的印证之前介绍概念时提到的内容,比如Consumer Group的机制、Offset机制等。下一章节进一步介绍Offset的操作以及Config CLI。希望能给小伙伴们带来帮助。

分享到: