Kafka生产环境应用方案:高可用集群部署与运维实战
架构图
┌─────────────────────────────────────────────────────────────────────────────────┐ │ Kafka生产环境架构 │ ├─────────────────────────────────────────────────────────────────────────────────┤ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Producer1 │ │ Producer2 │ │ Producer3 │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ │ │ │ └─────────────────┼─────────────────┘ │ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ Kafka Cluster │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Broker1 │ │ Broker2 │ │ Broker3 │ │ Broker4 │ │ │ │ │ │192.168.1.11 │ │192.168.1.12 │ │192.168.1.13 │ │192.168.1.14 │ │ │ │ │ │ Port:9092 │ │ Port:9092 │ │ Port:9092 │ │ Port:9092 │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┐ │ │ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ ZooKeeper Cluster │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ ZK1 │ │ ZK2 │ │ ZK3 │ │ │ │ │ │192.168.1.21 │ │192.168.1.22 │ │192.168.1.23 │ │ │ │ │ │ Port:2181 │ │ Port:2181 │ │ Port:2181 │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘ │ │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ Consumer1 │ │ Consumer2 │ │ Consumer3 │ │ │ │ (Group A) │ │ (Group B) │ │ (Group C) │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ │ ┌─────────────────────────────────────────────────────────────────────────┐ │ │ │ 监控系统 │ │ │ │ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │ │ │ │ │ Prometheus │ │ Grafana │ │ Kafka │ │ │ │ │ │ Metrics │ │ Dashboard │ │ Manager │ │ │ │ │ └─────────────┘ └─────────────┘ └─────────────┘ │ │ │ └─────────────────────────────────────────────────────────────────────────┘ │ └─────────────────────────────────────────────────────────────────────────────────┘
引言
Apache Kafka作为分布式流处理平台,在现代大数据架构中扮演着消息中间件的核心角色。本文将从运维工程师的角度,详细介绍Kafka在生产环境中的部署方案、配置优化、监控运维等关键技术。通过实战案例和代码示例,帮助运维团队构建稳定、高效的Kafka集群。
1. Kafka集群自动化部署
1.1 ZooKeeper集群部署脚本
#!/bin/bash
# ZooKeeper集群自动化部署脚本
set -e
ZK_VERSION="3.8.1"
ZK_NODES=("192.168.1.21""192.168.1.22""192.168.1.23")
ZK_DATA_DIR="/data/zookeeper"
ZK_LOG_DIR="/logs/zookeeper"
# 创建ZooKeeper用户
useradd -r -s /bin/false zookeeper
# 下载安装ZooKeeper
install_zookeeper() {
cd /tmp
wget https://archive.apache.org/dist/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz
tar -xzf apache-zookeeper-${ZK_VERSION}-bin.tar.gz
mv apache-zookeeper-${ZK_VERSION}-bin /opt/zookeeper
chown -R zookeeper:zookeeper /opt/zookeeper
}
# 配置ZooKeeper
configure_zookeeper() {
local node_id=$1
local node_ip=$2
# 创建数据目录
mkdir -p ${ZK_DATA_DIR}${ZK_LOG_DIR}
chown -R zookeeper:zookeeper ${ZK_DATA_DIR}${ZK_LOG_DIR}
# 设置节点ID
echo${node_id} > ${ZK_DATA_DIR}/myid
# 生成配置文件
cat > /opt/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=${ZK_DATA_DIR}
dataLogDir=${ZK_LOG_DIR}
clientPort=2181
maxClientCnxns=60
# 集群配置
server.1=192.168.1.213888
server.2=192.168.1.223888
server.3=192.168.1.233888
# 性能优化
autopurge.snapRetainCount=10
autopurge.purgeInterval=1
EOF
}
# 启动ZooKeeper服务
start_zookeeper() {
# 创建systemd服务文件
cat > /etc/systemd/system/zookeeper.service << EOF
[Unit]
Description=Apache ZooKeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target
[Service]
Type=forking
User=zookeeper
Group=zookeeper
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
TimeoutSec=30
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable zookeeper
systemctl start zookeeper
}
# 执行部署
install_zookeeper
configure_zookeeper $1$2
start_zookeeper
ZooKeeper作为Kafka的协调服务,需要奇数个节点组成集群以保证高可用性。通过自动化脚本可以快速部署标准化的ZooKeeper环境。
1.2 Kafka集群部署配置
#!/bin/bash
# Kafka集群部署脚本
KAFKA_VERSION="2.8.2"
KAFKA_NODES=("192.168.1.11""192.168.1.12""192.168.1.13""192.168.1.14")
KAFKA_DATA_DIR="/data/kafka"
KAFKA_LOG_DIR="/logs/kafka"
# 安装Kafka
install_kafka() {
cd /tmp
wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-${KAFKA_VERSION}.tgz
tar -xzf kafka_2.13-${KAFKA_VERSION}.tgz
mv kafka_2.13-${KAFKA_VERSION} /opt/kafka
# 创建kafka用户
useradd -r -s /bin/false kafka
chown -R kafka:kafka /opt/kafka
# 创建数据目录
mkdir -p ${KAFKA_DATA_DIR}${KAFKA_LOG_DIR}
chown -R kafka:kafka ${KAFKA_DATA_DIR}${KAFKA_LOG_DIR}
}
# 生成Kafka服务器配置
generate_kafka_config() {
local broker_id=$1
local node_ip=$2
cat > /opt/kafka/config/server.properties << EOF
# 服务器基础配置
broker.id=${broker_id}
listeners=PLAINTEXT://${node_ip}:9092
advertised.listeners=PLAINTEXT://${node_ip}:9092
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
# 日志配置
log.dirs=${KAFKA_DATA_DIR}
num.partitions=3
num.recovery.threads.per.data.dir=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
# ZooKeeper配置
zookeeper.connect=192.168.1.21:2181,192.168.1.22:2181,192.168.1.23:2181/kafka
zookeeper.connection.timeout.ms=18000
# 性能优化配置
replica.fetch.max.bytes=1048576
message.max.bytes=1000000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
fetch.purgatory.purge.interval.requests=1000
producer.purgatory.purge.interval.requests=1000
delete.topic.enable=true
# JVM配置
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
EOF
}
# 创建Kafka系统服务
create_kafka_service() {
cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service
[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF
systemctl daemon-reload
systemctl enable kafka
systemctl start kafka
}
# 执行部署
install_kafka
generate_kafka_config $1$2
create_kafka_service
2. 生产环境性能优化
2.1 生产者性能调优
#!/usr/bin/env python3
# Kafka生产者性能优化配置
from kafka import KafkaProducer
import json
import time
import threading
from concurrent.futures import ThreadPoolExecutor
classOptimizedKafkaProducer:
def__init__(self, bootstrap_servers, topic):
self.topic = topic
self.producer = KafkaProducer(
bootstrap_servers=bootstrap_servers,
# 性能优化配置
batch_size=16384, # 批处理大小
linger_ms=10, # 延迟发送时间
buffer_memory=33554432, # 缓冲区大小32MB
compression_type='snappy', # 压缩算法
max_in_flight_requests_per_connection=5,
retries=3, # 重试次数
retry_backoff_ms=100,
request_timeout_ms=30000,
# 序列化配置
value_serializer=lambda v: json.dumps(v).encode('utf-8'),
key_serializer=lambda k: str(k).encode('utf-8')
)
defsend_message_sync(self, key, value):
"""同步发送消息"""
try:
future = self.producer.send(self.topic, key=key, value=value)
record_metadata = future.get(timeout=10)
return {
'topic': record_metadata.topic,
'partition': record_metadata.partition,
'offset': record_metadata.offset
}
except Exception as e:
print(f"发送消息失败: {e}")
returnNone
defsend_message_async(self, key, value, callback=None):
"""异步发送消息"""
try:
future = self.producer.send(self.topic, key=key, value=value)
if callback:
future.add_callback(callback)
return future
except Exception as e:
print(f"发送消息失败: {e}")
returnNone
defbatch_send_performance_test(self, message_count=100000):
"""批量发送性能测试"""
start_time = time.time()
# 使用线程池并发发送
with ThreadPoolExecutor(max_workers=10) as executor:
futures = []
for i inrange(message_count):
message = {
'id': i,
'timestamp': time.time(),
'data': f'test_message_{i}',
'source': 'performance_test'
}
future = executor.submit(self.send_message_async, str(i), message)
futures.append(future)
# 等待所有消息发送完成
for future in futures:
try:
future.result(timeout=30)
except Exception as e:
print(f"消息发送异常: {e}")
# 确保所有消息都发送出去
self.producer.flush()
end_time = time.time()
duration = end_time - start_time
throughput = message_count / duration
print(f"发送 {message_count} 条消息")
print(f"总耗时: {duration:.2f} 秒")
print(f"吞吐量: {throughput:.2f} 消息/秒")
defclose(self):
self.producer.close()
# 使用示例
if __name__ == "__main__":
producer = OptimizedKafkaProducer(
bootstrap_servers=['192.168.1.11:9092', '192.168.1.12:9092'],
topic='performance_test'
)
# 执行性能测试
producer.batch_send_performance_test(50000)
producer.close()
2.2 消费者性能优化
#!/usr/bin/env python3
# Kafka消费者性能优化配置
from kafka import KafkaConsumer
import json
import time
import threading
from concurrent.futures import ThreadPoolExecutor
classOptimizedKafkaConsumer:
def__init__(self, topics, group_id, bootstrap_servers):
self.topics = topics
self.group_id = group_id
self.consumer = KafkaConsumer(
*topics,
bootstrap_servers=bootstrap_servers,
group_id=group_id,
# 性能优化配置
fetch_min_bytes=1024, # 最小拉取字节数
fetch_max_wait_ms=500, # 最大等待时间
max_poll_records=500, # 单次拉取最大记录数
max_poll_interval_ms=300000, # 最大轮询间隔
session_timeout_ms=30000, # 会话超时时间
heartbeat_interval_ms=10000, # 心跳间隔
# 消费策略
auto_offset_reset='earliest',
enable_auto_commit=False, # 手动提交偏移量
# 反序列化配置
value_deserializer=lambda m: json.loads(m.decode('utf-8')),
key_deserializer=lambda m: m.decode('utf-8') if m elseNone
)
defconsume_messages_batch(self, batch_size=100, timeout=5000):
"""批量消费消息"""
message_batch = []
try:
# 批量拉取消息
message_pack = self.consumer.poll(timeout_ms=timeout)
for topic_partition, messages in message_pack.items():
for message in messages:
message_batch.append({
'topic': message.topic,
'partition': message.partition,
'offset': message.offset,
'key': message.key,
'value': message.value,
'timestamp': message.timestamp
})
iflen(message_batch) >= batch_size:
# 处理批量消息
self.process_message_batch(message_batch)
message_batch = []
# 处理剩余消息
if message_batch:
self.process_message_batch(message_batch)
# 手动提交偏移量
self.consumer.commit()
except Exception as e:
print(f"消费消息异常: {e}")
defprocess_message_batch(self, messages):
"""批量处理消息"""
with ThreadPoolExecutor(max_workers=5) as executor:
futures = []
for message in messages:
future = executor.submit(self.process_single_message, message)
futures.append(future)
# 等待所有消息处理完成
for future in futures:
try:
future.result(timeout=30)
except Exception as e:
print(f"处理消息异常: {e}")
defprocess_single_message(self, message):
"""处理单条消息"""
try:
# 模拟业务处理
time.sleep(0.001)
# 记录处理日志
print(f"处理消息: Topic={message['topic']}, "
f"Partition={message['partition']}, "
f"Offset={message['offset']}")
except Exception as e:
print(f"处理单条消息异常: {e}")
defstart_consuming(self):
"""开始消费消息"""
print(f"开始消费主题: {self.topics}")
try:
whileTrue:
self.consume_messages_batch()
except KeyboardInterrupt:
print("停止消费")
finally:
self.consumer.close()
# 使用示例
if __name__ == "__main__":
consumer = OptimizedKafkaConsumer(
topics=['performance_test'],
group_id='performance_consumer_group',
bootstrap_servers=['192.168.1.11:9092', '192.168.1.12:9092']
)
consumer.start_consuming()
3. 监控与运维自动化
3.1 Kafka集群监控脚本
#!/bin/bash
# Kafka集群监控脚本
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
ALERT_EMAIL="admin@company.com"
LOG_FILE="/var/log/kafka_monitor.log"
# 检查Kafka集群状态
check_kafka_cluster() {
echo"$(date): 检查Kafka集群状态" >> $LOG_FILE
# 检查broker列表
broker_list=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server ${KAFKA_BROKERS} 2>/dev/null | grep -c "id:")
if [ "$broker_list" -lt 3 ]; then
echo"ALERT: Kafka集群可用broker不足: $broker_list" | mail -s "Kafka Cluster Alert"$ALERT_EMAIL
echo"$(date): ALERT - 可用broker不足: $broker_list" >> $LOG_FILE
fi
}
# 检查主题状态
check_topic_health() {
echo"$(date): 检查主题健康状态" >> $LOG_FILE
# 获取主题列表
topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --list)
for topic in$topics; do
# 检查主题描述
topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic $topic)
# 检查是否有离线分区
offline_partitions=$(echo"$topic_desc" | grep -c "Leader: -1")
if [ "$offline_partitions" -gt 0 ]; then
echo"ALERT: 主题 $topic 有 $offline_partitions 个离线分区" | mail -s "Kafka Topic Alert"$ALERT_EMAIL
echo"$(date): ALERT - 主题 $topic 离线分区: $offline_partitions" >> $LOG_FILE
fi
done
}
# 检查消费者组延迟
check_consumer_lag() {
echo"$(date): 检查消费者组延迟" >> $LOG_FILE
# 获取消费者组列表
consumer_groups=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --list)
for group in$consumer_groups; do
# 获取消费者组详情
group_desc=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group $group)
# 检查延迟
max_lag=$(echo"$group_desc" | awk 'NR>1 {print $5}' | grep -v "-" | sort -n | tail -1)
if [ -n "$max_lag" ] && [ "$max_lag" -gt 10000 ]; then
echo"ALERT: 消费者组 $group 最大延迟: $max_lag" | mail -s "Kafka Consumer Lag Alert"$ALERT_EMAIL
echo"$(date): ALERT - 消费者组 $group 延迟过高: $max_lag" >> $LOG_FILE
fi
done
}
# 收集性能指标
collect_metrics() {
echo"$(date): 收集Kafka性能指标" >> $LOG_FILE
# 收集JVM指标
for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do
kafka_pid=$(ssh $broker"pgrep -f kafka")
if [ -n "$kafka_pid" ]; then
# 内存使用率
memory_usage=$(ssh $broker"ps -p $kafka_pid -o %mem --no-headers")
echo"$(date): Broker $broker 内存使用率: $memory_usage%" >> $LOG_FILE
# CPU使用率
cpu_usage=$(ssh $broker"ps -p $kafka_pid -o %cpu --no-headers")
echo"$(date): Broker $broker CPU使用率: $cpu_usage%" >> $LOG_FILE
fi
done
}
# 主监控循环
whiletrue; do
check_kafka_cluster
check_topic_health
check_consumer_lag
collect_metrics
sleep 300 # 5分钟检查一次
done
3.2 自动化运维脚本
#!/usr/bin/env python3
# Kafka自动化运维脚本
import subprocess
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
import logging
classKafkaOperations:
def__init__(self, kafka_home, brokers):
self.kafka_home = kafka_home
self.brokers = brokers
self.logger = self.setup_logger()
defsetup_logger(self):
"""设置日志记录"""
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/kafka_operations.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
defcreate_topic(self, topic_name, partitions=3, replication_factor=2):
"""创建主题"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.brokers,
"--create",
"--topic", topic_name,
"--partitions", str(partitions),
"--replication-factor", str(replication_factor)
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info(f"成功创建主题: {topic_name}")
returnTrue
else:
self.logger.error(f"创建主题失败: {result.stderr}")
returnFalse
except Exception as e:
self.logger.error(f"创建主题异常: {e}")
returnFalse
defdelete_topic(self, topic_name):
"""删除主题"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.brokers,
"--delete",
"--topic", topic_name
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info(f"成功删除主题: {topic_name}")
returnTrue
else:
self.logger.error(f"删除主题失败: {result.stderr}")
returnFalse
except Exception as e:
self.logger.error(f"删除主题异常: {e}")
returnFalse
defincrease_partitions(self, topic_name, new_partition_count):
"""增加分区数"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.brokers,
"--alter",
"--topic", topic_name,
"--partitions", str(new_partition_count)
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
self.logger.info(f"成功增加主题 {topic_name} 分区数到 {new_partition_count}")
returnTrue
else:
self.logger.error(f"增加分区失败: {result.stderr}")
returnFalse
except Exception as e:
self.logger.error(f"增加分区异常: {e}")
returnFalse
defrebalance_partitions(self, topic_name):
"""重新平衡分区"""
try:
# 生成重平衡计划
reassignment_file = f"/tmp/reassignment-{topic_name}.json"
# 获取当前分区分配
cmd_current = [
f"{self.kafka_home}/bin/kafka-topics.sh",
"--bootstrap-server", self.brokers,
"--describe",
"--topic", topic_name
]
current_result = subprocess.run(cmd_current, capture_output=True, text=True)
if current_result.returncode == 0:
# 生成重平衡计划
cmd_generate = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server", self.brokers,
"--topics-to-move-json-file", "/tmp/topics.json",
"--broker-list", "0,1,2,3",
"--generate"
]
# 执行重平衡
cmd_execute = [
f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
"--bootstrap-server", self.brokers,
"--reassignment-json-file", reassignment_file,
"--execute"
]
self.logger.info(f"开始重平衡主题: {topic_name}")
returnTrue
else:
self.logger.error(f"获取主题信息失败: {current_result.stderr}")
returnFalse
except Exception as e:
self.logger.error(f"重平衡异常: {e}")
returnFalse
defbackup_consumer_offsets(self, group_id):
"""备份消费者偏移量"""
try:
cmd = [
f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
"--bootstrap-server", self.brokers,
"--describe",
"--group", group_id
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
backup_file = f"/backup/consumer_offsets_{group_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
withopen(backup_file, 'w') as f:
f.write(result.stdout)
self.logger.info(f"成功备份消费者组 {group_id} 偏移量到 {backup_file}")
returnTrue
else:
self.logger.error(f"备份偏移量失败: {result.stderr}")
returnFalse
except Exception as e:
self.logger.error(f"备份偏移量异常: {e}")
returnFalse
# 使用示例
if __name__ == "__main__":
kafka_ops = KafkaOperations(
kafka_home="/opt/kafka",
brokers="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
)
# 创建主题
kafka_ops.create_topic("test_topic", partitions=6, replication_factor=3)
# 增加分区
kafka_ops.increase_partitions("test_topic", 12)
# 备份消费者偏移量
kafka_ops.backup_consumer_offsets("test_consumer_group")
4. 高可用与故障恢复
4.1 集群健康检查
#!/bin/bash
# Kafka集群健康检查与自动恢复
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
# 检查并修复不同步副本
check_and_fix_isr() {
echo"检查不同步副本..."
# 获取所有主题
topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --list)
for topic in$topics; do
# 检查主题详情
topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic $topic)
# 检查ISR不足的分区
isr_issues=$(echo"$topic_desc" | grep -E "Isr:|Replicas:" | awk '{
if ($1 == "Replicas:") replicas = NF-1;
if ($1 == "Isr:") isr = NF-1;
if (isr < replicas) print "ISR不足"
}')
if [ -n "$isr_issues" ]; then
echo"主题 $topic 存在ISR不足问题,尝试修复..."
# 触发首选副本选举
${KAFKA_HOME}/bin/kafka-leader-election.sh --bootstrap-server ${KAFKA_BROKERS} --election-type preferred --topic $topic
fi
done
}
# 自动故障恢复
auto_recovery() {
echo"执行自动故障恢复..."
# 重启失败的broker
for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do
if ! ssh $broker"systemctl is-active kafka" > /dev/null 2>&1; then
echo"重启broker: $broker"
ssh $broker"systemctl restart kafka"
sleep 30
fi
done
# 检查并修复ISR
check_and_fix_isr
# 验证集群状态
validate_cluster_state
}
validate_cluster_state() {
echo"验证集群状态..."
# 检查所有broker是否在线
online_brokers=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server ${KAFKA_BROKERS} 2>/dev/null | grep -c "id:")
if [ "$online_brokers" -eq 3 ]; then
echo"集群恢复正常,所有broker在线"
else
echo"集群恢复失败,在线broker数量: $online_brokers"
return 1
fi
}
# 执行健康检查和恢复
auto_recovery
总结
Kafka生产环境部署涉及多个关键环节:集群架构设计、性能参数调优、监控体系建设、自动化运维等。通过本文介绍的方案,运维工程师可以构建稳定、高效的Kafka集群。关键要点包括:合理的集群规模规划、科学的配置参数调优、完善的监控告警机制、可靠的故障恢复策略。在实际生产环境中,还需要根据具体业务场景进行针对性优化,持续监控和改进系统性能,确保消息队列服务的稳定性和可靠性。
全部0条评论
快来发表一下你的评论吧 !