MQTT
MQTT
MQTT(消息队列遥测传输)是一个开源的轻量级消息传输协议,针对低延迟进行了优化。该协议提供了一种可扩展且成本效益高的方式,使用发布/订阅模型连接设备。基于 MQTT 构建的通信系统由发布服务器、代理和一个或多个客户端组成。它专为受限设备和低带宽、高延迟或不可靠的网络而设计。
安装
要开始构建基于 MQTT 的微服务,首先安装所需的包:
$ npm i --save mqtt
概述
要使用 MQTT 传输器,请将以下选项对象传递给 createMicroservice() 方法:
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
},
});
提示 Transport 枚举从 @nestjs/microservices 包中导入。
选项
options 对象特定于所选的传输器。MQTT 传输器公开了这里描述的属性。
客户端
与其他微服务传输器一样,您有多种选项来创建 MQTT ClientProxy 实例。
创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递一个选项对象,该对象具有与上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。在这里阅读更多关于 ClientsModule 的信息。
@Module({
imports: [
ClientsModule.register([
{
name: 'MATH_SERVICE',
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
}
},
]),
]
...
})
创建客户端的其他选项(ClientProxyFactory 或 @Client())也可以使用。您可以在这里阅读相关信息。
上下文
在更复杂的场景中,您可能需要访问有关传入请求的其他信息。使用 MQTT 传输器时,您可以访问 MqttContext 对象。
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
console.log(`Topic: ${context.getTopic()}`);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(`Topic: ${context.getTopic()}`);
}
提示 @Payload()、@Ctx() 和 MqttContext 从 @nestjs/microservices 包中导入。
要访问原始的 mqtt 数据包,请使用 MqttContext 对象的 getPacket() 方法,如下所示:
@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: MqttContext) {
console.log(context.getPacket());
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
console.log(context.getPacket());
}
通配符
订阅可以是显式主题,也可以包含通配符。有两个可用的通配符:+ 和 #。+ 是单级通配符,而 # 是多级通配符,涵盖多个主题级别。
@@filename()
@MessagePattern('sensors/+/temperature/+')
getTemperature(@Ctx() context: MqttContext) {
console.log(`Topic: ${context.getTopic()}`);
}
@@switch
@Bind(Ctx())
@MessagePattern('sensors/+/temperature/+')
getTemperature(context) {
console.log(`Topic: ${context.getTopic()}`);
}
服务质量 (QoS)
使用 @MessagePattern 或 @EventPattern 装饰器创建的任何订阅都将以 QoS 0 订阅。如果需要更高的 QoS,可以在建立连接时使用 subscribeOptions 块全局设置,如下所示:
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
subscribeOptions: {
qos: 2
},
},
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1883',
subscribeOptions: {
qos: 2
},
},
});
如果需要特定主题的 QoS,请考虑创建自定义传输器。
记录构建器
要配置消息选项(调整 QoS 级别、设置 Retain 或 DUP 标志,或向有效负载添加其他属性),您可以使用 MqttRecordBuilder 类。例如,要将 QoS 设置为 2,请使用 setQoS 方法,如下所示:
const userProperties = { 'x-version': '1.0.0' };
const record = new MqttRecordBuilder(':cat:')
.setProperties({ userProperties })
.setQoS(1)
.build();
client.send('replace-emoji', record).subscribe(...);
提示 MqttRecordBuilder 类从 @nestjs/microservices 包中导出。
您也可以在服务器端读取这些选项,通过访问 MqttContext。
@@filename()
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: MqttContext): string {
const { properties: { userProperties } } = context.getPacket();
return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
const { properties: { userProperties } } = context.getPacket();
return userProperties['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
在某些情况下,您可能希望为多个请求配置用户属性,您可以将这些选项传递给 ClientProxyFactory。
import { Module } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';
@Module({
providers: [
{
provide: 'API_v1',
useFactory: () =>
ClientProxyFactory.create({
transport: Transport.MQTT,
options: {
url: 'mqtt://localhost:1833',
userProperties: { 'x-version': '1.0.0' },
},
}),
},
],
})
export class ApiModule {}
实例状态更新
要获取连接和底层驱动程序实例状态的实时更新,您可以订阅 status 流。此流提供特定于所选驱动程序的状态更新。对于 MQTT 驱动程序,status 流发出 connected、disconnected、reconnecting 和 closed 事件。
this.client.status.subscribe((status: MqttStatus) => {
console.log(status);
});
提示 MqttStatus 类型从 @nestjs/microservices 包中导入。
同样,您可以订阅服务器的 status 流以接收有关服务器状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: MqttStatus) => {
console.log(status);
});
监听 MQTT 事件
在某些情况下,您可能希望监听微服务发出的内部事件。例如,您可以监听 error 事件,以在发生错误时触发其他操作。为此,请使用 on() 方法,如下所示:
this.client.on('error', (err) => {
console.error(err);
});
同样,您可以监听服务器的内部事件:
server.on<MqttEvents>('error', (err) => {
console.error(err);
});
提示 MqttEvents 类型从 @nestjs/microservices 包中导入。
底层驱动程序访问
对于更高级的用例,您可能需要访问底层驱动程序实例。这对于手动关闭连接或使用特定于驱动程序的方法等场景很有用。但是,请记住,在大多数情况下,您不应该需要直接访问驱动程序。
为此,您可以使用 unwrap() 方法,该方法返回底层驱动程序实例。泛型类型参数应指定您期望的驱动程序实例类型。
const mqttClient = this.client.unwrap<import('mqtt').MqttClient>();
同样,您可以访问服务器的底层驱动程序实例:
const mqttClient = server.unwrap<import('mqtt').MqttClient>();