🗄️ 微服務資料管理:Saga 與最終一致性
📌 分散式交易的挑戰
在單體架構中,一個資料庫交易就能保證資料一致性:
// 單體架構:簡單的資料庫交易
using var transaction = await _db.Database.BeginTransactionAsync();
try
{
// 1. 建立訂單
_db.Orders.Add(order);
// 2. 扣減庫存
var stock = await _db.Stocks.FindAsync(productId);
stock.Quantity -= quantity;
// 3. 建立付款記錄
_db.Payments.Add(payment);
await _db.SaveChangesAsync();
await transaction.CommitAsync(); // 全部成功,一次提交
}
catch
{
await transaction.RollbackAsync(); // 任何失敗,全部回滾
}
但在微服務中,每個服務有自己的資料庫,無法使用單一交易!
📌 兩階段提交 (2PC) 的問題
兩階段提交:
1. 準備階段:協調者問所有參與者 "你準備好了嗎?"
2. 提交階段:所有人都說 OK → 提交;任何人說不行 → 全部回滾
問題:
├── 同步阻塞:所有參與者必須等待最慢的那個
├── 單點故障:協調者掛了,所有參與者都卡住
├── 效能瓶頸:鎖定資源直到交易完成
└── 不適合微服務的去中心化理念
📌 Saga 模式
Saga 是一種將分散式交易拆分為一系列本地交易的模式,每個本地交易都有對應的補償操作。
Choreography(編舞)模式
每個服務監聽事件並自主決定下一步。
// ── 訂單建立的 Saga(Choreography) ──
// 步驟 1:Order Service 建立訂單
public class OrderService
{
public async Task CreateOrderAsync(CreateOrderCommand cmd)
{
var order = new Order { Status = OrderStatus.Pending };
await _repo.AddAsync(order);
await _publisher.Publish(new OrderCreatedEvent(order.Id, cmd.Items));
}
}
// 步驟 2:Inventory Service 監聽並扣減庫存
public class OrderCreatedHandler : IConsumer<OrderCreatedEvent>
{
public async Task Consume(ConsumeContext<OrderCreatedEvent> ctx)
{
try
{
foreach (var item in ctx.Message.Items)
await _inventory.ReserveStockAsync(item.ProductId, item.Quantity);
await ctx.Publish(new StockReservedEvent(ctx.Message.OrderId));
}
catch (InsufficientStockException)
{
await ctx.Publish(new StockReservationFailedEvent(ctx.Message.OrderId));
}
}
}
// 步驟 3:Payment Service 監聽並處理付款
public class StockReservedHandler : IConsumer<StockReservedEvent>
{
public async Task Consume(ConsumeContext<StockReservedEvent> ctx)
{
try
{
await _payment.ChargeAsync(ctx.Message.OrderId);
await ctx.Publish(new PaymentCompletedEvent(ctx.Message.OrderId));
}
catch (PaymentFailedException)
{
await ctx.Publish(new PaymentFailedEvent(ctx.Message.OrderId));
}
}
}
// 補償:庫存服務監聽付款失敗,釋放庫存
public class PaymentFailedHandler : IConsumer<PaymentFailedEvent>
{
public async Task Consume(ConsumeContext<PaymentFailedEvent> ctx)
{
await _inventory.ReleaseStockAsync(ctx.Message.OrderId);
await ctx.Publish(new StockReleasedEvent(ctx.Message.OrderId));
}
}
Orchestration(指揮)模式
由一個中央的 Saga 協調者來控制流程。
// ── Saga 協調者 ──
public class OrderSagaOrchestrator : ISaga<OrderSagaState>
{
public async Task HandleAsync(OrderCreatedEvent evt, ISagaContext context)
{
// 步驟 1:要求庫存服務預留庫存
await context.Send(new ReserveStockCommand(evt.OrderId, evt.Items));
}
public async Task HandleAsync(StockReservedEvent evt, ISagaContext context)
{
// 步驟 2:要求付款服務扣款
await context.Send(new ProcessPaymentCommand(evt.OrderId));
}
public async Task HandleAsync(PaymentCompletedEvent evt, ISagaContext context)
{
// 步驟 3:訂單確認完成
await context.Send(new ConfirmOrderCommand(evt.OrderId));
}
// ── 補償流程 ──
public async Task HandleAsync(PaymentFailedEvent evt, ISagaContext context)
{
// 補償:釋放庫存 → 取消訂單
await context.Send(new ReleaseStockCommand(evt.OrderId));
await context.Send(new CancelOrderCommand(evt.OrderId, "付款失敗"));
}
public async Task HandleAsync(StockReservationFailedEvent evt, ISagaContext context)
{
// 補償:直接取消訂單
await context.Send(new CancelOrderCommand(evt.OrderId, "庫存不足"));
}
}
📌 最終一致性 (Eventual Consistency)
微服務架構接受:資料不會立即一致,但最終會一致。
強一致性(單體):下單 → 扣庫存 → 扣款 → 全部完成(同步)
最終一致性(微服務):
t=0s 訂單建立(Pending)
t=1s 庫存扣減(Processing)
t=3s 付款完成(Confirmed)← 最終一致
📌 Outbox Pattern — 可靠事件發布
確保資料庫操作和事件發布的原子性。
// ── 問題:資料庫寫入成功但事件發布失敗 ──
await _db.Orders.AddAsync(order);
await _db.SaveChangesAsync(); // ✅ 成功
await _publisher.Publish(event); // ❌ RabbitMQ 掛了!事件遺失!
// ── 解決方案:Outbox Pattern ──
public class OutboxMessage
{
public Guid Id { get; set; }
public string EventType { get; set; } = string.Empty;
public string Payload { get; set; } = string.Empty;
public DateTime CreatedAt { get; set; }
public DateTime? ProcessedAt { get; set; }
}
// 在同一個資料庫交易中寫入訂單和 Outbox 訊息
using var transaction = await _db.Database.BeginTransactionAsync();
await _db.Orders.AddAsync(order);
await _db.OutboxMessages.AddAsync(new OutboxMessage
{
Id = Guid.NewGuid(),
EventType = nameof(OrderCreatedEvent),
Payload = JsonSerializer.Serialize(new OrderCreatedEvent(order.Id)),
CreatedAt = DateTime.UtcNow
});
await _db.SaveChangesAsync();
await transaction.CommitAsync(); // 兩者在同一個交易中,保證原子性
// ── 背景服務定期掃描 Outbox 並發布事件 ──
public class OutboxProcessor : BackgroundService
{
protected override async Task ExecuteAsync(CancellationToken ct)
{
while (!ct.IsCancellationRequested)
{
var messages = await _db.OutboxMessages
.Where(m => m.ProcessedAt == null)
.OrderBy(m => m.CreatedAt)
.Take(20)
.ToListAsync(ct);
foreach (var msg in messages)
{
await _publisher.PublishRawAsync(msg.EventType, msg.Payload);
msg.ProcessedAt = DateTime.UtcNow;
}
await _db.SaveChangesAsync(ct);
await Task.Delay(TimeSpan.FromSeconds(5), ct);
}
}
}
📌 CQRS(命令查詢職責分離)
// ── 寫入端(Command) ──
public class CreateOrderCommandHandler
{
private readonly OrderDbContext _writeDb;
public async Task HandleAsync(CreateOrderCommand cmd)
{
var order = Order.Create(cmd.CustomerId, cmd.Items);
_writeDb.Orders.Add(order);
await _writeDb.SaveChangesAsync();
// 發布事件給讀取端
}
}
// ── 讀取端(Query) ──
public class OrderQueryService
{
private readonly IReadOnlyRepository _readDb; // 可以是不同的資料庫
public async Task<OrderDetailDto> GetOrderDetailAsync(Guid orderId)
{
// 讀取端的資料模型是為查詢最佳化的(反正規化)
return await _readDb.GetAsync<OrderDetailDto>(orderId);
}
}
📌 Event Sourcing 事件溯源基礎
// 不儲存當前狀態,而是儲存所有發生過的事件
public class OrderEventStore
{
public async Task AppendAsync(Guid orderId, IDomainEvent @event)
{
await _store.AppendToStreamAsync($"order-{orderId}", new EventData
{
EventType = @event.GetType().Name,
Data = JsonSerializer.Serialize(@event),
Timestamp = DateTime.UtcNow
});
}
public async Task<Order> LoadAsync(Guid orderId)
{
var events = await _store.ReadStreamAsync($"order-{orderId}");
var order = new Order();
foreach (var evt in events)
{
order.Apply(evt); // 重播事件來重建狀態
}
return order;
}
}
下一章: 我們將學習微服務的可觀測性 — 日誌、追蹤與指標的完整方案。