Skip to content

Commit 2a55a12

Browse files
committed
feat(jetstream): add watcherPrefix option jetstream/JetStreamManager contexts
Signed-off-by: Alberto Ricart <[email protected]>
1 parent 2e41bb0 commit 2a55a12

File tree

8 files changed

+177
-4
lines changed

8 files changed

+177
-4
lines changed

jetstream/src/jsbaseclient_api.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
import {
1717
backoff,
18+
createInbox,
1819
delay,
1920
Empty,
2021
errors,
@@ -65,6 +66,8 @@ export class BaseApiClientImpl {
6566

6667
constructor(nc: NatsConnection, opts?: JetStreamOptions) {
6768
this.nc = nc as NatsConnectionImpl;
69+
opts = opts || {} as JetStreamOptions;
70+
opts.watcherPrefix = opts.watcherPrefix || this.nc.options.inboxPrefix;
6871
this.opts = defaultJsOptions(opts);
6972
this._parseOpts();
7073
this.prefix = this.opts.apiPrefix!;
@@ -85,6 +88,9 @@ export class BaseApiClientImpl {
8588
prefix = prefix.substr(0, prefix.length - 1);
8689
}
8790
this.opts.apiPrefix = prefix;
91+
92+
// verify that watcherPrefix is valid
93+
createInbox(this.opts.watcherPrefix);
8894
}
8995

9096
async _request(

jetstream/src/jsclient.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ import { ConsumerAPIImpl } from "./jsmconsumer_api.ts";
1818

1919
import {
2020
backoff,
21+
createInbox,
2122
deferred,
2223
delay,
2324
Empty,
@@ -174,6 +175,14 @@ export class JetStreamClientImpl extends BaseApiClientImpl
174175
this.opts,
175176
{ checkAPI },
176177
) as JetStreamManagerOptions;
178+
179+
// fail early if watcherPrefix is bad
180+
try {
181+
createInbox(opts.watcherPrefix);
182+
} catch (err) {
183+
return Promise.reject(err);
184+
}
185+
177186
return jetstreamManager(this.nc, opts);
178187
}
179188

jetstream/src/jsmstream_api.ts

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -177,8 +177,7 @@ export class ConsumersImpl implements Consumers {
177177
name_prefix = name_prefix || `oc_${nuid.next()}`;
178178
minValidation("name_prefix", name_prefix);
179179
deliver_prefix = deliver_prefix ||
180-
createInbox((this.api as ConsumerAPIImpl).nc.options.inboxPrefix);
181-
minValidation("deliver_prefix", name_prefix);
180+
createInbox((this.api as ConsumerAPIImpl).getOptions().watcherPrefix);
182181

183182
const cc = Object.assign({}, opts) as ConsumerConfig;
184183
cc.ack_policy = AckPolicy.None;

jetstream/src/types.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,12 @@ export type JetStreamOptions = {
6868
* the default JetStream apiPrefix.
6969
*/
7070
domain?: string;
71+
72+
/**
73+
* Watcher prefix for inbox subscriptions - these are used for watchers
74+
* and push consumers. If not set, it uses ConnectionOptions#inboxPrefix
75+
*/
76+
watcherPrefix?: string;
7177
};
7278

7379
export type JetStreamManagerOptions = JetStreamOptions & {

jetstream/tests/jetstream_test.ts

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1374,3 +1374,81 @@ Deno.test("jetstream - jsm base timeout", async () => {
13741374

13751375
await cleanup(ns, nc);
13761376
});
1377+
1378+
Deno.test("jetstream - watcherPrefix", async () => {
1379+
const { ns, nc } = await setup(jetstreamServerConf({}));
1380+
1381+
async function transitive(js: JetStreamClient): Promise<void> {
1382+
const { watcherPrefix } = js.getOptions();
1383+
const jsm = await js.jetstreamManager();
1384+
assertEquals(watcherPrefix, jsm.getOptions().watcherPrefix);
1385+
1386+
const js2 = jsm.jetstream();
1387+
assertEquals(watcherPrefix, js2.getOptions().watcherPrefix);
1388+
}
1389+
1390+
let js = jetstream(nc);
1391+
assertEquals(js.getOptions().watcherPrefix, undefined);
1392+
1393+
const jsm = await jetstreamManager(nc);
1394+
assertEquals(jsm.getOptions().watcherPrefix, undefined);
1395+
1396+
const nc2 = await connect({ port: ns.port, inboxPrefix: "hello" });
1397+
js = jetstream(nc2);
1398+
assertEquals(js.getOptions().watcherPrefix, "hello");
1399+
await transitive(js);
1400+
1401+
js = jetstream(nc2, { watcherPrefix: "bar" });
1402+
assertEquals(js.getOptions().watcherPrefix, "bar");
1403+
await transitive(js);
1404+
1405+
assertThrows(
1406+
() => {
1407+
jetstream(nc2, { watcherPrefix: "hello.*" });
1408+
},
1409+
Error,
1410+
"'prefix' cannot have wildcards ('hello.*')",
1411+
);
1412+
1413+
await assertRejects(
1414+
() => {
1415+
return jetstreamManager(nc, { watcherPrefix: "hello.*" });
1416+
},
1417+
Error,
1418+
"'prefix' cannot have wildcards ('hello.*')",
1419+
);
1420+
1421+
await cleanup(ns, nc, nc2);
1422+
});
1423+
1424+
Deno.test("jetstream - watcher deliver_subject", async () => {
1425+
const { ns, nc } = await setup(jetstreamServerConf({}));
1426+
1427+
const jsm = await jetstreamManager(nc);
1428+
await jsm.streams.add({ name: "A", subjects: ["a"] });
1429+
1430+
async function assertDeliverTo(
1431+
js: JetStreamClient,
1432+
deliverPrefix: string,
1433+
): Promise<void> {
1434+
const pc = await js.consumers.getPushConsumer("A");
1435+
const { config: { deliver_subject } } = await pc.info(true);
1436+
assertEquals(deliver_subject?.split(".")[0], deliverPrefix);
1437+
await pc.delete();
1438+
}
1439+
1440+
// with no prefix, defaults to inbox
1441+
let js = jetstream(nc);
1442+
await assertDeliverTo(js, "_INBOX");
1443+
1444+
// respects inboxPrefix
1445+
const nc2 = await connect({ port: ns.port, inboxPrefix: "hallo" });
1446+
js = jetstream(nc2);
1447+
await assertDeliverTo(js, "hallo");
1448+
1449+
// override inboxPrefix
1450+
js = jetstream(nc2, { watcherPrefix: "hola" });
1451+
await assertDeliverTo(js, "hola");
1452+
1453+
await cleanup(ns, nc, nc2);
1454+
});

kv/tests/kv_test.ts

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2671,3 +2671,21 @@ Deno.test("kv - encoder", async () => {
26712671

26722672
await cleanup(ns, nc);
26732673
});
2674+
2675+
Deno.test("kv - watcherPrefix", async () => {
2676+
const { ns, nc } = await setup(jetstreamServerConf({}));
2677+
2678+
const js = jetstream(nc, { watcherPrefix: "hello" });
2679+
const kv = await new Kvm(js).create("A");
2680+
2681+
const iter = await kv.watch({ key: "a.>" }) as QueuedIteratorImpl<
2682+
KvWatchEntry
2683+
>;
2684+
const pci = iter._data as PushConsumer;
2685+
const { config: { deliver_subject } } = await pci.info(true);
2686+
assertEquals(deliver_subject?.split(".")[0], "hello");
2687+
2688+
iter.stop();
2689+
2690+
await cleanup(ns, nc);
2691+
});

obj/tests/objectstore_test.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,20 @@ import {
3434
headers,
3535
nanos,
3636
nuid,
37+
type QueuedIteratorImpl,
3738
} from "@nats-io/nats-core/internal";
3839
import type { NatsConnectionImpl } from "@nats-io/nats-core/internal";
39-
import type { ObjectInfo, ObjectStoreMeta } from "../src/types.ts";
40-
import { jetstreamManager, StorageType } from "@nats-io/jetstream";
40+
import type {
41+
ObjectInfo,
42+
ObjectStoreMeta,
43+
ObjectWatchInfo,
44+
} from "../src/types.ts";
45+
import {
46+
jetstream,
47+
jetstreamManager,
48+
type PushConsumer,
49+
StorageType,
50+
} from "@nats-io/jetstream/internal";
4151
import { equals } from "@std/bytes";
4252
import { digestType, Objm } from "../src/objectstore.ts";
4353
import { Base64UrlPaddedCodec } from "../src/base64.ts";
@@ -1306,3 +1316,18 @@ Deno.test("os - objm creates right number of replicas", async () => {
13061316
await nc.close();
13071317
await NatsServer.stopAll(servers, true);
13081318
});
1319+
1320+
Deno.test("objectstore - watcherPrefix", async () => {
1321+
const { ns, nc } = await setup(jetstreamServerConf({}));
1322+
const js = jetstream(nc, { watcherPrefix: "hello" });
1323+
const objm = new Objm(js);
1324+
const os = await objm.create("test");
1325+
1326+
const watches = await os.watch() as QueuedIteratorImpl<ObjectWatchInfo>;
1327+
const oc = watches._data as PushConsumer;
1328+
const { config: { deliver_subject } } = await oc.info(true);
1329+
1330+
assertEquals(deliver_subject?.split(".")[0], "hello");
1331+
1332+
await cleanup(ns, nc);
1333+
});

runtimes.md

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,38 @@ deno init
5353
deno add jsr:@nats-io/transport-deno
5454
```
5555

56+
## TypeScript Compiler Configuration (Node)
57+
58+
If you are writing your code in TypeScript under node using the `tsc` compiler,
59+
you will need to configure `tsc` properly so it can find its way around the
60+
module. You will want a configuration file that looks like this:
61+
62+
```json
63+
{
64+
"compilerOptions": {
65+
"target": "esnext",
66+
"module": "nodenext",
67+
"outDir": "lib/",
68+
"moduleResolution": "nodenext",
69+
"sourceMap": true,
70+
"declaration": true,
71+
"allowJs": true,
72+
"removeComments": false,
73+
"resolveJsonModule": true
74+
},
75+
"include": [
76+
"src/**/*"
77+
],
78+
"exclude": [
79+
"lib/**/*"
80+
]
81+
}
82+
```
83+
84+
Of key importance are the `target`, `module`, `moduleResolution`. The `nodenext`
85+
identifies that the packages can contain `exports`, and will enable the compiler
86+
and IDEs to properly resolve imports in your code.
87+
5688
## Let's add a simple program that we can run
5789

5890
Added to the `nats-dev/<runtime>/index.ts` you used above.

0 commit comments

Comments
 (0)