☕ NEW! 完成新手任務即可參加抽獎!LINE 星巴克禮券等你拿,名額有限!        🎉 推廣活動:邀請好友註冊 DevLearn,累積推薦抽 LINE 星巴克禮券! 活動詳情 →        🔥 活動期間 2026/4/1 - 5/31 |已有 0 人參加       
redis 進階

📡 Redis Pub/Sub 與 Stream:即時訊息與事件驅動

📌 Pub/Sub 發布訂閱模式

Publisher ──→ Channel "news" ──→ Subscriber A
                              ──→ Subscriber B
                              ──→ Subscriber C
  • 發布者 向頻道發送訊息
  • 訂閱者 監聽頻道接收訊息
  • 訊息即時推送,不會持久化

Redis CLI 操作

# 終端 1:訂閱頻道
SUBSCRIBE news
# 等待訊息...

# 終端 2:發布訊息
PUBLISH news "Breaking: Redis 8.0 released!"
# (integer) 1  ← 表示有 1 個訂閱者收到

# 模式訂閱(萬用字元)
PSUBSCRIBE news.*
# 會收到 news.tech, news.sports 等所有子頻道

📌 .NET 中使用 Pub/Sub

發布訊息

public class NotificationPublisher
{
    private readonly IConnectionMultiplexer _redis;

    public NotificationPublisher(IConnectionMultiplexer redis)
    {
        _redis = redis;
    }

    public async Task PublishOrderCreated(Order order)
    {
        var subscriber = _redis.GetSubscriber();
        var message = JsonSerializer.Serialize(new
        {
            EventType = "OrderCreated",
            OrderId = order.Id,
            Amount = order.Total,
            Timestamp = DateTime.UtcNow
        });

        await subscriber.PublishAsync("orders", message);
        Console.WriteLine($"已發布訂單事件: {order.Id}");
    }
}

訂閱訊息

public class NotificationSubscriber : BackgroundService
{
    private readonly IConnectionMultiplexer _redis;
    private readonly ILogger<NotificationSubscriber> _logger;

    public NotificationSubscriber(
        IConnectionMultiplexer redis,
        ILogger<NotificationSubscriber> logger)
    {
        _redis = redis;
        _logger = logger;
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        var subscriber = _redis.GetSubscriber();

        // 訂閱 orders 頻道
        await subscriber.SubscribeAsync("orders", (channel, message) =>
        {
            _logger.LogInformation(
                "收到訂單事件: {Message}", message.ToString());

            // 處理事件(發送 Email、更新庫存等)
            ProcessOrderEvent(message!);
        });

        // 模式訂閱
        await subscriber.SubscribeAsync("notifications.*",
            (channel, message) =>
            {
                _logger.LogInformation(
                    "頻道 {Channel}: {Message}", channel, message);
            });

        _logger.LogInformation("Pub/Sub 訂閱已啟動");
        await Task.Delay(Timeout.Infinite, ct);
    }

    private void ProcessOrderEvent(string message)
    {
        var order = JsonSerializer.Deserialize<JsonElement>(message);
        // 處理邏輯...
    }
}

// 註冊背景服務
builder.Services.AddHostedService<NotificationSubscriber>();

📌 Pub/Sub 的限制

限制 說明
不持久化 訊息發出就消失,離線的訂閱者收不到
無確認機制 不知道訂閱者是否成功處理
無重播 不能回溯歷史訊息

→ 需要持久化和可靠傳遞?使用 Redis Streams


📌 Redis Streams:持久化的訊息佇列

Producer ──XADD──→ Stream "orders"
                    ├── 1609459200000-0: {orderId:1, amount:100}
                    ├── 1609459200001-0: {orderId:2, amount:200}
                    └── 1609459200002-0: {orderId:3, amount:150}
                              ↑
Consumer Group "processors"
  ├── Consumer A: 處理 msg 0,1
  └── Consumer B: 處理 msg 2

Redis CLI 操作

# 新增訊息到 Stream
XADD orders * orderId 1001 amount 99.99 status created
# "1609459200000-0"  ← 自動生成的訊息 ID

# 讀取訊息
XRANGE orders - +           # 全部訊息
XRANGE orders - + COUNT 10  # 最新 10 筆

# 建立消費者群組
XGROUP CREATE orders processors $ MKSTREAM

# 消費者讀取
XREADGROUP GROUP processors consumer-1 COUNT 1 BLOCK 5000 STREAMS orders >

# 確認已處理
XACK orders processors 1609459200000-0

📌 .NET 中使用 Streams

public class OrderStreamProducer
{
    private readonly IDatabase _db;

    public async Task PublishOrder(Order order)
    {
        await _db.StreamAddAsync("orders", new NameValueEntry[]
        {
            new("orderId", order.Id.ToString()),
            new("amount", order.Total.ToString()),
            new("status", "created"),
            new("timestamp", DateTime.UtcNow.ToString("O"))
        });
    }
}

public class OrderStreamConsumer : BackgroundService
{
    private readonly IDatabase _db;
    private readonly string _groupName = "processors";
    private readonly string _consumerName;

    public OrderStreamConsumer(IConnectionMultiplexer redis)
    {
        _db = redis.GetDatabase();
        _consumerName = $"consumer-{Environment.MachineName}";
        EnsureGroupExists().Wait();
    }

    private async Task EnsureGroupExists()
    {
        try
        {
            await _db.StreamCreateConsumerGroupAsync(
                "orders", _groupName, "$", true);
        }
        catch (RedisServerException) { /* 群組已存在 */ }
    }

    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            var entries = await _db.StreamReadGroupAsync(
                "orders", _groupName, _consumerName,
                ">",      // 只讀未分配的新訊息
                count: 10);

            foreach (var entry in entries)
            {
                try
                {
                    var orderId = entry.Values
                        .First(v => v.Name == "orderId").Value;
                    Console.WriteLine($"處理訂單: {orderId}");

                    // 處理完成,確認
                    await _db.StreamAcknowledgeAsync(
                        "orders", _groupName, entry.Id);
                }
                catch (Exception ex)
                {
                    Console.WriteLine($"處理失敗: {ex.Message}");
                    // 不 ACK,訊息會重新分配
                }
            }

            if (entries.Length == 0)
                await Task.Delay(1000, ct);  // 沒有新訊息時等待
        }
    }
}

📌 對比 RabbitMQ / Kafka

特性 Redis Pub/Sub Redis Streams RabbitMQ Kafka
持久化
消費者群組
訊息確認
效能 極高
複雜度
適用場景 即時通知 輕量 MQ 企業級 MQ 大數據串流

📌 範例:即時通知系統

// 混合使用 Pub/Sub + Streams
public class NotificationService
{
    private readonly ISubscriber _pubsub;
    private readonly IDatabase _db;

    // 即時推送用 Pub/Sub(在線用戶立即收到)
    public async Task SendRealtimeNotification(
        string userId, string message)
    {
        await _pubsub.PublishAsync(
            $"user:{userId}:notifications", message);
    }

    // 持久化用 Streams(離線用戶上線後讀取)
    public async Task SaveNotification(
        string userId, string message)
    {
        await _db.StreamAddAsync(
            $"notifications:{userId}",
            new NameValueEntry[]
            {
                new("message", message),
                new("timestamp", DateTime.UtcNow.ToString("O")),
                new("read", "false")
            },
            maxLength: 100);  // 保留最近 100 筆
    }
}

🔑 重點整理

  1. Pub/Sub 適合即時推送,但訊息不持久化
  2. Streams 是 Redis 5.0+ 的持久化訊息佇列
  3. Consumer Group 讓多個消費者分攤訊息處理
  4. 簡單場景用 Redis,複雜場景考慮 RabbitMQ / Kafka
  5. 實際系統常 混合使用 Pub/Sub + Streams

💡 大家的想法 · 0

載入中...
💬 即時聊天室 🟢 0 人在線
😀 😎 🤓 💻 🎮 🎸 🔥
➕ 新問題
📋 我的工單
💬 LINE 社群
🔒
需要註冊才能使用此功能
註冊帳號即可解鎖測驗、遊戲、簽到、筆記下載等所有功能,完全免費!
免費註冊