代码示例

package main

import (
    "context"
    "fmt"
    "log"
    "time"

    "github.com/go-redis/redis/v8"
)

func main() {
    ctx := context.Background()

    // 创建Redis客户端
    rdb := redis.NewClient(&redis.Options{
        Addr:         "localhost:6379",
        DialTimeout:  time.Second * 3,   // 连接超时时间
        ReadTimeout:  time.Second * 3,   // 读超时时间
        WriteTimeout: time.Second * 3,   // 写超时时间
        PoolSize:     3,                 // 最大连接池大小
        IdleTimeout:  time.Second * 3,   // 空闲连接的最大存活时间
        MaxConnAge:   time.Second * 300, // 连接最大存活时间,无论连接是否空闲
        PoolTimeout:  time.Second * 30,  // 从连接池中获取连接池超时时间
    })

    // 自动从连接池中获取一个连接,并检查连接
    if _, err := rdb.Ping(ctx).Result(); err != nil {
        log.Fatalf("无法连接到Redis: %v", err)
    }

    // 流键和消费者组名称
    streamKey := "mystream"
    groupName := "mygroup"

    // 创建消费者组(如果不存在)
    _, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "<pre><code class="language-go">package main

import (
    &quot;context&quot;
    &quot;fmt&quot;
    &quot;log&quot;
    &quot;time&quot;

    &quot;github.com/go-redis/redis/v8&quot;
)

func main() {
    ctx := context.Background()

    // 创建Redis客户端
    rdb := redis.NewClient(&amp;redis.Options{
        Addr:         &quot;localhost:6379&quot;,
        DialTimeout:  time.Second * 3,   // 连接超时时间
        ReadTimeout:  time.Second * 3,   // 读超时时间
        WriteTimeout: time.Second * 3,   // 写超时时间
        PoolSize:     3,                 // 最大连接池大小
        IdleTimeout:  time.Second * 3,   // 空闲连接的最大存活时间
        MaxConnAge:   time.Second * 300, // 连接最大存活时间,无论连接是否空闲
        PoolTimeout:  time.Second * 30,  // 从连接池中获取连接池超时时间
    })

    // 自动从连接池中获取一个连接,并检查连接
    if _, err := rdb.Ping(ctx).Result(); err != nil {
        log.Fatalf(&quot;无法连接到Redis: %v&quot;, err)
    }

    // 流键和消费者组名称
    streamKey := &quot;mystream&quot;
    groupName := &quot;mygroup&quot;

    // 创建消费者组(如果不存在)
    _, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, &quot;<div class="article-page-layout">
    <div class="article-main-column">
        <article>
            <header>
                <div class="article-header-wrapper">
                    <h1>代码示例</h1>
                    <div class="article-header-actions">
                        <button type="button"
                                class="article-presenter-btn"
                                data-presenter-toggle
                                title="Presenter mode"
                                aria-label="Open presenter mode">
                            <svg width="18" height="18" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
                                <rect x="2" y="4" width="20" height="14" rx="2"></rect>
                                <path d="M8 20h8"></path>
                                <path d="M12 18v2"></path>
                            </svg>
                            <span>Presenter</span>
                        </button>
                        <a href="assets/markdown/代码示例.md"
                           download="代码示例.md"
                           class="markdown-export-btn"
                           title="Download Markdown source"
                           aria-label="Export Markdown">
                            <svg width="20" height="20" viewBox="0 0 24 24" fill="none" stroke="currentColor" stroke-width="2" stroke-linecap="round" stroke-linejoin="round">
                                <path d="M21 15v4a2 2 0 0 1-2 2H5a2 2 0 0 1-2-2v-4"></path>
                                <polyline points="7 10 12 15 17 10"></polyline>
                                <line x1="12" y1="15" x2="12" y2="3"></line>
                            </svg>
                        </a>
                    </div>
                </div>
                <time datetime="2025-09-17T13:31:12.000Z">2025年9月17日</time>
                <div class="tags">
                    
                </div>
            </header>
            <section class="content">
                <pre><code class="language-go">package main

import (
    &quot;context&quot;
    &quot;fmt&quot;
    &quot;log&quot;
    &quot;time&quot;

    &quot;github.com/go-redis/redis/v8&quot;
)

func main() {
    ctx := context.Background()

    // 创建Redis客户端
    rdb := redis.NewClient(&amp;redis.Options{
        Addr:         &quot;localhost:6379&quot;,
        DialTimeout:  time.Second * 3,   // 连接超时时间
        ReadTimeout:  time.Second * 3,   // 读超时时间
        WriteTimeout: time.Second * 3,   // 写超时时间
        PoolSize:     3,                 // 最大连接池大小
        IdleTimeout:  time.Second * 3,   // 空闲连接的最大存活时间
        MaxConnAge:   time.Second * 300, // 连接最大存活时间,无论连接是否空闲
        PoolTimeout:  time.Second * 30,  // 从连接池中获取连接池超时时间
    })

    // 自动从连接池中获取一个连接,并检查连接
    if _, err := rdb.Ping(ctx).Result(); err != nil {
        log.Fatalf(&quot;无法连接到Redis: %v&quot;, err)
    }

    // 流键和消费者组名称
    streamKey := &quot;mystream&quot;
    groupName := &quot;mygroup&quot;

    // 创建消费者组(如果不存在)
    _, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, &quot;&lt;pre&gt;&lt;code class=&quot;language-go&quot;&gt;package main

import (
    &amp;quot;context&amp;quot;
    &amp;quot;fmt&amp;quot;
    &amp;quot;log&amp;quot;
    &amp;quot;time&amp;quot;

    &amp;quot;github.com/go-redis/redis/v8&amp;quot;
)

func main() {
    ctx := context.Background()

    // 创建Redis客户端
    rdb := redis.NewClient(&amp;amp;redis.Options{
        Addr:         &amp;quot;localhost:6379&amp;quot;,
        DialTimeout:  time.Second * 3,   // 连接超时时间
        ReadTimeout:  time.Second * 3,   // 读超时时间
        WriteTimeout: time.Second * 3,   // 写超时时间
        PoolSize:     3,                 // 最大连接池大小
        IdleTimeout:  time.Second * 3,   // 空闲连接的最大存活时间
        MaxConnAge:   time.Second * 300, // 连接最大存活时间,无论连接是否空闲
        PoolTimeout:  time.Second * 30,  // 从连接池中获取连接池超时时间
    })

    // 自动从连接池中获取一个连接,并检查连接
    if _, err := rdb.Ping(ctx).Result(); err != nil {
        log.Fatalf(&amp;quot;无法连接到Redis: %v&amp;quot;, err)
    }

    // 流键和消费者组名称
    streamKey := &amp;quot;mystream&amp;quot;
    groupName := &amp;quot;mygroup&amp;quot;

    // 创建消费者组(如果不存在)
    _, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, &amp;quot;$&amp;quot;).Result()
    if err != nil &amp;amp;&amp;amp; err.Error() != &amp;quot;BUSYGROUP Consumer Group name already exists&amp;quot; {
        log.Fatalf(&amp;quot;创建消费者组失败: %v&amp;quot;, err)
    }

    // 启动多个消费者
    go consumer(ctx, rdb, streamKey, groupName, &amp;quot;consumer-1&amp;quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &amp;quot;consumer-2&amp;quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &amp;quot;consumer-3&amp;quot;, 2)

    // 生产消息
    go producer(ctx, rdb, streamKey)

    // 保持主程序运行
    select {}
}

// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
    for {
        // 生成消息
        message := map[string]interface{}{
            &amp;quot;timestamp&amp;quot;: time.Now().String(),
            &amp;quot;message&amp;quot;:   fmt.Sprintf(&amp;quot;消息 %d&amp;quot;, time.Now().UnixNano()),
        }

        // 发送消息到流
        id, err := rdb.XAdd(ctx, &amp;amp;redis.XAddArgs{
            Stream: streamKey,
            Values: message,
        }).Result()

        if err != nil {
            log.Printf(&amp;quot;发送消息失败: %v&amp;quot;, err)
        } else {
            log.Printf(&amp;quot;发送消息成功,ID: %s&amp;quot;, id)
        }

        time.Sleep(1 * time.Second)
    }
}

// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
    for i := 0; i &amp;lt; maxMsg; i++ {
        // 从流中读取消息
        streams, err := rdb.XReadGroup(ctx, &amp;amp;redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, &amp;quot;&amp;gt;&amp;quot;}, // &amp;quot;&amp;gt;&amp;quot; 表示只获取从未分配给其他消费者的新消息
            Count:    10,                       // 每次最多获取10条消息
            Block:    0,                        // 不阻塞,立即返回
        }).Result()

        if err != nil {
            log.Printf(&amp;quot;消费者 %s 读取消息失败: %v&amp;quot;, consumerName, err)
            time.Sleep(1 * time.Second)
            continue
        }

        // 处理消息
        for _, stream := range streams {
            for _, message := range stream.Messages {
                log.Printf(&amp;quot;消费者 %s 收到消息,ID: %s,内容: %v&amp;quot;, consumerName, message.ID, message.Values)

                // 模拟处理时间
                time.Sleep(200 * time.Millisecond)

                // 确认消息处理完成
                if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
                    log.Printf(&amp;quot;消费者 %s 确认消息失败: %v&amp;quot;, consumerName, err)
                } else {
                    log.Printf(&amp;quot;消费者 %s 确认消息成功,ID: %s&amp;quot;, consumerName, message.ID)
                }
            }
        }

        // 短暂休眠,避免CPU占用过高
        time.Sleep(100 * time.Millisecond)
    }
}
&lt;/code&gt;&lt;/pre&gt;
quot;).Result()
    if err != nil &amp;&amp; err.Error() != &quot;BUSYGROUP Consumer Group name already exists&quot; {
        log.Fatalf(&quot;创建消费者组失败: %v&quot;, err)
    }

    // 启动多个消费者
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-1&quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-2&quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-3&quot;, 2)

    // 生产消息
    go producer(ctx, rdb, streamKey)

    // 保持主程序运行
    select {}
}

// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
    for {
        // 生成消息
        message := map[string]interface{}{
            &quot;timestamp&quot;: time.Now().String(),
            &quot;message&quot;:   fmt.Sprintf(&quot;消息 %d&quot;, time.Now().UnixNano()),
        }

        // 发送消息到流
        id, err := rdb.XAdd(ctx, &amp;redis.XAddArgs{
            Stream: streamKey,
            Values: message,
        }).Result()

        if err != nil {
            log.Printf(&quot;发送消息失败: %v&quot;, err)
        } else {
            log.Printf(&quot;发送消息成功,ID: %s&quot;, id)
        }

        time.Sleep(1 * time.Second)
    }
}

// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
    for i := 0; i &lt; maxMsg; i++ {
        // 从流中读取消息
        streams, err := rdb.XReadGroup(ctx, &amp;redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, &quot;&gt;&quot;}, // &quot;&gt;&quot; 表示只获取从未分配给其他消费者的新消息
            Count:    10,                       // 每次最多获取10条消息
            Block:    0,                        // 不阻塞,立即返回
        }).Result()

        if err != nil {
            log.Printf(&quot;消费者 %s 读取消息失败: %v&quot;, consumerName, err)
            time.Sleep(1 * time.Second)
            continue
        }

        // 处理消息
        for _, stream := range streams {
            for _, message := range stream.Messages {
                log.Printf(&quot;消费者 %s 收到消息,ID: %s,内容: %v&quot;, consumerName, message.ID, message.Values)

                // 模拟处理时间
                time.Sleep(200 * time.Millisecond)

                // 确认消息处理完成
                if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
                    log.Printf(&quot;消费者 %s 确认消息失败: %v&quot;, consumerName, err)
                } else {
                    log.Printf(&quot;消费者 %s 确认消息成功,ID: %s&quot;, consumerName, message.ID)
                }
            }
        }

        // 短暂休眠,避免CPU占用过高
        time.Sleep(100 * time.Millisecond)
    }
}
</code></pre>

            </section>
        </article>

        
        <section class="comments-section" id="comments">
            <h2 class="comments-title">评论</h2>
            <div class="utterances-container">
                <script src="https://utteranc.es/client.js"
        repo="shinerio/shinerio.github.io"
        issue-term="代码示例"
        label="blog-comment"
        theme="preferred-color-scheme"
        crossorigin="anonymous"
        async>
</script>
            </div>
        </section>
        
    </div>

    
</div>amp;quot;).Result()
    if err != nil &amp;&amp; err.Error() != &quot;BUSYGROUP Consumer Group name already exists&quot; {
        log.Fatalf(&quot;创建消费者组失败: %v&quot;, err)
    }

    // 启动多个消费者
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-1&quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-2&quot;, 10)
    go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-3&quot;, 2)

    // 生产消息
    go producer(ctx, rdb, streamKey)

    // 保持主程序运行
    select {}
}

// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
    for {
        // 生成消息
        message := map[string]interface{}{
            &quot;timestamp&quot;: time.Now().String(),
            &quot;message&quot;:   fmt.Sprintf(&quot;消息 %d&quot;, time.Now().UnixNano()),
        }

        // 发送消息到流
        id, err := rdb.XAdd(ctx, &amp;redis.XAddArgs{
            Stream: streamKey,
            Values: message,
        }).Result()

        if err != nil {
            log.Printf(&quot;发送消息失败: %v&quot;, err)
        } else {
            log.Printf(&quot;发送消息成功,ID: %s&quot;, id)
        }

        time.Sleep(1 * time.Second)
    }
}

// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
    for i := 0; i &lt; maxMsg; i++ {
        // 从流中读取消息
        streams, err := rdb.XReadGroup(ctx, &amp;redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, &quot;&gt;&quot;}, // &quot;&gt;&quot; 表示只获取从未分配给其他消费者的新消息
            Count:    10,                       // 每次最多获取10条消息
            Block:    0,                        // 不阻塞,立即返回
        }).Result()

        if err != nil {
            log.Printf(&quot;消费者 %s 读取消息失败: %v&quot;, consumerName, err)
            time.Sleep(1 * time.Second)
            continue
        }

        // 处理消息
        for _, stream := range streams {
            for _, message := range stream.Messages {
                log.Printf(&quot;消费者 %s 收到消息,ID: %s,内容: %v&quot;, consumerName, message.ID, message.Values)

                // 模拟处理时间
                time.Sleep(200 * time.Millisecond)

                // 确认消息处理完成
                if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
                    log.Printf(&quot;消费者 %s 确认消息失败: %v&quot;, consumerName, err)
                } else {
                    log.Printf(&quot;消费者 %s 确认消息成功,ID: %s&quot;, consumerName, message.ID)
                }
            }
        }

        // 短暂休眠,避免CPU占用过高
        time.Sleep(100 * time.Millisecond)
    }
}
</code></pre>
quot;).Result()
    if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
        log.Fatalf("创建消费者组失败: %v", err)
    }

    // 启动多个消费者
    go consumer(ctx, rdb, streamKey, groupName, "consumer-1", 10)
    go consumer(ctx, rdb, streamKey, groupName, "consumer-2", 10)
    go consumer(ctx, rdb, streamKey, groupName, "consumer-3", 2)

    // 生产消息
    go producer(ctx, rdb, streamKey)

    // 保持主程序运行
    select {}
}

// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
    for {
        // 生成消息
        message := map[string]interface{}{
            "timestamp": time.Now().String(),
            "message":   fmt.Sprintf("消息 %d", time.Now().UnixNano()),
        }

        // 发送消息到流
        id, err := rdb.XAdd(ctx, &redis.XAddArgs{
            Stream: streamKey,
            Values: message,
        }).Result()

        if err != nil {
            log.Printf("发送消息失败: %v", err)
        } else {
            log.Printf("发送消息成功,ID: %s", id)
        }

        time.Sleep(1 * time.Second)
    }
}

// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
    for i := 0; i < maxMsg; i++ {
        // 从流中读取消息
        streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
            Group:    groupName,
            Consumer: consumerName,
            Streams:  []string{streamKey, ">"}, // ">" 表示只获取从未分配给其他消费者的新消息
            Count:    10,                       // 每次最多获取10条消息
            Block:    0,                        // 不阻塞,立即返回
        }).Result()

        if err != nil {
            log.Printf("消费者 %s 读取消息失败: %v", consumerName, err)
            time.Sleep(1 * time.Second)
            continue
        }

        // 处理消息
        for _, stream := range streams {
            for _, message := range stream.Messages {
                log.Printf("消费者 %s 收到消息,ID: %s,内容: %v", consumerName, message.ID, message.Values)

                // 模拟处理时间
                time.Sleep(200 * time.Millisecond)

                // 确认消息处理完成
                if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
                    log.Printf("消费者 %s 确认消息失败: %v", consumerName, err)
                } else {
                    log.Printf("消费者 %s 确认消息成功,ID: %s", consumerName, message.ID)
                }
            }
        }

        // 短暂休眠,避免CPU占用过高
        time.Sleep(100 * time.Millisecond)
    }
}

评论