这一节我们来认识一下在Kafka中有着超然地位的Zookeeper。

Zookeeper初识

ZooKeeper 分布式服务框架是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper的目标就是封装好复杂、易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

Zookeeper有以下一些特点:

  • Zookeeper的内部数据结构是树状结构的。
  • 每个节点称为zNode。
  • 每个zNode都有一个唯一路径(path)。
  • zNode分长久存在的和临时存在的。
  • 每个zNode都可以存储数据。
  • zNode不能重命名。
  • 每个zNode的任何变化都可以被监控。
閱讀全文 »

这一节来看看如何使用Java编写Kafka Consumer。

Java Consumer

首先创建Consumer需要的配置信息,最基本的有五个信息:

  • Kafka集群的地址。
  • 发送的Message中Key的序列化方式。
  • 发送的Message中Value的序列化方式。
  • 指定Consumer Group。
  • 指定拉取Message范围的策略。
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_1");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, none
閱讀全文 »

这一节来看看如何使用Java编写Kafka Producer。

Create Kafka Project

创建Maven工程,在POM文件中加入如下两个依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>

第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。

閱讀全文 »

Reseting Offset

在实际的业务场景中,经常需要重复消费Topic中的Message,所以来看看如何重置Offset。

首先重置Offset可以通过如下的命令:

kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group consumer_group_1 --reset-offsets [options] --execute --topic xxxx

Kafka为我们提供了6种重置Offset的方式,也就是命令中的options

  • --to-earliest:重置到最早的Offset。
  • --to-latest:重置到最后的Offset。
  • --to-offset <Long: offset>:重置到指定的Offset。
  • --to-current:重置到当前的Offset。
  • --to-datetime <String: datetime>:重置到指定时间的Offset,时间格式为YYYY-MM-DDTHH:mm:SS.sss
  • --shift-by <Long: number-of-offsets>:左移或右移Offset。
閱讀全文 »

Consumer CLI

这一节来看看使用命令行启动Consumer接收消息,通过如下的命令启动Consumer:

kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic first_topic

  • kafka-console-consumer.sh是启动Consumer的命令。
  • --bootstrap-server指定要连接的Broker地址。
  • --topic指定Topic名称,既要从哪个Topic里读数据。

閱讀全文 »