Kafka生产环境应用方案

描述

Kafka生产环境应用方案:高可用集群部署与运维实战

架构图

 

┌─────────────────────────────────────────────────────────────────────────────────┐
│                           Kafka生产环境架构                                        │
├─────────────────────────────────────────────────────────────────────────────────┤
│                                                                                 │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                         │
│  │  Producer1  │    │  Producer2  │    │  Producer3  │                         │
│  └─────────────┘    └─────────────┘    └─────────────┘                         │
│           │                 │                 │                                │
│           └─────────────────┼─────────────────┘                                │
│                             │                                                  │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                      Kafka Cluster                                     │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐   │   │
│  │  │   Broker1   │  │   Broker2   │  │   Broker3   │  │   Broker4   │   │   │
│  │  │192.168.1.11 │  │192.168.1.12 │  │192.168.1.13 │  │192.168.1.14 │   │   │
│  │  │   Port:9092 │  │   Port:9092 │  │   Port:9092 │  │   Port:9092 │   │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘  └─────────────┘   │   │
│  └─────────────────────────────────────────────────────────────────────────┐   │
│                             │                                             │   │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                      ZooKeeper Cluster                                 │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                    │   │
│  │  │     ZK1     │  │     ZK2     │  │     ZK3     │                    │   │
│  │  │192.168.1.21 │  │192.168.1.22 │  │192.168.1.23 │                    │   │
│  │  │  Port:2181  │  │  Port:2181  │  │  Port:2181  │                    │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘                    │   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
│                             │                                                  │
│  ┌─────────────┐    ┌─────────────┐    ┌─────────────┐                         │
│  │  Consumer1  │    │  Consumer2  │    │  Consumer3  │                         │
│  │ (Group A)   │    │ (Group B)   │    │ (Group C)   │                         │
│  └─────────────┘    └─────────────┘    └─────────────┘                         │
│                                                                                 │
│  ┌─────────────────────────────────────────────────────────────────────────┐   │
│  │                      监控系统                                            │   │
│  │  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐                    │   │
│  │  │ Prometheus  │  │   Grafana   │  │   Kafka     │                    │   │
│  │  │   Metrics   │  │  Dashboard  │  │   Manager   │                    │   │
│  │  └─────────────┘  └─────────────┘  └─────────────┘                    │   │
│  └─────────────────────────────────────────────────────────────────────────┘   │
└─────────────────────────────────────────────────────────────────────────────────┘

 

引言

Apache Kafka作为分布式流处理平台,在现代大数据架构中扮演着消息中间件的核心角色。本文将从运维工程师的角度,详细介绍Kafka在生产环境中的部署方案、配置优化、监控运维等关键技术。通过实战案例和代码示例,帮助运维团队构建稳定、高效的Kafka集群。

1. Kafka集群自动化部署

1.1 ZooKeeper集群部署脚本

 

#!/bin/bash
# ZooKeeper集群自动化部署脚本
set -e

ZK_VERSION="3.8.1"
ZK_NODES=("192.168.1.21""192.168.1.22""192.168.1.23")
ZK_DATA_DIR="/data/zookeeper"
ZK_LOG_DIR="/logs/zookeeper"

# 创建ZooKeeper用户
useradd -r -s /bin/false zookeeper

# 下载安装ZooKeeper
install_zookeeper() {
    cd /tmp
    wget https://archive.apache.org/dist/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz
    tar -xzf apache-zookeeper-${ZK_VERSION}-bin.tar.gz
    mv apache-zookeeper-${ZK_VERSION}-bin /opt/zookeeper
    chown -R zookeeper:zookeeper /opt/zookeeper
}

# 配置ZooKeeper
configure_zookeeper() {
    local node_id=$1
    local node_ip=$2
    
    # 创建数据目录
    mkdir -p ${ZK_DATA_DIR}${ZK_LOG_DIR}
    chown -R zookeeper:zookeeper ${ZK_DATA_DIR}${ZK_LOG_DIR}
    
    # 设置节点ID
    echo${node_id} > ${ZK_DATA_DIR}/myid
    
    # 生成配置文件
    cat > /opt/zookeeper/conf/zoo.cfg << EOF
tickTime=2000
initLimit=10
syncLimit=5
dataDir=${ZK_DATA_DIR}
dataLogDir=${ZK_LOG_DIR}
clientPort=2181
maxClientCnxns=60

# 集群配置
server.1=192.168.1.213888
server.2=192.168.1.223888
server.3=192.168.1.233888

# 性能优化
autopurge.snapRetainCount=10
autopurge.purgeInterval=1
EOF
}

# 启动ZooKeeper服务
start_zookeeper() {
    # 创建systemd服务文件
    cat > /etc/systemd/system/zookeeper.service << EOF
[Unit]
Description=Apache ZooKeeper server
Documentation=http://zookeeper.apache.org
Requires=network.target remote-fs.target
After=network.target remote-fs.target

[Service]
Type=forking
User=zookeeper
Group=zookeeper
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeper/bin/zkServer.sh restart
TimeoutSec=30
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

    systemctl daemon-reload
    systemctl enable zookeeper
    systemctl start zookeeper
}

# 执行部署
install_zookeeper
configure_zookeeper $1$2
start_zookeeper

 

ZooKeeper作为Kafka的协调服务,需要奇数个节点组成集群以保证高可用性。通过自动化脚本可以快速部署标准化的ZooKeeper环境。

1.2 Kafka集群部署配置

 

#!/bin/bash
# Kafka集群部署脚本
KAFKA_VERSION="2.8.2"
KAFKA_NODES=("192.168.1.11""192.168.1.12""192.168.1.13""192.168.1.14")
KAFKA_DATA_DIR="/data/kafka"
KAFKA_LOG_DIR="/logs/kafka"

# 安装Kafka
install_kafka() {
    cd /tmp
    wget https://archive.apache.org/dist/kafka/2.8.2/kafka_2.13-${KAFKA_VERSION}.tgz
    tar -xzf kafka_2.13-${KAFKA_VERSION}.tgz
    mv kafka_2.13-${KAFKA_VERSION} /opt/kafka
    
    # 创建kafka用户
    useradd -r -s /bin/false kafka
    chown -R kafka:kafka /opt/kafka
    
    # 创建数据目录
    mkdir -p ${KAFKA_DATA_DIR}${KAFKA_LOG_DIR}
    chown -R kafka:kafka ${KAFKA_DATA_DIR}${KAFKA_LOG_DIR}
}

# 生成Kafka服务器配置
generate_kafka_config() {
    local broker_id=$1
    local node_ip=$2
    
    cat > /opt/kafka/config/server.properties << EOF
# 服务器基础配置
broker.id=${broker_id}
listeners=PLAINTEXT://${node_ip}:9092
advertised.listeners=PLAINTEXT://${node_ip}:9092
num.network.threads=8
num.io.threads=16
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600

# 日志配置
log.dirs=${KAFKA_DATA_DIR}
num.partitions=3
num.recovery.threads.per.data.dir=2
offsets.topic.replication.factor=3
transaction.state.log.replication.factor=3
transaction.state.log.min.isr=2
log.retention.hours=168
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000

# ZooKeeper配置
zookeeper.connect=192.168.1.21:2181,192.168.1.22:2181,192.168.1.23:2181/kafka
zookeeper.connection.timeout.ms=18000

# 性能优化配置
replica.fetch.max.bytes=1048576
message.max.bytes=1000000
replica.socket.timeout.ms=30000
replica.socket.receive.buffer.bytes=65536
replica.fetch.wait.max.ms=500
replica.high.watermark.checkpoint.interval.ms=5000
fetch.purgatory.purge.interval.requests=1000
producer.purgatory.purge.interval.requests=1000
delete.topic.enable=true

# JVM配置
export KAFKA_HEAP_OPTS="-Xmx6G -Xms6G"
export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true"
EOF
}

# 创建Kafka系统服务
create_kafka_service() {
    cat > /etc/systemd/system/kafka.service << EOF
[Unit]
Description=Apache Kafka server (broker)
Documentation=http://kafka.apache.org/documentation.html
Requires=network.target remote-fs.target
After=network.target remote-fs.target zookeeper.service

[Service]
Type=simple
User=kafka
Group=kafka
Environment=JAVA_HOME=/usr/lib/jvm/java-11-openjdk
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
ExecStop=/opt/kafka/bin/kafka-server-stop.sh
TimeoutSec=30
Restart=on-failure

[Install]
WantedBy=multi-user.target
EOF

    systemctl daemon-reload
    systemctl enable kafka
    systemctl start kafka
}

# 执行部署
install_kafka
generate_kafka_config $1$2
create_kafka_service

 

2. 生产环境性能优化

2.1 生产者性能调优

 

#!/usr/bin/env python3
# Kafka生产者性能优化配置
from kafka import KafkaProducer
import json
import time
import threading
from concurrent.futures import ThreadPoolExecutor

classOptimizedKafkaProducer:
    def__init__(self, bootstrap_servers, topic):
        self.topic = topic
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            # 性能优化配置
            batch_size=16384,              # 批处理大小
            linger_ms=10,                  # 延迟发送时间
            buffer_memory=33554432,        # 缓冲区大小32MB
            compression_type='snappy',     # 压缩算法
            max_in_flight_requests_per_connection=5,
            retries=3,                     # 重试次数
            retry_backoff_ms=100,
            request_timeout_ms=30000,
            # 序列化配置
            value_serializer=lambda v: json.dumps(v).encode('utf-8'),
            key_serializer=lambda k: str(k).encode('utf-8')
        )
    
    defsend_message_sync(self, key, value):
        """同步发送消息"""
        try:
            future = self.producer.send(self.topic, key=key, value=value)
            record_metadata = future.get(timeout=10)
            return {
                'topic': record_metadata.topic,
                'partition': record_metadata.partition,
                'offset': record_metadata.offset
            }
        except Exception as e:
            print(f"发送消息失败: {e}")
            returnNone
    
    defsend_message_async(self, key, value, callback=None):
        """异步发送消息"""
        try:
            future = self.producer.send(self.topic, key=key, value=value)
            if callback:
                future.add_callback(callback)
            return future
        except Exception as e:
            print(f"发送消息失败: {e}")
            returnNone
    
    defbatch_send_performance_test(self, message_count=100000):
        """批量发送性能测试"""
        start_time = time.time()
        
        # 使用线程池并发发送
        with ThreadPoolExecutor(max_workers=10) as executor:
            futures = []
            for i inrange(message_count):
                message = {
                    'id': i,
                    'timestamp': time.time(),
                    'data': f'test_message_{i}',
                    'source': 'performance_test'
                }
                future = executor.submit(self.send_message_async, str(i), message)
                futures.append(future)
            
            # 等待所有消息发送完成
            for future in futures:
                try:
                    future.result(timeout=30)
                except Exception as e:
                    print(f"消息发送异常: {e}")
        
        # 确保所有消息都发送出去
        self.producer.flush()
        
        end_time = time.time()
        duration = end_time - start_time
        throughput = message_count / duration
        
        print(f"发送 {message_count} 条消息")
        print(f"总耗时: {duration:.2f} 秒")
        print(f"吞吐量: {throughput:.2f} 消息/秒")
        
    defclose(self):
        self.producer.close()

# 使用示例
if __name__ == "__main__":
    producer = OptimizedKafkaProducer(
        bootstrap_servers=['192.168.1.11:9092', '192.168.1.12:9092'],
        topic='performance_test'
    )
    
    # 执行性能测试
    producer.batch_send_performance_test(50000)
    producer.close()

 

2.2 消费者性能优化

 

#!/usr/bin/env python3
# Kafka消费者性能优化配置
from kafka import KafkaConsumer
import json
import time
import threading
from concurrent.futures import ThreadPoolExecutor

classOptimizedKafkaConsumer:
    def__init__(self, topics, group_id, bootstrap_servers):
        self.topics = topics
        self.group_id = group_id
        self.consumer = KafkaConsumer(
            *topics,
            bootstrap_servers=bootstrap_servers,
            group_id=group_id,
            # 性能优化配置
            fetch_min_bytes=1024,          # 最小拉取字节数
            fetch_max_wait_ms=500,         # 最大等待时间
            max_poll_records=500,          # 单次拉取最大记录数
            max_poll_interval_ms=300000,   # 最大轮询间隔
            session_timeout_ms=30000,      # 会话超时时间
            heartbeat_interval_ms=10000,   # 心跳间隔
            # 消费策略
            auto_offset_reset='earliest',
            enable_auto_commit=False,      # 手动提交偏移量
            # 反序列化配置
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            key_deserializer=lambda m: m.decode('utf-8') if m elseNone
        )
        
    defconsume_messages_batch(self, batch_size=100, timeout=5000):
        """批量消费消息"""
        message_batch = []
        
        try:
            # 批量拉取消息
            message_pack = self.consumer.poll(timeout_ms=timeout)
            
            for topic_partition, messages in message_pack.items():
                for message in messages:
                    message_batch.append({
                        'topic': message.topic,
                        'partition': message.partition,
                        'offset': message.offset,
                        'key': message.key,
                        'value': message.value,
                        'timestamp': message.timestamp
                    })
                    
                    iflen(message_batch) >= batch_size:
                        # 处理批量消息
                        self.process_message_batch(message_batch)
                        message_batch = []
            
            # 处理剩余消息
            if message_batch:
                self.process_message_batch(message_batch)
                
            # 手动提交偏移量
            self.consumer.commit()
            
        except Exception as e:
            print(f"消费消息异常: {e}")
            
    defprocess_message_batch(self, messages):
        """批量处理消息"""
        with ThreadPoolExecutor(max_workers=5) as executor:
            futures = []
            for message in messages:
                future = executor.submit(self.process_single_message, message)
                futures.append(future)
            
            # 等待所有消息处理完成
            for future in futures:
                try:
                    future.result(timeout=30)
                except Exception as e:
                    print(f"处理消息异常: {e}")
                    
    defprocess_single_message(self, message):
        """处理单条消息"""
        try:
            # 模拟业务处理
            time.sleep(0.001)
            
            # 记录处理日志
            print(f"处理消息: Topic={message['topic']}, "
                  f"Partition={message['partition']}, "
                  f"Offset={message['offset']}")
                  
        except Exception as e:
            print(f"处理单条消息异常: {e}")
            
    defstart_consuming(self):
        """开始消费消息"""
        print(f"开始消费主题: {self.topics}")
        
        try:
            whileTrue:
                self.consume_messages_batch()
                
        except KeyboardInterrupt:
            print("停止消费")
        finally:
            self.consumer.close()

# 使用示例
if __name__ == "__main__":
    consumer = OptimizedKafkaConsumer(
        topics=['performance_test'],
        group_id='performance_consumer_group',
        bootstrap_servers=['192.168.1.11:9092', '192.168.1.12:9092']
    )
    
    consumer.start_consuming()

 

3. 监控与运维自动化

3.1 Kafka集群监控脚本

 

#!/bin/bash
# Kafka集群监控脚本
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
ALERT_EMAIL="admin@company.com"
LOG_FILE="/var/log/kafka_monitor.log"

# 检查Kafka集群状态
check_kafka_cluster() {
    echo"$(date): 检查Kafka集群状态" >> $LOG_FILE
    
    # 检查broker列表
    broker_list=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server ${KAFKA_BROKERS} 2>/dev/null | grep -c "id:")
    
    if [ "$broker_list" -lt 3 ]; then
        echo"ALERT: Kafka集群可用broker不足: $broker_list" | mail -s "Kafka Cluster Alert"$ALERT_EMAIL
        echo"$(date): ALERT - 可用broker不足: $broker_list" >> $LOG_FILE
    fi
}

# 检查主题状态
check_topic_health() {
    echo"$(date): 检查主题健康状态" >> $LOG_FILE
    
    # 获取主题列表
    topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --list)
    
    for topic in$topics; do
        # 检查主题描述
        topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic $topic)
        
        # 检查是否有离线分区
        offline_partitions=$(echo"$topic_desc" | grep -c "Leader: -1")
        
        if [ "$offline_partitions" -gt 0 ]; then
            echo"ALERT: 主题 $topic 有 $offline_partitions 个离线分区" | mail -s "Kafka Topic Alert"$ALERT_EMAIL
            echo"$(date): ALERT - 主题 $topic 离线分区: $offline_partitions" >> $LOG_FILE
        fi
    done
}

# 检查消费者组延迟
check_consumer_lag() {
    echo"$(date): 检查消费者组延迟" >> $LOG_FILE
    
    # 获取消费者组列表
    consumer_groups=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --list)
    
    for group in$consumer_groups; do
        # 获取消费者组详情
        group_desc=$(${KAFKA_HOME}/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKERS} --describe --group $group)
        
        # 检查延迟
        max_lag=$(echo"$group_desc" | awk 'NR>1 {print $5}' | grep -v "-" | sort -n | tail -1)
        
        if [ -n "$max_lag" ] && [ "$max_lag" -gt 10000 ]; then
            echo"ALERT: 消费者组 $group 最大延迟: $max_lag" | mail -s "Kafka Consumer Lag Alert"$ALERT_EMAIL
            echo"$(date): ALERT - 消费者组 $group 延迟过高: $max_lag" >> $LOG_FILE
        fi
    done
}

# 收集性能指标
collect_metrics() {
    echo"$(date): 收集Kafka性能指标" >> $LOG_FILE
    
    # 收集JVM指标
    for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do
        kafka_pid=$(ssh $broker"pgrep -f kafka")
        if [ -n "$kafka_pid" ]; then
            # 内存使用率
            memory_usage=$(ssh $broker"ps -p $kafka_pid -o %mem --no-headers")
            echo"$(date): Broker $broker 内存使用率: $memory_usage%" >> $LOG_FILE
            
            # CPU使用率
            cpu_usage=$(ssh $broker"ps -p $kafka_pid -o %cpu --no-headers")
            echo"$(date): Broker $broker CPU使用率: $cpu_usage%" >> $LOG_FILE
        fi
    done
}

# 主监控循环
whiletrue; do
    check_kafka_cluster
    check_topic_health
    check_consumer_lag
    collect_metrics
    
    sleep 300  # 5分钟检查一次
done

 

3.2 自动化运维脚本

 

#!/usr/bin/env python3
# Kafka自动化运维脚本
import subprocess
import json
import smtplib
from email.mime.text import MIMEText
from datetime import datetime
import logging

classKafkaOperations:
    def__init__(self, kafka_home, brokers):
        self.kafka_home = kafka_home
        self.brokers = brokers
        self.logger = self.setup_logger()
        
    defsetup_logger(self):
        """设置日志记录"""
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/kafka_operations.log'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)
    
    defcreate_topic(self, topic_name, partitions=3, replication_factor=2):
        """创建主题"""
        try:
            cmd = [
                f"{self.kafka_home}/bin/kafka-topics.sh",
                "--bootstrap-server", self.brokers,
                "--create",
                "--topic", topic_name,
                "--partitions", str(partitions),
                "--replication-factor", str(replication_factor)
            ]
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                self.logger.info(f"成功创建主题: {topic_name}")
                returnTrue
            else:
                self.logger.error(f"创建主题失败: {result.stderr}")
                returnFalse
                
        except Exception as e:
            self.logger.error(f"创建主题异常: {e}")
            returnFalse
    
    defdelete_topic(self, topic_name):
        """删除主题"""
        try:
            cmd = [
                f"{self.kafka_home}/bin/kafka-topics.sh",
                "--bootstrap-server", self.brokers,
                "--delete",
                "--topic", topic_name
            ]
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                self.logger.info(f"成功删除主题: {topic_name}")
                returnTrue
            else:
                self.logger.error(f"删除主题失败: {result.stderr}")
                returnFalse
                
        except Exception as e:
            self.logger.error(f"删除主题异常: {e}")
            returnFalse
    
    defincrease_partitions(self, topic_name, new_partition_count):
        """增加分区数"""
        try:
            cmd = [
                f"{self.kafka_home}/bin/kafka-topics.sh",
                "--bootstrap-server", self.brokers,
                "--alter",
                "--topic", topic_name,
                "--partitions", str(new_partition_count)
            ]
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                self.logger.info(f"成功增加主题 {topic_name} 分区数到 {new_partition_count}")
                returnTrue
            else:
                self.logger.error(f"增加分区失败: {result.stderr}")
                returnFalse
                
        except Exception as e:
            self.logger.error(f"增加分区异常: {e}")
            returnFalse
    
    defrebalance_partitions(self, topic_name):
        """重新平衡分区"""
        try:
            # 生成重平衡计划
            reassignment_file = f"/tmp/reassignment-{topic_name}.json"
            
            # 获取当前分区分配
            cmd_current = [
                f"{self.kafka_home}/bin/kafka-topics.sh",
                "--bootstrap-server", self.brokers,
                "--describe",
                "--topic", topic_name
            ]
            
            current_result = subprocess.run(cmd_current, capture_output=True, text=True)
            
            if current_result.returncode == 0:
                # 生成重平衡计划
                cmd_generate = [
                    f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
                    "--bootstrap-server", self.brokers,
                    "--topics-to-move-json-file", "/tmp/topics.json",
                    "--broker-list", "0,1,2,3",
                    "--generate"
                ]
                
                # 执行重平衡
                cmd_execute = [
                    f"{self.kafka_home}/bin/kafka-reassign-partitions.sh",
                    "--bootstrap-server", self.brokers,
                    "--reassignment-json-file", reassignment_file,
                    "--execute"
                ]
                
                self.logger.info(f"开始重平衡主题: {topic_name}")
                returnTrue
            else:
                self.logger.error(f"获取主题信息失败: {current_result.stderr}")
                returnFalse
                
        except Exception as e:
            self.logger.error(f"重平衡异常: {e}")
            returnFalse
    
    defbackup_consumer_offsets(self, group_id):
        """备份消费者偏移量"""
        try:
            cmd = [
                f"{self.kafka_home}/bin/kafka-consumer-groups.sh",
                "--bootstrap-server", self.brokers,
                "--describe",
                "--group", group_id
            ]
            
            result = subprocess.run(cmd, capture_output=True, text=True)
            
            if result.returncode == 0:
                backup_file = f"/backup/consumer_offsets_{group_id}_{datetime.now().strftime('%Y%m%d_%H%M%S')}.txt"
                withopen(backup_file, 'w') as f:
                    f.write(result.stdout)
                
                self.logger.info(f"成功备份消费者组 {group_id} 偏移量到 {backup_file}")
                returnTrue
            else:
                self.logger.error(f"备份偏移量失败: {result.stderr}")
                returnFalse
                
        except Exception as e:
            self.logger.error(f"备份偏移量异常: {e}")
            returnFalse

# 使用示例
if __name__ == "__main__":
    kafka_ops = KafkaOperations(
        kafka_home="/opt/kafka",
        brokers="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"
    )
    
    # 创建主题
    kafka_ops.create_topic("test_topic", partitions=6, replication_factor=3)
    
    # 增加分区
    kafka_ops.increase_partitions("test_topic", 12)
    
    # 备份消费者偏移量
    kafka_ops.backup_consumer_offsets("test_consumer_group")

 

4. 高可用与故障恢复

4.1 集群健康检查

 

#!/bin/bash
# Kafka集群健康检查与自动恢复
KAFKA_HOME="/opt/kafka"
KAFKA_BROKERS="192.168.1.11:9092,192.168.1.12:9092,192.168.1.13:9092"

# 检查并修复不同步副本
check_and_fix_isr() {
    echo"检查不同步副本..."
    
    # 获取所有主题
    topics=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --list)
    
    for topic in$topics; do
        # 检查主题详情
        topic_desc=$(${KAFKA_HOME}/bin/kafka-topics.sh --bootstrap-server ${KAFKA_BROKERS} --describe --topic $topic)
        
        # 检查ISR不足的分区
        isr_issues=$(echo"$topic_desc" | grep -E "Isr:|Replicas:" | awk '{
            if ($1 == "Replicas:") replicas = NF-1;
            if ($1 == "Isr:") isr = NF-1;
            if (isr < replicas) print "ISR不足"
        }')
        
        if [ -n "$isr_issues" ]; then
            echo"主题 $topic 存在ISR不足问题,尝试修复..."
            
            # 触发首选副本选举
            ${KAFKA_HOME}/bin/kafka-leader-election.sh --bootstrap-server ${KAFKA_BROKERS} --election-type preferred --topic $topic
        fi
    done
}

# 自动故障恢复
auto_recovery() {
    echo"执行自动故障恢复..."
    
    # 重启失败的broker
    for broker in 192.168.1.11 192.168.1.12 192.168.1.13; do
        if ! ssh $broker"systemctl is-active kafka" > /dev/null 2>&1; then
            echo"重启broker: $broker"
            ssh $broker"systemctl restart kafka"
            sleep 30
        fi
    done
    
    # 检查并修复ISR
    check_and_fix_isr
    
    # 验证集群状态
    validate_cluster_state
}

validate_cluster_state() {
    echo"验证集群状态..."
    
    # 检查所有broker是否在线
    online_brokers=$(${KAFKA_HOME}/bin/kafka-broker-api-versions.sh --bootstrap-server ${KAFKA_BROKERS} 2>/dev/null | grep -c "id:")
    
    if [ "$online_brokers" -eq 3 ]; then
        echo"集群恢复正常,所有broker在线"
    else
        echo"集群恢复失败,在线broker数量: $online_brokers"
        return 1
    fi
}

# 执行健康检查和恢复
auto_recovery

 

总结

Kafka生产环境部署涉及多个关键环节:集群架构设计、性能参数调优、监控体系建设、自动化运维等。通过本文介绍的方案,运维工程师可以构建稳定、高效的Kafka集群。关键要点包括:合理的集群规模规划、科学的配置参数调优、完善的监控告警机制、可靠的故障恢复策略。在实际生产环境中,还需要根据具体业务场景进行针对性优化,持续监控和改进系统性能,确保消息队列服务的稳定性和可靠性。

 

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分