Kafka从上手到实践-Kafka集群:启动Kafka集群

这一章节来真正启动Kafka集群,先给出一份Broker的配置项列表,将以下信息复制三份,分别配置三台阿里云ECS上的Broker配置文件:

############################# Server Basics #############################
broker.id=0
delete.topic.enable=true
auto.create.topics.enable=true

############################# Socket Server Settings #############################
listeners=EXTERNAL://阿里云ECS内网IP:9092,INTERNAL://阿里云ECS内网IP:9093
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL
advertised.listeners=EXTERNAL://阿里云ECS外网IP:9092,INTERNAL://阿里云ECS内网IP:9093
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

############################# Log Basics #############################
log.dirs=/root/kafka_2.12-2.0.0/data/kafka
num.partitions=1
num.recovery.threads.per.data.dir=1
default.replication.factor=3
min.insync.replicas=2
offsets.topic.replication.factor=2
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

############################# Log Retention Policy #############################
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
log.segment.ms=604800000

############################# Zookeeper #############################
zookeeper.connect=zookeeper.server.1:2181,zookeeper.server.2:2181,zookeeper.server.3:2181
zookeeper.connection.timeout.ms=6000

############################# Group Coordinator Settings #############################
group.initial.rebalance.delay.ms=0

############################# Message #############################
message.max.bytes=1048576
fetch.message.max.bytes=1048576

以上列表有两点需要修改的地方:

  • broker.id需要修改,不同Broker的ID不能相同。
  • 阿里云ECS的内/外网IP需要配置正确。

然后使用如下命令分别启动Kafka Broker:

kafka_2.12-2.0.0/bin/kafka-server-start.sh kafka_2.12-2.0.0/config/server.properties &

三个Broker没有异常信息,大概率说明我们的Kafka集群部署成功了,下面来验证一下。首先我们创建一个Topic:

kafka_2.12-2.0.0/bin sh kafka-topics.sh --zookeeper zookeeper.server.1:2181 --topic my_topic_in_cluster --create --partitions 3 --replication-factor 2

上面的命令有这样几个信息:

  • 连接Zookeeper时,连Zookeeper集群中的任意一个Zookeeper即可。
  • 创建的Topicmy_topic_in_cluster有三个Partition,每个Partition有两个Replica,也就是每条发送到这个Topic的Message会保存六份。

如果Kafka集群是成功的,那么理论上这六个Partition会被两两均匀分配到三个Broker中。

连接到部署Broker-0的阿里云ECS,进入Kafka的data目录:

cd /kafka_2.12-2.0.0/data/kafka
/kafka_2.12-2.0.0/data/kafka# ls

__consumer_offsets-0   __consumer_offsets-3   __consumer_offsets-6
__consumer_offsets-1   __consumer_offsets-30  __consumer_offsets-7
__consumer_offsets-10  __consumer_offsets-31  __consumer_offsets-8
__consumer_offsets-11  __consumer_offsets-32  __consumer_offsets-9
__consumer_offsets-12  __consumer_offsets-33  
__consumer_offsets-13  __consumer_offsets-34  
__consumer_offsets-14  __consumer_offsets-35  
__consumer_offsets-15  __consumer_offsets-36  cleaner-offset-checkpoint
__consumer_offsets-16  __consumer_offsets-37  configured-topic-0
__consumer_offsets-17  __consumer_offsets-38  configured-topic-1
__consumer_offsets-18  __consumer_offsets-39  configured-topic-2
__consumer_offsets-19  __consumer_offsets-4   first_topic-0
__consumer_offsets-2   __consumer_offsets-40  first_topic-1
__consumer_offsets-20  __consumer_offsets-41  first_topic-2
__consumer_offsets-21  __consumer_offsets-42  log-start-offset-checkpoint
__consumer_offsets-22  __consumer_offsets-43  meta.properties
__consumer_offsets-23  __consumer_offsets-44  my_topic_in_cluster-0
__consumer_offsets-24  __consumer_offsets-45  my_topic_in_cluster-2
__consumer_offsets-25  __consumer_offsets-46  recovery-point-offset-checkpoint
__consumer_offsets-26  __consumer_offsets-47  replication-offset-checkpoint
__consumer_offsets-27  __consumer_offsets-48  with_keys_topic-0
__consumer_offsets-28  __consumer_offsets-49  with_keys_topic-1
__consumer_offsets-29  __consumer_offsets-5   with_keys_topic-2

可以看到Broker-0中分配了my_topic_in_cluster的Partition-0和Partition-2。

同理,连接到部署Broker-1的阿里云ECS,进入Kafka的data目录:

cd /kafka_2.12-2.0.0/data/kafka
/kafka_2.12-2.0.0/data/kafka# ls

meta.properties   my_topic_in_cluster-0
my_topic_in_cluster-1   cleaner-offset-checkpoint    
recovery-point-offset-checkpoint  log-start-offset-checkpoint 
replication-offset-checkpoint

可以看到Broker-1中分配了my_topic_in_cluster的Partition-0和Partition-1。

同理,连接到部署Broker-2的阿里云ECS,进入Kafka的data目录:

cd /kafka_2.12-2.0.0/data/kafka
/kafka_2.12-2.0.0/data/kafka# ls

meta.properties   my_topic_in_cluster-1
my_topic_in_cluster-2   cleaner-offset-checkpoint    
recovery-point-offset-checkpoint  log-start-offset-checkpoint 
replication-offset-checkpoint

可以看到Broker-2中分配了my_topic_in_cluster的Partition-1和Partition-2。

从上面的结果可以说明我们的Kafka集群是部署成功的。

小结

这一章节带大家实践运行Kafka集群,通过查看每个Broker的Data目录印证之前章节对Partition介绍的内容。下一章节会带大家搭建管理Zookeeper和Kafka的UI工具。希望能给小伙伴们带来帮助。

分享到: