OpenAPI

Server-Sent Events

Type-safe SSE with @EventStream(), discriminated event unions, and OpenAPI schemas.

tsgonest provides @EventStream() — a type-safe, iterator-based alternative to NestJS's @Sse() decorator. It gives you typed event payloads, discriminated unions, compile-time validation and serialization, automatic OpenAPI schemas, and proper resource cleanup.

Quick start

src/notifications/notification.controller.ts
import { Controller } from '@nestjs/common';
import { EventStream, SseEvent } from '@tsgonest/runtime';

interface NotificationDto {
  id: string;
  /** @format date-time */
  timestamp: string;
  message: string;
}

@Controller('notifications')
export class NotificationController {
  @EventStream('stream')
  async *stream(): AsyncGenerator<SseEvent<'notification', NotificationDto>> {
    for await (const n of this.service.watch()) {
      yield { event: 'notification', data: n };
    }
  }
}

This produces a GET /notifications/stream SSE endpoint with:

  • Compile-time validation and fast serialization of NotificationDto
  • OpenAPI text/event-stream response with typed itemSchema
  • Proper generator cleanup on client disconnect

SseEvent and SseEvents types

SseEvent<E, T>

A typed SSE event with a generic event name E (string literal discriminant) and data payload T:

import { SseEvent } from '@tsgonest/runtime';

// Single event type
type PingEvent = SseEvent<'ping', { ts: number }>;
// → { event: 'ping', data: { ts: number }, id?: string, retry?: number }
FieldTypeDescription
eventEEvent name (maps to SSE event: field)
dataTTyped payload (validated + serialized by tsgonest)
idstring?SSE id: field (optional)
retrynumber?Client reconnection interval in ms (optional)

SseEvents<M>

Converts a record of event names → payload types into a discriminated union:

import { SseEvents } from '@tsgonest/runtime';

type UserEvents = SseEvents<{
  created: UserDto;
  updated: UserDto;
  deleted: { id: string };
}>;
// Equivalent to:
// | SseEvent<'created', UserDto>
// | SseEvent<'updated', UserDto>
// | SseEvent<'deleted', { id: string }>

TypeScript narrows data when you check event:

function handle(e: UserEvents) {
  if (e.event === 'deleted') {
    e.data.id; // ✅ narrowed to { id: string }
  }
}

@EventStream decorator

import { EventStream } from '@tsgonest/runtime';

@EventStream(path?, options?)
ParameterTypeDefaultDescription
pathstring?'/'Route sub-path
options.heartbeatnumber?0 (disabled)Keep-alive interval in ms

The decorator sets NestJS-compatible routing metadata (path, method=GET, __sse__) so NestJS's router handles SSE protocol natively (headers, SseStream, socket tuning).

Discriminated event unions

The most powerful pattern is a controller method that yields multiple event types:

src/users/user-event.controller.ts
import { Controller } from '@nestjs/common';
import { EventStream, SseEvents } from '@tsgonest/runtime';

type UserEvents = SseEvents<{
  created: UserDto;
  updated: UserDto;
  deleted: { id: string };
}>;

@Controller('users')
export class UserEventController {
  @EventStream('events', { heartbeat: 30_000 })
  async *streamEvents(): AsyncGenerator<UserEvents> {
    const cursor = await this.db.watchChanges();
    try {
      for await (const change of cursor) {
        yield { event: change.type, data: change.doc };
      }
    } finally {
      await cursor.close(); // runs on client disconnect
    }
  }
}

tsgonest statically analyzes the AsyncGenerator<UserEvents> return type and:

  1. Extracts each event variantcreated, updated, deleted
  2. Generates per-event validationassertUserDto for created/updated, inline validation for deleted
  3. Generates per-event serialization — fast stringifyUserDto for created/updated
  4. Produces a discriminated OpenAPI schemaoneOf with event as discriminator

OpenAPI output

Discriminated union (multiple event types)

For the UserEvents example above, tsgonest generates:

{
  "200": {
    "description": "Server-Sent Events stream",
    "content": {
      "text/event-stream": {
        "schema": {
          "type": "string",
          "format": "event-stream",
          "itemSchema": {
            "oneOf": [
              {
                "type": "object",
                "required": ["data", "event"],
                "properties": {
                  "event": { "type": "string", "const": "created" },
                  "data": {
                    "type": "string",
                    "contentMediaType": "application/json",
                    "contentSchema": { "$ref": "#/components/schemas/UserDto" }
                  },
                  "id": { "type": "string" },
                  "retry": { "type": "integer", "minimum": 0 }
                }
              },
              {
                "type": "object",
                "required": ["data", "event"],
                "properties": {
                  "event": { "type": "string", "const": "deleted" },
                  "data": {
                    "type": "string",
                    "contentMediaType": "application/json",
                    "contentSchema": {
                      "type": "object",
                      "properties": { "id": { "type": "string" } },
                      "required": ["id"]
                    }
                  },
                  "id": { "type": "string" },
                  "retry": { "type": "integer", "minimum": 0 }
                }
              },
              {
                "type": "object",
                "required": ["data", "event"],
                "properties": {
                  "event": { "type": "string", "const": "error" },
                  "data": { "type": "string" },
                  "id": { "type": "string" },
                  "retry": { "type": "integer", "minimum": 0 }
                }
              }
            ],
            "discriminator": { "propertyName": "event" }
          }
        }
      }
    }
  }
}

Key details:

  • Each variant is an object with event (const discriminant) and data (typed via contentSchema)
  • An error variant (event: "error", data: string) is always appended automatically
  • The discriminator on "event" enables efficient client-side dispatch

Single event type

@EventStream('stream')
async *stream(): AsyncGenerator<SseEvent<'notification', NotificationDto>> { ... }

Produces a oneOf with two entries: the notification variant and the error variant, still with a discriminator.

Non-discriminated (generic string)

@EventStream('stream')
async *stream(): AsyncGenerator<SseEvent<string, UserDto>> { ... }

When the event name is a generic string (not a literal), the schema uses oneOf without a discriminator. The event variant has type: "string" for the event property instead of a const.

Heartbeat keep-alive

Proxies and load balancers often close idle SSE connections. Use the heartbeat option to emit empty keep-alive frames:

@EventStream('events', { heartbeat: 30_000 }) // every 30 seconds
async *events(): AsyncGenerator<UserEvents> { ... }

The interceptor emits empty-data frames (\n) at the configured interval, which is enough to keep connections alive through most proxies without triggering client-side event handlers.

Resource cleanup

When a client disconnects, the TsgonestSseInterceptor calls iterator.return() on the async generator. This triggers finally blocks, allowing you to clean up resources:

@EventStream('events')
async *events(): AsyncGenerator<SseEvents<{ update: DataDto }>> {
  const subscription = this.eventBus.subscribe();
  try {
    for await (const event of subscription) {
      yield { event: 'update', data: event };
    }
  } finally {
    // ✅ Always runs — even on client disconnect
    subscription.unsubscribe();
  }
}

Error handling

Errors thrown inside the generator are caught by the interceptor and emitted as a typed error event:

event: error
data: Something went wrong

This matches the error variant in the OpenAPI schema. After emitting the error, the stream completes gracefully. Validation errors from assertTypeName() are also caught and emitted as error events.

Compile-time injection

tsgonest's controller rewriter handles @EventStream routes differently from regular routes:

  1. No return wrapping — generators yield values, so return statements are not wrapped with stringify()
  2. Per-event transform metadataReflect.defineMetadata is injected after the method's __decorate call with a map of event name → [assertFn, stringifyFn] pairs
  3. Interceptor injectionTsgonestSseInterceptor is added as a class-level decorator (via UseInterceptors)
dist/user-event.controller.js (compiled)
// tsgonest injects companion imports
import { assertUserDto, stringifyUserDto } from './user.dto.UserDto.tsgonest.js';

// Per-event transforms injected after __decorate
Reflect.defineMetadata("__tsgonest_sse_transforms__", {
  "created": [assertUserDto, stringifyUserDto],
  "updated": [assertUserDto, stringifyUserDto],
  "deleted": [null, null]
}, UserEventController.prototype, "streamEvents");

The interceptor reads this metadata at request time and applies the correct assert/stringify pair for each yielded event based on its event field.

Mixing @EventStream with regular routes

A single controller can have both @EventStream and regular routes:

@Controller('dashboard')
export class DashboardController {
  @Get('stats')
  getStats(): StatsDto { ... }

  @EventStream('live')
  async *live(): AsyncGenerator<SseEvent<'stats', StatsDto>> {
    // ...
  }
}

tsgonest injects both TsgonestSerializeInterceptor (for getStats) and TsgonestSseInterceptor (for live) as class-level decorators. Each interceptor only activates on its respective routes.

@EventStream vs @Sse

@EventStream (tsgonest)@Sse (NestJS)
Return typeAsyncGenerator<SseEvent<E, T>>Observable<MessageEvent>
Type safetyTyped data per event namedata: string | object
ValidationCompile-time per-eventNone
SerializationCompile-time fast stringifyJSON.stringify
OpenAPIDiscriminated oneOf + error variantPlain itemSchema
Resource cleanupfinally blocks via iterator.return()Manual Observable.finalize()
HeartbeatBuilt-in heartbeat optionManual implementation
Error handlingTyped error event, graceful completeUnhandled → connection drop

@Sse() continues to work as before. Use @EventStream() for new SSE endpoints where you want type safety, validation, and proper OpenAPI documentation.

On this page