代码示例

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/go-redis/redis/v8"
)

func main() {
    ctx := context.Background()

    // 创建Redis客户端
    rdb := redis.NewClient(&redis.Options{
        Addr:         "localhost:6379",
        DialTimeout:  time.Second * 3,   // 连接超时时间
        ReadTimeout:  time.Second * 3,   // 读超时时间
        WriteTimeout: time.Second * 3,   // 写超时时间
        PoolSize:     3,                 // 最大连接池大小
        IdleTimeout:  time.Second * 3,   // 空闲连接的最大存活时间
        MaxConnAge:   time.Second * 300, // 连接最大存活时间,无论连接是否空闲
        PoolTimeout:  time.Second * 30,  // 从连接池中获取连接池超时时间
    })

    // 自动从连接池中获取一个连接,并检查连接
    if _, err := rdb.Ping(ctx).Result(); err != nil {
        log.Fatalf("无法连接到Redis: %v", err)
    }

    // 流键和消费者组名称
    streamKey := "mystream"
    groupName := "mygroup"

    // 创建消费者组(如果不存在)
    _, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "<pre><code class="language-go">package main

import (
    &quot;context&quot;
    &quot;fmt&quot;
    &quot;log&quot;
    &quot;time&quot;

    &quot;github.com/go-redis/redis/v8&quot;
)

func main() {
    ctx := context.Background()

    // 创建Redis客户端
    rdb := redis.NewClient(&amp;redis.Options{
        Addr:         &quot;localhost:6379&quot;,
        DialTimeout:  time.Second * 3,   // 连接超时时间
        ReadTimeout:  time.Second * 3,   // 读超时时间
        WriteTimeout: time.Second * 3,   // 写超时时间
        PoolSize:     3,                 // 最大连接池大小
        IdleTimeout:  time.Second * 3,   // 空闲连接的最大存活时间
        MaxConnAge:   time.Second * 300, // 连接最大存活时间,无论连接是否空闲
        PoolTimeout:  time.Second * 30,  // 从连接池中获取连接池超时时间
    })

    // 自动从连接池中获取一个连接,并检查连接
    if _, err := rdb.Ping(ctx).Result(); err != nil {
        log.Fatalf(&quot;无法连接到Redis: %v&quot;, err)
    }

    // 流键和消费者组名称
    streamKey := &quot;mystream&quot;
    groupName := &quot;mygroup&quot;

    // 创建消费者组(如果不存在)
    _, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, &quot;<article>
    <header>
        <h1>代码示例</h1>
        <time datetime="2025-09-17T13:31:12.000Z">2025年9月17日</time>
        <div class="tags">
            
        </div>
    </header>
    <section class="content">
        <pre><code class="language-go">package main

import (
    &quot;context&quot;
    &quot;fmt&quot;
    &quot;log&quot;
    &quot;time&quot;

    &quot;github.com/go-redis/redis/v8&quot;
)

func main() {
    ctx := context.Background()

    // 创建Redis客户端
    rdb := redis.NewClient(&amp;redis.Options{
        Addr:         &quot;localhost:6379&quot;,
        DialTimeout:  time.Second * 3,   // 连接超时时间
        ReadTimeout:  time.Second * 3,   // 读超时时间
        WriteTimeout: time.Second * 3,   // 写超时时间
        PoolSize:     3,                 // 最大连接池大小
        IdleTimeout:  time.Second * 3,   // 空闲连接的最大存活时间
        MaxConnAge:   time.Second * 300, // 连接最大存活时间,无论连接是否空闲
        PoolTimeout:  time.Second * 30,  // 从连接池中获取连接池超时时间
    })

    // 自动从连接池中获取一个连接,并检查连接
    if _, err := rdb.Ping(ctx).Result(); err != nil {
        log.Fatalf(&quot;无法连接到Redis: %v&quot;, err)
    }

    // 流键和消费者组名称
    streamKey := &quot;mystream&quot;
    groupName := &quot;mygroup&quot;

    // 创建消费者组(如果不存在)
    _, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, &quot;&lt;pre&gt;&lt;code class=&quot;language-go&quot;&gt;package main

import (
    &amp;quot;context&amp;quot;
    &amp;quot;fmt&amp;quot;
    &amp;quot;log&amp;quot;
    &amp;quot;time&amp;quot;

    &amp;quot;github.com/go-redis/redis/v8&amp;quot;
)

func main() {
    ctx := context.Background()

    // 创建Redis客户端
    rdb := redis.NewClient(&amp;amp;redis.Options{
        Addr:         &amp;quot;localhost:6379&amp;quot;,
        DialTimeout:  time.Second * 3,   // 连接超时时间
        ReadTimeout:  time.Second * 3,   // 读超时时间
        WriteTimeout: time.Second * 3,   // 写超时时间
        PoolSize:     3,                 // 最大连接池大小
        IdleTimeout:  time.Second * 3,   // 空闲连接的最大存活时间
        MaxConnAge:   time.Second * 300, // 连接最大存活时间,无论连接是否空闲
        PoolTimeout:  time.Second * 30,  // 从连接池中获取连接池超时时间
    })

    // 自动从连接池中获取一个连接,并检查连接
    if _, err := rdb.Ping(ctx).Result(); err != nil {
        log.Fatalf(&amp;quot;无法连接到Redis: %v&amp;quot;, err)
    }

    // 流键和消费者组名称
    streamKey := &amp;quot;mystream&amp;quot;
    groupName := &amp;quot;mygroup&amp;quot;

    // 创建消费者组(如果不存在)
    _, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, &amp;quot;$&amp;quot;).Result()
    if err != nil &amp;amp;&amp;amp; err.Error() != &amp;quot;BUSYGROUP Consumer Group name already exists&amp;quot; {
        log.Fatalf(&amp;quot;创建消费者组失败: %v&amp;quot;, err)
    }

    // 启动多个消费者
    go consumer(ctx, rdb, streamKey, groupName, &amp;quot;consumer-1&amp;quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &amp;quot;consumer-2&amp;quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &amp;quot;consumer-3&amp;quot;, 2)

    // 生产消息
    go producer(ctx, rdb, streamKey)

    // 保持主程序运行
    select {}
}

// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
    for {
        // 生成消息
        message := map[string]interface{}{
            &amp;quot;timestamp&amp;quot;: time.Now().String(),
            &amp;quot;message&amp;quot;:   fmt.Sprintf(&amp;quot;消息 %d&amp;quot;, time.Now().UnixNano()),
        }

        // 发送消息到流
        id, err := rdb.XAdd(ctx, &amp;amp;redis.XAddArgs{
            Stream: streamKey,
            Values: message,
        }).Result()

        if err != nil {
            log.Printf(&amp;quot;发送消息失败: %v&amp;quot;, err)
        } else {
            log.Printf(&amp;quot;发送消息成功,ID: %s&amp;quot;, id)
        }

        time.Sleep(1 * time.Second)
    }
}

// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
    for i := 0; i &amp;lt; maxMsg; i++ {
        // 从流中读取消息
        streams, err := rdb.XReadGroup(ctx, &amp;amp;redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, &amp;quot;&amp;gt;&amp;quot;}, // &amp;quot;&amp;gt;&amp;quot; 表示只获取从未分配给其他消费者的新消息
            Count:    10,                       // 每次最多获取10条消息
            Block:    0,                        // 不阻塞,立即返回
        }).Result()

        if err != nil {
            log.Printf(&amp;quot;消费者 %s 读取消息失败: %v&amp;quot;, consumerName, err)
            time.Sleep(1 * time.Second)
            continue
        }

        // 处理消息
        for _, stream := range streams {
            for _, message := range stream.Messages {
                log.Printf(&amp;quot;消费者 %s 收到消息,ID: %s,内容: %v&amp;quot;, consumerName, message.ID, message.Values)

                // 模拟处理时间
                time.Sleep(200 * time.Millisecond)

                // 确认消息处理完成
                if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
                    log.Printf(&amp;quot;消费者 %s 确认消息失败: %v&amp;quot;, consumerName, err)
                } else {
                    log.Printf(&amp;quot;消费者 %s 确认消息成功,ID: %s&amp;quot;, consumerName, message.ID)
                }
            }
        }

        // 短暂休眠,避免CPU占用过高
        time.Sleep(100 * time.Millisecond)
    }
}
&lt;/code&gt;&lt;/pre&gt;
quot;).Result()
    if err != nil &amp;&amp; err.Error() != &quot;BUSYGROUP Consumer Group name already exists&quot; {
        log.Fatalf(&quot;创建消费者组失败: %v&quot;, err)
    }

    // 启动多个消费者
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-1&quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-2&quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-3&quot;, 2)

    // 生产消息
    go producer(ctx, rdb, streamKey)

    // 保持主程序运行
    select {}
}

// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
    for {
        // 生成消息
        message := map[string]interface{}{
            &quot;timestamp&quot;: time.Now().String(),
            &quot;message&quot;:   fmt.Sprintf(&quot;消息 %d&quot;, time.Now().UnixNano()),
        }

        // 发送消息到流
        id, err := rdb.XAdd(ctx, &amp;redis.XAddArgs{
            Stream: streamKey,
            Values: message,
        }).Result()

        if err != nil {
            log.Printf(&quot;发送消息失败: %v&quot;, err)
        } else {
            log.Printf(&quot;发送消息成功,ID: %s&quot;, id)
        }

        time.Sleep(1 * time.Second)
    }
}

// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
    for i := 0; i &lt; maxMsg; i++ {
        // 从流中读取消息
        streams, err := rdb.XReadGroup(ctx, &amp;redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, &quot;&gt;&quot;}, // &quot;&gt;&quot; 表示只获取从未分配给其他消费者的新消息
            Count:    10,                       // 每次最多获取10条消息
            Block:    0,                        // 不阻塞,立即返回
        }).Result()

        if err != nil {
            log.Printf(&quot;消费者 %s 读取消息失败: %v&quot;, consumerName, err)
            time.Sleep(1 * time.Second)
            continue
        }

        // 处理消息
        for _, stream := range streams {
            for _, message := range stream.Messages {
                log.Printf(&quot;消费者 %s 收到消息,ID: %s,内容: %v&quot;, consumerName, message.ID, message.Values)

                // 模拟处理时间
                time.Sleep(200 * time.Millisecond)

                // 确认消息处理完成
                if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
                    log.Printf(&quot;消费者 %s 确认消息失败: %v&quot;, consumerName, err)
                } else {
                    log.Printf(&quot;消费者 %s 确认消息成功,ID: %s&quot;, consumerName, message.ID)
                }
            }
        }

        // 短暂休眠,避免CPU占用过高
        time.Sleep(100 * time.Millisecond)
    }
}
</code></pre>

    </section>
</article>amp;quot;).Result()
    if err != nil &amp;&amp; err.Error() != &quot;BUSYGROUP Consumer Group name already exists&quot; {
        log.Fatalf(&quot;创建消费者组失败: %v&quot;, err)
    }

    // 启动多个消费者
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-1&quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-2&quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-3&quot;, 2)

    // 生产消息
    go producer(ctx, rdb, streamKey)

    // 保持主程序运行
    select {}
}

// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
    for {
        // 生成消息
        message := map[string]interface{}{
            &quot;timestamp&quot;: time.Now().String(),
            &quot;message&quot;:   fmt.Sprintf(&quot;消息 %d&quot;, time.Now().UnixNano()),
        }

        // 发送消息到流
        id, err := rdb.XAdd(ctx, &amp;redis.XAddArgs{
            Stream: streamKey,
            Values: message,
        }).Result()

        if err != nil {
            log.Printf(&quot;发送消息失败: %v&quot;, err)
        } else {
            log.Printf(&quot;发送消息成功,ID: %s&quot;, id)
        }

        time.Sleep(1 * time.Second)
    }
}

// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
    for i := 0; i &lt; maxMsg; i++ {
        // 从流中读取消息
        streams, err := rdb.XReadGroup(ctx, &amp;redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, &quot;&gt;&quot;}, // &quot;&gt;&quot; 表示只获取从未分配给其他消费者的新消息
            Count:    10,                       // 每次最多获取10条消息
            Block:    0,                        // 不阻塞,立即返回
        }).Result()

        if err != nil {
            log.Printf(&quot;消费者 %s 读取消息失败: %v&quot;, consumerName, err)
            time.Sleep(1 * time.Second)
            continue
        }

        // 处理消息
        for _, stream := range streams {
            for _, message := range stream.Messages {
                log.Printf(&quot;消费者 %s 收到消息,ID: %s,内容: %v&quot;, consumerName, message.ID, message.Values)

                // 模拟处理时间
                time.Sleep(200 * time.Millisecond)

                // 确认消息处理完成
                if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
                    log.Printf(&quot;消费者 %s 确认消息失败: %v&quot;, consumerName, err)
                } else {
                    log.Printf(&quot;消费者 %s 确认消息成功,ID: %s&quot;, consumerName, message.ID)
                }
            }
        }

        // 短暂休眠,避免CPU占用过高
        time.Sleep(100 * time.Millisecond)
    }
}
</code></pre>
quot;).Result()
    if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
        log.Fatalf("创建消费者组失败: %v", err)
    }

    // 启动多个消费者
    go consumer(ctx, rdb, streamKey, groupName, "consumer-1", 10)
    go consumer(ctx, rdb, streamKey, groupName, "consumer-2", 10)
    go consumer(ctx, rdb, streamKey, groupName, "consumer-3", 2)

    // 生产消息
    go producer(ctx, rdb, streamKey)

    // 保持主程序运行
    select {}
}

// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
    for {
        // 生成消息
        message := map[string]interface{}{
            "timestamp": time.Now().String(),
            "message":   fmt.Sprintf("消息 %d", time.Now().UnixNano()),
        }

        // 发送消息到流
        id, err := rdb.XAdd(ctx, &redis.XAddArgs{
            Stream: streamKey,
            Values: message,
        }).Result()

        if err != nil {
            log.Printf("发送消息失败: %v", err)
        } else {
            log.Printf("发送消息成功,ID: %s", id)
        }

        time.Sleep(1 * time.Second)
    }
}

// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
    for i := 0; i < maxMsg; i++ {
        // 从流中读取消息
        streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, ">"}, // ">" 表示只获取从未分配给其他消费者的新消息
            Count:    10,                       // 每次最多获取10条消息
            Block:    0,                        // 不阻塞,立即返回
        }).Result()

        if err != nil {
            log.Printf("消费者 %s 读取消息失败: %v", consumerName, err)
            time.Sleep(1 * time.Second)
            continue
        }

        // 处理消息
        for _, stream := range streams {
            for _, message := range stream.Messages {
                log.Printf("消费者 %s 收到消息,ID: %s,内容: %v", consumerName, message.ID, message.Values)

                // 模拟处理时间
                time.Sleep(200 * time.Millisecond)

                // 确认消息处理完成
                if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
                    log.Printf("消费者 %s 确认消息失败: %v", consumerName, err)
                } else {
                    log.Printf("消费者 %s 确认消息成功,ID: %s", consumerName, message.ID)
                }
            }
        }

        // 短暂休眠,避免CPU占用过高
        time.Sleep(100 * time.Millisecond)
    }
}