package main
import (
"context"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 创建Redis客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DialTimeout: time.Second * 3, // 连接超时时间
ReadTimeout: time.Second * 3, // 读超时时间
WriteTimeout: time.Second * 3, // 写超时时间
PoolSize: 3, // 最大连接池大小
IdleTimeout: time.Second * 3, // 空闲连接的最大存活时间
MaxConnAge: time.Second * 300, // 连接最大存活时间,无论连接是否空闲
PoolTimeout: time.Second * 30, // 从连接池中获取连接池超时时间
})
// 自动从连接池中获取一个连接,并检查连接
if _, err := rdb.Ping(ctx).Result(); err != nil {
log.Fatalf("无法连接到Redis: %v", err)
}
// 流键和消费者组名称
streamKey := "mystream"
groupName := "mygroup"
// 创建消费者组(如果不存在)
_, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "<pre><code class="language-go">package main
import (
"context"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 创建Redis客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DialTimeout: time.Second * 3, // 连接超时时间
ReadTimeout: time.Second * 3, // 读超时时间
WriteTimeout: time.Second * 3, // 写超时时间
PoolSize: 3, // 最大连接池大小
IdleTimeout: time.Second * 3, // 空闲连接的最大存活时间
MaxConnAge: time.Second * 300, // 连接最大存活时间,无论连接是否空闲
PoolTimeout: time.Second * 30, // 从连接池中获取连接池超时时间
})
// 自动从连接池中获取一个连接,并检查连接
if _, err := rdb.Ping(ctx).Result(); err != nil {
log.Fatalf("无法连接到Redis: %v", err)
}
// 流键和消费者组名称
streamKey := "mystream"
groupName := "mygroup"
// 创建消费者组(如果不存在)
_, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "<article>
<header>
<h1>代码示例</h1>
<time datetime="2025-09-17T13:31:12.000Z">2025年9月17日</time>
<div class="tags">
</div>
</header>
<section class="content">
<pre><code class="language-go">package main
import (
"context"
"fmt"
"log"
"time"
"github.com/go-redis/redis/v8"
)
func main() {
ctx := context.Background()
// 创建Redis客户端
rdb := redis.NewClient(&redis.Options{
Addr: "localhost:6379",
DialTimeout: time.Second * 3, // 连接超时时间
ReadTimeout: time.Second * 3, // 读超时时间
WriteTimeout: time.Second * 3, // 写超时时间
PoolSize: 3, // 最大连接池大小
IdleTimeout: time.Second * 3, // 空闲连接的最大存活时间
MaxConnAge: time.Second * 300, // 连接最大存活时间,无论连接是否空闲
PoolTimeout: time.Second * 30, // 从连接池中获取连接池超时时间
})
// 自动从连接池中获取一个连接,并检查连接
if _, err := rdb.Ping(ctx).Result(); err != nil {
log.Fatalf("无法连接到Redis: %v", err)
}
// 流键和消费者组名称
streamKey := "mystream"
groupName := "mygroup"
// 创建消费者组(如果不存在)
_, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, "<pre><code class="language-go">package main
import (
&quot;context&quot;
&quot;fmt&quot;
&quot;log&quot;
&quot;time&quot;
&quot;github.com/go-redis/redis/v8&quot;
)
func main() {
ctx := context.Background()
// 创建Redis客户端
rdb := redis.NewClient(&amp;redis.Options{
Addr: &quot;localhost:6379&quot;,
DialTimeout: time.Second * 3, // 连接超时时间
ReadTimeout: time.Second * 3, // 读超时时间
WriteTimeout: time.Second * 3, // 写超时时间
PoolSize: 3, // 最大连接池大小
IdleTimeout: time.Second * 3, // 空闲连接的最大存活时间
MaxConnAge: time.Second * 300, // 连接最大存活时间,无论连接是否空闲
PoolTimeout: time.Second * 30, // 从连接池中获取连接池超时时间
})
// 自动从连接池中获取一个连接,并检查连接
if _, err := rdb.Ping(ctx).Result(); err != nil {
log.Fatalf(&quot;无法连接到Redis: %v&quot;, err)
}
// 流键和消费者组名称
streamKey := &quot;mystream&quot;
groupName := &quot;mygroup&quot;
// 创建消费者组(如果不存在)
_, err := rdb.XGroupCreateMkStream(ctx, streamKey, groupName, &quot;$&quot;).Result()
if err != nil &amp;&amp; err.Error() != &quot;BUSYGROUP Consumer Group name already exists&quot; {
log.Fatalf(&quot;创建消费者组失败: %v&quot;, err)
}
// 启动多个消费者
go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-1&quot;, 10)
go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-2&quot;, 10)
go consumer(ctx, rdb, streamKey, groupName, &quot;consumer-3&quot;, 2)
// 生产消息
go producer(ctx, rdb, streamKey)
// 保持主程序运行
select {}
}
// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
for {
// 生成消息
message := map[string]interface{}{
&quot;timestamp&quot;: time.Now().String(),
&quot;message&quot;: fmt.Sprintf(&quot;消息 %d&quot;, time.Now().UnixNano()),
}
// 发送消息到流
id, err := rdb.XAdd(ctx, &amp;redis.XAddArgs{
Stream: streamKey,
Values: message,
}).Result()
if err != nil {
log.Printf(&quot;发送消息失败: %v&quot;, err)
} else {
log.Printf(&quot;发送消息成功,ID: %s&quot;, id)
}
time.Sleep(1 * time.Second)
}
}
// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
for i := 0; i &lt; maxMsg; i++ {
// 从流中读取消息
streams, err := rdb.XReadGroup(ctx, &amp;redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamKey, &quot;&gt;&quot;}, // &quot;&gt;&quot; 表示只获取从未分配给其他消费者的新消息
Count: 10, // 每次最多获取10条消息
Block: 0, // 不阻塞,立即返回
}).Result()
if err != nil {
log.Printf(&quot;消费者 %s 读取消息失败: %v&quot;, consumerName, err)
time.Sleep(1 * time.Second)
continue
}
// 处理消息
for _, stream := range streams {
for _, message := range stream.Messages {
log.Printf(&quot;消费者 %s 收到消息,ID: %s,内容: %v&quot;, consumerName, message.ID, message.Values)
// 模拟处理时间
time.Sleep(200 * time.Millisecond)
// 确认消息处理完成
if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
log.Printf(&quot;消费者 %s 确认消息失败: %v&quot;, consumerName, err)
} else {
log.Printf(&quot;消费者 %s 确认消息成功,ID: %s&quot;, consumerName, message.ID)
}
}
}
// 短暂休眠,避免CPU占用过高
time.Sleep(100 * time.Millisecond)
}
}
</code></pre>
quot;).Result()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Fatalf("创建消费者组失败: %v", err)
}
// 启动多个消费者
go consumer(ctx, rdb, streamKey, groupName, "consumer-1", 10)
go consumer(ctx, rdb, streamKey, groupName, "consumer-2", 10)
go consumer(ctx, rdb, streamKey, groupName, "consumer-3", 2)
// 生产消息
go producer(ctx, rdb, streamKey)
// 保持主程序运行
select {}
}
// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
for {
// 生成消息
message := map[string]interface{}{
"timestamp": time.Now().String(),
"message": fmt.Sprintf("消息 %d", time.Now().UnixNano()),
}
// 发送消息到流
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamKey,
Values: message,
}).Result()
if err != nil {
log.Printf("发送消息失败: %v", err)
} else {
log.Printf("发送消息成功,ID: %s", id)
}
time.Sleep(1 * time.Second)
}
}
// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
for i := 0; i < maxMsg; i++ {
// 从流中读取消息
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamKey, ">"}, // ">" 表示只获取从未分配给其他消费者的新消息
Count: 10, // 每次最多获取10条消息
Block: 0, // 不阻塞,立即返回
}).Result()
if err != nil {
log.Printf("消费者 %s 读取消息失败: %v", consumerName, err)
time.Sleep(1 * time.Second)
continue
}
// 处理消息
for _, stream := range streams {
for _, message := range stream.Messages {
log.Printf("消费者 %s 收到消息,ID: %s,内容: %v", consumerName, message.ID, message.Values)
// 模拟处理时间
time.Sleep(200 * time.Millisecond)
// 确认消息处理完成
if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
log.Printf("消费者 %s 确认消息失败: %v", consumerName, err)
} else {
log.Printf("消费者 %s 确认消息成功,ID: %s", consumerName, message.ID)
}
}
}
// 短暂休眠,避免CPU占用过高
time.Sleep(100 * time.Millisecond)
}
}
</code></pre>
</section>
</article>amp;quot;).Result()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Fatalf("创建消费者组失败: %v", err)
}
// 启动多个消费者
go consumer(ctx, rdb, streamKey, groupName, "consumer-1", 10)
go consumer(ctx, rdb, streamKey, groupName, "consumer-2", 10)
go consumer(ctx, rdb, streamKey, groupName, "consumer-3", 2)
// 生产消息
go producer(ctx, rdb, streamKey)
// 保持主程序运行
select {}
}
// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
for {
// 生成消息
message := map[string]interface{}{
"timestamp": time.Now().String(),
"message": fmt.Sprintf("消息 %d", time.Now().UnixNano()),
}
// 发送消息到流
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamKey,
Values: message,
}).Result()
if err != nil {
log.Printf("发送消息失败: %v", err)
} else {
log.Printf("发送消息成功,ID: %s", id)
}
time.Sleep(1 * time.Second)
}
}
// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
for i := 0; i < maxMsg; i++ {
// 从流中读取消息
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamKey, ">"}, // ">" 表示只获取从未分配给其他消费者的新消息
Count: 10, // 每次最多获取10条消息
Block: 0, // 不阻塞,立即返回
}).Result()
if err != nil {
log.Printf("消费者 %s 读取消息失败: %v", consumerName, err)
time.Sleep(1 * time.Second)
continue
}
// 处理消息
for _, stream := range streams {
for _, message := range stream.Messages {
log.Printf("消费者 %s 收到消息,ID: %s,内容: %v", consumerName, message.ID, message.Values)
// 模拟处理时间
time.Sleep(200 * time.Millisecond)
// 确认消息处理完成
if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
log.Printf("消费者 %s 确认消息失败: %v", consumerName, err)
} else {
log.Printf("消费者 %s 确认消息成功,ID: %s", consumerName, message.ID)
}
}
}
// 短暂休眠,避免CPU占用过高
time.Sleep(100 * time.Millisecond)
}
}
</code></pre>
quot;).Result()
if err != nil && err.Error() != "BUSYGROUP Consumer Group name already exists" {
log.Fatalf("创建消费者组失败: %v", err)
}
// 启动多个消费者
go consumer(ctx, rdb, streamKey, groupName, "consumer-1", 10)
go consumer(ctx, rdb, streamKey, groupName, "consumer-2", 10)
go consumer(ctx, rdb, streamKey, groupName, "consumer-3", 2)
// 生产消息
go producer(ctx, rdb, streamKey)
// 保持主程序运行
select {}
}
// 生产者函数
func producer(ctx context.Context, rdb *redis.Client, streamKey string) {
for {
// 生成消息
message := map[string]interface{}{
"timestamp": time.Now().String(),
"message": fmt.Sprintf("消息 %d", time.Now().UnixNano()),
}
// 发送消息到流
id, err := rdb.XAdd(ctx, &redis.XAddArgs{
Stream: streamKey,
Values: message,
}).Result()
if err != nil {
log.Printf("发送消息失败: %v", err)
} else {
log.Printf("发送消息成功,ID: %s", id)
}
time.Sleep(1 * time.Second)
}
}
// 消费者函数
func consumer(ctx context.Context, rdb *redis.Client, streamKey, groupName, consumerName string, maxMsg int) {
for i := 0; i < maxMsg; i++ {
// 从流中读取消息
streams, err := rdb.XReadGroup(ctx, &redis.XReadGroupArgs{
Group: groupName,
Consumer: consumerName,
Streams: []string{streamKey, ">"}, // ">" 表示只获取从未分配给其他消费者的新消息
Count: 10, // 每次最多获取10条消息
Block: 0, // 不阻塞,立即返回
}).Result()
if err != nil {
log.Printf("消费者 %s 读取消息失败: %v", consumerName, err)
time.Sleep(1 * time.Second)
continue
}
// 处理消息
for _, stream := range streams {
for _, message := range stream.Messages {
log.Printf("消费者 %s 收到消息,ID: %s,内容: %v", consumerName, message.ID, message.Values)
// 模拟处理时间
time.Sleep(200 * time.Millisecond)
// 确认消息处理完成
if _, err := rdb.XAck(ctx, streamKey, groupName, message.ID).Result(); err != nil {
log.Printf("消费者 %s 确认消息失败: %v", consumerName, err)
} else {
log.Printf("消费者 %s 确认消息成功,ID: %s", consumerName, message.ID)
}
}
}
// 短暂休眠,避免CPU占用过高
time.Sleep(100 * time.Millisecond)
}
}