Subscriptions
Introduction
Subscriptions are a type of real-time event stream between the client and server. Use subscriptions when you need to push real-time updates to the client.
With tRPC's subscriptions, the client establishes and maintains a persistent connection to the server plus automatically attempts to reconnect and recover gracefully if disconnected with the help of tracked() events.
WebSockets or Server-sent Events?
You can either use WebSockets or Server-sent Events (SSE) to setup real-time subscriptions in tRPC.
- For WebSockets, see the WebSockets page
- For SSE, see the httpSubscriptionLink
If you are unsure which one to use, we recommend using SSE for subscriptions as it's easier to setup and doesn't require setting up a WebSocket server.
Reference projects
| Type | Example Type | Link |
|---|---|---|
| WebSockets | Bare-minimum Node.js WebSockets example | /examples/standalone-server |
| SSE | Full-stack SSE implementation | github.com/trpc/examples-next-sse-chat |
| WebSockets | Full-stack WebSockets implementation | github.com/trpc/examples-next-prisma-websockets-starter |
Basic example
For a full example, see our full-stack SSE example.
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
server.tstsimportEventEmitter , {on } from 'node:events';import {initTRPC } from '@trpc/server';constt =initTRPC .create ();typePost = {id : string;title : string };constee = newEventEmitter ();export constappRouter =t .router ({onPostAdd :t .procedure .subscription (async function* (opts ) {// listen for new eventsfor await (const [data ] ofon (ee , 'add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,})) {constpost =data asPost ;yieldpost ;}}),});
Automatic tracking of id using tracked() (recommended)
If you yield an event using our tracked()-helper and include an id, the client will automatically reconnect when it gets disconnected and send the last known ID.
You can send an initial lastEventId when initializing the subscription and it will be automatically updated as the browser receives data.
- For SSE, this is part of the
EventSource-spec and will be propagated throughlastEventIdin your.input(). - For WebSockets, our
wsLinkwill automatically send the last known ID and update it as the browser receives data.
If you're fetching data based on the lastEventId, and capturing all events is critical, make sure you set up the event listener before fetching events from your database as is done in our full-stack SSE example, this can prevent newly emitted events being ignored while yield'ing the original batch based on lastEventId.
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
tsimportEventEmitter , {on } from 'node:events';import {initTRPC ,tracked } from '@trpc/server';import {z } from 'zod';classIterableEventEmitter extendsEventEmitter {toIterable (eventName : string,opts ?: {signal ?:AbortSignal }) {returnon (this,eventName ,opts );}}typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newIterableEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client receivedlastEventId :z .string ().nullish (),}).optional (),).subscription (async function* (opts ) {// We start by subscribing to the ee so that we don't miss any new events while fetchingconstiterable =ee .toIterable ('add', {// Passing the AbortSignal from the request automatically cancels the event emitter when the request is abortedsignal :opts .signal ,});if (opts .input ?.lastEventId ) {// [...] get the posts since the last event id and yield them// const items = await db.post.findMany({ ... })// for (const item of items) {// yield tracked(item.id, item);// }}// listen for new events from the iterable we set up abovefor await (const [data ] ofiterable ) {constpost =data asPost ;// tracking the post id ensures the client can reconnect at any time and get the latest events since this idyieldtracked (post .id ,post );}}),});
Pull data in a loop
This recipe is useful when you want to periodically check for new data from a source like a database and push it to the client.
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
server.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';export constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({// lastEventId is the last event id that the client has received// On the first call, it will be whatever was passed in the initial setup// If the client reconnects, it will be the last event id that the client received// The id is the createdAt of the postlastEventId :z .coerce .date ().nullish (),}),).subscription (async function* (opts ) {// `opts.signal` is an AbortSignal that will be aborted when the client disconnects.letlastEventId =opts .input ?.lastEventId ?? null;// We use a `while` loop that checks `!opts.signal.aborted`while (!opts .signal !.aborted ) {constposts = awaitdb .post .findMany ({// If we have a `lastEventId`, we only fetch posts created after it.where :lastEventId ? {createdAt : {gt :lastEventId ,},}:undefined ,orderBy : {createdAt : 'asc',},});for (constpost ofposts ) {// `tracked` is a helper that sends an `id` with each event.// This allows the client to resume from the last received event upon reconnection.yieldtracked (post .createdAt .toJSON (),post );lastEventId =post .createdAt ;}// Wait for a bit before polling again to avoid hammering the database.awaitsleep (1_000);}}),});
Stopping a subscription from the server
If you need to stop a subscription from the server, simply return in the generator function.
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
tsexport constsubRouter =router ({onPostAdd :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (!opts .signal !.aborted ) {constidx =index ++;if (idx > 100) {// With this, the subscription will stop and the client will disconnectreturn;}await newPromise ((resolve ) =>setTimeout (resolve , 10));}}),});
On the client, you just .unsubscribe() the subscription.
Cleanup of side effects
If you need to clean up any side-effects of your subscription you can use the try...finally pattern, as trpc invokes the .return() of the Generator Instance when the subscription stops for any reason.
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
tsimportEventEmitter , {on } from 'events';import {initTRPC } from '@trpc/server';typePost = {id : string;title : string };constt =initTRPC .create ();constpublicProcedure =t .procedure ;constrouter =t .router ;constee = newEventEmitter ();export constsubRouter =router ({onPostAdd :publicProcedure .subscription (async function* (opts ) {lettimeout :ReturnType <typeofsetTimeout > | undefined;try {for await (const [data ] ofon (ee , 'add', {signal :opts .signal ,})) {timeout =setTimeout (() =>console .log ('Pretend like this is useful'));constpost =data asPost ;yieldpost ;}} finally {if (timeout )clearTimeout (timeout );}}),});
Error handling
Throwing an error in a generator function propagates to trpc's onError() on the backend.
If the error thrown is a 5xx error, the client will automatically attempt to reconnect based on the last event id that is tracked using tracked(). For other errors, the subscription will be cancelled and propagate to the onError() callback.
Output validation
Since subscriptions are async iterators, you have to go through the iterator to validate the output.
Example with Zod v4
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
zAsyncIterable.tstsimport type {TrackedEnvelope } from '@trpc/server';import {isTrackedEnvelope ,tracked } from '@trpc/server';import {z } from 'zod';functionisAsyncIterable <TValue ,TReturn = unknown>(value : unknown,):value isAsyncIterable <TValue ,TReturn > {return !!value && typeofvalue === 'object' &&Symbol .asyncIterator invalue ;}consttrackedEnvelopeSchema =z .custom <TrackedEnvelope <unknown>>(isTrackedEnvelope );/*** A Zod schema helper designed specifically for validating async iterables. This schema ensures that:* 1. The value being validated is an async iterable.* 2. Each item yielded by the async iterable conforms to a specified type.* 3. The return value of the async iterable, if any, also conforms to a specified type.*/export functionzAsyncIterable <TYieldIn ,TYieldOut ,TReturnIn = void,TReturnOut = void,Tracked extends boolean = false,>(opts : {/*** Validate the value yielded by the async generator*/yield :z .ZodType <TYieldOut ,TYieldIn >;/*** Validate the return value of the async generator* @remarks not applicable for subscriptions*/return ?:z .ZodType <TReturnOut ,TReturnIn >;/*** Whether the yielded values are tracked* @remarks only applicable for subscriptions*/tracked ?:Tracked ;}) {returnz .custom <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn >>((val ) =>isAsyncIterable (val )).transform (async function* (iter ) {constiterator =iter [Symbol .asyncIterator ]();try {letnext ;while ((next = awaititerator .next ()) && !next .done ) {if (opts .tracked ) {const [id ,data ] =trackedEnvelopeSchema .parse (next .value );yieldtracked (id , awaitopts .yield .parseAsync (data ));continue;}yieldopts .yield .parseAsync (next .value );}if (opts .return ) {return awaitopts .return .parseAsync (next .value );}return;} finally {awaititerator .return ?.();}}) asz .ZodType <AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldIn > :TYieldIn ,TReturnIn ,unknown>,AsyncIterable <Tracked extends true ?TrackedEnvelope <TYieldOut > :TYieldOut ,TReturnOut ,unknown>>;}
Now you can use this helper to validate the output of your subscription procedures:
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});
_app.tstsimport {tracked } from '@trpc/server';import {z } from 'zod';import {publicProcedure ,router } from './trpc';import {zAsyncIterable } from './zAsyncIterable';export constappRouter =router ({mySubscription :publicProcedure .input (z .object ({lastEventId :z .coerce .number ().min (0).optional (),}),).output (zAsyncIterable ({yield :z .object ({count :z .number (),}),tracked : true,}),).subscription (async function* (opts ) {letindex =opts .input .lastEventId ?? 0;while (true) {index ++;yieldtracked (String (index ), {count :index ,});await newPromise ((resolve ) =>setTimeout (resolve , 1000));}}),});