Kafka
Kafka
Kafka 是一个开源的分布式事件流平台,被数千家公司用于高性能数据管道、流分析、数据集成和关键任务应用程序。
安装
要开始构建基于 Kafka 的微服务,首先安装所需的包:
$ npm i --save kafkajs
概述
要使用 Kafka 传输器,请将以下选项对象传递给 createMicroservice() 方法:
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
}
}
});
提示 Transport 枚举从 @nestjs/microservices 包导入。
选项
options 对象特定于所选的传输器。Kafka 传输器公开了下面描述的属性。
| 属性 | 描述 |
|---|---|
client | 客户端配置选项(阅读更多) |
consumer | 消费者配置选项(阅读更多) |
run | 运行配置选项(阅读更多) |
subscribe | 订阅配置选项(阅读更多) |
producer | 生产者配置选项(阅读更多) |
send | 发送配置选项(阅读更多) |
postfixId | 后缀 ID 用于客户端/消费者/生产者(默认:随机生成) |
客户端
与其他微服务传输器一样,您有多种选项来创建 Kafka ClientProxy 实例。
创建实例的一种方法是使用 ClientsModule。要使用 ClientsModule 创建客户端实例,请导入它并使用 register() 方法传递一个选项对象,该对象具有与上面 createMicroservice() 方法中显示的相同属性,以及一个用作注入令牌的 name 属性。阅读更多关于 ClientsModule 的信息这里。
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
}
}
},
]),
]
...
})
其他创建客户端的选项(ClientProxyFactory 或 @Client())也可以使用。您可以在这里阅读相关信息。
消息模式
Kafka 传输器利用两种类型的主题来处理请求-响应和基于事件的通信。
请求-响应
请求-响应通信机制非常适合在微服务之间交换消息。使用此方法,您可以确保服务实际接收到消息(不需要手动配置确认)。但是,请求-响应范式并不总是最佳选择。例如,使用基于日志的持久性的流传输器(如 Kafka 或 NATS streaming)针对解决不同范围的问题进行了优化,更符合事件源、CQRS 或其他架构模式。
要启用请求-响应消息类型,Kafka 创建两个主题。回复主题名称来自请求主题名称,并添加 .reply 后缀。例如,如果您的请求主题名为 cats,则 Kafka 传输器将自动创建 cats.reply 主题来发布响应消息。
提示 对于请求-响应消息类型,回复主题应该有单个分区。要了解更多信息,请阅读这里。
基于事件
基于事件的通信非常适合当您只想发布事件而不等待任何响应时。在这种情况下,您希望拥有事件源、CQRS 或者只是想通知另一个服务发生了某些事情。
消息响应序列化
传出响应通过 JSON.stringify() 序列化。如果您想使用不同的序列化方法,您可以在 @MessagePattern 装饰器中实现 Serializer 接口或传递它作为选项。阅读更多。
传入消息反序列化
传入消息通过 JSON.parse() 反序列化。如果您想使用不同的反序列化方法,您可以在 @MessagePattern 装饰器中实现 Deserializer 接口或传递它作为选项。阅读更多。
上下文
在更复杂的场景中,您可能需要访问有关传入请求的其他信息。使用 Kafka 传输器时,您可以访问 KafkaContext 对象。
@@filename()
@MessagePattern('hero.kill.dragon')
killDragon(@Payload() message: KillDragonMessage, @Ctx() context: KafkaContext) {
const originalMessage = context.getMessage();
const partition = context.getPartition();
const { headers, timestamp } = originalMessage;
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('hero.kill.dragon')
killDragon(message, context) {
const originalMessage = context.getMessage();
const partition = context.getPartition();
const { headers, timestamp } = originalMessage;
}
提示 @Payload()、@Ctx()、KafkaContext 和 KillDragonMessage 从 @nestjs/microservices 包导入。
命名约定
Kafka 微服务组件在客户端和消费者之间附加其各自角色的描述,以避免冲突。默认情况下,ClientKafka 组件附加 -client 后缀,ServerKafka 组件附加 -server 后缀。下面提供了这些值的自定义方式。
要自定义此行为,请在创建微服务时提供 postfixId。
@@filename(main)
const app = await NestFactory.createMicroservice<MicroserviceOptions>(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
postfixId: '-my-service'
}
});
@@switch
const app = await NestFactory.createMicroservice(AppModule, {
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
postfixId: '-my-service'
}
});
使用上述设置,ServerKafka 组件将附加 -my-service 而不是默认的 -server。
要为 ClientKafka 组件自定义此行为,请在创建客户端时提供 postfixId。
@Module({
imports: [
ClientsModule.register([
{
name: 'HERO_SERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'hero',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'hero-consumer'
},
postfixId: '-my-client', // 应用于 clientId、consumerGroupId 和 producerId
}
},
]),
]
...
})
记录构建器
要配置消息选项,您可以使用 KafkaRecordBuilder 类(注意:这对于基于事件的流也是可行的)。例如,要设置 headers 和 partition,请使用 setHeaders 和 setPartition 方法,如下所示:
const message = new KafkaRecordBuilder(':cat:')
.setPartition(0)
.setHeaders({
['x-version']: '1.0.0',
})
.build();
this.client.send('replace-emoji', message).subscribe(...);
提示 KafkaRecordBuilder 类从 @nestjs/microservices 包导出。
您也可以在服务器端读取这些值,通过访问 KafkaContext,如下所示:
@@filename()
@MessagePattern('replace-emoji')
replaceEmoji(@Payload() data: string, @Ctx() context: KafkaContext): string {
const { headers } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
@@switch
@Bind(Payload(), Ctx())
@MessagePattern('replace-emoji')
replaceEmoji(data, context) {
const { headers } = context.getMessage();
return headers['x-version'] === '1.0.0' ? '🐱' : '🐈';
}
在某些情况下,您可能希望为多个请求配置头部,您可以将这些作为选项传递给 ClientProxyFactory:
import { Module } from '@nestjs/common';
import { ClientProxyFactory, Transport } from '@nestjs/microservices';
@Module({
providers: [
{
provide: 'API_v1',
useFactory: () =>
ClientProxyFactory.create({
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
send: {
headers: { 'x-version': '1.0.0' },
},
},
}),
},
],
})
export class ApiModule {}
提交偏移量
当消息被接收时,如果您想手动提交偏移量,您可以使用 KafkaContext 对象的 getConsumer 方法来访问原始 Kafka 消费者,然后调用 commitOffsets 方法。
@@filename()
@EventPattern('user.created')
async handleUserCreated(@Payload() data: any, @Ctx() context: KafkaContext) {
// 业务逻辑
const consumer = context.getConsumer();
const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
await consumer.commitOffsets([{ topic, partition, offset: offset + 1 }]);
}
@@switch
@Bind(Payload(), Ctx())
@EventPattern('user.created')
async handleUserCreated(data, context) {
// 业务逻辑
const consumer = context.getConsumer();
const { offset } = context.getMessage();
const partition = context.getPartition();
const topic = context.getTopic();
await consumer.commitOffsets([{ topic, partition, offset: offset + 1 }]);
}
心跳
Kafka 依赖心跳机制来检测消费者故障。如果在 session.timeout.ms 内没有收到心跳,消费者将被视为死亡,其分区将重新分配给组中的其他消费者。当消息处理时间超过心跳间隔时,您可以使用 KafkaContext 对象的 getHeartbeat 方法来访问 heartbeat 回调:
@@filename()
@EventPattern('user.created')
async handleUserCreated(@Payload() data: any, @Ctx() context: KafkaContext) {
const heartbeat = context.getHeartbeat();
// 执行一些缓慢的业务逻辑
await doWorkPart1();
// 发送心跳以保持消费者活跃
await heartbeat();
// 完成剩余的业务逻辑
await doWorkPart2();
}
@@switch
@Bind(Payload(), Ctx())
@EventPattern('user.created')
async handleUserCreated(data, context) {
const heartbeat = context.getHeartbeat();
// 执行一些缓慢的业务逻辑
await doWorkPart1();
// 发送心跳以保持消费者活跃
await heartbeat();
// 完成剩余的业务逻辑
await doWorkPart2();
}
实例状态更新
要获取连接和底层驱动实例状态的实时更新,您可以订阅 status 流。此流提供特定于所选驱动的状态更新。对于 Kafka 驱动,status 流发出 connected 和 disconnected 事件。
this.client.status.subscribe((status: KafkaStatus) => {
console.log(status);
});
提示 KafkaStatus 类型从 @nestjs/microservices 包导入。
同样,您可以订阅服务器的 status 流以接收有关服务器状态的通知。
const server = app.connectMicroservice<MicroserviceOptions>(...);
server.status.subscribe((status: KafkaStatus) => {
console.log(status);
});
监听 Kafka 事件
在某些情况下,您可能希望监听微服务发出的内部事件。例如,您可以监听 error 事件以在发生错误时触发其他操作。为此,请使用 on() 方法,如下所示:
this.client.on('error', (err) => {
console.error(err);
});
同样,您可以监听服务器的内部事件:
server.on<KafkaEvents>('error', (err) => {
console.error(err);
});
提示 KafkaEvents 类型从 @nestjs/microservices 包导入。
底层驱动访问
对于更高级的用例,您可能需要访问底层驱动实例。这对于手动关闭连接或使用驱动特定方法等场景很有用。但是,请记住,在大多数情况下,您不应该需要直接访问驱动。
为此,您可以使用 unwrap() 方法,它返回底层驱动实例。泛型类型参数应指定您期望的驱动实例类型。
const kafkaJS = this.client.unwrap<import('kafkajs').Kafka>();
同样,您可以访问服务器的底层驱动实例:
const kafkaJS = server.unwrap<import('kafkajs').Kafka>();