Microservices

RabbitMQ

学习如何在 NestJS 中使用 RabbitMQ 传输器构建微服务,包括安装、配置、客户端设置、上下文访问、记录构建器、实例状态更新、事件监听和底层驱动访问。

RabbitMQ

RabbitMQ 是一个开源且轻量级的消息代理,支持多种消息协议。RabbitMQ 可以部署在分布式和联合配置中,以满足高规模、高可用性的要求。此外,它是在云环境中部署消息传递的最流行方式,并且可以在本地使用。

安装

要开始构建基于 RabbitMQ 的微服务,首先安装所需的包:

$ npm i --save amqplib
$ npm i --save-dev @types/amqplib

概述

要使用 RabbitMQ 传输器,请将以下选项对象传递给 createMicroservice() 方法:

@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://localhost:5672'],
    queue: 'cats_queue',
    queueOptions: {
      durable: false
    },
  },
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.RMQ,
  options: {
    urls: ['amqp://localhost:5672'],
    queue: 'cats_queue',
    queueOptions: {
      durable: false
    },
  },
});

提示 Transport 枚举从 @nestjs/microservices 包导入。

选项

options 对象特定于所选的传输器。RabbitMQ 传输器公开了下面描述的属性。

属性描述
urls连接 URL
queue您的服务器将监听的队列名称
prefetchCount设置通道的预取计数
isGlobalPrefetchCount启用每个通道的预取计数
noAck如果为 false,手动确认模式启用
queueOptions其他队列选项(这里查看可用选项)
socketOptions其他套接字选项(这里查看可用选项)
headers要与每条消息一起发送的头部

客户端

与其他微服务传输器一样,您有多种选项来创建 RabbitMQ ClientProxy 实例。

创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递一个选项对象,该对象具有与上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。阅读更多关于 ClientsModule 的信息这里

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'MATH_SERVICE',
        transport: Transport.RMQ,
        options: {
          urls: ['amqp://localhost:5672'],
          queue: 'cats_queue',
          queueOptions: {
            durable: false
          },
        },
      },
    ]),
  ]
  ...
})

其他创建客户端的选项(ClientProxyFactory@Client())也可以使用。您可以在这里阅读相关信息。

上下文

在更复杂的场景中,您可能需要访问有关传入请求的其他信息。使用 RabbitMQ 传输器时,您可以访问 RmqContext 对象。

@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(`Pattern: ${context.getPattern()}`);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(`Pattern: ${context.getPattern()}`);
}

提示 @Payload()@Ctx()RmqContext@nestjs/microservices 包导入。

要访问原始 RabbitMQ 消息(带有 propertiesfieldscontent),请使用 RmqContext 对象的 getMessage() 方法,如下所示:

@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getMessage());
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(context.getMessage());
}

要检索对 RabbitMQ 通道的引用,请使用 RmqContext 对象的 getChannelRef 方法,如下所示:

@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  console.log(context.getChannelRef());
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  console.log(context.getChannelRef());
}

消息确认

要确保消息永远不会丢失,RabbitMQ 支持消息确认。确认由消费者发送回 RabbitMQ,告诉 RabbitMQ 特定消息已被接收、处理,并且 RabbitMQ 可以自由删除它。如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)而没有发送确认,RabbitMQ 将理解消息未完全处理并将重新排队。

要启用手动确认模式,请将 noAck 属性设置为 false

options: {
  urls: ['amqp://localhost:5672'],
  queue: 'cats_queue',
  noAck: false,
  queueOptions: {
    durable: false
  },
}

当手动消费者确认打开时,我们必须从工作者发送适当的确认以指示我们已完成任务。

@@filename()
@MessagePattern('notifications')
getNotifications(@Payload() data: number[], @Ctx() context: RmqContext) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();

  channel.ack(originalMsg);
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('notifications')
getNotifications(data, context) {
  const channel = context.getChannelRef();
  const originalMsg = context.getMessage();

  channel.ack(originalMsg);
}

记录构建器

要配置消息选项,您可以使用 RmqRecordBuilder 类(注意:这对于基于事件的流也是可行的)。例如,要设置 headerspriority 属性,请使用 setOptions 方法,如下所示:

const message = new RmqRecordBuilder(':cat:')
  .setOptions({
    headers: {
      ['x-version']: '1.0.0',
    },
    priority: 3,
  })
  .build();

this.client.send('replace-emoji', message).subscribe(...);

提示 RmqRecordBuilder 类从 @nestjs/microservices 包导出。

您也可以在服务器端读取这些选项,通过访问 RmqContext,如下所示:

@@filename()
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: RmqContext): string {
  const { properties } = context.getMessage();
  return properties.headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
  const { properties } = context.getMessage();
  return properties.headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}

实例状态更新

要获取连接和底层驱动实例状态的实时更新,您可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 RabbitMQ 驱动,status 流发出 connecteddisconnected 事件。

this.client.status.subscribe((status: RmqStatus) => {
  console.log(status);
});

提示 RmqStatus 类型从 @nestjs/microservices 包导入。

同样,您可以订阅服务器的 status 流以接收有关服务器状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: RmqStatus) => {
  console.log(status);
});

监听 RabbitMQ 事件

在某些情况下,您可能希望监听微服务发出的内部事件。例如,您可以监听 error 事件以在发生错误时触发其他操作。为此,请使用 on() 方法,如下所示:

this.client.on('error', (err) => {
  console.error(err);
});

同样,您可以监听服务器的内部事件:

server.on<RmqEvents>('error', (err) => {
  console.error(err);
});

提示 RmqEvents 类型从 @nestjs/microservices 包导入。

底层驱动访问

对于更高级的用例,您可能需要访问底层驱动实例。这对于手动关闭连接或使用驱动特定方法等场景很有用。但是,请记住,在大多数情况下,您不应该需要直接访问驱动。

为此,您可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定您期望的驱动实例类型。

const amqpConnection = this.client.unwrap<import('amqplib').Connection>();

同样,您可以访问服务器的底层驱动实例:

const amqpConnection = server.unwrap<import('amqplib').Connection>();