在分布式系统中,分布式事务的实现依赖于框架或服务的协调能力。不同的框架/服务基于TCC、Saga、事务消息等模式,提供了开箱即用的解决方案。以下从通用框架/服务和Node.js生态特有的实现两方面介绍,并结合Node.js示例说明具体用法。
一、支持分布式事务的主流框架/服务分类
分布式事务框架/服务通常按实现模式分类,常见类型及代表如下:
事务模式 |
核心思想 |
主流框架/服务 |
适用场景 |
TCC模式 |
Try-Confirm-Cancel:分阶段提交,预留资源→确认/取消 |
Seata(Java)、tcc-transaction(多语言)、tcc-transaction-nodejs(Node.js) |
高实时性场景(如金融转账) |
Saga模式 |
拆分本地事务+补偿事务,最终一致性 |
Saga.js(Node.js)、node-saga、AWS Step Functions(托管服务) |
长流程场景(如电商下单、物流) |
事务消息模式 |
基于消息队列的最终一致性(本地消息表+消息确认) |
RocketMQ事务消息、RabbitMQ结合本地表实现、Bull(Node.js消息队列)+自定义逻辑 |
异步通信为主的场景(如订单通知) |
托管服务 |
云厂商提供的分布式事务协调服务 |
AWS Step Functions、阿里云Seata托管版、Azure Durable Functions |
无需维护底层,适合云原生架构 |
二、Node.js生态中的分布式事务框架及示例
Node.js作为异步非阻塞的语言,在微服务中应用广泛,其生态中支持分布式事务的框架虽不如Java丰富,但针对Saga、TCC等模式已有成熟工具。以下以Saga模式和TCC模式为例,结合具体框架说明实现方式。
1. Saga模式:saga.js(轻量级Saga框架)
saga.js
是Node.js中专注于Saga模式的轻量级框架,通过定义本地事务和补偿事务,自动协调执行顺序,支持失败时的补偿回滚。
核心特性:
- 声明式定义事务步骤和补偿逻辑;
- 支持异步操作(Promise);
- 内置失败重试和补偿触发机制。
示例:Node.js实现电商下单Saga事务
假设场景:下单流程涉及3个服务(订单、支付、库存),用saga.js
实现编排式Saga(中央协调)。
步骤1:安装依赖
步骤2:定义本地事务和补偿事务
每个服务的本地事务和补偿事务通过函数定义(返回Promise):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| const orderService = { createOrder: async (orderId, userId, productId) => { console.log(`订单服务:创建订单 ${orderId},用户 ${userId},商品 ${productId}`); await db.orders.create({ id: orderId, userId, productId, status: 'PENDING' }); return { orderId, status: 'PENDING' }; }, cancelOrder: async (orderId) => { console.log(`订单服务:取消订单 ${orderId}`); await db.orders.update({ status: 'CANCELLED' }, { where: { id: orderId } }); } };
const paymentService = { deductBalance: async (userId, amount) => { console.log(`支付服务:用户 ${userId} 扣减余额 ${amount}`); const user = await db.users.findByPk(userId); if (user.balance < amount) { throw new Error(`用户 ${userId} 余额不足`); } await db.users.update({ balance: user.balance - amount }, { where: { id: userId } }); }, refund: async (userId, amount) => { console.log(`支付服务:用户 ${userId} 退款 ${amount}`); const user = await db.users.findByPk(userId); await db.users.update({ balance: user.balance + amount }, { where: { id: userId } }); } };
const inventoryService = { deductStock: async (productId, quantity) => { console.log(`库存服务:商品 ${productId} 扣减库存 ${quantity}`); const product = await db.products.findByPk(productId); if (product.stock < quantity) { throw new Error(`商品 ${productId} 库存不足`); } await db.products.update({ stock: product.stock - quantity }, { where: { id: productId } }); }, restoreStock: async (productId, quantity) => { console.log(`库存服务:商品 ${productId} 恢复库存 ${quantity}`); const product = await db.products.findByPk(productId); await db.products.update({ stock: product.stock + quantity }, { where: { id: productId } }); } };
|
步骤3:用saga.js定义Saga流程
通过saga.js
的Saga
类定义事务步骤,指定每个步骤的执行函数和补偿函数:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| const { Saga } = require('saga.js');
const orderSaga = new Saga();
orderSaga.step( async (args) => { const { orderId, userId, productId } = args; return await orderService.createOrder(orderId, userId, productId); }, async (result) => { await orderService.cancelOrder(result.orderId); } ) .step( async (args, prevResult) => { const { userId, amount } = args; await paymentService.deductBalance(userId, amount); return { userId, amount }; }, async (result) => { await paymentService.refund(result.userId, result.amount); } ) .step( async (args) => { const { productId, quantity } = args; await inventoryService.deductStock(productId, quantity); return { productId, quantity }; }, async (result) => { await inventoryService.restoreStock(result.productId, result.quantity); } );
|
步骤4:执行Saga事务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| const runSaga = async () => { const args = { orderId: 'order_123', userId: 'user_456', productId: 'product_789', amount: 100, quantity: 2 };
try { await orderSaga.execute(args); console.log('Saga事务执行成功:订单创建完成'); } catch (error) { console.error('Saga事务执行失败,已触发补偿:', error.message); } };
runSaga();
|
执行结果分析:
- 正常流程:依次执行“创建订单→扣减余额→扣减库存”,全部成功则事务完成;
- 失败场景(如步骤2支付失败):
saga.js
自动触发步骤1的补偿函数(取消订单),实现回滚。
2. TCC模式:tcc-transaction-nodejs(TCC框架)
tcc-transaction-nodejs
是Node.js中实现TCC模式的框架,通过Try-Confirm-Cancel三阶段操作,保证分布式事务的一致性。
核心特性:
- 支持分布式事务上下文传递;
- 基于Redis存储事务日志(用于崩溃恢复);
- 自动处理Confirm/Cancel的触发。
示例:Node.js实现转账TCC事务
场景:用户A向用户B转账100元,涉及两个账户服务。
步骤1:安装依赖
1
| npm install tcc-transaction-nodejs ioredis
|
步骤2:初始化TCC框架(配置Redis存储)
1 2 3 4 5 6 7 8 9 10 11
| const { TCC } = require('tcc-transaction-nodejs'); const Redis = require('ioredis');
const redis = new Redis({ host: 'localhost', port: 6379 });
const tcc = new TCC({ redis, serviceName: 'transfer-service' });
|
步骤3:定义TCC三阶段接口
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| const accountService = { tryFreeze: async (ctx, userId, amount) => { console.log(`Try:冻结用户 ${userId} 的 ${amount} 元`); await db.accounts.update( { balance: db.sequelize.literal(`balance - ${amount}`), freeze: db.sequelize.literal(`freeze + ${amount}`) }, { where: { id: userId } } ); },
confirm: async (ctx, userId, amount) => { console.log(`Confirm:确认扣减用户 ${userId} 的 ${amount} 元`); await db.accounts.update( { freeze: db.sequelize.literal(`freeze - ${amount}`) }, { where: { id: userId } } ); },
cancel: async (ctx, userId, amount) => { console.log(`Cancel:取消冻结用户 ${userId} 的 ${amount} 元`); await db.accounts.update( { balance: db.sequelize.literal(`balance + ${amount}`), freeze: db.sequelize.literal(`freeze - ${amount}`) }, { where: { id: userId } } ); } };
|
步骤4:定义TCC事务并执行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| const transferTCC = async () => { const userIdA = 'user_A'; const userIdB = 'user_B'; const amount = 100;
const ctx = await tcc.begin();
try { await tcc.call(ctx, accountService.tryFreeze, userIdA, amount); await tcc.call(ctx, accountService.tryFreeze, userIdB, 0);
await tcc.commit(ctx); console.log('TCC事务成功:转账完成'); } catch (error) { await tcc.rollback(ctx); console.error('TCC事务失败,已回滚:', error.message); } };
transferTCC();
|
执行逻辑:
- Try阶段:冻结A的100元(预留资源),确保B可收款;
- 若Try全部成功,TCC框架自动调用Confirm阶段(A扣减100,B增加100);
- 若Try失败,框架调用Cancel阶段(解冻A的100元),释放资源。
3. 事务消息模式:Bull(消息队列)+ 本地消息表
Node.js中常用的消息队列Bull
(基于Redis)可结合“本地消息表”实现事务消息模式,通过消息的可靠投递保证最终一致性。
核心思路:
- 本地事务执行时,同时写入“消息表”(记录待发送消息);
- 事务提交后,通过定时任务将消息表中的消息发送到队列;
- 接收方消费消息并执行本地事务,确认后删除消息表记录。
示例:订单创建后发送通知(最终一致性)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
| const Queue = require('bull'); const { sequelize } = require('./db');
const notificationQueue = new Queue('order-notification', { redis: { host: 'localhost', port: 6379 } });
const Message = sequelize.define('Message', { id: { type: Sequelize.STRING, primaryKey: true }, content: Sequelize.JSON, status: Sequelize.STRING });
const createOrderWithMessage = async (orderData) => { const transaction = await sequelize.transaction(); try { const order = await Order.create(orderData, { transaction });
await Message.create({ id: `msg_${order.id}`, content: { orderId: order.id, userId: order.userId, message: '订单已创建' }, status: 'PENDING' }, { transaction });
await transaction.commit(); return order; } catch (error) { await transaction.rollback(); throw error; } };
const sendPendingMessages = async () => { const pendingMessages = await Message.findAll({ where: { status: 'PENDING' } }); for (const msg of pendingMessages) { try { await notificationQueue.add(msg.content); await msg.update({ status: 'SENT' }); } catch (error) { console.error('消息发送失败,将重试:', msg.id); } } };
setInterval(sendPendingMessages, 10000);
notificationQueue.process(async (job) => { const { orderId, userId, message } = job.data; console.log(`通知服务:向用户 ${userId} 发送订单 ${orderId} 通知:${message}`); });
|
三、云服务:AWS Step Functions(托管Saga服务)
对于无需维护底层框架的场景,可使用AWS Step Functions(托管状态机服务),通过JSON定义Saga流程,Node.js通过API调用触发,适合云原生微服务。
核心优势:
- 可视化定义事务流程;
- 自动处理失败重试和补偿;
- 无需维护消息队列或协调器。
示例:Node.js调用AWS Step Functions执行Saga
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66
| const AWS = require('aws-sdk'); const stepfunctions = new AWS.StepFunctions();
const stateMachineDefinition = { Comment: "电商下单Saga流程", StartAt: "CreateOrder", States: { CreateOrder: { Type: "Task", Resource: "arn:aws:lambda:us-east-1:123456789:function:create-order", Next: "DeductBalance", Catch: [{ ErrorEquals: ["States.ALL"], Next: "CancelOrder" }] }, DeductBalance: { Type: "Task", Resource: "arn:aws:lambda:us-east-1:123456789:function:deduct-balance", Next: "DeductStock", Catch: [{ ErrorEquals: ["States.ALL"], Next: "CancelOrder" }] }, DeductStock: { Type: "Task", Resource: "arn:aws:lambda:us-east-1:123456789:function:deduct-stock", End: true, Catch: [{ ErrorEquals: ["States.ALL"], Next: "RefundAndCancel" }] }, CancelOrder: { Type: "Task", Resource: "arn:aws:lambda:us-east-1:123456789:function:cancel-order", End: true }, RefundAndCancel: { Type: "Task", Resource: "arn:aws:lambda:us-east-1:123456789:function:refund-and-cancel", End: true } } };
const startSaga = async () => { const params = { stateMachineArn: "arn:aws:states:us-east-1:123456789:stateMachine:OrderSaga", input: JSON.stringify({ orderId: "order_123", userId: "user_456", productId: "product_789" }) };
const result = await stepfunctions.startExecution(params).promise(); console.log("Saga流程启动:", result.executionArn); };
startSaga();
|
四、总结与选型建议
Node.js中分布式事务框架的选型需结合场景:
- 短流程、低延迟:优先TCC模式(
tcc-transaction-nodejs
),适合金融转账等实时场景;
- 长流程、多服务:优先Saga模式(
saga.js
),适合电商下单、物流等;
- 异步通信为主:用
Bull
+本地消息表实现事务消息,保证最终一致性;
- 云原生架构:选择AWS Step Functions等托管服务,减少运维成本。
无论哪种方案,需重点关注幂等性(避免重复执行)、日志追踪(便于问题排查)和失败重试策略(应对网络抖动)。