nodejs分布式事务实现示例

在分布式系统中,分布式事务的实现依赖于框架或服务的协调能力。不同的框架/服务基于TCC、Saga、事务消息等模式,提供了开箱即用的解决方案。以下从通用框架/服务Node.js生态特有的实现两方面介绍,并结合Node.js示例说明具体用法。

一、支持分布式事务的主流框架/服务分类

分布式事务框架/服务通常按实现模式分类,常见类型及代表如下:

事务模式 核心思想 主流框架/服务 适用场景
TCC模式 Try-Confirm-Cancel:分阶段提交,预留资源→确认/取消 Seata(Java)、tcc-transaction(多语言)、tcc-transaction-nodejs(Node.js) 高实时性场景(如金融转账)
Saga模式 拆分本地事务+补偿事务,最终一致性 Saga.js(Node.js)、node-saga、AWS Step Functions(托管服务) 长流程场景(如电商下单、物流)
事务消息模式 基于消息队列的最终一致性(本地消息表+消息确认) RocketMQ事务消息、RabbitMQ结合本地表实现、Bull(Node.js消息队列)+自定义逻辑 异步通信为主的场景(如订单通知)
托管服务 云厂商提供的分布式事务协调服务 AWS Step Functions、阿里云Seata托管版、Azure Durable Functions 无需维护底层,适合云原生架构

二、Node.js生态中的分布式事务框架及示例

Node.js作为异步非阻塞的语言,在微服务中应用广泛,其生态中支持分布式事务的框架虽不如Java丰富,但针对Saga、TCC等模式已有成熟工具。以下以Saga模式TCC模式为例,结合具体框架说明实现方式。

1. Saga模式:saga.js(轻量级Saga框架)

saga.js是Node.js中专注于Saga模式的轻量级框架,通过定义本地事务和补偿事务,自动协调执行顺序,支持失败时的补偿回滚。

核心特性

  • 声明式定义事务步骤和补偿逻辑;
  • 支持异步操作(Promise);
  • 内置失败重试和补偿触发机制。

示例:Node.js实现电商下单Saga事务
假设场景:下单流程涉及3个服务(订单、支付、库存),用saga.js实现编排式Saga(中央协调)。

步骤1:安装依赖
1
npm install saga.js
步骤2:定义本地事务和补偿事务

每个服务的本地事务和补偿事务通过函数定义(返回Promise):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 订单服务:本地事务和补偿事务
const orderService = {
// 本地事务:创建订单(状态:待支付)
createOrder: async (orderId, userId, productId) => {
console.log(`订单服务:创建订单 ${orderId},用户 ${userId},商品 ${productId}`);
// 实际中调用数据库插入操作(如使用Prisma/Sequelize)
await db.orders.create({ id: orderId, userId, productId, status: 'PENDING' });
return { orderId, status: 'PENDING' };
},
// 补偿事务:取消订单(状态:已取消)
cancelOrder: async (orderId) => {
console.log(`订单服务:取消订单 ${orderId}`);
await db.orders.update({ status: 'CANCELLED' }, { where: { id: orderId } });
}
};

// 支付服务:本地事务和补偿事务
const paymentService = {
// 本地事务:扣减用户余额
deductBalance: async (userId, amount) => {
console.log(`支付服务:用户 ${userId} 扣减余额 ${amount}`);
const user = await db.users.findByPk(userId);
if (user.balance < amount) {
throw new Error(`用户 ${userId} 余额不足`); // 模拟支付失败
}
await db.users.update({ balance: user.balance - amount }, { where: { id: userId } });
},
// 补偿事务:退款(恢复余额)
refund: async (userId, amount) => {
console.log(`支付服务:用户 ${userId} 退款 ${amount}`);
const user = await db.users.findByPk(userId);
await db.users.update({ balance: user.balance + amount }, { where: { id: userId } });
}
};

// 库存服务:本地事务和补偿事务
const inventoryService = {
// 本地事务:扣减库存
deductStock: async (productId, quantity) => {
console.log(`库存服务:商品 ${productId} 扣减库存 ${quantity}`);
const product = await db.products.findByPk(productId);
if (product.stock < quantity) {
throw new Error(`商品 ${productId} 库存不足`); // 模拟库存不足失败
}
await db.products.update({ stock: product.stock - quantity }, { where: { id: productId } });
},
// 补偿事务:恢复库存
restoreStock: async (productId, quantity) => {
console.log(`库存服务:商品 ${productId} 恢复库存 ${quantity}`);
const product = await db.products.findByPk(productId);
await db.products.update({ stock: product.stock + quantity }, { where: { id: productId } });
}
};
步骤3:用saga.js定义Saga流程

通过saga.jsSaga类定义事务步骤,指定每个步骤的执行函数和补偿函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
const { Saga } = require('saga.js');

// 创建Saga实例(协调者)
const orderSaga = new Saga();

// 定义Saga步骤:按顺序执行,每个步骤包含执行函数和补偿函数
orderSaga.step(
// 步骤1:创建订单(执行函数)
async (args) => {
const { orderId, userId, productId } = args;
return await orderService.createOrder(orderId, userId, productId);
},
// 步骤1的补偿函数(参数:步骤1的返回结果)
async (result) => {
await orderService.cancelOrder(result.orderId);
}
)
.step(
// 步骤2:扣减余额(依赖步骤1的结果)
async (args, prevResult) => { // prevResult是步骤1的返回值
const { userId, amount } = args;
await paymentService.deductBalance(userId, amount);
return { userId, amount };
},
// 步骤2的补偿函数
async (result) => {
await paymentService.refund(result.userId, result.amount);
}
)
.step(
// 步骤3:扣减库存
async (args) => {
const { productId, quantity } = args;
await inventoryService.deductStock(productId, quantity);
return { productId, quantity };
},
// 步骤3的补偿函数
async (result) => {
await inventoryService.restoreStock(result.productId, result.quantity);
}
);
步骤4:执行Saga事务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 执行Saga事务(传入参数)
const runSaga = async () => {
const args = {
orderId: 'order_123',
userId: 'user_456',
productId: 'product_789',
amount: 100, // 支付金额
quantity: 2 // 购买数量
};

try {
// 执行Saga事务
await orderSaga.execute(args);
console.log('Saga事务执行成功:订单创建完成');
} catch (error) {
console.error('Saga事务执行失败,已触发补偿:', error.message);
}
};

runSaga();

执行结果分析

  • 正常流程:依次执行“创建订单→扣减余额→扣减库存”,全部成功则事务完成;
  • 失败场景(如步骤2支付失败):saga.js自动触发步骤1的补偿函数(取消订单),实现回滚。

2. TCC模式:tcc-transaction-nodejs(TCC框架)

tcc-transaction-nodejs是Node.js中实现TCC模式的框架,通过Try-Confirm-Cancel三阶段操作,保证分布式事务的一致性。

核心特性

  • 支持分布式事务上下文传递;
  • 基于Redis存储事务日志(用于崩溃恢复);
  • 自动处理Confirm/Cancel的触发。

示例:Node.js实现转账TCC事务
场景:用户A向用户B转账100元,涉及两个账户服务。

步骤1:安装依赖
1
npm install tcc-transaction-nodejs ioredis
步骤2:初始化TCC框架(配置Redis存储)
1
2
3
4
5
6
7
8
9
10
11
const { TCC } = require('tcc-transaction-nodejs');
const Redis = require('ioredis');

// 初始化Redis(存储事务日志)
const redis = new Redis({ host: 'localhost', port: 6379 });

// 初始化TCC框架
const tcc = new TCC({
redis,
serviceName: 'transfer-service' // 当前服务名称
});
步骤3:定义TCC三阶段接口
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// 账户服务:TCC接口
const accountService = {
// Try阶段:冻结用户A的100元(预留资源)
tryFreeze: async (ctx, userId, amount) => {
console.log(`Try:冻结用户 ${userId}${amount} 元`);
// 实际操作:更新数据库,增加冻结金额(如:balance=balance-amount, freeze=freeze+amount)
await db.accounts.update(
{ balance: db.sequelize.literal(`balance - ${amount}`), freeze: db.sequelize.literal(`freeze + ${amount}`) },
{ where: { id: userId } }
);
},

// Confirm阶段:确认扣减(将冻结金额真正扣除)
confirm: async (ctx, userId, amount) => {
console.log(`Confirm:确认扣减用户 ${userId}${amount} 元`);
await db.accounts.update(
{ freeze: db.sequelize.literal(`freeze - ${amount}`) }, // 解冻并扣减
{ where: { id: userId } }
);
},

// Cancel阶段:取消冻结(释放资源)
cancel: async (ctx, userId, amount) => {
console.log(`Cancel:取消冻结用户 ${userId}${amount} 元`);
await db.accounts.update(
{ balance: db.sequelize.literal(`balance + ${amount}`), freeze: db.sequelize.literal(`freeze - ${amount}`) },
{ where: { id: userId } }
);
}
};
步骤4:定义TCC事务并执行
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// 定义转账TCC事务
const transferTCC = async () => {
const userIdA = 'user_A';
const userIdB = 'user_B';
const amount = 100;

// 创建TCC事务上下文
const ctx = await tcc.begin();

try {
// 调用用户A的Try阶段(冻结金额)
await tcc.call(ctx, accountService.tryFreeze, userIdA, amount);
// 调用用户B的Try阶段(预留收款金额,此处简化为直接冻结B的0元,实际可能是增加可收款额度)
await tcc.call(ctx, accountService.tryFreeze, userIdB, 0); // B的Try阶段无实际冻结

// 所有Try成功,提交事务(触发Confirm)
await tcc.commit(ctx);
console.log('TCC事务成功:转账完成');
} catch (error) {
// Try失败,回滚事务(触发Cancel)
await tcc.rollback(ctx);
console.error('TCC事务失败,已回滚:', error.message);
}
};

transferTCC();

执行逻辑

  • Try阶段:冻结A的100元(预留资源),确保B可收款;
  • 若Try全部成功,TCC框架自动调用Confirm阶段(A扣减100,B增加100);
  • 若Try失败,框架调用Cancel阶段(解冻A的100元),释放资源。

3. 事务消息模式:Bull(消息队列)+ 本地消息表

Node.js中常用的消息队列Bull(基于Redis)可结合“本地消息表”实现事务消息模式,通过消息的可靠投递保证最终一致性。

核心思路

  1. 本地事务执行时,同时写入“消息表”(记录待发送消息);
  2. 事务提交后,通过定时任务将消息表中的消息发送到队列;
  3. 接收方消费消息并执行本地事务,确认后删除消息表记录。

示例:订单创建后发送通知(最终一致性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
const Queue = require('bull');
const { sequelize } = require('./db'); // 假设使用Sequelize ORM

// 1. 初始化Bull队列
const notificationQueue = new Queue('order-notification', {
redis: { host: 'localhost', port: 6379 }
});

// 2. 定义本地消息表模型(记录待发送消息)
const Message = sequelize.define('Message', {
id: { type: Sequelize.STRING, primaryKey: true },
content: Sequelize.JSON, // 消息内容(如订单信息)
status: Sequelize.STRING // 状态:PENDING/SENT
});

// 3. 本地事务+消息表写入
const createOrderWithMessage = async (orderData) => {
// 开启本地事务
const transaction = await sequelize.transaction();
try {
// 步骤1:创建订单(本地事务)
const order = await Order.create(orderData, { transaction });

// 步骤2:写入消息表(待发送通知)
await Message.create({
id: `msg_${order.id}`,
content: { orderId: order.id, userId: order.userId, message: '订单已创建' },
status: 'PENDING'
}, { transaction });

// 提交事务:订单和消息表同时生效
await transaction.commit();
return order;
} catch (error) {
await transaction.rollback();
throw error;
}
};

// 4. 定时任务:发送消息表中的PENDING消息
const sendPendingMessages = async () => {
const pendingMessages = await Message.findAll({ where: { status: 'PENDING' } });
for (const msg of pendingMessages) {
try {
// 发送消息到队列
await notificationQueue.add(msg.content);
// 更新消息状态为SENT
await msg.update({ status: 'SENT' });
} catch (error) {
console.error('消息发送失败,将重试:', msg.id);
}
}
};

// 每10秒执行一次定时任务
setInterval(sendPendingMessages, 10000);

// 5. 消费队列消息(通知服务)
notificationQueue.process(async (job) => {
const { orderId, userId, message } = job.data;
console.log(`通知服务:向用户 ${userId} 发送订单 ${orderId} 通知:${message}`);
// 执行通知逻辑(如发送短信/推送)
});

三、云服务:AWS Step Functions(托管Saga服务)

对于无需维护底层框架的场景,可使用AWS Step Functions(托管状态机服务),通过JSON定义Saga流程,Node.js通过API调用触发,适合云原生微服务。

核心优势

  • 可视化定义事务流程;
  • 自动处理失败重试和补偿;
  • 无需维护消息队列或协调器。

示例:Node.js调用AWS Step Functions执行Saga

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
const AWS = require('aws-sdk');
const stepfunctions = new AWS.StepFunctions();

// 定义Saga流程(状态机JSON)
const stateMachineDefinition = {
Comment: "电商下单Saga流程",
StartAt: "CreateOrder",
States: {
CreateOrder: {
Type: "Task",
Resource: "arn:aws:lambda:us-east-1:123456789:function:create-order", // 订单Lambda
Next: "DeductBalance",
Catch: [{ // 失败时触发补偿
ErrorEquals: ["States.ALL"],
Next: "CancelOrder"
}]
},
DeductBalance: {
Type: "Task",
Resource: "arn:aws:lambda:us-east-1:123456789:function:deduct-balance", // 支付Lambda
Next: "DeductStock",
Catch: [{
ErrorEquals: ["States.ALL"],
Next: "CancelOrder" // 失败时回滚订单
}]
},
DeductStock: {
Type: "Task",
Resource: "arn:aws:lambda:us-east-1:123456789:function:deduct-stock", // 库存Lambda
End: true,
Catch: [{
ErrorEquals: ["States.ALL"],
Next: "RefundAndCancel" // 失败时回滚支付和订单
}]
},
// 补偿步骤:取消订单
CancelOrder: {
Type: "Task",
Resource: "arn:aws:lambda:us-east-1:123456789:function:cancel-order",
End: true
},
// 补偿步骤:退款+取消订单
RefundAndCancel: {
Type: "Task",
Resource: "arn:aws:lambda:us-east-1:123456789:function:refund-and-cancel",
End: true
}
}
};

// 启动状态机执行
const startSaga = async () => {
const params = {
stateMachineArn: "arn:aws:states:us-east-1:123456789:stateMachine:OrderSaga",
input: JSON.stringify({
orderId: "order_123",
userId: "user_456",
productId: "product_789"
})
};

const result = await stepfunctions.startExecution(params).promise();
console.log("Saga流程启动:", result.executionArn);
};

startSaga();

四、总结与选型建议

Node.js中分布式事务框架的选型需结合场景:

  • 短流程、低延迟:优先TCC模式(tcc-transaction-nodejs),适合金融转账等实时场景;
  • 长流程、多服务:优先Saga模式(saga.js),适合电商下单、物流等;
  • 异步通信为主:用Bull+本地消息表实现事务消息,保证最终一致性;
  • 云原生架构:选择AWS Step Functions等托管服务,减少运维成本。

无论哪种方案,需重点关注幂等性(避免重复执行)、日志追踪(便于问题排查)和失败重试策略(应对网络抖动)。