Newer
Older
import { Injectable, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import {
connect,
DiscardPolicy,
JetStreamClient,
JetStreamManager,
JSONCodec,
NatsConnection,
Zdravko Iliev
committed
NatsError,
RetentionPolicy,
StorageType,
StreamConfig,
} from "nats";
import asyncRetry from "async-retry";
import { IConfAgent, IConfNats } from "@ocm-engine/config";
@Injectable()
export class NatsBaseService {
protected config: IConfNats;
protected jsonCodec = JSONCodec();
protected client: NatsConnection;
protected jsClient: JetStreamClient;
protected jsm: JetStreamManager;
protected readonly logger: Logger = new Logger(NatsBaseService.name);
protected streamConfig: StreamConfig;
constructor(private configService: ConfigService) {
const agentConfig = this.configService.get<IConfAgent>("agent");
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
if (agentConfig?.agentIsRest) {
return;
}
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this.config = configService.get<IConfNats>("nats")!;
//TODO: What part should be configured by env vars ?
this.streamConfig = {
allow_direct: false,
allow_rollup_hdrs: false,
deny_delete: false,
deny_purge: false,
discard: DiscardPolicy.Old,
discard_new_per_subject: false,
duplicate_window: 0,
max_bytes: -1,
max_msg_size: -1,
max_msgs: -1,
mirror_direct: false,
num_replicas: 0,
name: this.config.streamName,
max_consumers: 1,
subjects: this.config.subjects,
retention: RetentionPolicy.Workqueue,
sealed: false,
storage: StorageType.File,
max_msgs_per_subject: 10,
max_age: 86400000000000,
};
this.connectToNats();
}
private async connectToNats() {
await asyncRetry(
async () => {
this.client = await connect({ servers: this.config.servers });
this.jsClient = this.client.jetstream();
this.jsm = await this.client.jetstreamManager();
Zdravko Iliev
committed
await this.registerStream(this.streamConfig);
},
{
retries: 5,
onRetry: (error) => {
this.logger.log(JSON.stringify(error, null, 2));
this.logger.error(
`Failed to connect to NATS, retrying...${error.message}`,
);
},
},
);
this.logger.log("Connected to Nats");
}
disconnect = () => {
return this.client.close();
};
Zdravko Iliev
committed
private registerStream = async (sconfig: StreamConfig) => {
try {
return await this.jsm.streams.add(sconfig);
} catch (e) {
this.logger.log(JSON.stringify(e, null, 2));
if (e instanceof NatsError && e.api_error?.err_code === 10058) {
this.logger.log(`Trying to update ${sconfig.name} stream`);
return await this.jsm.streams.update(sconfig.name, sconfig);
}
throw new Error(`RegisterStream failed.`);
}
};