Docker 安装 kafka (bitnami/kafka:4.0)
1、拉取镜像
docker pull bitnami/kafka:4.0
2、创建挂载目录
mkdir -p /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/data
mkdir -p /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/logs
3、给挂载目录授权
chmod 777 /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/data
chmod 777 /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/logs
4、运行容器
4.1、运行命令
docker run -d
--name bitnami_kafka_4.0
--restart always
--ulimit nofile=65536:65536
-e TZ=Asia/Shanghai
-e KAFKA_ENABLE_KRAFT=yes
-e KAFKA_CFG_NODE_ID=0
-e KAFKA_CFG_PROCESS_ROLES=controller,broker
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://113.45.38.93:9092
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
-e KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data
-p 9092:9092
-v /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/data:/bitnami/kafka
-v /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/logs:/opt/bitnami/kafka/logs
--memory=512m
--cpus="1.0"
bitnami/kafka:4.0
4.2、命令解释
下面先给出这条 docker run
命令的整体概览,然后逐行详解各个参数和环境变量的作用。
这条命令启动了一个基于 Bitnami 提供的 Kafka 4.0 镜像的容器,并在 KRaft 模式(即不依赖 Zookeeper)下同时担任 controller 和 broker。它设置了时区、节点 ID、监听器及广告地址,指定数据和日志存储目录,限制了文件句柄数、内存和 CPU 使用,并在主机上映射了对应端口,保证容器重启策略为“始终重启”。
4.2.1、 基本启动与命名
docker run -d
--name bitnami_kafka_4.0
--restart always
docker run -d
:以“后台模式”(detached)启动容器,使其在后台运行,不占用当前终端。--name bitnami_kafka_4.0
:为容器指定名字,方便后续管理和运维。--restart always
:容器退出(无论退出码为何)或 Docker 守护进程重启后,都会自动重启该容器,保证 Kafka 服务的高可用性。
4.2.2、 文件句柄限制
--ulimit nofile=65536:65536
--ulimit nofile=65536:65536
:将容器内“最大打开文件数”(nofile
)软限制和硬限制都设置为 65536,避免 Kafka 在高并发情况下因文件描述符不足而崩溃。
4.2.3、时区设置
-e TZ=Asia/Shanghai
-e TZ=Asia/Shanghai
:设置容器时区为北京时间(东八区),使 Kafka 日志及监控时间戳与本地时间保持一致,便于分析与排查。
4.2.4、 KRaft 模式开启及节点角色
-e KAFKA_ENABLE_KRAFT=yes
-e KAFKA_CFG_NODE_ID=0
-e KAFKA_CFG_PROCESS_ROLES=controller,broker
-e KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
KAFKA_ENABLE_KRAFT=yes
:启用 KRaft 模式(Kafka Raft Metadata),这是 Kafka 未来推荐的无 Zookeeper 架构。KAFKA_CFG_NODE_ID=0
:为该 Kafka 实例分配唯一的节点 ID,KRaft 模式下用于集群内部元数据选举。KAFKA_CFG_PROCESS_ROLES=controller,broker
:指定该节点同时承担 “controller” 和 “broker” 两种角色。KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@127.0.0.1:9093
:定义控制器选举的投票列表,格式为
,此处只有一个节点自身(单节点集群)。@ :
4.2.5、 监听器配置
-e KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
-e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092
-e KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
-e KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
:定义两个监听端口:PLAINTEXT
:对外提供普通客户端连接,监听容器内部9092
端口;CONTROLLER
:内部控制器通信,监听容器内部9093
端口。
KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://IP:9092
:告诉客户端连接时使用的地址和端口,通常设置为宿主机或对外 IP,方便外部服务或用户连接。KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
:将每个监听器映射到安全协议,此处均为明文(PLAINTEXT)。KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
:指定用于 controller 通信的监听器名称为CONTROLLER
。
4.2.6、 日志与数据目录挂载
-e KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data
-p 9092:9092
-v /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/data:/bitnami/kafka
-v /user/lzl/tool/docker/kafka/bitnami/bitnami_kafka_4.0/home/logs:/opt/bitnami/kafka/logs
KAFKA_CFG_LOG_DIRS=/bitnami/kafka/data
:Kafka 存储分区数据的目录。-p 9092:9092
:将宿主机的9092
端口映射到容器的9092
,使外部客户端可以访问。-v …/data:/bitnami/kafka
:把宿主机的数据目录挂载到容器中,用于持久化 Kafka topic 数据。-v …/logs:/opt/bitnami/kafka/logs
:把宿主机的日志目录挂载到容器,用于持久化 Kafka 日志文件,便于排查。
4.2.7、 资源限制
--memory=512m
--cpus="1.0"
--memory=512m
:限制容器使用最多 512MB 内存,避免单容器占用过多宿主机资源。--cpus="1.0"
:限制容器最多使用一个 CPU 核心,保障宿主机上其他服务的性能。
4.2.8、 镜像与版本
bitnami/kafka:4.0
- 使用 Bitnami 官方维护的
bitnami/kafka:4.0
镜像。Bitnami 镜像通常附带最佳实践的配置和管理脚本,方便快速部署生产级 Kafka 服务。
5、SpringBoot 整合
5.1、引入依赖
<dependency>
<groupId>org.springframework.kafkagroupId>
<artifactId>spring-kafkaartifactId>
<version>${spring.kafka.version}version>
dependency>
5.2、yml 配置
spring:
kafka:
bootstrap-servers: IP:9092
listener:
ack-mode: manual_immediate
consumer:
group-id: kafka_consumer_group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# earliest 启动时,如果找不到有效的偏移量(如消费组第一次启动或偏移量已过期),从 Topic 的最早消息开始消费
# latest(默认值) 启动时,如果找不到有效的偏移量,从 Topic 的最新消息开始消费(跳过历史消息)
# none 如果没有有效偏移量,则抛出异常
auto-offset-reset: earliest
enable-auto-commit: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 发送消息失败允许重试次数
retries: 5
5.3、使用 kafka 发送消息
import org.springframework.kafka.core.KafkaTemplate;
/**
* 消息发送
*/
@Slf4j
@Component
public class EventPublisher {
@Resource
private KafkaTemplate<String, String> kafkaTemplate;
public void publish(String topic, BaseEvent.EventMessage<?> eventMessage) {
String messageJson = JSONUtil.toJsonStr(eventMessage);
publish(topic, messageJson);
}
public void publish(String topic, String eventMessageJSON) {
try {
kafkaTemplate.send(topic, eventMessageJSON);
log.info("发送MQ消息 topic:{} message:{}", topic, eventMessageJSON);
} catch (Exception e) {
log.error("发送MQ消息失败 topic:{} message:{}", topic, eventMessageJSON, e);
throw e;
}
}
}
5.4、使用 kafka 消费消息
@KafkaListener(topics = {topic}, groupId = "${spring.kafka.consumer.group-id}")
public void listener(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) {
String message = record.value();
try {
log.info("监听发送消息,topic: {} message: {}", topic, message);
// 手动确认消费成功
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("监听发送消息,消费失败 topic: {} message: {}", topic, message);
}
}