golang中使用kafka的综合指南

描述

 

介绍

kafka是一个比较流行的分布式、可拓展、高性能、可靠的流处理平台。在处理kafka的数据时,这里有确保处理效率和可靠性的多种最佳实践。本文将介绍这几种实践方式,并通过sarama实现他们。

以下是一些kafka消费的最佳实践:

选择合适的提交策略:Kafka提供两种提交策略,自动和手动。虽然自动操作很容易使用,但它可能会导致数据丢失或重复。手动提交提供了更高级别的控制,确保消息至少处理一次或恰好一次,具体取决于用例。

尽可能减少Kafka的传输次数:大批量读取消息可以显著提高吞吐量。这可以通过调整 fetch.min.bytes 和 fetch.max.wait.ms 等参数来实现。

尽可能使用消费者组:Kafka允许多个消费者组成一个消费者组来并行消费数据。这使得 Kafka 能够将数据分发给一个组中的所有消费者,从而实现高效的数据消费。

调整消费者缓冲区大小:通过调整消费者的缓冲区大小,如 receive.buffer.bytes 和 max.partition.fetch.bytes,可以根据消息的预期大小和消费者的内存容量进行调整。这可以提高消费者的表现。

处理rebalance:当新的消费者加入消费者组,或者现有的消费者离开时,Kafka会触发rebalance以重新分配负载。在此过程中,消费者停止消费数据。因此,快速有效地处理重新平衡可以提高整体吞吐量。

监控消费者:使用 Kafka 的消费者指标来监控消费者的性能。定期监控可以帮助我们识别性能瓶颈并调整消费者的配置。

选择合适的提交策略

1.自动提交

Sarama 的 ConsumerGroup 默认情况下会自动提交偏移量。这意味着它会定期提交已成功消费的消息的偏移量,这允许消费者在重新启动或消费失败时从中断的地方继续。

下面是一个自动提交的消费者组消费消息的例子:

 


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = true  
config.Consumer.Offsets.AutoCommit.Interval = 1 * time.Second  
  
ConsumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Panicf( "创建消费者组客户端时出错: %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume(ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "来自消费者的错误: %v" , err)  
    }  
}

 

根据config.Consumer.Offsets.AutoCommit.Interval 可以看到,消费者会每秒自动提交offset。

2. 手动提交

手动提交使我们更好地控制何时提交消息偏移量。下面是一个手动提交的消费者组消费消息的例子:

 


config := sarama.NewConfig()  
config.Version = sarama.V2_0_0_0 
config.Consumer.Offsets.AutoCommit.Enable = false 
  
consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID , config)  
if err != nil {  
    log.Panicf( "创建消费者组客户端时出错: %v" , err)  
}  
  
Consumer := Consumer{}  
ctx := context.Background()  
  
for {  
    err := ConsumerGroup.Consume( ctx, [] string {topic}, Consumer)  
    if err != nil {  
        log.Panicf( "Error from Consumer: %v" , err)  
    }  
}  
  


type Consumer struct {}  
  
func (consumer Consumer) Setup (_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) Cleanup(_ sarama.ConsumerGroupSession) error { return nil }  
func (consumer Consumer) ConsumeClaim(sess sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error {  
    for msg : = range Claim.Messages() {  
        
        fmt.Printf( "Message topic:%q partition:%d offset:%d
" , msg.Topic, msg.Partition, msg.Offset)  


        
        sess.MarkMessage(msg, "" )  
    }  
    return nil  
}

 

在该示例中, 使用MarkMessage手动将消息标记为已处理,最终根据Consumer.Offsets.CommitInterval 配置提交。另外这个例子省略了错误处理部分,开发时需要注意正确处理生产过程中出现的错误。

译者注:这篇文章虽然是今年5月发布,但是这里的提交方式还是有些过时了,目前sarama已经废弃了Consumer.Offsets.CommitInterval,相关配置目前在 Consumer.Offsets.AutoCommit

尽可能减少Kafka的传输次数

减少kafka的传输次数可以通过优化从kafka中读取和写入数据的方式来实现:

1. 增加批次的大小

使用kafka批量发送消息的效果优于逐个发送消息,批次越大,kafka发送数据效率就越高。但是需要权衡延迟和吞吐量之间的关系。较大的批次虽然代表着更大的吞吐量,但也会增加延迟。因为批次越大,填充批次的时间也越久。

在Go中,我们可以在使用sarama包生成消息时设置批次大小:

 


config := sarama.NewConfig()  
config.Producer.Flush.Bytes = 1024 * 1024

 

以及获取消息的批次大小

 


config := sarama.NewConfig()  
config.Consumer.Fetch.Default = 1024 * 1024

 

2. 使用长轮询

长轮询是指消费者轮询时如果Kafka中没有数据,则消费者将等待数据到达。这减少了往返次数,因为消费者不需要在没有数据时不断请求数据。

 


config := sarama.NewConfig() 
config .Consumer.MaxWaitTime = 500 *time.Millisecond

 

该配置告诉消费者在返回之前会等待500毫秒

3. 尽可能使用消费者组

消费者组是一组协同工作消费来自kafka主题的消息的消费者。消费者组允许我们在多个消费者之间分配消息,从而提供横向拓展能力。使用消费者组时,kafka负责将分区分配给组中的消费者,并确保每个分区同时仅被一个消费者消费。

接下来是sarama中消费者组的使用:

使用消费者组需要实现一个ConsumerGroupHandler接口:

该接口具有三个方法: Setup、Cleanup、 和ConsumeClaim

 


type exampleConsumerGroupHandler struct { 
} 


func  (h *exampleConsumerGroupHandler) Setup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) Cleanup(_ sarama.ConsumerGroupSession) error { 
    
    return  nil
 } 


func  (h *exampleConsumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, Claim sarama.ConsumerGroupClaim) error { 
    for message := range Claim.Messages() { 
        
        fmt.Printf( "Message: %s
" , string (message.Value)) 


        
        session.MarkMessage(message, "" ) 
    }
    返回 nil
 }

 

创建 sarama.ConsumerGroup并开始消费:

 


brokers := []string{"localhost:9092"} 
topic := "example_topic"  
groupID := "example_consumer_group"  
  


consumerGroup, err := sarama.NewConsumerGroup(brokers, groupID, config)  
if err != nil {  
    log.Fatalf("Error creating consumer group: %v", err)  
}  
defer consumerGroup.Close()  
  


handler := &exampleConsumerGroupHandler{}  
  


for {  
    err := consumerGroup.Consume(context.Background(), []string{topic}, handler)  
    if err != nil {  
        log.Printf("Error consuming messages: %v", err)  
    }  
}

 

该示例设置了一个消费组,用于消费来自“example_topic”的消息。消费者组可以通过添加更多消费者来提高处理能力。

使用消费者组时,记得处理消费期间rebalance和错误。

调整消费者缓冲区大小

在sarama中,我们可以调整消费者缓冲区的大小,以调整消费者在处理消息之前可以在内存中保存的消息数量。

默认情况下,缓冲区大小设置为256,这代表Sarama在开始处理消息之前将在内存中保存最多256条消息。如果消费者速度很慢,增加缓冲区大小可能有助于提高吞吐量。但是,更大的缓冲区也会消耗更多的内存。

以下是如何增加缓冲区大小的例子:

 


config := sarama.NewConfig()  
config.Consumer.Return.Errors = true  
config.Version = sarama.V2_1_0_0  
config.Consumer.Offsets.Initial = sarama.OffsetOldest  


config.ChannelBufferSize = 500  
  


group, err := sarama.NewConsumerGroup([]string{broker}, groupID, config)  
if err != nil {  
    panic(err)  
}  
  


ctx := context.Background()  
for {  
    topics := []string{topic}  
    handler := exampleConsumerGroupHandler{}  


    err := group.Consume(ctx, topics, &handler)  
    if err != nil {  
        panic(err)  
    }  
}

 

处理rebalance

当新消费者添加到消费者组或现有消费者离开消费者组时,kafka会重新平衡该组中的消费者。rebalance是kafka确保消费者组中的所有消费者不会消费同一分区的保证。

在sarama中,处理rebalance是通过 Setup 和CleanUp函数来完成的。

通过正确处理重新平衡事件,您可以确保应用程序正常处理消费者组的更改,例如消费者离开或加入,并且在这些事件期间不会丢失或处理两次消息。

译者注:其实更重要的是在ConsumeClaim函数在通道关闭时尽早退出,才能正确的进入CleanUp函数。

监控消费者

监控Kafka消费者对于确保系统的健康和性能至关重要,我们需要时刻关注延迟、处理时间和错误率的指标。

Golang没有内置对 Kafka 监控的支持,但有几个库和工具可以帮助我们。让我们看一下其中的一些:

Sarama的Metrics:Sarama 提供了一个指标注册表,它报告了有助于监控的各种指标,例如请求、响应的数量、请求和响应的大小等。这些指标可以使用 Prometheus 等监控系统来收集和监控。

JMX Exporter:如果您在 JVM 上运行 Kafka, 则可以使用 JMX Exporter 将kafka的 MBeans 发送给Prometheus

Kafka Exporter:Kafka Exporter是一个第三方工具,可以提供有关Kafka的更详细的指标。它可以提供消费者组延迟,这是消费kafka消息时要监控的关键指标。

Jaeger 或 OpenTelemetry:这些工具可用于分布式追踪,这有助于追踪消息如何流经系统以及可能出现瓶颈的位置。

日志:时刻关注应用程序日志,记录消费者中的任何错误或异常行为。这些日志可以帮助我们诊断问题。

消费者组命令, 可以使用kafka-consumer-groups 命令来描述消费者组的状态。

请记住,不仅要追踪这些指标,还要针对任何需要关注的场景设置警报。通过这些方法,我们可以在问题还在初始阶段时快速做出响应。

以上工作有助于确保使用kafka的应用程序健壮、可靠且高效。

  审核编辑:汤梓红

打开APP阅读更多精彩内容
声明:本文内容及配图由入驻作者撰写或者入驻合作网站授权转载。文章观点仅代表作者本人,不代表电子发烧友网立场。文章及其配图仅供工程师学习之用,如有内容侵权或者其他违规问题,请联系本站处理。 举报投诉

全部0条评论

快来发表一下你的评论吧 !

×
20
完善资料,
赚取积分