Microservices

Kafka

学习如何在 NestJS 中使用 Kafka 传输器构建微服务,包括安装、配置、客户端设置、消息模式、上下文访问、记录构建器、实例状态更新、事件监听和底层驱动访问。

Kafka

Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。

安装

要开始构建基于 Kafka 的微服务,首先安装所需的包:

$ npm i --save kafkajs

概述

要使用 Kafka 传输器,请将以下选项对象传递给 createMicroservice() 方法:

@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    }
  }
});

提示 Transport 枚举从 @nestjs/microservices 包导入。

选项

options 对象特定于所选的传输器。Kafka 传输器公开了下面描述的属性。

属性描述
client客户端配置选项(阅读更多
consumer消费者配置选项(阅读更多
run运行配置选项(阅读更多
subscribe订阅配置选项(阅读更多
producer生产者配置选项(阅读更多
send发送配置选项(阅读更多
postfixId后缀 ID 用于客户端/消费者/生产者(默认:随机生成)

客户端

与其他微服务传输器一样,您有多种选项来创建 Kafka ClientProxy 实例。

创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递一个选项对象,该对象具有与上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。阅读更多关于 ClientsModule 的信息这里

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          }
        }
      },
    ]),
  ]
  ...
})

其他创建客户端的选项(ClientProxyFactory@Client())也可以使用。您可以在这里阅读相关信息。

消息模式

Kafka 传输器利用两种类型的主题来处理请求-响应和基于事件的通信。

请求-响应

请求-响应通信机制非常适合在微服务之间交换消息。使用此方法,您可以确保服务实际接收到消息(不需要手动配置确认)。但是,请求-响应范式并不总是最佳选择。例如,使用基于日志的持久性的流传输器(如 KafkaNATS streaming)针对解决不同范围的问题进行了优化,更符合事件源、CQRS 或其他架构模式。

要启用请求-响应消息类型,Kafka 创建两个主题。回复主题名称来自请求主题名称,并添加 .reply 后缀。例如,如果您的请求主题名为 cats,则 Kafka 传输器将自动创建 cats.reply 主题来发布响应消息。

提示 对于请求-响应消息类型,回复主题应该有单个分区。要了解更多信息,请阅读这里

基于事件

基于事件的通信非常适合当您只想发布事件而不等待任何响应时。在这种情况下,您希望拥有事件源CQRS 或者只是想通知另一个服务发生了某些事情。

消息响应序列化

传出响应通过 JSON.stringify() 序列化。如果您想使用不同的序列化方法,您可以在 @MessagePattern 装饰器中实现 Serializer 接口或传递它作为选项。阅读更多

传入消息反序列化

传入消息通过 JSON.parse() 反序列化。如果您想使用不同的反序列化方法,您可以在 @MessagePattern 装饰器中实现 Deserializer 接口或传递它作为选项。阅读更多

上下文

在更复杂的场景中,您可能需要访问有关传入请求的其他信息。使用 Kafka 传输器时,您可以访问 KafkaContext 对象。

@@filename()
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('hero.kill.dragon')
killDragon(message, context) {
  const originalMessage = context.getMessage();
  const partition = context.getPartition();
  const { headers, timestamp } = originalMessage;
}

提示 @Payload()@Ctx()KafkaContextKillDragonMessage@nestjs/microservices 包导入。

命名约定

Kafka 微服务组件在客户端和消费者之间附加其各自角色的描述,以避免冲突。默认情况下,ClientKafka 组件附加 -client 后缀,ServerKafka 组件附加 -server 后缀。下面提供了这些值的自定义方式。

要自定义此行为,请在创建微服务时提供 postfixId

@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    postfixId: '-my-service'
  }
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
  transport: Transport.KAFKA,
  options: {
    client: {
      brokers: ['localhost:9092'],
    },
    postfixId: '-my-service'
  }
});

使用上述设置,ServerKafka 组件将附加 -my-service 而不是默认的 -server

要为 ClientKafka 组件自定义此行为,请在创建客户端时提供 postfixId

@Module({
  imports: [
    ClientsModule.register([
      {
        name: 'HERO_SERVICE',
        transport: Transport.KAFKA,
        options: {
          client: {
            clientId: 'hero',
            brokers: ['localhost:9092'],
          },
          consumer: {
            groupId: 'hero-consumer'
          },
          postfixId: '-my-client', // 应用于 clientId、consumerGroupId 和 producerId
        }
      },
    ]),
  ]
  ...
})

记录构建器

要配置消息选项,您可以使用 KafkaRecordBuilder 类(注意:这对于基于事件的流也是可行的)。例如,要设置 headerspartition,请使用 setHeaderssetPartition 方法,如下所示:

const message = new KafkaRecordBuilder(':cat:')
  .setPartition(0)
  .setHeaders({
    ['x-version']: '1.0.0',
  })
  .build();

this.client.send('replace-emoji', message).subscribe(...);

提示 KafkaRecordBuilder 类从 @nestjs/microservices 包导出。

您也可以在服务器端读取这些值,通过访问 KafkaContext,如下所示:

@@filename()
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: KafkaContext): string {
  const { headers } = context.getMessage();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
  const { headers } = context.getMessage();
  return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}

在某些情况下,您可能希望为多个请求配置头部,您可以将这些作为选项传递给 ClientProxyFactory

import { Module } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';

@Module({
  providers: [
    {
      provide: 'API_v1',
      useFactory: () =>
        ClientProxyFactory.create({
          transport: Transport.KAFKA,
          options: {
            client: {
              brokers: ['localhost:9092'],
            },
            send: {
              headers: { 'x-version': '1.0.0' },
            },
          },
        }),
    },
  ],
})
export class ApiModule {}

提交偏移量

当消息被接收时,如果您想手动提交偏移量,您可以使用 KafkaContext 对象的 getConsumer 方法来访问原始 Kafka 消费者,然后调用 commitOffsets 方法。

@@filename()
@EventPattern('user.created')
async handleUserCreated(@Payload() data: any, @Ctx() context: KafkaContext) {
  // 业务逻辑
  const consumer = context.getConsumer();
  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  await consumer.commitOffsets([{ topic, partition, offset: offset + 1 }]);
}
@@switch
@Bind(Payload(), Ctx())
@EventPattern('user.created')
async handleUserCreated(data, context) {
  // 业务逻辑
  const consumer = context.getConsumer();
  const { offset } = context.getMessage();
  const partition = context.getPartition();
  const topic = context.getTopic();
  await consumer.commitOffsets([{ topic, partition, offset: offset + 1 }]);
}

心跳

Kafka 依赖心跳机制来检测消费者故障。如果在 session.timeout.ms 内没有收到心跳,消费者将被视为死亡,其分区将重新分配给组中的其他消费者。当消息处理时间超过心跳间隔时,您可以使用 KafkaContext 对象的 getHeartbeat 方法来访问 heartbeat 回调:

@@filename()
@EventPattern('user.created')
async handleUserCreated(@Payload() data: any, @Ctx() context: KafkaContext) {
  const heartbeat = context.getHeartbeat();

  // 执行一些缓慢的业务逻辑
  await doWorkPart1();

  // 发送心跳以保持消费者活跃
  await heartbeat();

  // 完成剩余的业务逻辑
  await doWorkPart2();
}
@@switch
@Bind(Payload(), Ctx())
@EventPattern('user.created')
async handleUserCreated(data, context) {
  const heartbeat = context.getHeartbeat();

  // 执行一些缓慢的业务逻辑
  await doWorkPart1();

  // 发送心跳以保持消费者活跃
  await heartbeat();

  // 完成剩余的业务逻辑
  await doWorkPart2();
}

实例状态更新

要获取连接和底层驱动实例状态的实时更新,您可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 Kafka 驱动,status 流发出 connecteddisconnected 事件。

this.client.status.subscribe((status: KafkaStatus) => {
  console.log(status);
});

提示 KafkaStatus 类型从 @nestjs/microservices 包导入。

同样,您可以订阅服务器的 status 流以接收有关服务器状态的通知。

const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: KafkaStatus) => {
  console.log(status);
});

监听 Kafka 事件

在某些情况下,您可能希望监听微服务发出的内部事件。例如,您可以监听 error 事件以在发生错误时触发其他操作。为此,请使用 on() 方法,如下所示:

this.client.on('error', (err) => {
  console.error(err);
});

同样,您可以监听服务器的内部事件:

server.on<KafkaEvents>('error', (err) => {
  console.error(err);
});

提示 KafkaEvents 类型从 @nestjs/microservices 包导入。

底层驱动访问

对于更高级的用例,您可能需要访问底层驱动实例。这对于手动关闭连接或使用驱动特定方法等场景很有用。但是,请记住,在大多数情况下,您不应该需要直接访问驱动。

为此,您可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定您期望的驱动实例类型。

const kafkaJS = this.client.unwrap<import('kafkajs').Kafka>();

同样,您可以访问服务器的底层驱动实例:

const kafkaJS = server.unwrap<import('kafkajs').Kafka>();