Kafka

package main

import (
    "context"
    "fmt"
    "log"
    "math/rand"
    "os"
    "os/signal"
    "strconv"
    "sync"
    "syscall"
    "time"

    "github.com/IBM/sarama"
)

const (
    kafka_url         = "dbconn.sealosbja.site:34337"
    topic             = "topic_test1"
    numPartitions     = 4
    consumerGroupName = "my-test-group"
)

var (
    signals = make(chan os.Signal, 1)
)

func main() {
    signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
    ctx, cancel := context.WithCancel(context.Background())

    CreateTopic()
    wg := sync.WaitGroup{}
    wg.Add(1)
    go SyncProducer(ctx, &wg)
    wg.Add(1)
    //go SaramaConsumer(ctx, &wg)
    go SaramaConsumerGroup(ctx, &wg)

    <-signals
    cancel()

    wg.Wait()
    log.Println("exit main...")
}

func CreateTopic() error {
    config := sarama.NewConfig()

    admin, err := sarama.NewClusterAdmin([]string{kafka_url}, config)
    if err != nil {
        return err
    }
    defer admin.Close()

    topicDetail := &sarama.TopicDetail{
        NumPartitions: numPartitions,
    }

    admin.CreateTopic(topic, topicDetail, false)
    meta, _ := admin.DescribeTopics([]string{topic})
    log.Print(meta)
    return nil
}

type consumerGroupHandler struct {
}

func (consumerGroupHandler) Setup(sarama.ConsumerGroupSession) error {
    log.Printf("do something before every batch consume...")
    return nil
}

func (consumerGroupHandler) Cleanup(sarama.ConsumerGroupSession) error {
    log.Printf("do something after leave group...")
    return nil
}

// ConsumeClaim must start a consumer loop of ConsumerGroupClaim's Messages().
func (consumerGroupHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    for msg := range claim.Messages() {
        // 在内存中标记消息的处理状态,通常在处理每条消息时调用
        session.MarkMessage(msg, "")
        log.Printf("Message claimed: key = %s, value = %s, timestamp = %v, topic = %s", msg.Key, msg.Value, msg.Timestamp, msg.Topic)
    }

    // 将内存中的标记信息(即消费位移)提交给 Kafka 集群进行持久化。一般在处理完一批消息或者在特定的时机(如消费组会话结束时)调用
    session.Commit()
    return nil
}

// 消费者组
func SaramaConsumerGroup(ctx context.Context, mainWg *sync.WaitGroup) {
    defer mainWg.Done()
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = false
    config.Consumer.Offsets.Initial = sarama.OffsetOldest // 未找到组消费记录时从,最早的消息开始消费
    config.Consumer.Offsets.AutoCommit.Enable = false     // 禁用自动提交

    group, err := sarama.NewConsumerGroup([]string{kafka_url}, consumerGroupName, config)
    if err != nil {
        panic(err)
    }
    defer group.Close()

    // Track errors
    go func() {
        for err := range group.Errors() {
            fmt.Println("ERROR", err)
        }
    }()
    fmt.Println("Consumed start")
    // Iterate over consumer sessions.
    topics := []string{topic}
    handler := consumerGroupHandler{}
    for {
        select {
        case <-ctx.Done():
            return
        default:
            // `Consume` should be called inside an infinite loop, when a
            // server-side rebalance happens, the consumer session will need to be
            // recreated to get the new claims
            err := group.Consume(ctx, topics, handler)
            if err != nil {
                panic(err)
            }
        }
    }
}

func SyncProducer(mainCtx context.Context, mainWg *sync.WaitGroup) {
    defer mainWg.Done()
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true                 // 成功交付的消息将在success channel返回. producer.Successes()
    config.Producer.Return.Errors = true                    // 失败交付的消息将在success channel返回. producer.Errors()
    config.Producer.Partitioner = sarama.NewHashPartitioner // 使用key hash选择partition
    producer, err := sarama.NewAsyncProducer([]string{kafka_url}, config)
    if err != nil {
        panic(err)
    }

    var (
        produceWg                                   sync.WaitGroup
        enqueued, producerSuccesses, producerErrors int
    )

    produceWg.Add(1)
    go func() {
        defer func() {
            produceWg.Done()
            log.Printf("exit success count...")
        }()
        for range producer.Successes() {
            producerSuccesses++
        }
    }()

    produceWg.Add(1)
    go func() {
        defer func() {
            produceWg.Done()
            log.Printf("exit error count...")
        }()
        for err := range producer.Errors() {
            log.Println(err)
            producerErrors++
        }
    }()

ProducerLoop:
    for {
        randomNum := rand.Intn(100000) + 1
        message := &sarama.ProducerMessage{Topic: topic, Key: sarama.StringEncoder(strconv.Itoa(randomNum)), Value: sarama.StringEncoder("This is a test message.")}
        select {
        case producer.Input() <- message:
            enqueued++
            time.Sleep(3 * time.Second)
        case <-mainCtx.Done():
            if err := producer.Close(); err != nil {
                log.Fatalln(err)
            } else {
                log.Printf("producer closed...")
            }
            log.Println("exit produece loop..")
            break ProducerLoop
        }
    }

    produceWg.Wait()

    log.Printf("All messages: %d; successfully produced: %d; errors: %d\n", enqueued, producerSuccesses, producerErrors)
}

func SaramaConsumer(mainCtx context.Context, mainWg *sync.WaitGroup) {
    defer mainWg.Done()
    consumer, err := sarama.NewConsumer([]string{kafka_url}, sarama.NewConfig())
    if err != nil {
        panic(err)
    }

    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalln(err)
        }
    }()

    partitions, err := consumer.Partitions(topic)
    if err != nil {
        log.Fatalf("Failed to get partitions: %v", err)
    }

    var consumerWg sync.WaitGroup
    for _, partition := range partitions {
        currentPartition := partition
        consumerWg.Add(1)
        go func(ctx context.Context, partition int32, wg *sync.WaitGroup) {
            defer consumerWg.Done()
            partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetNewest)
            if err != nil {
                panic(err)
            }

            consumed := 0
            defer func() {
                if err := partitionConsumer.Close(); err != nil {
                    log.Fatalln(err)
                } else {
                    log.Printf("consumer %d closed...", partition)
                }
            }()

            for {
                select {
                case msg := <-partitionConsumer.Messages():
                    log.Printf("Consumer %d consumes message, key: %s, value: %s, offset: %d\n", partition, msg.Key, msg.Value, msg.Offset)
                    consumed++
                case <-ctx.Done():
                    log.Printf("Consumed: %d\n", consumed)
                    return
                }
            }
        }(mainCtx, currentPartition, &consumerWg)
    }

    <-mainCtx.Done()
    consumerWg.Wait()
    log.Printf("exit consume...")
}