Skip to main content
Version: 11.x

HTTP Subscription Link

httpSubscriptionLink is a terminating link that's uses Server-sent Events (SSE) for subscriptions.

SSE is a good option for real-time as it's a bit easier to deal with than WebSockets and handles things like reconnecting and continuing where it left off automatically.

Setup

info

If your client's environment doesn't support EventSource, you need an EventSource polyfill.

To use httpSubscriptionLink, you need to use a splitLink to make it explicit that we want to use SSE for subscriptions.

client/index.ts
ts
import type { TRPCLink } from '@trpc/client';
import {
httpBatchLink,
loggerLink,
splitLink,
unstable_httpSubscriptionLink,
} from '@trpc/client';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @link https://trpc.io/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: unstable_httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});
client/index.ts
ts
import type { TRPCLink } from '@trpc/client';
import {
httpBatchLink,
loggerLink,
splitLink,
unstable_httpSubscriptionLink,
} from '@trpc/client';
const trpcClient = createTRPCClient<AppRouter>({
/**
* @link https://trpc.io/docs/v11/client/links
*/
links: [
// adds pretty logs to your console in development and logs errors in production
loggerLink(),
splitLink({
// uses the httpSubscriptionLink for subscriptions
condition: (op) => op.type === 'subscription',
true: unstable_httpSubscriptionLink({
url: `/api/trpc`,
}),
false: httpBatchLink({
url: `/api/trpc`,
}),
}),
],
});

Usage

tip

For a full example, see our full-stack SSE example.

Basic example

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add')) {
const post = data as Post;
yield post;
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure.subscription(async function* (opts) {
// listen for new events
for await (const [data] of on(ee, 'add')) {
const post = data as Post;
yield post;
}
}),
});

If you yield an event using our sse()-helper and include an id, the browser will automatically reconnect when it gets disconnected and send the last known ID - this is part of the EventSource-spec and will be propagated through lastEventId in your .input().

You can send an initial lastEventId when initializing the subscription and it will be automatically updated as the browser receives data.

ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { sse } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
}),
)
.subscription(async function* (opts) {
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add')) {
const post = data as Post;
yield sse({
// yielding the post id ensures the client can reconnect at any time and get the latest events this id
id: post.id,
data: post,
});
}
}),
});
ts
import EventEmitter, { on } from 'events';
import type { Post } from '@prisma/client';
import { sse } from '@trpc/server';
import { z } from 'zod';
import { publicProcedure, router } from '../trpc';
const ee = new EventEmitter();
export const subRouter = router({
onPostAdd: publicProcedure
.input(
z.object({
// lastEventId is the last event id that the client has received
// On the first call, it will be whatever was passed in the initial setup
// If the client reconnects, it will be the last event id that the client received
lastEventId: z.string().nullish(),
}),
)
.subscription(async function* (opts) {
if (opts.input.lastEventId) {
// [...] get the posts since the last event id and yield them
}
// listen for new events
for await (const [data] of on(ee, 'add')) {
const post = data as Post;
yield sse({
// yielding the post id ensures the client can reconnect at any time and get the latest events this id
id: post.id,
data: post,
});
}
}),
});
ts
type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes> = {
/**
* The URL to connect to (can be a function that returns a URL)
*/
url: string | (() => MaybePromise<string>);
/**
* EventSource options
*/
eventSourceOptions?: EventSourceInit;
/**
* Data transformer
* @link https://trpc.io/docs/v11/data-transformers
**/
transformer?: DataTransformerOptions;
};
ts
type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes> = {
/**
* The URL to connect to (can be a function that returns a URL)
*/
url: string | (() => MaybePromise<string>);
/**
* EventSource options
*/
eventSourceOptions?: EventSourceInit;
/**
* Data transformer
* @link https://trpc.io/docs/v11/data-transformers
**/
transformer?: DataTransformerOptions;
};