WebSockets
You can use WebSockets for all or some of the communication with your server, see wsLink for how to set it up on the client.
The document here outlines the specific details of using WebSockets. For general usage of subscriptions, see our subscriptions guide.
Creating a WebSocket-server
bashyarn add ws
bashyarn add ws
server/wsServer.tstsimport {applyWSSHandler } from '@trpc/server/adapters/ws';import {WebSocketServer } from 'ws';import {appRouter } from './routers/app';import {createContext } from './trpc';constwss = newWebSocketServer ({port : 3001,});consthandler =applyWSSHandler ({wss ,router :appRouter ,createContext ,// Enable heartbeat messages to keep connection open (disabled by default)keepAlive : {enabled : true,// server ping message interval in millisecondspingMs : 30000,// connection is terminated if pong message is not received in this many millisecondspongWaitMs : 5000,},});wss .on ('connection', (ws ) => {console .log (`++ Connection (${wss .clients .size })`);ws .once ('close', () => {console .log (`-- Connection (${wss .clients .size })`);});});console .log ('WebSocket Server listening on ws://localhost:3001');process .on ('SIGTERM', () => {console .log ('SIGTERM');handler .broadcastReconnectNotification ();wss .close ();});
server/wsServer.tstsimport {applyWSSHandler } from '@trpc/server/adapters/ws';import {WebSocketServer } from 'ws';import {appRouter } from './routers/app';import {createContext } from './trpc';constwss = newWebSocketServer ({port : 3001,});consthandler =applyWSSHandler ({wss ,router :appRouter ,createContext ,// Enable heartbeat messages to keep connection open (disabled by default)keepAlive : {enabled : true,// server ping message interval in millisecondspingMs : 30000,// connection is terminated if pong message is not received in this many millisecondspongWaitMs : 5000,},});wss .on ('connection', (ws ) => {console .log (`++ Connection (${wss .clients .size })`);ws .once ('close', () => {console .log (`-- Connection (${wss .clients .size })`);});});console .log ('WebSocket Server listening on ws://localhost:3001');process .on ('SIGTERM', () => {console .log ('SIGTERM');handler .broadcastReconnectNotification ();wss .close ();});
Setting TRPCClient to use WebSockets
You can use Links to route queries and/or mutations to HTTP transport and subscriptions over WebSockets.
client.tstsximport {createTRPCClient ,createWSClient ,wsLink } from '@trpc/client';import type {AppRouter } from './server';// create persistent WebSocket connectionconstwsClient =createWSClient ({url : `ws://localhost:3001`,});// configure TRPCClient to use WebSockets transportconstclient =createTRPCClient <AppRouter >({links : [wsLink ({client :wsClient ,}),],});
client.tstsximport {createTRPCClient ,createWSClient ,wsLink } from '@trpc/client';import type {AppRouter } from './server';// create persistent WebSocket connectionconstwsClient =createWSClient ({url : `ws://localhost:3001`,});// configure TRPCClient to use WebSockets transportconstclient =createTRPCClient <AppRouter >({links : [wsLink ({client :wsClient ,}),],});
Authentication / connection params
If you're doing a web application, you can ignore this section as the cookies are sent as part of the request.
In order to authenticate with WebSockets, you can define connectionParams to createWSClient. This will be sent as the first message when the client establishes a WebSocket connection.
server/context.tstsimport type {CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';export constcreateContext = async (opts :CreateWSSContextFnOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
server/context.tstsimport type {CreateWSSContextFnOptions } from '@trpc/server/adapters/ws';export constcreateContext = async (opts :CreateWSSContextFnOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
client/trpc.tstsimport {createTRPCClient ,createWSClient ,wsLink } from '@trpc/client';import type {AppRouter } from './server';importsuperjson from 'superjson';constwsClient =createWSClient ({url : `ws://localhost:3000`,connectionParams : async () => {return {token : 'supersecret',};},});export consttrpc =createTRPCClient <AppRouter >({links : [wsLink ({client :wsClient ,transformer :superjson })],});
client/trpc.tstsimport {createTRPCClient ,createWSClient ,wsLink } from '@trpc/client';import type {AppRouter } from './server';importsuperjson from 'superjson';constwsClient =createWSClient ({url : `ws://localhost:3000`,connectionParams : async () => {return {token : 'supersecret',};},});export consttrpc =createTRPCClient <AppRouter >({links : [wsLink ({client :wsClient ,transformer :superjson })],});
Automatic tracking of id using tracked() (recommended)
If you yield an event using our tracked()-helper and include an id, the client will automatically reconnect when it gets disconnected and send the last known ID when reconnecting as part of the lastEventId-input.
You can send an initial lastEventId when initializing the subscription and it will be automatically updated as the browser receives data.
If you're fetching data based on the lastEventId, and capturing all events is critical, you may want to use ReadableStream's or a similar pattern as an intermediary as is done in our full-stack SSE example to prevent newly emitted events being ignored while yield'ing the original batch based on lastEventId.
tsimportEventEmitter , {on } from 'events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them}// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is abortedsignal :opts .signal ,})) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
tsimportEventEmitter , {on } from 'events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them}// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the subscription is abortedsignal :opts .signal ,})) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
WebSockets RPC Specification
You can read more details by drilling into the TypeScript definitions:
query / mutation
Request
tsinterfaceRequestMessage {id : number | string;jsonrpc ?: '2.0';method : 'query' | 'mutation';params : {path : string;input ?: unknown; // <-- pass input of procedure, serialized by transformer};}
tsinterfaceRequestMessage {id : number | string;jsonrpc ?: '2.0';method : 'query' | 'mutation';params : {path : string;input ?: unknown; // <-- pass input of procedure, serialized by transformer};}
Response
... below, or an error.
tsinterfaceResponseMessage {id : number | string;jsonrpc ?: '2.0';result : {type : 'data'; // always 'data' for mutation / queriesdata :TOutput ; // output from procedure};}
tsinterfaceResponseMessage {id : number | string;jsonrpc ?: '2.0';result : {type : 'data'; // always 'data' for mutation / queriesdata :TOutput ; // output from procedure};}
subscription / subscription.stop
Start a subscription
tsinterfaceSubscriptionRequest {id : number | string;jsonrpc ?: '2.0';method : 'subscription';params : {path : string;input ?: unknown; // <-- pass input of procedure, serialized by transformer};}
tsinterfaceSubscriptionRequest {id : number | string;jsonrpc ?: '2.0';method : 'subscription';params : {path : string;input ?: unknown; // <-- pass input of procedure, serialized by transformer};}
To cancel a subscription, call subscription.stop
tsinterfaceSubscriptionStopRequest {id : number | string; // <-- id of your created subscriptionjsonrpc ?: '2.0';method : 'subscription.stop';}
tsinterfaceSubscriptionStopRequest {id : number | string; // <-- id of your created subscriptionjsonrpc ?: '2.0';method : 'subscription.stop';}
Subscription response shape
... below, or an error.
tsinterfaceSubscriptionResponse {id : number | string;jsonrpc ?: '2.0';result :| {type : 'data';data :TData ; // subscription emitted data}| {type : 'started'; // subscription started}| {type : 'stopped'; // subscription stopped};}
tsinterfaceSubscriptionResponse {id : number | string;jsonrpc ?: '2.0';result :| {type : 'data';data :TData ; // subscription emitted data}| {type : 'started'; // subscription started}| {type : 'stopped'; // subscription stopped};}
Connection params
If the connection is initialized with ?connectionParams=1, the first message has to be connection params.
tsinterfaceConnectionParamsMessage {data :Record <string, string> | null;method : 'connectionParams';}
tsinterfaceConnectionParamsMessage {data :Record <string, string> | null;method : 'connectionParams';}
Errors
See https://www.jsonrpc.org/specification#error_object or Error Formatting.
Notifications from Server to Client
{ id: null, type: 'reconnect' }
Tells clients to reconnect before shutting down the server. Invoked by wssHandler.broadcastReconnectNotification().