RabbitMQ
RabbitMQ
RabbitMQ 是一个开源且轻量级的消息代理,支持多种消息协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。此外,它是在云环境中部署消息传递的最流行方式,并且可以在本地使用。
安装
要开始构建基于 RabbitMQ 的微服务,首先安装所需的包:
$ npm i --save amqplib
$ npm i --save-dev @types/amqplib
概述
要使用 RabbitMQ 传输器,请将以下选项对象传递给 createMicroservice() 方法:
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
});
提示 Transport 枚举从 @nestjs/microservices 包导入。
选项
options 对象特定于所选的传输器。RabbitMQ 传输器公开了下面描述的属性。
| 属性 | 描述 |
|---|---|
urls | 连接 URL |
queue | 您的服务器将监听的队列名称 |
prefetchCount | 设置通道的预取计数 |
isGlobalPrefetchCount | 启用每个通道的预取计数 |
noAck | 如果为 false,手动确认模式启用 |
queueOptions | 其他队列选项(这里查看可用选项) |
socketOptions | 其他套接字选项(这里查看可用选项) |
headers | 要与每条消息一起发送的头部 |
客户端
与其他微服务传输器一样,您有多种选项来创建 RabbitMQ ClientProxy 实例。
创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递一个选项对象,该对象具有与上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。阅读更多关于 ClientsModule 的信息这里。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.RMQ,
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
queueOptions: {
durable: false
},
},
},
]),
]
...
})
其他创建客户端的选项(ClientProxyFactory 或 @Client())也可以使用。您可以在这里阅读相关信息。
上下文
在更复杂的场景中,您可能需要访问有关传入请求的其他信息。使用 RabbitMQ 传输器时,您可以访问 RmqContext 对象。
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(`Pattern: ${context.getPattern()}`);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(`Pattern: ${context.getPattern()}`);
}
提示 @Payload()、@Ctx() 和 RmqContext 从 @nestjs/microservices 包导入。
要访问原始 RabbitMQ 消息(带有 properties、fields 和 content),请使用 RmqContext 对象的 getMessage() 方法,如下所示:
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getMessage());
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(context.getMessage());
}
要检索对 RabbitMQ 通道的引用,请使用 RmqContext 对象的 getChannelRef 方法,如下所示:
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
console.log(context.getChannelRef());
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(context.getChannelRef());
}
消息确认
要确保消息永远不会丢失,RabbitMQ 支持消息确认。确认由消费者发送回 RabbitMQ,告诉 RabbitMQ 特定消息已被接收、处理,并且 RabbitMQ 可以自由删除它。如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而没有发送确认,RabbitMQ 将理解消息未完全处理并将重新排队。
要启用手动确认模式,请将 noAck 属性设置为 false:
options: {
urls: ['amqp://localhost:5672'],
queue: 'cats_queue',
noAck: false,
queueOptions: {
durable: false
},
}
当手动消费者确认打开时,我们必须从工作者发送适当的确认以指示我们已完成任务。
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
const channel = context.getChannelRef();
const originalMsg = context.getMessage();
channel.ack(originalMsg);
}
记录构建器
要配置消息选项,您可以使用 RmqRecordBuilder 类(注意:这对于基于事件的流也是可行的)。例如,要设置 headers 和 priority 属性,请使用 setOptions 方法,如下所示:
const message = new RmqRecordBuilder(':cat:')
.setOptions({
headers: {
['x-version']: '1.0.0',
},
priority: 3,
})
.build();
this.client.send('replace-emoji', message).subscribe(...);
提示 RmqRecordBuilder 类从 @nestjs/microservices 包导出。
您也可以在服务器端读取这些选项,通过访问 RmqContext,如下所示:
@@filename()
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
const { properties } = context.getMessage();
return properties.headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
const { properties } = context.getMessage();
return properties.headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
实例状态更新
要获取连接和底层驱动实例状态的实时更新,您可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 RabbitMQ 驱动,status 流发出 connected 和 disconnected 事件。
this.client.status.subscribe((status: RmqStatus) => {
console.log(status);
});
提示 RmqStatus 类型从 @nestjs/microservices 包导入。
同样,您可以订阅服务器的 status 流以接收有关服务器状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: RmqStatus) => {
console.log(status);
});
监听 RabbitMQ 事件
在某些情况下,您可能希望监听微服务发出的内部事件。例如,您可以监听 error 事件以在发生错误时触发其他操作。为此,请使用 on() 方法,如下所示:
this.client.on('error', (err) => {
console.error(err);
});
同样,您可以监听服务器的内部事件:
server.on<RmqEvents>('error', (err) => {
console.error(err);
});
提示 RmqEvents 类型从 @nestjs/microservices 包导入。
底层驱动访问
对于更高级的用例,您可能需要访问底层驱动实例。这对于手动关闭连接或使用驱动特定方法等场景很有用。但是,请记住,在大多数情况下,您不应该需要直接访问驱动。
为此,您可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定您期望的驱动实例类型。
const amqpConnection = this.client.unwrap<import('amqplib').Connection>();
同样,您可以访问服务器的底层驱动实例:
const amqpConnection = server.unwrap<import('amqplib').Connection>();