Skip to content

Commit f1ed450

Browse files
authored
feat(core): add reconnect logic for specifying servers (#339)
* feat(core): add reconnect logic for specifying servers - Introduced support for reconnecting to specified servers via an optional `server` parameter in `reconnect()` implementation. Signed-off-by: Alberto Ricart <[email protected]>
1 parent 6c595bc commit f1ed450

File tree

4 files changed

+150
-5
lines changed

4 files changed

+150
-5
lines changed

core/src/core.ts

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -491,15 +491,22 @@ export interface NatsConnection {
491491
* - If the reconnection policy given to the client doesn't allow reconnects, the
492492
* connection will close.
493493
*
494-
* - Messages that are inbound or outbound could be lost.
494+
* - Messages that are inbound or outbound could be lost.
495495
*
496496
* - All requests that are in flight will be rejected.
497497
*
498498
* Note that the returned promise will reject if the client is already closed, or if
499499
* it is in the process of draining. If the client is currently disconnected,
500500
* this API has no effect, as the client is already attempting to reconnect.
501+
*
502+
* If a server or list of servers is specified, the client will clear its current
503+
* server list (the one provided to connect), and set the specified list as the
504+
* new server list - previous servers are forgotten, and then attempt reconnects.
505+
* If connecting to a TLS secured server, the hostname(s) must match.
506+
*
507+
* @param server an optional server or list of servers to reconnect to.
501508
*/
502-
reconnect(): Promise<void>;
509+
reconnect(server?: string | string[]): Promise<void>;
503510
}
504511

505512
/**

core/src/nats.ts

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,13 +554,30 @@ export class NatsConnectionImpl implements NatsConnection {
554554
return this.protocol.features;
555555
}
556556

557-
reconnect(): Promise<void> {
557+
reconnect(server?: string | string[]): Promise<void> {
558558
if (this.isClosed()) {
559559
return Promise.reject(new errors.ClosedConnectionError());
560560
}
561561
if (this.isDraining()) {
562562
return Promise.reject(new errors.DrainingConnectionError());
563563
}
564+
if (server !== undefined) {
565+
if (!Array.isArray(server)) {
566+
server = [server];
567+
}
568+
for (const s of server) {
569+
if (!s || typeof s !== "string") {
570+
return Promise.reject(
571+
InvalidArgumentError.format("server", "must be a hostname"),
572+
);
573+
}
574+
}
575+
576+
this.protocol.servers.clear(true);
577+
for (const s of server) {
578+
this.protocol.servers.addServer(s);
579+
}
580+
}
564581
return this.protocol.reconnect();
565582
}
566583

core/src/servers.ts

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -218,8 +218,11 @@ export class Servers {
218218
this.currentServer = this.servers[0];
219219
}
220220

221-
clear(): void {
221+
clear(forgetTLS?: boolean): void {
222222
this.servers.length = 0;
223+
if (forgetTLS === true) {
224+
this.tlsName = "";
225+
}
223226
}
224227

225228
updateTLSName(): void {

core/tests/reconnect_test.ts

Lines changed: 119 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,14 @@
1212
* See the License for the specific language governing permissions and
1313
* limitations under the License.
1414
*/
15-
import { assert, assertEquals, assertInstanceOf, fail } from "@std/assert";
15+
import {
16+
assert,
17+
assertEquals,
18+
assertExists,
19+
assertInstanceOf,
20+
assertRejects,
21+
fail,
22+
} from "@std/assert";
1623
import { connect } from "./connect.ts";
1724
import { Lock, NatsServer } from "test_helpers";
1825
import {
@@ -469,3 +476,114 @@ Deno.test("reconnect - authentication timeout reconnects", async () => {
469476

470477
await cleanup(ns, nc);
471478
});
479+
480+
Deno.test("reconnect - reconnect to new servers", async () => {
481+
const ns = await NatsServer.start();
482+
483+
const nci = await connect({
484+
port: ns.port,
485+
debug: true,
486+
}) as NatsConnectionImpl;
487+
488+
const done = deferred();
489+
490+
(async () => {
491+
for await (const s of nci.status()) {
492+
console.log(s);
493+
if (s.type === "reconnect") {
494+
if (s.server === "demo.nats.io:4222") {
495+
done.resolve();
496+
}
497+
}
498+
}
499+
})().then();
500+
501+
await nci.reconnect("demo.nats.io");
502+
503+
assert(!nci.isClosed());
504+
assertEquals(nci.protocol.servers.getServers().length, 1);
505+
const demo = nci.protocol.servers.getServers()[0];
506+
assertEquals(demo.hostname, "demo.nats.io");
507+
assertEquals(demo.gossiped, false);
508+
509+
await done;
510+
await cleanup(ns, nci);
511+
});
512+
513+
Deno.test("reconnect - bad args", async () => {
514+
const ns = await NatsServer.start();
515+
const nc = await connect({ port: ns.port }) as NatsConnectionImpl;
516+
const server = nc.protocol.servers.getServers()[0]!;
517+
518+
async function check(s?: string | string[]): Promise<void> {
519+
await assertRejects(
520+
() => nc.reconnect(s),
521+
Error,
522+
"must be a hostname",
523+
);
524+
assertEquals(nc.protocol.servers.getServers().length, 1);
525+
assertExists(nc.protocol.servers.getServers()[0]);
526+
assertEquals(nc.protocol.servers.getServers()[0], server);
527+
}
528+
529+
const tests = [
530+
["localhost", "", "demo.nats.io"],
531+
[123 as unknown as string],
532+
["localhost", null as unknown as string, "demo.nats.io"],
533+
"",
534+
];
535+
536+
for (const t of tests) {
537+
await check(t);
538+
}
539+
540+
await cleanup(ns, nc);
541+
});
542+
543+
Deno.test("reconnect - accepts valid array", async () => {
544+
const ns = await NatsServer.start();
545+
const nc = await connect({ port: ns.port }) as NatsConnectionImpl;
546+
547+
const done = deferred();
548+
549+
(async () => {
550+
for await (const s of nc.status()) {
551+
if (s.type === "reconnect") {
552+
done.resolve();
553+
break;
554+
}
555+
}
556+
})().then();
557+
558+
await nc.reconnect(["demo.nats.io"]);
559+
560+
assertEquals(nc.protocol.servers.getServers().length, 1);
561+
assertEquals(nc.protocol.servers.getServers()[0].hostname, "demo.nats.io");
562+
563+
await done;
564+
await cleanup(ns, nc);
565+
});
566+
567+
Deno.test("reconnect - accepts string", async () => {
568+
const ns = await NatsServer.start();
569+
const nc = await connect({ port: ns.port }) as NatsConnectionImpl;
570+
571+
const done = deferred();
572+
573+
(async () => {
574+
for await (const s of nc.status()) {
575+
if (s.type === "reconnect") {
576+
done.resolve();
577+
break;
578+
}
579+
}
580+
})().then();
581+
582+
await nc.reconnect("demo.nats.io");
583+
584+
assertEquals(nc.protocol.servers.getServers().length, 1);
585+
assertEquals(nc.protocol.servers.getServers()[0].hostname, "demo.nats.io");
586+
587+
await done;
588+
await cleanup(ns, nc);
589+
});

0 commit comments

Comments
 (0)