Antiox – Tokio-like async primitives for TypeScript

1 points by nathanflurry


Sharing a small library we built internally here for fellow Lobsters burdened with writing async TypeScript.

We did an assessment internally at Rivet of the bugs in our TypeScript codebases. The #1 issue by far was with async concurrency bugs (specifically in backend for non-CRUD use cases).

Needless to say, very few of these concurrency issues exist in our Rust codebases since we heavily utilize channels + tasks + Tokio's other concurrency primitives

We built this to be as close to a 1:1 mapWe built this to be as close to a 1:1 mapping from Rust as possible.ping from Rust as possible.

There are loads of other concurrency libraries for TypeScript, but they usually have gaping holes or weird design quirks. Tokio's been rigorously tested and well designed, so copying identically makes it – in my opinion – the best designed concurrency library for TypeScript.

There's one big gotcha with TypeScript: you cannot cancel promises like you can futures. To work around this, we automatically create and pass an abort signal anywhere you would need to cancel a future (e.g. task, timeout, select). You can't fix TypeScript, but this helps you and LLMs reason about your code easily.

This does not attempt to bring multi-threading to JS (though others have attempted by using worker pools). This is roughly equivalent to Tokio's single-threaded scheduler.

Code snippet:

import { channel } from "antiox/sync/mpsc";
import { oneshot, OneshotSender } from "antiox/sync/oneshot";
import { unreachable } from "antiox/panic";
import { spawn } from "antiox/task";

type Msg =
  | { type: "increment"; amount: number }
  | { type: "get"; resTx: OneshotSender<number> };

const [tx, rx] = channel<Msg>(32);

spawn(async () => {
  let count = 0;
  for await (const msg of rx) {
    switch (msg.type) {
      case "increment":
        count += msg.amount;
        break;
      case "get":
        msg.resTx.send(count);
        break;
      default:
        // `unreachable(x: never)` provides compile-time exhaustiveness checking for switch statements
        unreachable(msg);
    }
  }
});

// Fire-and-forget
await tx.send({ type: "increment", amount: 5 });

// Request-response via oneshot channel
const [resTx, resRx] = oneshot<number>();
await tx.send({ type: "get", resTx });
const value = await resRx;

Currently supported modules:

rix

This is an interesting approach that doubles down on using AbortSignal for Structural Concurrency at a bigger scale. The ergonomic issues of AbortSignal are mainly these:

Reading antiox, I can find examples of these issues:

Missing removeEventListener('abort').. Even { once: true } cannot help here - if the signal was never aborted, the handler will be leaked for as long as the signal is living.

	if (signal) {
		signal.addEventListener("abort", () => parentController.abort(signal.reason), { once: true });
	}

AbortSignal being opt-in, meaning child tasks cannot be cancelled and joined before the parent task continues.

		const result = await select({
			fast: async (_signal) => {
				await delay(10);
				return "fast";
			},
			slow: async (_signal) => {
				await delay(200);
				return "slow";
			},
		});

Kevin Gibbons and I discussed similar ideas but we were more focused on solving these ergonomic issues and enforcing SC guarantees, and end up with this: https://lobste.rs/s/xt8q7u/adding_structured_concurrency (Not a library, just some brain storming).