Skip to content
Snippets Groups Projects
Unverified Commit d6b7b55a authored by Zdravko Iliev's avatar Zdravko Iliev
Browse files

fix: fix stream already exists

parent 3294ed43
No related branches found
No related tags found
1 merge request!31ci: issuer doesn't see any new connections in GET /connections or GET /connections/:id
Pipeline #64822 canceled with stage
in 52 seconds
This commit is part of merge request !31. Comments created here will be created in the context of that merge request.
...@@ -7,6 +7,7 @@ import { ...@@ -7,6 +7,7 @@ import {
JetStreamManager, JetStreamManager,
JSONCodec, JSONCodec,
NatsConnection, NatsConnection,
NatsError,
RetentionPolicy, RetentionPolicy,
StorageType, StorageType,
StreamConfig, StreamConfig,
...@@ -66,7 +67,7 @@ export class NatsBaseService { ...@@ -66,7 +67,7 @@ export class NatsBaseService {
this.jsClient = this.client.jetstream(); this.jsClient = this.client.jetstream();
this.jsm = await this.client.jetstreamManager(); this.jsm = await this.client.jetstreamManager();
await this.jsm.streams.add(this.streamConfig); await this.registerStream(this.streamConfig);
}, },
{ {
retries: 5, retries: 5,
...@@ -89,4 +90,18 @@ export class NatsBaseService { ...@@ -89,4 +90,18 @@ export class NatsBaseService {
disconnect = () => { disconnect = () => {
return this.client.close(); return this.client.close();
}; };
private registerStream = async (sconfig: StreamConfig) => {
try {
return await this.jsm.streams.add(sconfig);
} catch (e) {
this.logger.log(JSON.stringify(e, null, 2));
if (e instanceof NatsError && e.api_error?.err_code === 10065) {
return await this.jsm.streams.get(sconfig.name);
}
throw new Error(`RegisterStream failed.`);
}
};
} }
...@@ -38,8 +38,8 @@ export class ConsumerService extends NatsBaseService { ...@@ -38,8 +38,8 @@ export class ConsumerService extends NatsBaseService {
return this.jsClient.consumers.get(stream, consumerConfig.name); return this.jsClient.consumers.get(stream, consumerConfig.name);
} catch (error) { } catch (error) {
if (error instanceof NatsError && error.api_error?.err_code === 10065) { if (error instanceof NatsError && error.code === "409") {
this.logger.log("Stream already exists"); this.logger.log("Consumer already exists");
return this.jsClient.consumers.get(stream, consumerConfig.name); return this.jsClient.consumers.get(stream, consumerConfig.name);
} }
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment