☕ NEW! 完成新手任務即可參加抽獎!LINE 星巴克禮券等你拿,名額有限!        🎉 推廣活動:邀請好友註冊 DevLearn,累積推薦抽 LINE 星巴克禮券! 活動詳情 →        🔥 活動期間 2026/4/1 - 5/31 |已有 0 人參加       
microservices 進階

🗄️ 微服務資料管理:Saga 與最終一致性

📌 分散式交易的挑戰

在單體架構中,一個資料庫交易就能保證資料一致性:

// 單體架構:簡單的資料庫交易
using var transaction = await _db.Database.BeginTransactionAsync();
try
{
    // 1. 建立訂單
    _db.Orders.Add(order);

    // 2. 扣減庫存
    var stock = await _db.Stocks.FindAsync(productId);
    stock.Quantity -= quantity;

    // 3. 建立付款記錄
    _db.Payments.Add(payment);

    await _db.SaveChangesAsync();
    await transaction.CommitAsync(); // 全部成功,一次提交
}
catch
{
    await transaction.RollbackAsync(); // 任何失敗,全部回滾
}

但在微服務中,每個服務有自己的資料庫,無法使用單一交易!


📌 兩階段提交 (2PC) 的問題

兩階段提交:
1. 準備階段:協調者問所有參與者 "你準備好了嗎?"
2. 提交階段:所有人都說 OK → 提交;任何人說不行 → 全部回滾

問題:
├── 同步阻塞:所有參與者必須等待最慢的那個
├── 單點故障:協調者掛了,所有參與者都卡住
├── 效能瓶頸:鎖定資源直到交易完成
└── 不適合微服務的去中心化理念

📌 Saga 模式

Saga 是一種將分散式交易拆分為一系列本地交易的模式,每個本地交易都有對應的補償操作

Choreography(編舞)模式

每個服務監聽事件並自主決定下一步。

// ── 訂單建立的 Saga(Choreography) ──

// 步驟 1:Order Service 建立訂單
public class OrderService
{
    public async Task CreateOrderAsync(CreateOrderCommand cmd)
    {
        var order = new Order { Status = OrderStatus.Pending };
        await _repo.AddAsync(order);
        await _publisher.Publish(new OrderCreatedEvent(order.Id, cmd.Items));
    }
}

// 步驟 2:Inventory Service 監聽並扣減庫存
public class OrderCreatedHandler : IConsumer<OrderCreatedEvent>
{
    public async Task Consume(ConsumeContext<OrderCreatedEvent> ctx)
    {
        try
        {
            foreach (var item in ctx.Message.Items)
                await _inventory.ReserveStockAsync(item.ProductId, item.Quantity);

            await ctx.Publish(new StockReservedEvent(ctx.Message.OrderId));
        }
        catch (InsufficientStockException)
        {
            await ctx.Publish(new StockReservationFailedEvent(ctx.Message.OrderId));
        }
    }
}

// 步驟 3:Payment Service 監聽並處理付款
public class StockReservedHandler : IConsumer<StockReservedEvent>
{
    public async Task Consume(ConsumeContext<StockReservedEvent> ctx)
    {
        try
        {
            await _payment.ChargeAsync(ctx.Message.OrderId);
            await ctx.Publish(new PaymentCompletedEvent(ctx.Message.OrderId));
        }
        catch (PaymentFailedException)
        {
            await ctx.Publish(new PaymentFailedEvent(ctx.Message.OrderId));
        }
    }
}

// 補償:庫存服務監聽付款失敗,釋放庫存
public class PaymentFailedHandler : IConsumer<PaymentFailedEvent>
{
    public async Task Consume(ConsumeContext<PaymentFailedEvent> ctx)
    {
        await _inventory.ReleaseStockAsync(ctx.Message.OrderId);
        await ctx.Publish(new StockReleasedEvent(ctx.Message.OrderId));
    }
}

Orchestration(指揮)模式

由一個中央的 Saga 協調者來控制流程。

// ── Saga 協調者 ──
public class OrderSagaOrchestrator : ISaga<OrderSagaState>
{
    public async Task HandleAsync(OrderCreatedEvent evt, ISagaContext context)
    {
        // 步驟 1:要求庫存服務預留庫存
        await context.Send(new ReserveStockCommand(evt.OrderId, evt.Items));
    }

    public async Task HandleAsync(StockReservedEvent evt, ISagaContext context)
    {
        // 步驟 2:要求付款服務扣款
        await context.Send(new ProcessPaymentCommand(evt.OrderId));
    }

    public async Task HandleAsync(PaymentCompletedEvent evt, ISagaContext context)
    {
        // 步驟 3:訂單確認完成
        await context.Send(new ConfirmOrderCommand(evt.OrderId));
    }

    // ── 補償流程 ──
    public async Task HandleAsync(PaymentFailedEvent evt, ISagaContext context)
    {
        // 補償:釋放庫存 → 取消訂單
        await context.Send(new ReleaseStockCommand(evt.OrderId));
        await context.Send(new CancelOrderCommand(evt.OrderId, "付款失敗"));
    }

    public async Task HandleAsync(StockReservationFailedEvent evt, ISagaContext context)
    {
        // 補償:直接取消訂單
        await context.Send(new CancelOrderCommand(evt.OrderId, "庫存不足"));
    }
}

📌 最終一致性 (Eventual Consistency)

微服務架構接受:資料不會立即一致,但最終會一致

強一致性(單體):下單 → 扣庫存 → 扣款 → 全部完成(同步)
最終一致性(微服務):
  t=0s  訂單建立(Pending)
  t=1s  庫存扣減(Processing)
  t=3s  付款完成(Confirmed)← 最終一致

📌 Outbox Pattern — 可靠事件發布

確保資料庫操作和事件發布的原子性。

// ── 問題:資料庫寫入成功但事件發布失敗 ──
await _db.Orders.AddAsync(order);
await _db.SaveChangesAsync();        // ✅ 成功
await _publisher.Publish(event);      // ❌ RabbitMQ 掛了!事件遺失!

// ── 解決方案:Outbox Pattern ──
public class OutboxMessage
{
    public Guid Id { get; set; }
    public string EventType { get; set; } = string.Empty;
    public string Payload { get; set; } = string.Empty;
    public DateTime CreatedAt { get; set; }
    public DateTime? ProcessedAt { get; set; }
}

// 在同一個資料庫交易中寫入訂單和 Outbox 訊息
using var transaction = await _db.Database.BeginTransactionAsync();

await _db.Orders.AddAsync(order);
await _db.OutboxMessages.AddAsync(new OutboxMessage
{
    Id = Guid.NewGuid(),
    EventType = nameof(OrderCreatedEvent),
    Payload = JsonSerializer.Serialize(new OrderCreatedEvent(order.Id)),
    CreatedAt = DateTime.UtcNow
});

await _db.SaveChangesAsync();
await transaction.CommitAsync(); // 兩者在同一個交易中,保證原子性

// ── 背景服務定期掃描 Outbox 並發布事件 ──
public class OutboxProcessor : BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken ct)
    {
        while (!ct.IsCancellationRequested)
        {
            var messages = await _db.OutboxMessages
                .Where(m => m.ProcessedAt == null)
                .OrderBy(m => m.CreatedAt)
                .Take(20)
                .ToListAsync(ct);

            foreach (var msg in messages)
            {
                await _publisher.PublishRawAsync(msg.EventType, msg.Payload);
                msg.ProcessedAt = DateTime.UtcNow;
            }

            await _db.SaveChangesAsync(ct);
            await Task.Delay(TimeSpan.FromSeconds(5), ct);
        }
    }
}

📌 CQRS(命令查詢職責分離)

// ── 寫入端(Command) ──
public class CreateOrderCommandHandler
{
    private readonly OrderDbContext _writeDb;

    public async Task HandleAsync(CreateOrderCommand cmd)
    {
        var order = Order.Create(cmd.CustomerId, cmd.Items);
        _writeDb.Orders.Add(order);
        await _writeDb.SaveChangesAsync();
        // 發布事件給讀取端
    }
}

// ── 讀取端(Query) ──
public class OrderQueryService
{
    private readonly IReadOnlyRepository _readDb; // 可以是不同的資料庫

    public async Task<OrderDetailDto> GetOrderDetailAsync(Guid orderId)
    {
        // 讀取端的資料模型是為查詢最佳化的(反正規化)
        return await _readDb.GetAsync<OrderDetailDto>(orderId);
    }
}

📌 Event Sourcing 事件溯源基礎

// 不儲存當前狀態,而是儲存所有發生過的事件
public class OrderEventStore
{
    public async Task AppendAsync(Guid orderId, IDomainEvent @event)
    {
        await _store.AppendToStreamAsync($"order-{orderId}", new EventData
        {
            EventType = @event.GetType().Name,
            Data = JsonSerializer.Serialize(@event),
            Timestamp = DateTime.UtcNow
        });
    }

    public async Task<Order> LoadAsync(Guid orderId)
    {
        var events = await _store.ReadStreamAsync($"order-{orderId}");
        var order = new Order();
        foreach (var evt in events)
        {
            order.Apply(evt); // 重播事件來重建狀態
        }
        return order;
    }
}

下一章: 我們將學習微服務的可觀測性 — 日誌、追蹤與指標的完整方案。

💡 大家的想法 · 0

載入中...
💬 即時聊天室 🟢 0 人在線
😀 😎 🤓 💻 🎮 🎸 🔥
➕ 新問題
📋 我的工單
💬 LINE 社群
🔒
需要註冊才能使用此功能
註冊帳號即可解鎖測驗、遊戲、簽到、筆記下載等所有功能,完全免費!
免費註冊