1. 消息队列简介

1.1 消息队列历史

1983 年在MIT 工作的26岁的印度小伙 Vivek Ranadive突发奇想,以前我们的软件相互通信,都是点对点的,而且要实现相同的协议,能不能有一种专门用来通信的中间件,就像主板(BUS)一样,把不同的软件集成起来呢?于是他搞了一家公司(Teknekron),开发了世界上第一个消息队列软件The Information Bus(TIB)。最开始的时候,它被高盛这些公司用在金融交易里面。因为TIB 实现了发布订阅(Publish/Subscribe)模型,信息的生产者和消费者可以完全解耦,这个特性引起了电信行业特别是新闻机构的注意。1994年路透社收购了Teknekron。
TIB 的成功马上引起了业界大佬IBM 的注意,他们研发了自己的IBM MQ(IBMWesphere)。后面微软也加入了这场竟争,研发了MSMQ。这个时候,每个厂商的产品是孤立的,大家都有自己的技术壁垒。比如一个应用订阅了IBM MQ 的消息,如果有要订阅MSMQ 的消息,因为协议、API 不同,又要重复去实现。

J2EE 制定了JDBC 的规范,那么各个数据库厂商自己去实现协议,提供jar 包,在Java 里面就可以使用相同的API 做操作不同的数据库了。MQ 产品的问题也是一样的,2001 年的时候,SUN 公司发布了 JMS 规范,它想要在各大厂商的MQ 上面统一包装一层Java 的规范,大家都只需要针对API 编程就可以了,不需要关注使用了什么样的消息中间件,只要选择合适的MQ 驱动。但是JMS 只适用于Java 语言,它是跟语言绑定的,没有从根本上解决这个问题(只是一个API)。所以在2006 年的时候,AMQP (Advanced Message Queuing Protocol)规范发布了。它是跨语言和跨平台的,真正地促进了消息队列的繁荣发展。

2007 年的时候,Rabbit 技术公司基于Erlang语言开发了符合AMQP 规范RabbitMQ 1.0。从最开始用在金融行业里面,现在RabbitMQ 已经在世界各地的公司中遍地开花。国内的绝大部分大厂都在用RabbitMQ,包括头条,美团,滴滴(TMD),去哪儿,艺龙,淘宝也有用。

Kafka 是一种分布式流处理系统,由 LinkedIn 的 Jay Kreps、Neha Narkhede 和 Jonathan Ellis 于 2011 年开源。它主要用于高吞吐量、低延迟的数据传输和流处理。Kafka 的设计初衷是为了解决传统消息队列(如 RabbitMQ 和 ActiveMQ)和日志处理系统(如 Flume 和 Logstash)的局限性,为现代数据处理场景提供一个更高效、可扩展的解决方案。

Kafka 的崛起与大数据时代的出现密切相关。随着数据的生成和传输量不断增加,传统的中央化处理方式已经无法满足需求。Kafka 通过分布式架构、高吞吐量和低延迟等特点,为大数据和实时数据处理提供了一个强大的技术支持。同时Kafka 也被称为下一代分布式消息系统,由 Scala 和 Java编写,是非营利性组织ASF(Apache Software Foundation)基金会中的一个开源项目,比如:HTTP Server、Tomcat、Hadoop、ActiveMQ等开源软件都属于 Apache基金会的开源软件,类似的消息系统还有RabbitMQ、ActiveMQ、ZeroMQ。

Kafka用于构建实时数据管道和流应用程序。 它具有水平可伸缩性,容错性,快速性,可在数千家组织中同时投入生产协同工作。

1.2 Kafka 特点和优势

特点

  • 分布式: 多机实现,不允许单机
  • 分区: 一个消息.可以拆分出多个,分别存储在多个位置
  • 多副本: 防止信息丢失,可以多来几个备份
  • 多订阅者: 可以有很多应用连接kafka
  • Zookeeper: 早期版本的Kafka依赖于zookeeper, 2021年4月19日Kafka 2.8.0正式发布,此版本包括了很多重要改动,最主要的是kafka通过自我管理的仲裁来替代ZooKeeper,即Kafka将不再需要ZooKeeper!

优势

  • Kafka 通过 O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以 TB 级别以上的消息存储也能够保持长时间的稳定性能。
  • 高吞吐量:即使是非常普通的硬件Kafka也可以支持每秒数百万的消息。支持通过Kafka 服务器分区消息。
  • 分布式: Kafka 基于分布式集群实现高可用的容错机制,可以实现自动的故障转移
  • 顺序保证:在大多数使用场景下,数据处理的顺序都很重要。大部分消息队列本来就是排序的,并且能保证数据会按照特定的顺序来处理。 Kafka保证一个Partiton内的消息的有序性(分区间数据是无序的,如果对数据的顺序有要求,应将在创建主题时将分区数partitions设置为1)
  • 支持 Hadoop 并行数据加载
  • 通常用于大数据场合,传递单条消息比较大,而Rabbitmq 消息主要是传输业务的指令数据,单条数据较小
    O(1)就是最低的时空复杂度,也就是耗时/耗空间与输入数据大小无关,无论输入数据增大多少倍,耗时/耗空间都不变,哈希算法就是典型的O(1)时间复杂度,无论数据规模多大,都可以在一次计算后找到目标

1.3 Kafka 角色和流程

kafka-architecture-diagram-1

kafka-architecture-diagram-2

  • Producer:Producer即生产者,消息的产生者,是消息的入口。负责发布消息到Kafka broker。
  • Consumer:消费者,用于消费消息,即处理消息
    Broker:Broker是kafka实例,每个服务器上可以有一个或多个kafka的实例,假设每个broker对应一台服务器。每个kafka集群内的broker都有一个不重复的编号,如: broker-0、broker-1等……
  • Topic:消息的主题,可以理解为消息的分类,一个Topic相当于数据库中的一张表,一条消息相当于关系数据库的一条记录,一个Topic或者相当于Redis中列表类型的一个Key,一条消息即为列表中的一个元素。kafka的数据就保存在topic。在每个broker上都可以创建多个topic。物理上不同 topic 的消息分开存储在不同的文件夹,逻辑上一个 topic的消息虽然保存于一个或多个broker 上, 但用户只需指定消息的topic即可生产或消费数据而不必关心数据存于何处,topic 在逻辑上对record(记录、日志)进行分组保存,消费者需要订阅相应的topic 才能消费topic中的消息
  • Consumer group: 每个consumer 属于一个特定的consumer group(可为每个consumer 指定 group name,若不指定 group name 则属于默认的group),同一topic的一条消息只能被同一个consumer group 内的一个consumer 消费,类似于一对一的单播机制,但多个consumer group 可同时消费这一消息,类似于一对多的多播机制。
    Partition :是物理上的概念,每个topic 分割为一个或多个partition,即一个topic切分为多份.创建 topic时可指定 partition 数量,partition的表现形式就是一个一个的文件夹,该文件夹下存储该partition的数据和索引文件,分区的作用还可以实现负载均衡,提高kafka的吞吐量。同一个topic在不同的分区的数据是不重复的,一般Partition数不要超过节点数,注意同一个partition数据是有顺序的,但不同的partition则是无序的。
  • Replication: 同样数据的副本,包括leader和follower的副本数,基本于数据安全,建议至少2个,是Kafka的高可靠性的保障,和ES的副本有所不同,Kafka中的副本数包括主分片数,而ES中的副本数不包括主分片数,为了实现数据的高可用,比如将分区 0 的数据分散到不同的kafka 节点,每一个分区都有一个 broker 作为 Leader 和一个 broker 作为Follower,类似于ES中的主分片和副本分片,假设分区为 3, 即分三个分区0~2,副本为3,即每个分区都有一个 leader,再加两个follower,分区 0 的leader为服务器A,服务器 B 和服务器 C 则为 A 的follower,分区 1 的leader为服务器B,服务器 A 和C 则为服务器B 的follower,而分区 2 的leader 为C,服务器A 和 B 则为C 的follower。
  • AR: Assigned Replicas,分区中的所有副本的统称,包括leader和 follower,AR= lSR+ OSR
  • lSR:ln Sync Replicas,所有与leader副本保持同步的副本 follower和leader本身组成的集合,包括leader和follower,是AR的子集
  • OSR:out-of-Sync Replied,所有与leader副本同步不能同步的 follower的集合,是AR的子集

分区和副本的优势:

  • 实现存储空间的横向扩容,即将多个kafka服务器的空间组合利用
  • 提升性能,多服务器并行读写
  • 实现高可用,每个分区都有一个主分区即 leader 分布在不同的kafka 服务器,并且有对应follower 分布在和leader不同的服务器上

kafka-architecture-diagram-3

kafka_write_message_process

2. Kafka docker-compose部署

Kafka 支持单机和集群部署,生产通常为集群模式

2.1 单机ACL部署

2.1.1 环境准备

系统环境Docker版本节点IP地址Docker-compose版本Kafka版本
Ubuntu 20.04.624.0.710.84.3.126v2.23.33.8.0

2.1.2 docker-compose文件

cat > docker-compose.yaml << EOF
version: "3.3"
services:
   kafka:
     image: 'bitnami/kafka:3.8.0'
     container_name: kafka-test
     user: root
     ports:
       - '9092:9092'
       - '9093:9093'
     environment:
       #设置容器内的时区为“Asia/Shanghai”
       - TZ=Asia/Shanghai
       #Kafka KRaft 角色的逗号分隔列表
       - KAFKA_CFG_PROCESS_ROLES=broker,controller
       #开启Bitnami容器的调试模式
       - BITNAMI_DEBUG=true
       #指定Controller的监听器名称,这里设置为“CONTROLLER”,对应后面的 KAFKA_CFG_LISTENERS 中的 CONTROLLER://0.0.0.0:9093。
       - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
       #设置Kafka中创建的每个主题的默认分区数为1。这意味着如果创建一个主题时不指定分区数,它将只会有一个分区。
       - KAFKA_CFG_NUM_PARTITIONS=1
       #Kafka服务器的监听地址和端口;INTERNAL 在 9094 端口监听,供内部通信使用,CLIENT 在 9095 端口监听,供客户端使用,CONTROLLER 在 9093 端口监听,供Controller使用,EXTERNAL 在 9092 端口监听,供外部访问。
       - KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
       #客户端连接Kafka时使用的地址 INTERNAL 在 0.0.0.0:9094 广播,供内部通信使用。CLIENT 在 9095 端口广播,供客户端使用。EXTERNAL 在 0.0.0.0:9092 广播,供外部访问。
       #- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://0.0.0.0:9094,CLIENT://:9095,EXTERNAL://0.0.0.0:9092
       - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://10.84.3.126:9094,CLIENT://10.84.3.126:9095,EXTERNAL://10.84.3.126:9092
       #每个监听器使用的安全协议
       - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
       #Kafka Broker之间的通信监听器为 INTERNAL。这个值需要与 KAFKA_CFG_LISTENERS 中的某个监听器名称匹配,确保Broker之间的通信使用这个监听器。
       - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
       #Apache Kafka 代理间通信协议
       - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
       #Broker之间通信使用的SASL认证机制为 PLAIN。
       - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
       #Kafka 客户端用户 默认:user
       - KAFKA_CLIENT_USERS=superAdmin
       #Kafka 客户端用户密码 默认:bitnami
       - KAFKA_CLIENT_PASSWORDS=Sec@2024...
       #Kafka 代理间通信用户
       - KAFKA_INTER_BROKER_USER=superAdmin
       #Kafka 代理间通信密码
       - KAFKA_INTER_BROKER_PASSWORD=Sec@2024...
       #Kafka 节点的唯一 ID
       - KAFKA_CFG_NODE_ID=1
       #Kafka KRaft模式下的Controller仲裁投票者列表。包括集群中所有Controller的节点ID和对应的地址,用于选举和管理Controller的任务。
       - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.84.3.126:9093 #多集群2@10.84.3.126:9093,3@10.84.3.45:9093
       #使用 Kafka Raft 模式(KRaft)时的 Kafka 集群 ID 可自定义
       - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
       #创建Topic时的默认分区数量
       - KAFKA_CFG_NUM_PARTITIONS=30
       #日志保留的时间,单位为小时
       - KAFKA_LOG_RETENTION_HOURS=168
       #每个日志段的最大大小(字节)1GB
       - KAFKA_CFG_SEGMENT_BYTES=1073741824
       #日志保留检查的时间间隔,以毫秒为单位。即每隔300秒(5分钟)检查1次
       - KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS=300000
       #消息的最大字节数。5000000表示每条消息最大为5MB。
       - KAFKA_CFG_MESSAGE_MAX_BYTES=5000000
     volumes:
       - './data:/bitnami/kafka/data:rw'

   kafka_map:
     image: 'dushixiang/kafka-map:latest'
     container_name: kafka-ui
     environment:
       - DEFAULT_USERNAME=superAdmin
       - DEFAULT_PASSWORD=Seca@2024...
     ports:
       - '8080:8080'
EOF

2.1.3 启动kafka服务

#在docker-compose.yaml所在目录执行
docker-compose up -d 

2.2 集群ACL部署

2.2.1 环境准备

系统环境Docker版本节点IP地址Docker-compose版本Kafka版本
Ubuntu 20.04.624.0.710.84.3.126v2.23.33.8.0
Ubuntu 20.04.624.0.710.84.3.127v2.23.33.8.0
Ubuntu 20.04.624.0.710.84.3.128v2.23.33.8.0

2.2.2 节点1 docker-compose文件

cat > docker-compose.yaml << EOF
version: "3.3"
services:
   kafka-materialmq:
     image: 'bitnami/kafka:3.8.0'
     container_name: kafka-materialmq
     user: root
     ports:
       - '9092:9092'
       - '9093:9093'
     environment:
       #设置容器内的时区为“Asia/Shanghai”
       - TZ=Asia/Shanghai
       #Kafka KRaft 角色的逗号分隔列表
       - KAFKA_CFG_PROCESS_ROLES=broker,controller
       #开启Bitnami容器的调试模式
       - BITNAMI_DEBUG=true
       #指定Controller的监听器名称,这里设置为“CONTROLLER”,对应后面的 KAFKA_CFG_LISTENERS 中的 CONTROLLER://0.0.0.0:9093。
       - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
       #设置Kafka中创建的每个主题的默认分区数为1。这意味着如果创建一个主题时不指定分区数,它将只会有一个分区。
       - KAFKA_CFG_NUM_PARTITIONS=1
       #Kafka服务器的监听地址和端口;INTERNAL 在 9094 端口监听,供内部通信使用,CLIENT 在 9095 端口监听,供客户端使用,CONTROLLER 在 9093 端口监听,供Controller使用,EXTERNAL 在 9092 端口监听,供外部访问。
       - KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
       #客户端连接Kafka时使用的地址 INTERNAL 在 0.0.0.0:9094 广播,供内部通信使用。CLIENT 在 9095 端口广播,供客户端使用。EXTERNAL 在 0.0.0.0:9092 广播,供外部访问。
       - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://10.84.3.126:9094,CLIENT://10.84.3.126:9095,EXTERNAL://10.84.3.126:9092
       #每个监听器使用的安全协议
       - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
       #Kafka Broker之间的通信监听器为 INTERNAL。这个值需要与 KAFKA_CFG_LISTENERS 中的某个监听器名称匹配,确保Broker之间的通信使用这个监听器。
       - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
       #Apache Kafka 代理间通信协议
       - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
       #Broker之间通信使用的SASL认证机制为 PLAIN。
       - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
       #Kafka 客户端用户 默认:user
       - KAFKA_CLIENT_USERS=superAdmin
       #Kafka 客户端用户密码 默认:bitnami
       - KAFKA_CLIENT_PASSWORDS=Sec@2024...
       #Kafka 代理间通信用户
       - KAFKA_INTER_BROKER_USER=superAdmin
       #Kafka 代理间通信密码
       - KAFKA_INTER_BROKER_PASSWORD=Sec@2024...
       #Kafka 节点的唯一 ID
       - KAFKA_CFG_NODE_ID=1
       #Kafka KRaft模式下的Controller仲裁投票者列表。包括集群中所有Controller的节点ID和对应的地址,用于选举和管理Controller的任务。
       - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.84.3.126:9093,2@10.84.3.127:9093,3@10.84.3.128:9093 #多集群2@10.84.3.126:9093,3@10.84.3.45:9093
       #使用 Kafka Raft 模式(KRaft)时的 Kafka 集群 ID 可自定义
       - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
       #创建Topic时的默认分区数量
       - KAFKA_CFG_NUM_PARTITIONS=30
       #日志保留的时间,单位为小时
       - KAFKA_LOG_RETENTION_HOURS=168
       #每个日志段的最大大小(字节)1GB
       - KAFKA_CFG_SEGMENT_BYTES=1073741824
       #日志保留检查的时间间隔,以毫秒为单位。即每隔300秒(5分钟)检查1次
       - KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS=300000
       #消息的最大字节数。5000000表示每条消息最大为5MB。
       - KAFKA_CFG_MESSAGE_MAX_BYTES=5000000
     volumes:
       - './data:/bitnami/kafka/data:rw'
     networks:
      - kafka

   kafka_map:
     image: 'dushixiang/kafka-map:latest'
     container_name: kafka-ui
     environment:
       - DEFAULT_USERNAME=superAdmin
       - DEFAULT_PASSWORD=Sec@2024...
     ports:
       - '8080:8080'
     networks:
       - kafka

networks:
  kafka:
    driver: bridge
EOF

2.2.3 节点2 docker-compose文件

cat > docker-compose.yaml << EOF
version: "3.3"
services:
   kafka-materialmq:
     image: 'bitnami/kafka:3.8.0'
     container_name: kafka-materialmq
     user: root
     ports:
       - '9092:9092'
       - '9093:9093'
     environment:
       #设置容器内的时区为“Asia/Shanghai”
       - TZ=Asia/Shanghai
       #Kafka KRaft 角色的逗号分隔列表
       - KAFKA_CFG_PROCESS_ROLES=broker,controller
       #开启Bitnami容器的调试模式
       - BITNAMI_DEBUG=true
       #指定Controller的监听器名称,这里设置为“CONTROLLER”,对应后面的 KAFKA_CFG_LISTENERS 中的 CONTROLLER://0.0.0.0:9093。
       - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
       #设置Kafka中创建的每个主题的默认分区数为1。这意味着如果创建一个主题时不指定分区数,它将只会有一个分区。
       - KAFKA_CFG_NUM_PARTITIONS=1
       #Kafka服务器的监听地址和端口;INTERNAL 在 9094 端口监听,供内部通信使用,CLIENT 在 9095 端口监听,供客户端使用,CONTROLLER 在 9093 端口监听,供Controller使用,EXTERNAL 在 9092 端口监听,供外部访问。
       - KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
       #客户端连接Kafka时使用的地址 INTERNAL 在 0.0.0.0:9094 广播,供内部通信使用。CLIENT 在 9095 端口广播,供客户端使用。EXTERNAL 在 0.0.0.0:9092 广播,供外部访问。
       - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://10.84.3.127:9094,CLIENT://10.84.3.127:9095,EXTERNAL://10.84.3.127:9092
       #每个监听器使用的安全协议
       - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
       #Kafka Broker之间的通信监听器为 INTERNAL。这个值需要与 KAFKA_CFG_LISTENERS 中的某个监听器名称匹配,确保Broker之间的通信使用这个监听器。
       - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
       #Apache Kafka 代理间通信协议
       - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
       #Broker之间通信使用的SASL认证机制为 PLAIN。
       - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
       #Kafka 客户端用户 默认:user
       - KAFKA_CLIENT_USERS=superAdmin
       #Kafka 客户端用户密码 默认:bitnami
       - KAFKA_CLIENT_PASSWORDS=Sec@2024...
       #Kafka 代理间通信用户
       - KAFKA_INTER_BROKER_USER=superAdmin
       #Kafka 代理间通信密码
       - KAFKA_INTER_BROKER_PASSWORD=Sec@2024...
       #Kafka 节点的唯一 ID
       - KAFKA_CFG_NODE_ID=2
       #Kafka KRaft模式下的Controller仲裁投票者列表。包括集群中所有Controller的节点ID和对应的地址,用于选举和管理Controller的任务。
       - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.84.3.126:9093,2@10.84.3.127:9093,3@10.84.3.128:9093 #多集群2@10.84.3.126:9093,3@10.84.3.45:9093
       #使用 Kafka Raft 模式(KRaft)时的 Kafka 集群 ID 可自定义
       - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
       #创建Topic时的默认分区数量
       - KAFKA_CFG_NUM_PARTITIONS=30
       #日志保留的时间,单位为小时
       - KAFKA_LOG_RETENTION_HOURS=168
       #每个日志段的最大大小(字节)1GB
       - KAFKA_CFG_SEGMENT_BYTES=1073741824
       #日志保留检查的时间间隔,以毫秒为单位。即每隔300秒(5分钟)检查1次
       - KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS=300000
       #消息的最大字节数。5000000表示每条消息最大为5MB。
       - KAFKA_CFG_MESSAGE_MAX_BYTES=5000000
     volumes:
       - './data:/bitnami/kafka/data:rw'
     networks:
      - kafka

networks:
  kafka:
    driver: bridge
EOF

2.2.4 节点3 docker-compose文件

cat > docker-compose.yaml << EOF
version: "3.3"
services:
   kafka-materialmq:
     image: 'bitnami/kafka:3.8.0'
     container_name: kafka-materialmq
     user: root
     ports:
       - '9092:9092'
       - '9093:9093'
     environment:
       #设置容器内的时区为“Asia/Shanghai”
       - TZ=Asia/Shanghai
       #Kafka KRaft 角色的逗号分隔列表
       - KAFKA_CFG_PROCESS_ROLES=broker,controller
       #开启Bitnami容器的调试模式
       - BITNAMI_DEBUG=true
       #指定Controller的监听器名称,这里设置为“CONTROLLER”,对应后面的 KAFKA_CFG_LISTENERS 中的 CONTROLLER://0.0.0.0:9093。
       - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
       #设置Kafka中创建的每个主题的默认分区数为1。这意味着如果创建一个主题时不指定分区数,它将只会有一个分区。
       - KAFKA_CFG_NUM_PARTITIONS=1
       #Kafka服务器的监听地址和端口;INTERNAL 在 9094 端口监听,供内部通信使用,CLIENT 在 9095 端口监听,供客户端使用,CONTROLLER 在 9093 端口监听,供Controller使用,EXTERNAL 在 9092 端口监听,供外部访问。
       - KAFKA_CFG_LISTENERS=INTERNAL://:9094,CLIENT://:9095,CONTROLLER://0.0.0.0:9093,EXTERNAL://0.0.0.0:9092
       #客户端连接Kafka时使用的地址 INTERNAL 在 0.0.0.0:9094 广播,供内部通信使用。CLIENT 在 9095 端口广播,供客户端使用。EXTERNAL 在 0.0.0.0:9092 广播,供外部访问。
       - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://10.84.3.128:9094,CLIENT://10.84.3.128:9095,EXTERNAL://10.84.3.128:9092
       #每个监听器使用的安全协议
       - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_PLAINTEXT,CLIENT:SASL_PLAINTEXT,CONTROLLER:PLAINTEXT,EXTERNAL:SASL_PLAINTEXT
       #Kafka Broker之间的通信监听器为 INTERNAL。这个值需要与 KAFKA_CFG_LISTENERS 中的某个监听器名称匹配,确保Broker之间的通信使用这个监听器。
       - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
       #Apache Kafka 代理间通信协议
       - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
       #Broker之间通信使用的SASL认证机制为 PLAIN。
       - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
       #Kafka 客户端用户 默认:user
       - KAFKA_CLIENT_USERS=superAdmin
       #Kafka 客户端用户密码 默认:bitnami
       - KAFKA_CLIENT_PASSWORDS=Sec@2024...
       #Kafka 代理间通信用户
       - KAFKA_INTER_BROKER_USER=superAdmin
       #Kafka 代理间通信密码
       - KAFKA_INTER_BROKER_PASSWORD=Sec@2024...
       #Kafka 节点的唯一 ID
       - KAFKA_CFG_NODE_ID=3
       #Kafka KRaft模式下的Controller仲裁投票者列表。包括集群中所有Controller的节点ID和对应的地址,用于选举和管理Controller的任务。
       - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=1@10.84.3.126:9093,2@10.84.3.127:9093,3@10.84.3.128:9093 #多集群2@10.84.3.126:9093,3@10.84.3.45:9093
       #使用 Kafka Raft 模式(KRaft)时的 Kafka 集群 ID 可自定义
       - KAFKA_KRAFT_CLUSTER_ID=LelM2dIFQkiUFvXCEcqRWA
       #创建Topic时的默认分区数量
       - KAFKA_CFG_NUM_PARTITIONS=30
       #日志保留的时间,单位为小时
       - KAFKA_LOG_RETENTION_HOURS=168
       #每个日志段的最大大小(字节)1GB
       - KAFKA_CFG_SEGMENT_BYTES=1073741824
       #日志保留检查的时间间隔,以毫秒为单位。即每隔300秒(5分钟)检查1次
       - KAFKA_CFG_LOG_RETENTION_CHECK_INTERVAL_MS=300000
       #消息的最大字节数。5000000表示每条消息最大为5MB。
       - KAFKA_CFG_MESSAGE_MAX_BYTES=5000000
     volumes:
       - './data:/bitnami/kafka/data:rw'
     networks:
      - kafka

networks:
  kafka:
    driver: bridge
EOF

2.2.5 启动三台kafka服务

#三台机器都需要在docker-compose.yaml所在目录执行
docker-compose up -d 

3. Kafka 读写数据

参考文档:https://kafka.apache.org/quickstart
常见命令:

  • kafka-topics.sh #消息的管理命令
  • kafka-console-producer.sh #生产者的模拟命令
  • kafka-console-consumer.sh #消费者的模拟命令

3.1 创建 Topic

创建topic名为 test,partitions(分区)为3,replication(每个分区的副本数/每个分区的分区因子)为 2

root@starlink007:/apps/kafka# docker-compose exec -T kafka-materialmq kafka-topics.sh --create --topic test --bootstrap-server 10.84.3.126:9092 --partitions 3 --replication-factor 2 --command-config /opt/bitnami/kafka/config/producer.properties
Created topic test.

3.2 获取所有 Topic

root@starlink007:/apps/kafka# docker-compose exec -T kafka-materialmq kafka-topics.sh --list --bootstrap-server 10.84.3.126:9092 --command-config /opt/bitnami/kafka/config/producer.properties
test

3.3 验证 Topic 详情

root@starlink007:/apps/kafka# docker-compose exec -T kafka-materialmq kafka-topics.sh --describe --bootstrap-server 10.84.3.126:9092 --topic test --command-config /opt/bitnami/kafka/config/producer.properties
Topic: test        TopicId: i-k5__5bTUKqiepqAPZO2Q        PartitionCount: 3        ReplicationFactor: 2        Configs: 
        Topic: test        Partition: 0        Leader: 1        Replicas: 1,2        Isr: 1        Elr:         LastKnownElr: 
        Topic: test        Partition: 1        Leader: 2        Replicas: 2,3        Isr: 2        Elr:         LastKnownElr: 
        Topic: test        Partition: 2        Leader: 3        Replicas: 3,1        Isr: 3        Elr:         LastKnownElr:
  • Topic: test 主题名称。在这个例子中,主题名为 test。
  • TopicId: i-k5__5bTUKqiepqAPZO2Q 主题的唯一标识符。在Kafka中,每个主题都有一个全局唯一的ID,用于内部管理和识别。在这个例子中,主题ID为 i-k5__5bTUKqiepqAPZO2Q。
  • PartitionCount: 3 主题的分区数量。分区是Kafka中数据的最小单位,每个分区都是一个有序的、不可变的消息队列。在这个例子中,主题 test 有3个分区
  • ReplicationFactor: 2 主题的副本因子。这是每个分区的副本数量,用于数据冗余和容错。在这个例子中,每个分区有2个副本。
  • Configs:主题的配置。这里显示了主题的任何自定义配置。在这个例子中,没有显示任何特定配置,意味着主题使用的是默认配置。
  • Partition: 分区的编号。分区编号从0开始,连续编号。
  • Leader: 当前分区的领导者。在Kafka中,每个分区都有一个Leader,负责处理所有读写请求。在这个例子中,分区0的领导者是Broker 1,分区1的领导者是Broker 2,分区2的领导者是Broker 3。
  • Replicas: 分区的所有副本,包括Leader。这里列出了所有包含该分区副本的Broker的ID。例如,分区0的副本位于Broker 1和Broker 2上。
  • Isr: In-Sync Replicas,即与Leader同步的副本。这些是能够立即接替Leader角色的副本,如果领导者失败的话。在这个例子中,每个分区的Isr都与Replicas相同,意味着所有副本都是同步的。
  • Elr: Eligible Leader Replicas,即有资格成为领导者的副本。在Kafka中,只有Isr中的副本才有资格成为Leader。在这个例子中,Elr列为空,这可能是因为kafka-topics.sh命令的版本或配置中没有显示这个信息。
  • LastKnownElr: 这是上一次已知的Elr列表,用于故障恢复时的参考。在这个例子中,LastKnownElr列也为空,原因同上。
    1.5.4 生产 Topic

3.4 生产 Topic

kafka-console-producer.sh 格式:

#发送消息命令格式:
kafka-console-producer.sh --broker-list <kafkaIP1>:<端口>,<kafkaIP2>:<端口> --topic <topic名称> --producer.config <认证文件>

范例:

root@starlink007:/apps/kafka# docker-compose exec -T kafka-materialmq kafka-console-producer.sh --bootstrap-server 10.84.3.126:9092 --topic test --producer.config /opt/bitnami/kafka/config/producer.properties
>message1 #交互式命令窗口
>message2
>message3
>

3.5 消费 Topic

kafka-console-consumer.sh 格式:

#接收消息命令格式:
kafka-console-consumer.sh --bootstrap-server <host>:<post> --topic <topic名称> --from-beginning --consumerproperty group.id=<组名称> --producer.config <认证文件>

注意:消息者先生产消息,消费都后续才启动,也能收到之前生产的消息

  • 在 Kafka 中,一个消费组(consumer group)内的每条消息只能被该组内的一个消费者(consumer)消费。Kafka 通过分区(partition)将消息分配给消费者。一个主题可以包含多个分区,每个分区内的消息只能被消费组中的一个消费者处理。例如,如果有100条消息,分布在两个分区(a分区和b分区),而消费组中有两个消费者A和B,那么A可能会消费a分区的50条消息,B则消费b分区的50条消息,从而实现负载均衡。此外,不同的消费组可以独立消费相同的消息。这意味着,即使一个消费组已经消费了这些消息,另一个消费组中的消费者仍然可以再次消费它们。
  • --from-beginning 表示消费前发布的消息也能收到,默认只能收到消费后发布的新消息
    范例:
#交互式持续接收消息,按Ctrl+C退出
root@starlink007:/apps/kafka# docker-compose exec -T kafka-materialmq kafka-console-consumer.sh --bootstrap-server 10.84.3.126:9092 --topic test --consumer.config /opt/bitnami/kafka/config/consumer.properties
message1
message2
message3

3.6 删除 Topic

#接收消息命令格式:
kafka-topics.sh --delete --bootstrap-server <host>:<post> --topic <topic名称>

范例:

#查看现有topic
root@starlink007:/apps/kafka# docker-compose exec -T kafka-materialmq kafka-topics.sh --list --bootstrap-server 10.84.3.126:9092 --command-config /opt/bitnami/kafka/config/producer.properties
__consumer_offsets
test

#删除 Topic  test
root@starlink007:/apps/kafka# docker-compose exec -T kafka-materialmq kafka-topics.sh --delete --bootstrap-server 10.84.3.126:9092 --topic test --command-config /opt/bitnami/kafka/config/producer.properties

#验证是否成功删除
root@starlink007:/apps/kafka# docker-compose exec -T kafka-materialmq kafka-topics.sh --list --bootstrap-server 10.84.3.126:9092 --command-config /opt/bitnami/kafka/config/producer.properties
__consumer_offsets

4. Web端操作kafka

4.1 连接集群

访问测试工具kafkamap地址:http://10.84.3.126:28080
用户名:superAdmin
密码:Sec@2024...

4.2 连接集群

kafka_web_connect

4.3 连接成功

kafka_web_connect_success

4.4 进入topic

kafka_web_edit

4.5 创建topic

kafka_web_create_topic

5. kafka-exporter

1、部署kafka-exporter

services:
  kafka-exporter:
    image: danielqsj/kafka-exporter:v1.8.0
    command: 
      - "--kafka.server=10.84.3.126:9092,10.84.3.127:9092,10.84.3.128:9092"
      - "--sasl.enabled"
      - "--sasl.mechanism=PLAIN"
      - "--sasl.username=superAdmin"
      - "--sasl.password=Sec@2024..."
      - "--kafka.version=3.8.0"
    ports:
      - "9308:9308"
    restart: unless-stopped

2、Prometheus端配置

2.1、配置prometheus.yaml

  - job_name: 'kafka_exporter'
    file_sd_configs:
    - files:
      - /apps/prometheus/Discovery/kafka_exporter.yml
      refresh_interval: 5s

2.2、file_sd_configs配置

- targets:
  - 10.84.3.126:9308
  labels:
    app: kafka_exporter
最后修改:2024 年 12 月 23 日
如果觉得我的文章对你有用,请随意赞赏