自定义传输器
自定义传输器
Nest 提供了多种开箱即用的传输器,以及允许开发者构建新的自定义传输策略的 API。传输器使您能够使用可插拔的通信层和非常简单的应用级消息协议通过网络连接组件(阅读完整文章)。
提示 使用 Nest 构建微服务并不一定意味着您必须使用 @nestjs/microservices 包。例如,如果您想与外部服务通信(比如用不同语言编写的其他微服务),您可能不需要 @nestjs/microservice 库提供的所有功能。实际上,如果您不需要让您声明式定义订阅者的装饰器(@EventPattern 或 @MessagePattern),运行一个独立应用程序并手动维护连接/订阅频道对于大多数用例来说应该足够了,并且会为您提供更多的灵活性。
使用自定义传输器,您可以集成任何消息系统/协议(包括 Google Cloud Pub/Sub、Amazon Kinesis 等)或扩展现有的传输器,在其基础上添加额外功能(例如,为 MQTT 添加 QoS)。
提示 为了更好地理解 Nest 微服务的工作原理以及如何扩展现有传输器的功能,我们建议阅读 NestJS Microservices in Action 和 Advanced NestJS Microservices 系列文章。
创建策略
首先,让我们定义一个表示自定义传输器的类。
import { CustomTransportStrategy, Server } from '@nestjs/microservices';
class GoogleCloudPubSubServer
extends Server
implements CustomTransportStrategy
{
/**
* 当您运行 "app.listen()" 时触发。
*/
listen(callback: () => void) {
callback();
}
/**
* 在应用程序关闭时触发。
*/
close() {}
/**
* 如果您不希望传输器用户能够注册事件监听器,
* 可以忽略此方法。大多数自定义实现不需要此方法。
*/
on(event: string, callback: Function) {
throw new Error('Method not implemented.');
}
/**
* 如果您不希望传输器用户能够检索底层原生服务器,
* 可以忽略此方法。大多数自定义实现不需要此方法。
*/
unwrap<T = never>(): T {
throw new Error('Method not implemented.');
}
}
警告 请注意,我们不会在本章中实现一个功能完整的 Google Cloud Pub/Sub 服务器,因为这需要深入了解传输器特定的技术细节。
在上面的示例中,我们声明了 GoogleCloudPubSubServer 类,并提供了 CustomTransportStrategy 接口强制要求的 listen() 和 close() 方法。此外,我们的类扩展了从 @nestjs/microservices 包导入的 Server 类,该类提供了一些有用的方法,例如 Nest 运行时用于注册消息处理器的方法。或者,如果您想扩展现有传输策略的功能,您可以扩展相应的服务器类,例如 ServerRedis。按照惯例,我们在类名中添加了 "Server" 后缀,因为它将负责订阅消息/事件(并在必要时响应它们)。
有了这个设置,我们现在可以使用自定义策略而不是使用内置传输器,如下所示:
const app = await NestFactory.createMicroservice<MicroserviceOptions>(
AppModule,
{
strategy: new GoogleCloudPubSubServer(),
},
);
基本上,我们不是传递具有 transport 和 options 属性的普通传输器选项对象,而是传递一个单一属性 strategy,其值是我们自定义传输器类的实例。
回到我们的 GoogleCloudPubSubServer 类,在实际应用中,我们会在 listen() 方法中建立与消息代理/外部服务的连接并注册订阅者/监听特定频道(然后在 close() 清理方法中移除订阅并关闭连接),但由于这需要对 Nest 微服务如何相互通信有很好的理解,我们建议阅读这个文章系列。在本章中,我们将专注于 Server 类提供的功能以及如何利用它们来构建自定义策略。
例如,假设在我们的应用程序中某处定义了以下消息处理器:
@MessagePattern('echo')
echo(@Payload() data: object) {
return data;
}
这个消息处理器将由 Nest 运行时自动注册。使用 Server 类,您可以查看已注册的消息模式,还可以访问和执行分配给它们的实际方法。为了测试这一点,让我们在调用 callback 函数之前在 listen() 方法中添加一个简单的 console.log:
listen(callback: () => void) {
console.log(this.messageHandlers);
callback();
}
应用程序重启后,您将在终端中看到以下日志:
Map { 'echo' => [AsyncFunction] { isEventHandler: false } }
提示 如果我们使用 @EventPattern 装饰器,您会看到相同的输出,但 isEventHandler 属性设置为 true。
如您所见,messageHandlers 属性是所有消息(和事件)处理器的 Map 集合,其中模式被用作键。现在,您可以使用键(例如 "echo")来获取消息处理器的引用:
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo');
console.log(await echoHandler('Hello world!'));
callback();
}
一旦我们执行 echoHandler 并传递一个任意字符串作为参数(这里是 "Hello world!"),我们应该在控制台中看到它:
Hello world!
这意味着我们的方法处理器被正确执行了。
当使用 CustomTransportStrategy 与拦截器时,处理器被包装到 RxJS 流中。这意味着您需要订阅它们以执行流的底层逻辑(例如,在拦截器执行后继续进入控制器逻辑)。
下面可以看到一个示例:
async listen(callback: () => void) {
const echoHandler = this.messageHandlers.get('echo');
const streamOrResult = await echoHandler('Hello World');
if (isObservable(streamOrResult)) {
streamOrResult.subscribe();
}
callback();
}
客户端代理
正如我们在第一节中提到的,您不一定需要使用 @nestjs/microservices 包来创建微服务,但如果您决定这样做并且需要集成自定义策略,您也需要提供一个"客户端"类。
提示 同样,实现与所有 @nestjs/microservices 功能(例如流式传输)兼容的功能完整的客户端类需要对框架使用的通信技术有很好的理解。要了解更多信息,请查看这篇文章。
要与外部服务通信/发出和发布消息(或事件),您可以使用特定于库的 SDK 包,或者实现一个扩展 ClientProxy 的自定义客户端类,如下所示:
import { ClientProxy, ReadPacket, WritePacket } from '@nestjs/microservices';
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {}
async close() {}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {}
unwrap<T = never>(): T {
throw new Error('Method not implemented.');
}
}
警告 请注意,我们不会在本章中实现一个功能完整的 Google Cloud Pub/Sub 客户端,因为这需要深入了解传输器特定的技术细节。
如您所见,ClientProxy 类要求我们提供几个方法来建立和关闭连接以及发布消息(publish)和事件(dispatchEvent)。请注意,如果您不需要请求-响应通信风格支持,您可以将 publish() 方法留空。同样,如果您不需要支持基于事件的通信,请跳过 dispatchEvent() 方法。
为了观察这些方法何时以及如何执行,让我们添加多个 console.log 调用,如下所示:
class GoogleCloudPubSubClient extends ClientProxy {
async connect(): Promise<any> {
console.log('connect');
}
async close() {
console.log('close');
}
async dispatchEvent(packet: ReadPacket<any>): Promise<any> {
return console.log('event to dispatch: ', packet);
}
publish(
packet: ReadPacket<any>,
callback: (packet: WritePacket<any>) => void,
): Function {
console.log('message:', packet);
// 在实际应用中,"callback" 函数应该使用从响应者发送回来的负载执行。
// 这里,我们将简单地模拟(5秒延迟)响应通过传递与我们最初传入的相同"数据"来到达。
//
// WritePacket 上的 "isDisposed" 布尔值告诉响应不再期望进一步的数据。
// 如果未发送或为 false,这将简单地向 Observable 发出数据。
setTimeout(() => callback({
response: packet.data,
isDisposed: true,
}), 5000);
return () => console.log('teardown');
}
unwrap<T = never>(): T {
throw new Error('Method not implemented.');
}
}
有了这个设置,让我们创建一个 GoogleCloudPubSubClient 类的实例并运行 send() 方法(您可能在前面的章节中见过),订阅返回的可观察流。
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.subscribe((response) => console.log(response));
现在,您应该在终端中看到以下输出:
connect
message: { pattern: 'pattern', data: 'Hello world!' }
Hello world! // <-- 5秒后
为了测试我们的"teardown"方法(我们的 publish() 方法返回的)是否正确执行,让我们对流应用超时操作符,将其设置为2秒,以确保它在我们的 setTimeout 调用 callback 函数之前抛出异常。
const googlePubSubClient = new GoogleCloudPubSubClient();
googlePubSubClient
.send('pattern', 'Hello world!')
.pipe(timeout(2000))
.subscribe(
(response) => console.log(response),
(error) => console.error(error.message),
);
提示 timeout 操作符从 rxjs/operators 包导入。
应用 timeout 操作符后,您的终端输出应该如下所示:
connect
message: { pattern: 'pattern', data: 'Hello world!' }
teardown // <-- teardown
Timeout has occurred
要分发事件(而不是发送消息),请使用 emit() 方法:
googlePubSubClient.emit('event', 'Hello world!');
这就是您应该在控制台中看到的内容:
connect
event to dispatch: { pattern: 'event', data: 'Hello world!' }
消息序列化
如果您需要在客户端添加一些关于响应序列化的自定义逻辑,您可以使用一个扩展 ClientProxy 类或其子类之一的自定义类。对于修改成功的请求,您可以重写 serializeResponse 方法,对于修改通过此客户端的任何错误,您可以重写 serializeError 方法。要使用这个自定义类,您可以使用 customClass 属性将类本身传递给 ClientsModule.register() 方法。下面是一个自定义 ClientProxy 的示例,它将每个错误序列化为 RpcException。
@@filename(error-handling.proxy)
import { ClientTcp, RpcException } from '@nestjs/microservices';
class ErrorHandlingProxy extends ClientTCP {
serializeError(err: Error) {
return new RpcException(err);
}
}
然后在 ClientsModule 中这样使用它:
@@filename(app.module)
@Module({
imports: [
ClientsModule.register([{
name: 'CustomProxy',
customClass: ErrorHandlingProxy,
}]),
]
})
export class AppModule
提示 这是传递给 customClass 的类本身,而不是类的实例。Nest 将在底层为您创建实例,并将给定给 options 属性的任何选项传递给新的 ClientProxy。