HTTP Subscription Link
httpSubscriptionLink
is a terminating link that's uses Server-sent Events (SSE) for subscriptions.
SSE is a good option for real-time as it's a bit easier than setting up a WebSockets-server.
We have prefixed this as unstable_
as it's a new API, but you're safe to use it! Read more.
Setup
If your client's environment doesn't support EventSource, you need an EventSource polyfill. For React Native specific instructions please defer to the compatibility section.
To use httpSubscriptionLink
, you need to use a splitLink to make it explicit that we want to use SSE for subscriptions.
client/index.tsts
import type { TRPCLink } from '@trpc/client';import {httpBatchLink,loggerLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';const trpcClient = createTRPCClient<AppRouter>({/*** @see https://trpc.io/docs/v11/client/links*/links: [// adds pretty logs to your console in development and logs errors in productionloggerLink(),splitLink({// uses the httpSubscriptionLink for subscriptionscondition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: `/api/trpc`,}),false: httpBatchLink({url: `/api/trpc`,}),}),],});
client/index.tsts
import type { TRPCLink } from '@trpc/client';import {httpBatchLink,loggerLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';const trpcClient = createTRPCClient<AppRouter>({/*** @see https://trpc.io/docs/v11/client/links*/links: [// adds pretty logs to your console in development and logs errors in productionloggerLink(),splitLink({// uses the httpSubscriptionLink for subscriptionscondition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: `/api/trpc`,}),false: httpBatchLink({url: `/api/trpc`,}),}),],});
The document here outlines the specific details of using httpSubscriptionLink
. For general usage of subscriptions, see our subscriptions guide.
Headers and authorization / authentication
Web apps
Same domain
If you're doing a web application, cookies are sent as part of the request as long as your client is on the same domain as the server.
Cross-domain
If the client and server are not on the same domain, you can use withCredentials: true
(read more on MDN here).
Example:
tsx
// [...]unstable_httpSubscriptionLink({url: 'https://example.com/api/trpc',eventSourceOptions() {return {withCredentials: true, // <---};},});
tsx
// [...]unstable_httpSubscriptionLink({url: 'https://example.com/api/trpc',eventSourceOptions() {return {withCredentials: true, // <---};},});
Custom headers through ponyfill
Recommended for non-web environments
You can ponyfill EventSource
and use the eventSourceOptions
-callback to populate headers.
tsx
import {createTRPCClient,httpBatchLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import { EventSourcePolyfill } from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: 'http://localhost:3000',// ponyfill EventSourceEventSource: EventSourcePolyfill,// options to pass to the EventSourcePolyfill constructoreventSourceOptions: async ({ op }) => {// ^ Includes the operation that's being executed// you can use this to generate a signature for the operationconst signature = await getSignature(op);return {headers: {authorization: 'Bearer supersecret','x-signature': signature,},};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
tsx
import {createTRPCClient,httpBatchLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import { EventSourcePolyfill } from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: 'http://localhost:3000',// ponyfill EventSourceEventSource: EventSourcePolyfill,// options to pass to the EventSourcePolyfill constructoreventSourceOptions: async ({ op }) => {// ^ Includes the operation that's being executed// you can use this to generate a signature for the operationconst signature = await getSignature(op);return {headers: {authorization: 'Bearer supersecret','x-signature': signature,},};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
Updating configuration on an active connection
httpSubscriptionLink
leverages SSE through EventSource
, ensuring that connections encountering errors like network failures or bad response codes are automatically retried. However, EventSource
does not allow re-execution of the eventSourceOptions()
or url()
options to update its configuration, which is particularly important in scenarios where authentication has expired since the last connection.
To address this limitation, you can use a retryLink
in conjunction with httpSubscriptionLink
. This approach ensures that the connection is re-established with the latest configuration, including any updated authentication details.
Please note that restarting the connection will result in the EventSource
being recreated from scratch, which means any previously tracked events will be lost.
tsx
import {createTRPCClient,httpBatchLink,retryLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import {EventSourcePolyfill,EventSourcePolyfillInit,} from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',false: httpBatchLink({url: 'http://localhost:3000',}),true: [retryLink({retry: (opts) => {opts.op.type;// ^? will always be 'subscription' since we're in a splitLinkconst code = opts.error.data?.code;if (!code) {// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable responseconsole.error('No error code found, retrying', opts);return true;}if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {console.log('Retrying due to 401/403 error');return true;}return false;},}),unstable_httpSubscriptionLink({url: async () => {// calculate the latest URL if needed...return getAuthenticatedUri();},// ponyfill EventSourceEventSource: EventSourcePolyfill,eventSourceOptions: async () => {// ...or maybe renew an access tokenconst token = await auth.getOrRenewToken();return {headers: {authorization: `Bearer ${token}`,},};},}),],}),],});
tsx
import {createTRPCClient,httpBatchLink,retryLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import {EventSourcePolyfill,EventSourcePolyfillInit,} from 'event-source-polyfill';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',false: httpBatchLink({url: 'http://localhost:3000',}),true: [retryLink({retry: (opts) => {opts.op.type;// ^? will always be 'subscription' since we're in a splitLinkconst code = opts.error.data?.code;if (!code) {// This shouldn't happen as our httpSubscriptionLink will automatically retry within when there's a non-parsable responseconsole.error('No error code found, retrying', opts);return true;}if (code === 'UNAUTHORIZED' || code === 'FORBIDDEN') {console.log('Retrying due to 401/403 error');return true;}return false;},}),unstable_httpSubscriptionLink({url: async () => {// calculate the latest URL if needed...return getAuthenticatedUri();},// ponyfill EventSourceEventSource: EventSourcePolyfill,eventSourceOptions: async () => {// ...or maybe renew an access tokenconst token = await auth.getOrRenewToken();return {headers: {authorization: `Bearer ${token}`,},};},}),],}),],});
Connection params
In order to authenticate with EventSource
, you can define connectionParams
in httpSubscriptionLink
. This will be sent as part of the URL, which is why other methods are preferred).
server/context.tsts
import type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
server/context.tsts
import type {CreateHTTPContextOptions } from '@trpc/server/adapters/standalone';export constcreateContext = async (opts :CreateHTTPContextOptions ) => {consttoken =opts .info .connectionParams ?.token ;// [... authenticate]return {};};export typeContext =Awaited <ReturnType <typeofcreateContext >>;
client/trpc.tsts
import {createTRPCClient,httpBatchLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: 'http://localhost:3000',connectionParams: async () => {// Will be serialized as part of the URLreturn {token: 'supersecret',};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
client/trpc.tsts
import {createTRPCClient,httpBatchLink,splitLink,unstable_httpSubscriptionLink,} from '@trpc/client';import type { AppRouter } from '../server/index.js';// Initialize the tRPC clientconst trpc = createTRPCClient<AppRouter>({links: [splitLink({condition: (op) => op.type === 'subscription',true: unstable_httpSubscriptionLink({url: 'http://localhost:3000',connectionParams: async () => {// Will be serialized as part of the URLreturn {token: 'supersecret',};},}),false: httpBatchLink({url: 'http://localhost:3000',}),}),],});
Timeout Configuration
The httpSubscriptionLink
supports configuring a timeout for inactivity through the reconnectAfterInactivityMs
option. If no messages (including ping messages) are received within the specified timeout period, the connection will be marked as "connecting" and automatically attempt to reconnect.
The timeout configuration is set on the server side when initializing tRPC:
server/trpc.tsts
import { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {client: {reconnectAfterInactivityMs: 3_000,},},});
server/trpc.tsts
import { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {client: {reconnectAfterInactivityMs: 3_000,},},});
Server Ping Configuration
The server can be configured to send periodic ping messages to keep the connection alive and prevent timeout disconnections. This is particularly useful when combined with the reconnectAfterInactivityMs
-option.
server/trpc.tsts
import { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {// Maximum duration of a single SSE connection in milliseconds// maxDurationMs: 60_00,ping: {// Enable periodic ping messages to keep connection aliveenabled: true,// Send ping message every 2sintervalMs: 2_000,},// client: {// reconnectAfterInactivityMs: 3_000// }},});
server/trpc.tsts
import { initTRPC } from '@trpc/server';export const t = initTRPC.create({sse: {// Maximum duration of a single SSE connection in milliseconds// maxDurationMs: 60_00,ping: {// Enable periodic ping messages to keep connection aliveenabled: true,// Send ping message every 2sintervalMs: 2_000,},// client: {// reconnectAfterInactivityMs: 3_000// }},});
Compatibility (React Native)
The httpSubscriptionLink
makes use of the EventSource
API, Streams API, and AsyncIterator
s, these are not natively supported by React Native and will have to be ponyfilled.
To ponyfill EventSource
we recommend to use a polyfill that utilizes the networking library exposed by React Native, over using a polyfill that using the XMLHttpRequest
API. Libraries that polyfill EventSource
using XMLHttpRequest
fail to reconnect after the app has been in the background. Consider using the rn-eventsource-reborn package.
The Streams API can be ponyfilled using the web-streams-polyfill package.
AsyncIterator
s can be polyfilled using the @azure/core-asynciterator-polyfill package.
Installation
Install the required polyfills:
- npm
- yarn
- pnpm
- bun
- deno
npm install rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
yarn add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
pnpm add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
bun add rn-eventsource-reborn web-streams-polyfill @azure/core-asynciterator-polyfill
deno add npm:rn-eventsource-reborn npm:web-streams-polyfill npm:@azure/core-asynciterator-polyfill
Add the polyfills to your project before the link is used (e.g. where you add your TRPCReact.Provider):
utils/api.tsxts
import '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
utils/api.tsxts
import '@azure/core-asynciterator-polyfill';import { RNEventSource } from 'rn-eventsource-reborn';import { ReadableStream, TransformStream } from 'web-streams-polyfill';globalThis.ReadableStream = globalThis.ReadableStream || ReadableStream;globalThis.TransformStream = globalThis.TransformStream || TransformStream;
Once the ponyfills are added, you can continue setting up the httpSubscriptionLink
as described in the setup section.
httpSubscriptionLink
Options
ts
type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes,TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,> = {/*** EventSource ponyfill*/EventSource?: TEventSource;/*** EventSource options or a callback that returns them*/eventSourceOptions?:| EventSourceLike.InitDictOf<TEventSource>| ((opts: {op: Operation;}) =>| EventSourceLike.InitDictOf<TEventSource>| Promise<EventSourceLike.InitDictOf<TEventSource>>);};
ts
type HTTPSubscriptionLinkOptions<TRoot extends AnyClientTypes,TEventSource extends EventSourceLike.AnyConstructor = typeof EventSource,> = {/*** EventSource ponyfill*/EventSource?: TEventSource;/*** EventSource options or a callback that returns them*/eventSourceOptions?:| EventSourceLike.InitDictOf<TEventSource>| ((opts: {op: Operation;}) =>| EventSourceLike.InitDictOf<TEventSource>| Promise<EventSourceLike.InitDictOf<TEventSource>>);};
SSE Options on the server
ts
export interface SSEStreamProducerOptions<TValue = unknown> {ping?: {/*** Enable ping comments sent from the server* @default false*/enabled: boolean;/*** Interval in milliseconds* @default 1000*/intervalMs?: number;};/*** Maximum duration in milliseconds for the request before ending the stream* @default undefined*/maxDurationMs?: number;/*** End the request immediately after data is sent* Only useful for serverless runtimes that do not support streaming responses* @default false*/emitAndEndImmediately?: boolean;/*** Client-specific options - these will be sent to the client as part of the first message* @default {}*/client?: {/*** Timeout and reconnect after inactivity in milliseconds* @default undefined*/reconnectAfterInactivityMs?: number;};}
ts
export interface SSEStreamProducerOptions<TValue = unknown> {ping?: {/*** Enable ping comments sent from the server* @default false*/enabled: boolean;/*** Interval in milliseconds* @default 1000*/intervalMs?: number;};/*** Maximum duration in milliseconds for the request before ending the stream* @default undefined*/maxDurationMs?: number;/*** End the request immediately after data is sent* Only useful for serverless runtimes that do not support streaming responses* @default false*/emitAndEndImmediately?: boolean;/*** Client-specific options - these will be sent to the client as part of the first message* @default {}*/client?: {/*** Timeout and reconnect after inactivity in milliseconds* @default undefined*/reconnectAfterInactivityMs?: number;};}