Microservices

gRPC

学习如何在 NestJS 中使用 gRPC 传输器构建微服务,包括安装、配置、客户端设置、proto 文件定义、服务实现、流式传输、元数据处理、错误处理、实例状态更新、事件监听和底层驱动访问。

gRPC

gRPC 是一个现代的开源高性能 RPC 框架,可以在任何环境中运行。它可以通过可插拔的负载均衡、跟踪、健康检查和身份验证支持高效地连接数据中心内和跨数据中心的服务。

与传统的 HTTP REST API 一样,gRPC 基于定义服务的思想,指定可以远程调用的方法及其参数和返回类型。gRPC 默认使用 protocol buffers 作为接口定义语言 (IDL) 来描述服务接口和有效负载消息的结构。

提示 在 gRPC 中,客户端应用程序可以直接调用不同机器上的服务器应用程序上的方法,就像它是本地对象一样,使您更容易创建分布式应用程序和服务。

安装

要开始构建基于 gRPC 的微服务,首先安装所需的包:

$ npm i --save @grpc/grpc-js @grpc/proto-loader

概述

要使用 gRPC 传输器,请将以下选项对象传递给 createMicroservice() 方法:

@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.GRPC,
  options: {
    package: 'hero',
    protoPath: join(__dirname, 'hero/hero.proto'),
  },
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.GRPC,
  options: {
    package: 'hero',
    protoPath: join(__dirname, 'hero/hero.proto'),
  },
});

提示 join() 函数从 path 包导入,而 Transport 枚举从 @nestjs/microservices 包导入。

选项

options 对象特定于所选的传输器。gRPC 传输器公开了下面描述的属性。

属性描述
url连接 URL
protoPathproto 文件的绝对(或相对于根目录的)路径
packageProtobuf 包名
protoLoaderNPM 包名(如果您想使用其他 proto 加载器)
loader@grpc/proto-loader 选项。这些选项在这里有详细说明。
credentials服务器凭据(阅读更多
maxSendMessageLength最大发送消息长度
maxReceiveMessageLength最大接收消息长度
maxMetadataSize最大元数据大小
keepaliveKeepalive 选项(阅读更多
channelOptions表示通道选项的键值对(阅读更多

概述

通常,package 属性设置 protobuf 包名,而 protoPath.proto 定义文件的路径。hero.proto 文件使用 protocol buffers 语言进行结构化。以下是一个示例:

// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
  rpc FindOne (HeroById) returns (Hero) {}
  rpc FindMany (Empty) returns (Heroes) {}
}

message HeroById {
  int32 id = 1;
}

message Hero {
  int32 id = 1;
  string name = 2;
}

message Heroes {
  repeated Hero heroes = 1;
}

message Empty {}

在上面的示例中,我们定义了一个 HeroesService,它公开了两个方法:FindOne(返回单个 Hero 对象基于传递的 HeroById)和 FindMany(返回 Heroes 对象)。

现在,让我们创建一个 HeroesController

@@filename(heroes.controller)
import { Controller } from '@nestjs/common';
import { GrpcMethod } from '@nestjs/microservices';

interface HeroById {
  id: number;
}
interface Hero {
  id: number;
  name: string;
}

@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService', 'FindOne')
  findOne(data: HeroById): Hero {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }

  @GrpcMethod('HeroesService', 'FindMany')
  findMany(): Hero[] {
    return [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
  }
}
@@switch
import { Controller } from '@nestjs/common';
import { GrpcMethod } from '@nestjs/microservices';

@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService', 'FindOne')
  findOne(data) {
    const items = [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
    return items.find(({ id }) => id === data.id);
  }

  @GrpcMethod('HeroesService', 'FindMany')
  findMany() {
    return [
      { id: 1, name: 'John' },
      { id: 2, name: 'Doe' },
    ];
  }
}

提示 @GrpcMethod() 装饰器从 @nestjs/microservices 包导入。

@GrpcMethod() 装饰器的第一个参数是服务名称,而第二个是方法名称。可选的第三个参数是一个选项对象,用于设置方法特定的选项,如序列化器。

通常,@GrpcMethod() 装饰器不是必需的。但是,当您想要指定不同的服务或方法名称时,或者当您想要传递方法特定的选项时,它会很有用。

@Controller()
export class HeroesController {
  @GrpcMethod('HeroesService', 'FindOne')
  findHero(data: HeroById): Hero {
    // 逻辑
  }
}

同样,您可能有时需要访问 gRPC 元数据,这些元数据包含有关特定 RPC 调用的数据(例如身份验证详细信息)。

@@filename(heroes.controller)
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
  console.log(metadata);
  const items = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];
  return items.find(({ id }) => id === data.id);
}
@@switch
findOne(data, metadata, call) {
  console.log(metadata);
  const items = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];
  return items.find(({ id }) => id === data.id);
}

要访问 gRPC Metadata,我们需要从 @grpc/grpc-js 包导入它。

import { Metadata } from '@grpc/grpc-js';

此外,要访问 ServerUnaryCall,我们需要从 @grpc/grpc-js 包导入它。

import { ServerUnaryCall } from '@grpc/grpc-js';

客户端

要创建客户端实例,我们需要使用 @Client() 装饰器。

@Client({
  transport: Transport.GRPC,
  options: {
    package: 'hero',
    protoPath: join(__dirname, 'hero/hero.proto'),
  },
})
client: ClientGrpc;

与前面的微服务传输器示例相比,这里有一个小的区别。我们使用 ClientGrpc 类(而不是 ClientProxy 类),它提供了 getService() 方法。getService() 泛型方法将服务名称作为参数,并返回其实例(如果可用)。

@@filename(heroes.controller)
onModuleInit() {
  this.heroesService = this.client.getService<HeroesService>('HeroesService');
}
@@switch
onModuleInit() {
  this.heroesService = this.client.getService('HeroesService');
}

heroesService 对象公开了在 .proto 文件中定义的相同方法。注意,所有这些都是 小写(为了遵循自然约定)。基本上,我们的 gRPC HeroesService 定义包含 FindOne()FindMany() 函数。这意味着 heroesService 实例将提供 findOne()findMany() 方法。

interface HeroesService {
  findOne(data: { id: number }): Observable<any>;
  findMany(data: {}): Observable<any>;
}

所有服务方法都返回 Observable。由于 Nest 支持 RxJS 流并与它们很好地协作,我们也可以在 HTTP 处理程序内返回它们。

@@filename(heroes.controller)
@Get()
call(): Observable<any> {
  return this.heroesService.findOne({ id: 1 });
}
@@switch
@Get()
call() {
  return this.heroesService.findOne({ id: 1 });
}

完整的工作示例可在这里获得。

gRPC 流

gRPC 本身支持长期存在的流。流可以从服务方法的客户端、服务器或双方发起。有关完整定义,请查看官方 gRPC 文档。Nest 通过 Observable 流支持 gRPC 流。

服务器流

当返回类型被定义为 stream 时,服务器可以发送多个消息。下面的 .proto 文件显示了一个示例 FindMany 方法。

// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
  rpc FindMany (Empty) returns (stream Hero) {}
}

message Hero {
  int32 id = 1;
  string name = 2;
}

message Empty {}

提示 stream 关键字指示该方法应返回值流,而不是单个消息。

使用这种方法,服务器可以发送多个消息到客户端。但是,不要忘记返回一个 Observable

@@filename(heroes.controller)
@GrpcMethod('HeroesService', 'FindMany')
findMany(): Observable<Hero> {
  const heroes$ = new Subject<Hero>();

  const heroes = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];

  heroes.forEach(hero => heroes$.next(hero));
  heroes$.complete();

  return heroes$.asObservable();
}
@@switch
@GrpcMethod('HeroesService', 'FindMany')
findMany() {
  const heroes$ = new Subject();

  const heroes = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];

  heroes.forEach(hero => heroes$.next(hero));
  heroes$.complete();

  return heroes$.asObservable();
}

客户端流

当参数类型前缀为 stream 关键字时,客户端可以发送多个消息。下面的示例显示了一个示例 AddHeroes 方法。

// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
  rpc AddHeroes (stream Hero) returns (Heroes) {}
}

message Hero {
  int32 id = 1;
  string name = 2;
}

message Heroes {
  repeated Hero heroes = 1;
}

为了处理客户端流,方法处理程序参数应该是一个 Observable

@@filename(heroes.controller)
@GrpcMethod('HeroesService', 'AddHeroes')
addHeroes(heroes$: Observable<Hero>): Observable<Heroes> {
  const heroes: Hero[] = [];

  const onNext = (hero: Hero) => {
    heroes.push(hero);
  };
  const onComplete = () => {
    // 返回累积的英雄
  };
  heroes$.subscribe(onNext, null, onComplete);
}
@@switch
@GrpcMethod('HeroesService', 'AddHeroes')
addHeroes(heroes$) {
  const heroes = [];

  const onNext = (hero) => {
    heroes.push(hero);
  };
  const onComplete = () => {
    // 返回累积的英雄
  };
  heroes$.subscribe(onNext, null, onComplete);
}

双向流

可以双向传递流。

// hero/hero.proto
syntax = "proto3";

package hero;

service HeroesService {
  rpc GetHeroes (stream HeroById) returns (stream Hero) {}
}

message HeroById {
  int32 id = 1;
}

message Hero {
  int32 id = 1;
  string name = 2;
}
@@filename(heroes.controller)
@GrpcMethod('HeroesService', 'GetHeroes')
getHeroes(heroes$: Observable<HeroById>): Observable<Hero> {
  return heroes$.pipe(
    map((heroById: HeroById) => this.items.find(({ id }) => id === heroById.id)),
  );
}
@@switch
@GrpcMethod('HeroesService', 'GetHeroes')
getHeroes(heroes$) {
  return heroes$.pipe(
    map((heroById) => this.items.find(({ id }) => id === heroById.id)),
  );
}

提示 支持 map 操作符需要 rxjs 包。

根据服务方法的调用约定,heroesService.getHeroes() 方法可以启动双向流。

const heroes$ = new Subject<HeroById>();

this.heroesService.getHeroes(heroes$).subscribe({
  next: (hero: Hero) => console.log(hero),
  complete: () => console.log('completed'),
});

heroes$.next({ id: 1 });
heroes$.next({ id: 2 });
heroes$.complete();

根据上下文,客户端和/或服务器可以选择完成流。gRPC 流是双向的,这意味着客户端和服务器都可以独立地发送和完成流。

gRPC 元数据

元数据是关于特定 RPC 调用的信息,采用键值对列表的形式,其中键是字符串,值通常是字符串,但也可以是二进制数据。元数据对 gRPC 本身是不透明的 - 它允许客户端向服务器提供与调用相关的信息,反之亦然。元数据可能包括身份验证令牌、用于跟踪目的的请求标识符和标签,以及有关数据的信息,如数据集中的记录数。

要从 @GrpcMethod() 处理程序内读取元数据,请使用第二个参数(元数据),其类型为 Metadata(从 @grpc/grpc-js 包导入)。

要从处理程序发送元数据,请使用 ServerUnaryCall#sendMetadata() 方法(第三个处理程序参数)。

@@filename(heroes.controller)
@GrpcMethod('HeroesService', 'FindOne')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
  const serverMetadata = new Metadata();
  const items = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];

  serverMetadata.add('Set-Cookie', 'yummy_cookie=choco');
  call.sendMetadata(serverMetadata);

  return items.find(({ id }) => id === data.id);
}
@@switch
@GrpcMethod('HeroesService', 'FindOne')
findOne(data, metadata, call) {
  const serverMetadata = new Metadata();
  const items = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];

  serverMetadata.add('Set-Cookie', 'yummy_cookie=choco');
  call.sendMetadata(serverMetadata);

  return items.find(({ id }) => id === data.id);
}

同样,要从客户端发送元数据,您可以使用 ClientGrpc#getService() 方法。

@@filename(heroes.controller)
onModuleInit() {
  this.heroesService = this.client.getService<HeroesService>('HeroesService');
}

getHero(): Observable<any> {
  const metadata = new Metadata();
  metadata.add('authorization', 'Bearer ' + TOKEN);

  return this.heroesService.findOne({ id: 1 }, metadata);
}
@@switch
onModuleInit() {
  this.heroesService = this.client.getService('HeroesService');
}

getHero() {
  const metadata = new Metadata();
  metadata.add('authorization', 'Bearer ' + TOKEN);

  return this.heroesService.findOne({ id: 1 }, metadata);
}

gRPC 错误

gRPC 使用错误代码来指示 RPC 的成功或失败状态。状态代码是整数值,在这里定义。当 RPC 失败时,状态代码指示失败的性质。除了状态代码之外,还可以包含可选的字符串错误消息,提供有关错误的更多详细信息。

要从 @GrpcMethod() 处理程序抛出错误,请使用 RpcException(从 @nestjs/microservices 包导入)。Nest 将自动将抛出的 RpcException 转换为具有适当状态代码的 gRPC 错误。

@@filename(heroes.controller)
@GrpcMethod('HeroesService', 'FindOne')
findOne(data: HeroById, metadata: Metadata, call: ServerUnaryCall<any, any>): Hero {
  const items = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];
  const hero = items.find(({ id }) => id === data.id);
  if (!hero) {
    throw new RpcException('Hero not found');
  }
  return hero;
}
@@switch
@GrpcMethod('HeroesService', 'FindOne')
findOne(data, metadata, call) {
  const items = [
    { id: 1, name: 'John' },
    { id: 2, name: 'Doe' },
  ];
  const hero = items.find(({ id }) => id === data.id);
  if (!hero) {
    throw new RpcException('Hero not found');
  }
  return hero;
}

提示 RpcException 类从 @nestjs/microservices 包导入。

当上述处理程序抛出 RpcException 时,客户端将收到以下错误:

{
  "error": "Hero not found",
  "code": 2,
  "details": "Hero not found"
}

默认情况下,Nest 将 RpcException 映射到 UNKNOWN gRPC 状态代码(值:2)。您可以通过将错误代码作为第二个参数传递给 RpcException 构造函数来提供更具体的错误代码。

throw new RpcException('Hero not found', status.NOT_FOUND);

提示 status 常量从 @grpc/grpc-js 包导入。

或者,您可以提供整个错误对象:

throw new RpcException({
  code: status.NOT_FOUND,
  message: 'Hero not found',
});

实例状态更新

要获取连接和底层驱动实例状态的实时更新,您可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 gRPC 驱动,status 流发出 connecteddisconnected 事件。

this.client.status.subscribe((status: GrpcStatus) => {
  console.log(status);
});

提示 GrpcStatus 类型从 @nestjs/microservices 包导入。

同样,您可以订阅服务器的 status 流以接收有关服务器状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: GrpcStatus) => {
  console.log(status);
});

监听 gRPC 事件

在某些情况下,您可能希望监听微服务发出的内部事件。例如,您可以监听 error 事件以在发生错误时触发其他操作。为此,请使用 on() 方法,如下所示:

this.client.on('error', (err) => {
  console.error(err);
});

同样,您可以监听服务器的内部事件:

server.on<GrpcEvents>('error', (err) => {
  console.error(err);
});

提示 GrpcEvents 类型从 @nestjs/microservices 包导入。

底层驱动访问

对于更高级的用例,您可能需要访问底层驱动实例。这对于手动关闭连接或使用驱动特定方法等场景很有用。但是,请记住,在大多数情况下,您不应该需要直接访问驱动。

为此,您可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定您期望的驱动实例类型。

const grpcPackage = this.client.unwrap<any>();

同样,您可以访问服务器的底层驱动实例:

const grpcPackage = server.unwrap<any>();