系统环境

ubuntu 22.04 LTS
elasticsearch Version 7.12.1
kibana Versiob 7.12.1
kafka Version 2.13-2.6.2
es-kafka-1:192.168.60.163
es-kafka-2:192.168.60.164
es-kafka-3:192.168.60.165

elasticsearch软件下载地址:https://www.elastic.co/cn/downloads/elasticsearch
kafka下载地址:https://shackles.cn/Software/kafka_2.13-2.6.2.tgz
zookeeper下载地址:https://shackles.cn/Software/apache-zookeeper-3.7.1-bin.tar.gz

安装ES(所有节点执行):

dpkg -i elasticsearch-7.12.1-amd64.deb

配置elasticsearch配置文件:

cat <<EOF> /etc/elasticsearch/elasticsearch.yml
cluster.name: my-es
node.name: node3
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 192.168.60.165
discovery.seed_hosts: ["192.168.60.163", "192.168.60.164","192.168.60.165"]
cluster.initial_master_nodes: ["192.168.60.163", "192.168.60.164","192.168.60.165"]
action.destructive_requires_name: true
EOF

cluster.name:集群名称
node.name: 节点名称。一般配置为本节点的IP地址也可自定义名字。
path.data: 数据存放目录。
path.logs: 日志存放目录。
network.host: 本节点的网络IP地址
discovery.seed_hosts: 单节点安装时,设置为本节点的网络IP地址;多节点安装时,需要设置为多节点的IP地址
cluster.initial_master_nodes: 在启动时,Elasticsearch将会选择这三个节点中的一个作为主节点,并在之后的运行时维护这个列表来确保主节点总是可用。这个列表不一定需要包含所有节点,但必须至少包含一个活动节点才能初始化主节点。

安装kibana:

dpkg -i kibana-7.12.1-amd64.deb

配置kibana配置文件:

cat <<EOF> /etc/kibana/kibana.yml
server.port: 5601
server.host: "0.0.0.0"
server.name: "kibana"
elasticsearch.hosts: ["http://192.168.60.163:9200"]
kibana.index: ".kibana"
i18n.locale: "zh-CN"
EOF

server.port:指定Kibana服务器运行的端口号,这里设置为5601。
server.host:指定Kibana服务器监听的地址,这里设置为0.0.0.0,表示监听所有的IP地址。
server.name:指定Kibana服务器的名称,这里设置为"kibana"。
elasticsearch.hosts:指定连接的Elasticsearch实例的地址,这里设置为"http://192.168.60.163:9200",表示连接到IP地址为192.168.60.163、端口号为9200的Elasticsearch实例。
kibana.index:指定Kibana使用的索引名称,这里设置为".kibana"。
i18n.locale:指定使用的语言环境,这里设置为"zh-CN",表示使用中文语言环境。

安装JDK8:

apt install openjdk-8-jdk

部署zookeeper:

mkdir /apps && cd /apps
#安装包放到/apps下
tar xvf apache-zookeeper-3.7.1-bin.tar.gz
ln -sf apache-zookeeper-3.7.1-bin zookeeper && cd /apps/zookeepr/conf && cp zoo_sample.cfg zoo.cfg

cat <<EOF> /apps/zookeeper/conf/zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/data/zookeeper
clientPort=2181

server.1=192.168.60.163:2888:3888
server.2=192.168.60.164:2888:3888
server.3=192.168.60.165:2888:3888
EOF

tickTime:Zookeeper中使用的基本时间单位,以毫秒为单位。这里设置为2000ms。
initLimit:用于限制Zookeeper服务器启动时连接到Zookeeper集合中的follower最长时间限制。这里设置为10。
syncLimit:用于限制Zookeeper服务器之间保持同步的领导选举时间。这里设置为5。
dataDir:Zookeeper存储数据的目录,这里设置为/data/zookeeper。
clientPort:Zookeeper客户端连接服务器使用的端口号,这里设置为2181。

Zookeeper服务的服务器列表:
server.1=192.168.60.163:2888:3888`:表示第一个Zookeeper服务器的IP地址为192.168.60.163,使用2888端口进行连接和通信,使用3888端口进行领导选举。
server.2=192.168.60.164:2888:3888`:表示第二个Zookeeper服务器的IP地址为192.168.60.164,使用2888端口进行连接和通信,使用3888端口进行领导选举。
server.3=192.168.60.165:2888:3888`:表示第三个Zookeeper服务器的IP地址为192.168.60.165,使用2888端口进行连接和通信,使用3888端口进行领导选举。

注意:/data/zookeeper需要手动创建,server.1、server.2、server.3要和集群中的IP地址一一对应,三台的zookeeper配置文件一样可scp到其他节点。

mkdir -p /data/zookeeper

创建集群中唯一标识符:

在集群es-kafka-1创建标识符:
echo 1 > /data/zookeeper/myid
在集群es-kafka-2创建标识符:
echo 2 > /data/zookeeper/myid
在集群es-kafka-3创建标识符:
echo 3 > /data/zookeeper/myid

myid:ZooKeeper的一个文件,用于存储该 ZooKeeper 实例的唯一标识符(即 myid)

启动zookeeper(所有节点):

/apps/zookeepr/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeepr/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

验证zookeeper是否启动(所有节点):

root@@es-kafka-1:~#/apps/zookeepr/bin/zkServer.sh status
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /apps/zookeepr/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

mode:是follower和leader都证明node在集群中是正常的

部署kafka:

cd /apps && tar xvf kafka_2.13-2.6.2.tgz && ln -sf kafka_2.13-2.6.2 kafka

es-kafka-1配置文件:

cat <<EOF> /apps/kafka/config/server.properties
broker.id=1
listeners=PLAINTEXT://192.168.60.163:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.60.163:2181,192.168.60.164:2181,192.168.60.165:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

es-kafka-2配置文件:

cat <<EOF> /apps/kafka/config/server.properties
broker.id=2
listeners=PLAINTEXT://192.168.60.164:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.60.163:2181,192.168.60.164:2181,192.168.60.165:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

es-kafka-3配置文件:

cat <<EOF> /apps/kafka/config/server.properties
broker.id=3
listeners=PLAINTEXT://192.168.60.165:9092
num.network.threads=3
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka-logs
num.partitions=1
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
zookeeper.connect=192.168.60.163:2181,192.168.60.164:2181,192.168.60.165:2181
zookeeper.connection.timeout.ms=18000
group.initial.rebalance.delay.ms=0
EOF

broker.id:指定当前Kafka Broker的唯一标识符,这里设置为1。
listeners:指定Kafka Broker监听的地址和端口号,这里指定监听在IP地址为192.168.60.163、端口号为9092的地址上。
num.network.threads:指定Kafka Broker用于处理网络I/O的线程数,这里设置为3。
num.io.threads:指定Kafka Broker用于处理磁盘I/O的线程数,这里设置为8。
socket.send.buffer.bytes:设置Kafka Broker用于网络传输中的Socket发送缓冲区大小,这里设置为102400。
socket.receive.buffer.bytes:设置Kafka Broker用于网络传输中的Socket接收缓冲区大小,这里设置为102400。
socket.request.max.bytes:设置在Kafka Broker上发送或接收任何单个请求时所允许的最大字节数,这里设置为104857600字节。
log.dirs:指定Kafka Broker用于存储日志文件的目录,这里设置为/data/kafka-logs。
num.partitions:指定创建新主题时每个默认分区的数量,这里设置为1。
num.recovery.threads.per.data.dir:指定每个数据目录中日志恢复线程的数量,这里设置为1。
offsets.topic.replication.factor:指定储存消费者组和偏移量信息的内部主题的副本数量,这里设置为1。
transaction.state.log.replication.factor:指定事务状态日志内部主题的副本数量,这里设置为1。
transaction.state.log.min.isr:指定一个副本集合中必须确认事务状态的最小数量,这里设置为1。
log.retention.hours:指定日志文件在自动删除之前保留的时间,这里设置为168小时(即7天)。
log.segment.bytes:指定日志文件的最大大小,当文件达到此大小时,将创建一个新的日志段,这里设置为1073741824字节(即1GB)。
log.retention.check.interval.ms:指定删除旧日志文件任务之间的时间间隔,这里设置为300000ms(即5分钟)。
zookeeper.connect:指定Zookeeper的连接地址(所有Kafka Broker都将向此地址注册)。这里设置为多个Zookeeper实例的地址,用逗号分隔。
zookeeper.connection.timeout.ms:指定连接Zookeeper服务器的最长时间,这里设置为18000ms(即18秒)。
group.initial.rebalance.delay.ms:当新的消费者加入消费者组时,指定延迟重新平衡的时间。这里设置为0表示立即开始重新平衡。

启动kafka:

/apps/kafka/bin/kafka-server-start.sh -daemon /apps/kafka/config/server.properties

验证是否启动成功:

netstat -ntpl | grep 9092
tcp6       0      0 192.168.60.163:9092     :::*                    LISTEN      10604/java

下载官方kafka工具连接查看kafka状态:

官方下载地址:https://www.kafkatool.com/download.html

kafka-tools-1

kafka-tools-2

kafka-tools-3

最后修改:2023 年 06 月 01 日
如果觉得我的文章对你有用,请随意赞赏