NATS
NATS
NATS 是一个简单、安全且高性能的开源消息系统,适用于云原生应用程序、IoT 消息传递和微服务架构。NATS 服务器使用 Go 编程语言编写,但与服务器交互的客户端库可用于数十种主要编程语言。NATS 支持最多一次和至少一次交付。它可以在任何地方运行,从大型服务器和云实例,到边缘网关甚至物联网设备。
安装
要开始构建基于 NATS 的微服务,首先安装所需的包:
$ npm i --save nats
概述
要使用 NATS 传输器,请将以下选项对象传递给 createMicroservice() 方法:
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
},
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
},
});
提示 Transport 枚举从 @nestjs/microservices 包导入。
选项
options 对象特定于所选的传输器。NATS 传输器公开了这里描述的属性以及以下属性:
| 属性 | 描述 |
|---|---|
queue | 您的服务器应该订阅的队列(留空 undefined 以忽略此设置)。阅读更多关于 NATS 队列组的信息见下文。 |
gracefulShutdown | 启用优雅关闭。启用时,服务器首先取消订阅所有通道,然后关闭连接。默认为 false。 |
gracePeriod | 取消订阅所有通道后等待服务器的时间(毫秒)。默认为 10000 毫秒。 |
客户端
与其他微服务传输器一样,您有多种选项来创建 NATS ClientProxy 实例。
创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递一个选项对象,该对象具有与上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。阅读更多关于 ClientsModule 的信息这里。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
}
},
]),
]
...
})
其他创建客户端的选项(ClientProxyFactory 或 @Client())也可以使用。您可以在这里阅读相关信息。
请求-响应
对于请求-响应消息样式(阅读更多),NATS 传输器不使用 NATS 内置的请求-回复机制。相反,"请求"使用 publish() 方法在给定主题上发布,并带有唯一的回复主题名称,响应者监听该主题并将响应发送到回复主题。回复主题动态定向回请求者,无论任何一方的位置如何。
基于事件
对于基于事件的消息样式(阅读更多),NATS 传输器使用 NATS 内置的发布-订阅机制。发布者在主题上发送消息,任何监听该主题的活动订阅者都会收到消息。订阅者还可以注册对通配符主题的兴趣,这些主题的工作方式有点像正则表达式。这种一对多模式有时称为扇出。
队列组
NATS 提供了一个称为分布式队列的内置负载平衡功能。要创建队列订阅,请使用 queue 属性,如下所示:
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
queue: 'cats_queue',
},
});
上下文
在更复杂的场景中,您可能需要访问有关传入请求的其他信息。使用 NATS 传输器时,您可以访问 NatsContext 对象。
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: NatsContext) {
console.log(`Subject: ${context.getSubject()}`);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(`Subject: ${context.getSubject()}`);
}
提示 @Payload()、@Ctx() 和 NatsContext 从 @nestjs/microservices 包导入。
通配符
订阅可以是显式主题,也可以包含通配符。
@@filename()
@MessagePattern('time.us.*')
getDate(@Payload() data: number[], @Ctx() context: NatsContext) {
console.log(`Subject: ${context.getSubject()}`); // 例如 "time.us.east"
return new Date().toLocaleTimeString(...);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('time.us.*')
getDate(data, context) {
console.log(`Subject: ${context.getSubject()}`); // 例如 "time.us.east"
return new Date().toLocaleTimeString(...);
}
记录构建器
要配置消息选项,您可以使用 NatsRecordBuilder 类(注意:这对于基于事件的流也是可行的)。例如,要添加 x-version 头部,请使用 setHeaders 方法,如下所示:
import * as nats from 'nats';
// 在您的代码中的某处
const headers = nats.headers();
headers.set('x-version', '1.0.0');
const record = new NatsRecordBuilder(':cat:').setHeaders(headers).build();
this.client.send('replace-emoji', record).subscribe(...);
提示 NatsRecordBuilder 类从 @nestjs/microservices 包导出。
您也可以在服务器端读取这些头部,通过访问 NatsContext,如下所示:
@@filename()
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: NatsContext): string {
const headers = context.getHeaders();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
const headers = context.getHeaders();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
在某些情况下,您可能希望为多个请求配置头部,您可以将这些作为选项传递给 ClientProxyFactory:
import { Module } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';
@Module({
providers: [
{
provide: 'API_v1',
useFactory: () =>
ClientProxyFactory.create({
transport: Transport.NATS,
options: {
servers: ['nats://localhost:4222'],
headers: { 'x-version': '1.0.0' },
},
}),
},
],
})
export class ApiModule {}
实例状态更新
要获取连接和底层驱动实例状态的实时更新,您可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 NATS 驱动,status 流发出 connected、disconnected 和 reconnecting 事件。
this.client.status.subscribe((status: NatsStatus) => {
console.log(status);
});
提示 NatsStatus 类型从 @nestjs/microservices 包导入。
同样,您可以订阅服务器的 status 流以接收有关服务器状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: NatsStatus) => {
console.log(status);
});
监听 NATS 事件
在某些情况下,您可能希望监听微服务发出的内部事件。例如,您可以监听 error 事件以在发生错误时触发其他操作。为此,请使用 on() 方法,如下所示:
this.client.on('error', (err) => {
console.error(err);
});
同样,您可以监听服务器的内部事件:
server.on<NatsEvents>('error', (err) => {
console.error(err);
});
提示 NatsEvents 类型从 @nestjs/microservices 包导入。
底层驱动访问
对于更高级的用例,您可能需要访问底层驱动实例。这对于手动关闭连接或使用驱动特定方法等场景很有用。但是,请记住,在大多数情况下,您不应该需要直接访问驱动。
为此,您可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定您期望的驱动实例类型。
const natsConnection = this.client.unwrap<import('nats').NatsConnection>();
同样,您可以访问服务器的底层驱动实例:
const natsConnection = server.unwrap<import('nats').NatsConnection>();