这一节我们来真正搭建一个Zookeeper集群。

搭建Zookeeper集群

首先要做的就是再租赁两个服务器,参照搭建单机Kafka章节中的步骤,租赁阿里云服务器、安装JDK、下载配置Kafka、配置安全组规则。

Zookeeper配置信息

搭建单机Kafka章节中,启动的是单机Zookeeper,所以/root/kafka_2.12-2.0.0/config目录下的zookeeper.properties配置文件中只配置了dataDir,也就是存储各种数据、日志、快照的路径。

在搭建Zookeeper时,就需要额外再配置一些参数了。同样打开/root/kafka_2.12-2.0.0/config目录下的zookeeper.properties配置文件,额外添加如下内容:

maxClientCnxns=0
tickTime=2000
initLimit=10
syncLimit=5
quorumListenOnAllIPs=true
server.1=zookeeper.server.1:2888:3888
server.2=zookeeper.server.2:2888:3888
server.3=zookeeper.server.3:2888:3888

阅读全文 »

这一节来看看Zookeeper的命令行工具。

Zookeeper CLI

在第七章节搭建单机Kafka中,我们已经发现了,Kafka是自带Zookeeper的,而且在启动Kafka之前,要先启动Zookeeper,相当于启动了单机Zookeeper,所以我们先说Zookeeper CLI,后面说Zookeeper集群时再具体说配置参数。

展示zNode

首先打开终端,连接至我们的服务器,进入/root/kafka_2.12-2.0.0/bin目录,执行如下命令:

sh zookeeper-shell.sh 127.0.0.1:2181

这是Zookeeper CLI Client连接Zookeeper的命令,当看到如下信息时,说明连接成功:

Connecting to 127.0.0.1:2181
Welcome to ZooKeeper!
JLine support is disabled

先来来看看目前Zookeeper里都有哪些zNode:

ls /

[cluster, controller_epoch, controller, brokers, zookeeper, kafka-manager, admin, isr_change_notification, consumers, log_dir_event_notification, latest_producer_id_block, config]

ls命令和Linux中的作用一样,在Zookeeper中是展示某个zNode下的所有zNode。这里的/表示根zNode。可以看到已经有很多zNode注册在了Zookeeper。再来看看brokers下还有哪些zNode:

ls /brokers

[ids, topics, seqid]

阅读全文 »

这一节我们来认识一下在Kafka中有着超然地位的Zookeeper。

Zookeeper初识

ZooKeeper 分布式服务框架是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。简化分布式应用协调及其管理的难度,提供高性能的分布式服务。ZooKeeper的目标就是封装好复杂、易出错的关键服务,将简单易用的接口和性能高效、功能稳定的系统提供给用户。

Zookeeper有以下一些特点:

  • Zookeeper的内部数据结构是树状结构的。
  • 每个节点称为zNode。
  • 每个zNode都有一个唯一路径(path)。
  • zNode分长久存在的和临时存在的。
  • 每个zNode都可以存储数据。
  • zNode不能重命名。
  • 每个zNode的任何变化都可以被监控。
阅读全文 »

这一节来看看如何使用Java编写Kafka Consumer。

Java Consumer

首先创建Consumer需要的配置信息,最基本的有五个信息:

  • Kafka集群的地址。
  • 发送的Message中Key的序列化方式。
  • 发送的Message中Value的序列化方式。
  • 指定Consumer Group。
  • 指定拉取Message范围的策略。
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "IP:Port");
    properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "consumer_group_1");
    properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest"); // earliest, none
阅读全文 »

这一节来看看如何使用Java编写Kafka Producer。

Create Kafka Project

创建Maven工程,在POM文件中加入如下两个依赖:

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.0.0</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
</dependency>

第一个是Kafka的依赖包,用于创建Producer、ProducerRecord、Consumer等。第二个是Log4J的依赖包,用于输出日志。

阅读全文 »