Newer
Older
import { Injectable } from "@nestjs/common";
import {
AckPolicy,
ConsumerConfig,
DeliverPolicy,
NatsError,
ReplayPolicy,
} from "nats";
import { NatsBaseService } from "./base.nats.service";
import { ConfigService } from "@nestjs/config";
import { CloudEventDto } from "@ocm-engine/dtos";
import { SimpleMutex } from "nats/lib/nats-base-client/util";
import { IConfAgent } from "@ocm-engine/config";
@Injectable()
export class ConsumerService extends NatsBaseService {
//eslint-disable-next-line
private readonly agentConfig: IConfAgent;
constructor(configService: ConfigService) {
super(configService);
//TODO: no like ! move config, interfaces to seperate lib
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.agentConfig = configService.get<IConfAgent>("agent")!;
}
private registerConsumer = (stream: string) => {
const consumerConfig: ConsumerConfig = {
name: this.agentConfig.agentConsumerName,
ack_policy: AckPolicy.Explicit,
deliver_policy: DeliverPolicy.All,
replay_policy: ReplayPolicy.Original,
};
try {
this.jsm.consumers.add(stream, consumerConfig);
return this.jsClient.consumers.get(stream, consumerConfig.name);
} catch (error) {
if (error instanceof NatsError && error.code === "409") {
Zdravko Iliev
committed
this.logger.log("Consumer already exists");
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
return this.jsClient.consumers.get(stream, consumerConfig.name);
}
throw new Error(
`register consumer fail ${JSON.stringify(error, null, 2)}`,
);
}
};
async subscribe<T>(
handler: (event: CloudEventDto<T>) => Promise<void>,
): Promise<void> {
const consumer = await this.registerConsumer(this.streamConfig.name);
const messages = await consumer.consume({
max_messages: this.agentConfig.agentConsumerMaxMessagess,
});
for await (const message of messages) {
const event = this.jsonCodec.decode(message.data) as CloudEventDto<T>;
this.logger.log("event received, processing...");
this.logger.debug(JSON.stringify(event, null, 2));
// if this is "await" it will create a head-of-line blocking
// i.e. no other message will be consumed
// if I remove "await" this may lead to large number
// of async operations which may exceed the limits of the runtime
// we will need to introduce rate limiter and etc.
// https://github.com/nats-io/nats.deno/blob/main/examples/jetstream/07_consume_jobs.ts
// Can the agent handle the concurency of working with two things for example - creating schema and creating cred def ??
// I guess load tests need to be done (I'm pretty sure the wallet cant handle such thing, as the ledgers are extremely slow)
const rl = new SimpleMutex(this.agentConfig.agentConsumerRateLimit);
rl.lock();
handler(event)
.then(() => message.ack())
.catch((e) => {
if (e instanceof Error) {
this.logger.log(
`Could not handle consuming event with reason, ${e.message}`,
);
}
//TODO: we should implement dead letter queue
return message.nak();
})
.finally(() => rl.unlock());
}
}
}