diff --git a/libs/nats/src/base.nats.service.ts b/libs/nats/src/base.nats.service.ts index c0517cff10782946d00eb06aa12b2bed7264d2db..20a6bde05bc91ed932d2b05eb730636cd89e02ad 100644 --- a/libs/nats/src/base.nats.service.ts +++ b/libs/nats/src/base.nats.service.ts @@ -7,6 +7,7 @@ import { JetStreamManager, JSONCodec, NatsConnection, + NatsError, RetentionPolicy, StorageType, StreamConfig, @@ -66,7 +67,7 @@ export class NatsBaseService { this.jsClient = this.client.jetstream(); this.jsm = await this.client.jetstreamManager(); - await this.jsm.streams.add(this.streamConfig); + await this.registerStream(this.streamConfig); }, { retries: 5, @@ -89,4 +90,18 @@ export class NatsBaseService { disconnect = () => { return this.client.close(); }; + + private registerStream = async (sconfig: StreamConfig) => { + try { + return await this.jsm.streams.add(sconfig); + } catch (e) { + this.logger.log(JSON.stringify(e, null, 2)); + + if (e instanceof NatsError && e.api_error?.err_code === 10065) { + return await this.jsm.streams.get(sconfig.name); + } + + throw new Error(`RegisterStream failed.`); + } + }; } diff --git a/libs/nats/src/consumer.nats.service.ts b/libs/nats/src/consumer.nats.service.ts index 8cef00f874090de467de8a84d94c1cfb6fb48fcd..d1dfbc4eaa9973f9a42953f433415781bc9ebfea 100644 --- a/libs/nats/src/consumer.nats.service.ts +++ b/libs/nats/src/consumer.nats.service.ts @@ -38,8 +38,8 @@ export class ConsumerService extends NatsBaseService { return this.jsClient.consumers.get(stream, consumerConfig.name); } catch (error) { - if (error instanceof NatsError && error.api_error?.err_code === 10065) { - this.logger.log("Stream already exists"); + if (error instanceof NatsError && error.code === "409") { + this.logger.log("Consumer already exists"); return this.jsClient.consumers.get(stream, consumerConfig.name); }