📡 Redis Pub/Sub 與 Stream:即時訊息與事件驅動
📌 Pub/Sub 發布訂閱模式
Publisher ──→ Channel "news" ──→ Subscriber A
──→ Subscriber B
──→ Subscriber C
- 發布者 向頻道發送訊息
- 訂閱者 監聽頻道接收訊息
- 訊息即時推送,不會持久化
Redis CLI 操作
# 終端 1:訂閱頻道
SUBSCRIBE news
# 等待訊息...
# 終端 2:發布訊息
PUBLISH news "Breaking: Redis 8.0 released!"
# (integer) 1 ← 表示有 1 個訂閱者收到
# 模式訂閱(萬用字元)
PSUBSCRIBE news.*
# 會收到 news.tech, news.sports 等所有子頻道
📌 .NET 中使用 Pub/Sub
發布訊息
public class NotificationPublisher
{
private readonly IConnectionMultiplexer _redis;
public NotificationPublisher(IConnectionMultiplexer redis)
{
_redis = redis;
}
public async Task PublishOrderCreated(Order order)
{
var subscriber = _redis.GetSubscriber();
var message = JsonSerializer.Serialize(new
{
EventType = "OrderCreated",
OrderId = order.Id,
Amount = order.Total,
Timestamp = DateTime.UtcNow
});
await subscriber.PublishAsync("orders", message);
Console.WriteLine($"已發布訂單事件: {order.Id}");
}
}
訂閱訊息
public class NotificationSubscriber : BackgroundService
{
private readonly IConnectionMultiplexer _redis;
private readonly ILogger<NotificationSubscriber> _logger;
public NotificationSubscriber(
IConnectionMultiplexer redis,
ILogger<NotificationSubscriber> logger)
{
_redis = redis;
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
var subscriber = _redis.GetSubscriber();
// 訂閱 orders 頻道
await subscriber.SubscribeAsync("orders", (channel, message) =>
{
_logger.LogInformation(
"收到訂單事件: {Message}", message.ToString());
// 處理事件(發送 Email、更新庫存等)
ProcessOrderEvent(message!);
});
// 模式訂閱
await subscriber.SubscribeAsync("notifications.*",
(channel, message) =>
{
_logger.LogInformation(
"頻道 {Channel}: {Message}", channel, message);
});
_logger.LogInformation("Pub/Sub 訂閱已啟動");
await Task.Delay(Timeout.Infinite, ct);
}
private void ProcessOrderEvent(string message)
{
var order = JsonSerializer.Deserialize<JsonElement>(message);
// 處理邏輯...
}
}
// 註冊背景服務
builder.Services.AddHostedService<NotificationSubscriber>();
📌 Pub/Sub 的限制
| 限制 |
說明 |
| 不持久化 |
訊息發出就消失,離線的訂閱者收不到 |
| 無確認機制 |
不知道訂閱者是否成功處理 |
| 無重播 |
不能回溯歷史訊息 |
→ 需要持久化和可靠傳遞?使用 Redis Streams!
📌 Redis Streams:持久化的訊息佇列
Producer ──XADD──→ Stream "orders"
├── 1609459200000-0: {orderId:1, amount:100}
├── 1609459200001-0: {orderId:2, amount:200}
└── 1609459200002-0: {orderId:3, amount:150}
↑
Consumer Group "processors"
├── Consumer A: 處理 msg 0,1
└── Consumer B: 處理 msg 2
Redis CLI 操作
# 新增訊息到 Stream
XADD orders * orderId 1001 amount 99.99 status created
# "1609459200000-0" ← 自動生成的訊息 ID
# 讀取訊息
XRANGE orders - + # 全部訊息
XRANGE orders - + COUNT 10 # 最新 10 筆
# 建立消費者群組
XGROUP CREATE orders processors $ MKSTREAM
# 消費者讀取
XREADGROUP GROUP processors consumer-1 COUNT 1 BLOCK 5000 STREAMS orders >
# 確認已處理
XACK orders processors 1609459200000-0
📌 .NET 中使用 Streams
public class OrderStreamProducer
{
private readonly IDatabase _db;
public async Task PublishOrder(Order order)
{
await _db.StreamAddAsync("orders", new NameValueEntry[]
{
new("orderId", order.Id.ToString()),
new("amount", order.Total.ToString()),
new("status", "created"),
new("timestamp", DateTime.UtcNow.ToString("O"))
});
}
}
public class OrderStreamConsumer : BackgroundService
{
private readonly IDatabase _db;
private readonly string _groupName = "processors";
private readonly string _consumerName;
public OrderStreamConsumer(IConnectionMultiplexer redis)
{
_db = redis.GetDatabase();
_consumerName = $"consumer-{Environment.MachineName}";
EnsureGroupExists().Wait();
}
private async Task EnsureGroupExists()
{
try
{
await _db.StreamCreateConsumerGroupAsync(
"orders", _groupName, "$", true);
}
catch (RedisServerException) { /* 群組已存在 */ }
}
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var entries = await _db.StreamReadGroupAsync(
"orders", _groupName, _consumerName,
">", // 只讀未分配的新訊息
count: 10);
foreach (var entry in entries)
{
try
{
var orderId = entry.Values
.First(v => v.Name == "orderId").Value;
Console.WriteLine($"處理訂單: {orderId}");
// 處理完成,確認
await _db.StreamAcknowledgeAsync(
"orders", _groupName, entry.Id);
}
catch (Exception ex)
{
Console.WriteLine($"處理失敗: {ex.Message}");
// 不 ACK,訊息會重新分配
}
}
if (entries.Length == 0)
await Task.Delay(1000, ct); // 沒有新訊息時等待
}
}
}
📌 對比 RabbitMQ / Kafka
| 特性 |
Redis Pub/Sub |
Redis Streams |
RabbitMQ |
Kafka |
| 持久化 |
❌ |
✅ |
✅ |
✅ |
| 消費者群組 |
❌ |
✅ |
✅ |
✅ |
| 訊息確認 |
❌ |
✅ |
✅ |
✅ |
| 效能 |
極高 |
高 |
中 |
高 |
| 複雜度 |
低 |
中 |
高 |
高 |
| 適用場景 |
即時通知 |
輕量 MQ |
企業級 MQ |
大數據串流 |
📌 範例:即時通知系統
// 混合使用 Pub/Sub + Streams
public class NotificationService
{
private readonly ISubscriber _pubsub;
private readonly IDatabase _db;
// 即時推送用 Pub/Sub(在線用戶立即收到)
public async Task SendRealtimeNotification(
string userId, string message)
{
await _pubsub.PublishAsync(
$"user:{userId}:notifications", message);
}
// 持久化用 Streams(離線用戶上線後讀取)
public async Task SaveNotification(
string userId, string message)
{
await _db.StreamAddAsync(
$"notifications:{userId}",
new NameValueEntry[]
{
new("message", message),
new("timestamp", DateTime.UtcNow.ToString("O")),
new("read", "false")
},
maxLength: 100); // 保留最近 100 筆
}
}
🔑 重點整理
- Pub/Sub 適合即時推送,但訊息不持久化
- Streams 是 Redis 5.0+ 的持久化訊息佇列
- Consumer Group 讓多個消費者分攤訊息處理
- 簡單場景用 Redis,複雜場景考慮 RabbitMQ / Kafka
- 實際系統常 混合使用 Pub/Sub + Streams