Server-Sent Events
Type-safe SSE with @EventStream(), discriminated event unions, and OpenAPI schemas.
tsgonest provides @EventStream() — a type-safe, iterator-based alternative to NestJS's @Sse() decorator. It gives you typed event payloads, discriminated unions, compile-time validation and serialization, automatic OpenAPI schemas, and proper resource cleanup.
Quick start
import { Controller } from '@nestjs/common';
import { EventStream, SseEvent } from '@tsgonest/runtime';
interface NotificationDto {
id: string;
/** @format date-time */
timestamp: string;
message: string;
}
@Controller('notifications')
export class NotificationController {
@EventStream('stream')
async *stream(): AsyncGenerator<SseEvent<'notification', NotificationDto>> {
for await (const n of this.service.watch()) {
yield { event: 'notification', data: n };
}
}
}This produces a GET /notifications/stream SSE endpoint with:
- Compile-time validation and fast serialization of
NotificationDto - OpenAPI
text/event-streamresponse with typeditemSchema - Proper generator cleanup on client disconnect
SseEvent and SseEvents types
SseEvent<E, T>
A typed SSE event with a generic event name E (string literal discriminant) and data payload T:
import { SseEvent } from '@tsgonest/runtime';
// Single event type
type PingEvent = SseEvent<'ping', { ts: number }>;
// → { event: 'ping', data: { ts: number }, id?: string, retry?: number }| Field | Type | Description |
|---|---|---|
event | E | Event name (maps to SSE event: field) |
data | T | Typed payload (validated + serialized by tsgonest) |
id | string? | SSE id: field (optional) |
retry | number? | Client reconnection interval in ms (optional) |
SseEvents<M>
Converts a record of event names → payload types into a discriminated union:
import { SseEvents } from '@tsgonest/runtime';
type UserEvents = SseEvents<{
created: UserDto;
updated: UserDto;
deleted: { id: string };
}>;
// Equivalent to:
// | SseEvent<'created', UserDto>
// | SseEvent<'updated', UserDto>
// | SseEvent<'deleted', { id: string }>TypeScript narrows data when you check event:
function handle(e: UserEvents) {
if (e.event === 'deleted') {
e.data.id; // ✅ narrowed to { id: string }
}
}@EventStream decorator
import { EventStream } from '@tsgonest/runtime';
@EventStream(path?, options?)| Parameter | Type | Default | Description |
|---|---|---|---|
path | string? | '/' | Route sub-path |
options.heartbeat | number? | 0 (disabled) | Keep-alive interval in ms |
The decorator sets NestJS-compatible routing metadata (path, method=GET, __sse__) so NestJS's router handles SSE protocol natively (headers, SseStream, socket tuning).
Discriminated event unions
The most powerful pattern is a controller method that yields multiple event types:
import { Controller } from '@nestjs/common';
import { EventStream, SseEvents } from '@tsgonest/runtime';
type UserEvents = SseEvents<{
created: UserDto;
updated: UserDto;
deleted: { id: string };
}>;
@Controller('users')
export class UserEventController {
@EventStream('events', { heartbeat: 30_000 })
async *streamEvents(): AsyncGenerator<UserEvents> {
const cursor = await this.db.watchChanges();
try {
for await (const change of cursor) {
yield { event: change.type, data: change.doc };
}
} finally {
await cursor.close(); // runs on client disconnect
}
}
}tsgonest statically analyzes the AsyncGenerator<UserEvents> return type and:
- Extracts each event variant —
created,updated,deleted - Generates per-event validation —
assertUserDtoforcreated/updated, inline validation fordeleted - Generates per-event serialization — fast
stringifyUserDtoforcreated/updated - Produces a discriminated OpenAPI schema —
oneOfwitheventas discriminator
OpenAPI output
Discriminated union (multiple event types)
For the UserEvents example above, tsgonest generates:
{
"200": {
"description": "Server-Sent Events stream",
"content": {
"text/event-stream": {
"schema": {
"type": "string",
"format": "event-stream",
"itemSchema": {
"oneOf": [
{
"type": "object",
"required": ["data", "event"],
"properties": {
"event": { "type": "string", "const": "created" },
"data": {
"type": "string",
"contentMediaType": "application/json",
"contentSchema": { "$ref": "#/components/schemas/UserDto" }
},
"id": { "type": "string" },
"retry": { "type": "integer", "minimum": 0 }
}
},
{
"type": "object",
"required": ["data", "event"],
"properties": {
"event": { "type": "string", "const": "deleted" },
"data": {
"type": "string",
"contentMediaType": "application/json",
"contentSchema": {
"type": "object",
"properties": { "id": { "type": "string" } },
"required": ["id"]
}
},
"id": { "type": "string" },
"retry": { "type": "integer", "minimum": 0 }
}
},
{
"type": "object",
"required": ["data", "event"],
"properties": {
"event": { "type": "string", "const": "error" },
"data": { "type": "string" },
"id": { "type": "string" },
"retry": { "type": "integer", "minimum": 0 }
}
}
],
"discriminator": { "propertyName": "event" }
}
}
}
}
}
}Key details:
- Each variant is an object with
event(const discriminant) anddata(typed viacontentSchema) - An error variant (
event: "error",data: string) is always appended automatically - The
discriminatoron"event"enables efficient client-side dispatch
Single event type
@EventStream('stream')
async *stream(): AsyncGenerator<SseEvent<'notification', NotificationDto>> { ... }Produces a oneOf with two entries: the notification variant and the error variant, still with a discriminator.
Non-discriminated (generic string)
@EventStream('stream')
async *stream(): AsyncGenerator<SseEvent<string, UserDto>> { ... }When the event name is a generic string (not a literal), the schema uses oneOf without a discriminator. The event variant has type: "string" for the event property instead of a const.
Heartbeat keep-alive
Proxies and load balancers often close idle SSE connections. Use the heartbeat option to emit empty keep-alive frames:
@EventStream('events', { heartbeat: 30_000 }) // every 30 seconds
async *events(): AsyncGenerator<UserEvents> { ... }The interceptor emits empty-data frames (\n) at the configured interval, which is enough to keep connections alive through most proxies without triggering client-side event handlers.
Resource cleanup
When a client disconnects, the TsgonestSseInterceptor calls iterator.return() on the async generator. This triggers finally blocks, allowing you to clean up resources:
@EventStream('events')
async *events(): AsyncGenerator<SseEvents<{ update: DataDto }>> {
const subscription = this.eventBus.subscribe();
try {
for await (const event of subscription) {
yield { event: 'update', data: event };
}
} finally {
// ✅ Always runs — even on client disconnect
subscription.unsubscribe();
}
}Error handling
Errors thrown inside the generator are caught by the interceptor and emitted as a typed error event:
event: error
data: Something went wrongThis matches the error variant in the OpenAPI schema. After emitting the error, the stream completes gracefully. Validation errors from assertTypeName() are also caught and emitted as error events.
Compile-time injection
tsgonest's controller rewriter handles @EventStream routes differently from regular routes:
- No return wrapping — generators yield values, so return statements are not wrapped with
stringify() - Per-event transform metadata —
Reflect.defineMetadatais injected after the method's__decoratecall with a map of event name →[assertFn, stringifyFn]pairs - Interceptor injection —
TsgonestSseInterceptoris added as a class-level decorator (viaUseInterceptors)
// tsgonest injects companion imports
import { assertUserDto, stringifyUserDto } from './user.dto.UserDto.tsgonest.js';
// Per-event transforms injected after __decorate
Reflect.defineMetadata("__tsgonest_sse_transforms__", {
"created": [assertUserDto, stringifyUserDto],
"updated": [assertUserDto, stringifyUserDto],
"deleted": [null, null]
}, UserEventController.prototype, "streamEvents");The interceptor reads this metadata at request time and applies the correct assert/stringify pair for each yielded event based on its event field.
Mixing @EventStream with regular routes
A single controller can have both @EventStream and regular routes:
@Controller('dashboard')
export class DashboardController {
@Get('stats')
getStats(): StatsDto { ... }
@EventStream('live')
async *live(): AsyncGenerator<SseEvent<'stats', StatsDto>> {
// ...
}
}tsgonest injects both TsgonestSerializeInterceptor (for getStats) and TsgonestSseInterceptor (for live) as class-level decorators. Each interceptor only activates on its respective routes.
@EventStream vs @Sse
@EventStream (tsgonest) | @Sse (NestJS) | |
|---|---|---|
| Return type | AsyncGenerator<SseEvent<E, T>> | Observable<MessageEvent> |
| Type safety | Typed data per event name | data: string | object |
| Validation | Compile-time per-event | None |
| Serialization | Compile-time fast stringify | JSON.stringify |
| OpenAPI | Discriminated oneOf + error variant | Plain itemSchema |
| Resource cleanup | finally blocks via iterator.return() | Manual Observable.finalize() |
| Heartbeat | Built-in heartbeat option | Manual implementation |
| Error handling | Typed error event, graceful complete | Unhandled → connection drop |
@Sse() continues to work as before. Use @EventStream() for new SSE endpoints where you want type safety, validation, and proper OpenAPI documentation.