Skip to content
Snippets Groups Projects
base.nats.service.ts 2.82 KiB
Newer Older
import { Injectable, Logger } from "@nestjs/common";
import { ConfigService } from "@nestjs/config";
import {
  connect,
  DiscardPolicy,
  JetStreamClient,
  JetStreamManager,
  JSONCodec,
  NatsConnection,
  RetentionPolicy,
  StorageType,
  StreamConfig,
} from "nats";
import asyncRetry from "async-retry";
import { IConfAgent, IConfNats } from "@ocm-engine/config";

@Injectable()
export class NatsBaseService {
  protected config: IConfNats;
  protected jsonCodec = JSONCodec();
  protected client: NatsConnection;
  protected jsClient: JetStreamClient;
  protected jsm: JetStreamManager;
  protected readonly logger: Logger = new Logger(NatsBaseService.name);
  protected streamConfig: StreamConfig;

  constructor(private configService: ConfigService) {
    const agentConfig = this.configService.get<IConfAgent>("agent");
    if (agentConfig?.agentIsRest) {
      return;
    }

    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    this.config = configService.get<IConfNats>("nats")!;

    //TODO: What part should be configured by env vars ?
    this.streamConfig = {
      allow_direct: false,
      allow_rollup_hdrs: false,
      deny_delete: false,
      deny_purge: false,
      discard: DiscardPolicy.Old,
      discard_new_per_subject: false,
      duplicate_window: 0,
      max_bytes: -1,
      max_msg_size: -1,
      max_msgs: -1,
      mirror_direct: false,
      num_replicas: 0,
      name: this.config.streamName,
      max_consumers: 1,
      subjects: this.config.subjects,
      retention: RetentionPolicy.Workqueue,
      sealed: false,
      storage: StorageType.File,
      max_msgs_per_subject: 10,
      max_age: 86400000000000,
    };
    this.connectToNats();
  }

  private async connectToNats() {
    await asyncRetry(
      async () => {
        this.client = await connect({ servers: this.config.servers });
        this.jsClient = this.client.jetstream();
        this.jsm = await this.client.jetstreamManager();

        await this.registerStream(this.streamConfig);
      },
      {
        retries: 5,
        onRetry: (error) => {
          this.logger.log(JSON.stringify(error, null, 2));
          this.logger.error(
            `Failed to connect to NATS, retrying...${error.message}`,
          );
        },
      },
    );

    this.logger.log("Connected to Nats");
  }

  disconnect = () => {
    return this.client.close();
  };

  private registerStream = async (sconfig: StreamConfig) => {
    try {
      return await this.jsm.streams.add(sconfig);
    } catch (e) {
      this.logger.log(JSON.stringify(e, null, 2));

      if (e instanceof NatsError && e.api_error?.err_code === 10058) {
        this.logger.log(`Trying to update ${sconfig.name} stream`);
        return await this.jsm.streams.update(sconfig.name, sconfig);
      }

      throw new Error(`RegisterStream failed.`);
    }
  };