Skip to main content
Version: 11.x

WebSockets

You can use WebSockets for all or some of the communication with your server, see wsLink for how to set it up on the client.

Using Subscriptions

tip

Adding a subscription procedure

server/router.ts
tsx
import { EventEmitter } from 'events';
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC.create();
export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Post>((emit) => {
const onAdd = (data: Post) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off('add', onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
}),
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */
ee.emit('add', post);
return post;
}),
});
server/router.ts
tsx
import { EventEmitter } from 'events';
import { initTRPC } from '@trpc/server';
import { observable } from '@trpc/server/observable';
import { z } from 'zod';
// create a global event emitter (could be replaced by redis, etc)
const ee = new EventEmitter();
const t = initTRPC.create();
export const appRouter = t.router({
onAdd: t.procedure.subscription(() => {
// return an `observable` with a callback which is triggered immediately
return observable<Post>((emit) => {
const onAdd = (data: Post) => {
// emit data to client
emit.next(data);
};
// trigger `onAdd()` when `add` is triggered in our event emitter
ee.on('add', onAdd);
// unsubscribe function when client disconnects or stops subscribing
return () => {
ee.off('add', onAdd);
};
});
}),
add: t.procedure
.input(
z.object({
id: z.string().uuid().optional(),
text: z.string().min(1),
}),
)
.mutation(async (opts) => {
const post = { ...opts.input }; /* [..] add to db */
ee.emit('add', post);
return post;
}),
});

Creating a WebSocket-server

bash
yarn add ws
bash
yarn add ws
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});
server/wsServer.ts
ts
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import ws from 'ws';
import { appRouter } from './routers/app';
import { createContext } from './trpc';
const wss = new ws.Server({
port: 3001,
});
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// Enable heartbeat messages to keep connection open (disabled by default)
keepAlive: {
enabled: true,
// server ping message interval in milliseconds
pingMs: 30000,
// connection is terminated if pong message is not received in this many milliseconds
pongWaitMs: 5000,
},
});
wss.on('connection', (ws) => {
console.log(`➕➕ Connection (${wss.clients.size})`);
ws.once('close', () => {
console.log(`➖➖ Connection (${wss.clients.size})`);
});
});
console.log('✅ WebSocket Server listening on ws://localhost:3001');
process.on('SIGTERM', () => {
console.log('SIGTERM');
handler.broadcastReconnectNotification();
wss.close();
});

Setting TRPCClient to use WebSockets

tip

You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets.

client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});
client.ts
tsx
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '../path/to/server/trpc';
// create persistent WebSocket connection
const wsClient = createWSClient({
url: `ws://localhost:3001`,
});
// configure TRPCClient to use WebSockets transport
const client = createTRPCClient<AppRouter>({
links: [
wsLink({
client: wsClient,
}),
],
});

Authentication / connection params

tip

If you're doing a web application, you can ignore this section as the cookies are sent as part of the request.

In order to authenticate with WebSockets, you can define connectionParams to createWSClient. This will be sent as the first message when the client establishes a WebSocket connection.

server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
server/context.ts
ts
import type { CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';
 
export const createContext = async (opts: CreateWSSContextFnOptions) => {
const token = opts.info.connectionParams?.token;
const token: string | undefined
 
// [... authenticate]
 
return {};
};
 
export type Context = Awaited<ReturnType<typeof createContext>>;
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '~/server/routers/_app';
const wsClient = createWSClient({
url: `ws://localhost:3000`,
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});
client/trpc.ts
ts
import { createTRPCClient, createWSClient, wsLink } from '@trpc/client';
import type { AppRouter } from '~/server/routers/_app';
const wsClient = createWSClient({
url: `ws://localhost:3000`,
connectionParams: async () => {
return {
token: 'supersecret',
};
},
});
export const trpc = createTRPCClient<AppRouter>({
links: [wsLink({ client: wsClient, transformer: superjson })],
});

Using React

See /examples/next-prisma-starter-websockets.

WebSockets RPC Specification

You can read more details by drilling into the TypeScript definitions:

query / mutation

Request

ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // optional
method: 'query' | 'mutation';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

Response

... below, or an error.

ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}
ts
{
id: number | string;
jsonrpc?: '2.0'; // only defined if included in request
result: {
type: 'data'; // always 'data' for mutation / queries
data: TOutput; // output from procedure
}
}

subscription / subscription.stop

Start a subscription

ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}
ts
{
id: number | string;
jsonrpc?: '2.0';
method: 'subscription';
params: {
path: string;
input?: unknown; // <-- pass input of procedure, serialized by transformer
};
}

To cancel a subscription, call subscription.stop

ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}
ts
{
id: number | string; // <-- id of your created subscription
jsonrpc?: '2.0';
method: 'subscription.stop';
}

Subscription response shape

... below, or an error.

ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}
ts
{
id: number | string;
jsonrpc?: '2.0';
result: (
| {
type: 'data';
data: TData; // subscription emitted data
}
| {
type: 'started'; // subscription started
}
| {
type: 'stopped'; // subscription stopped
}
)
}

Connection params

If the connection is initialized with ?connectionParams=1, the first message has to be connection params.

ts
{
data: Record<string, string> | null;
method: 'connectionParams';
}
ts
{
data: Record<string, string> | null;
method: 'connectionParams';
}

Errors

See https://www.jsonrpc.org/specification#error_object or Error Formatting.

Notifications from Server to Client

{ id: null, type: 'reconnect' }

Tells clients to reconnect before shutting down the server. Invoked by wssHandler.broadcastReconnectNotification().