Skip to content

WorkflowStream

Defined in: core/src/core/stream.ts:106

A reactive stream for processing workflow events.

WorkflowStream extends the standard ReadableStream to provide specialized methods for filtering, transforming, and consuming workflow events. It supports reactive patterns and can be used to build complex event processing pipelines.

// Get stream from workflow context
const stream = context.stream;
// Filter for specific events
const userEvents = stream.filter(UserEvent);
// Transform events
const processed = stream.map(event => (\{
type: event.constructor.name,
timestamp: Date.now(),
data: event.data
\}));
// Consume events
for await (const event of stream.take(10)) \{
console.log('Received:', event);
\}
  • ReadableStream<R>

R = any

The type of data flowing through the stream

  • AsyncIterable<R>

new WorkflowStream<R>(subscribable, rootStream): WorkflowStream<R>

Defined in: core/src/core/stream.ts:141

Subscribable<[R], void>

ReadableStream<R>

WorkflowStream<R>

ReadableStream<R>.constructor

new WorkflowStream<R>(subscribable, rootStream): WorkflowStream<R>

Defined in: core/src/core/stream.ts:145

Subscribable<[R], void>

null

WorkflowStream<R>

ReadableStream<R>.constructor

new WorkflowStream<R>(subscribable, rootStream): WorkflowStream<R>

Defined in: core/src/core/stream.ts:146

null

null | ReadableStream<R>

WorkflowStream<R>

ReadableStream<R>.constructor

on<T>(event, handler): () => void

Defined in: core/src/core/stream.ts:130

Subscribe to specific workflow events.

T

WorkflowEvent<T>

The event type to listen for

(event) => void

Function to handle the event

Unsubscribe function

(): void

void

const unsubscribe = stream.on(UserEvent, (event) => \{
console.log('User event:', event.data);
\});
// Later...
unsubscribe();

static fromReadableStream<T>(stream): WorkflowStream<T>

Defined in: core/src/core/stream.ts:192

Create a WorkflowStream from a standard ReadableStream.

T = any

ReadableStream<WorkflowEventData<any>>

The ReadableStream to wrap

WorkflowStream<T>

A new WorkflowStream instance


static fromResponse(response, eventMap): WorkflowStream<WorkflowEventData<any>>

Defined in: core/src/core/stream.ts:214

Create a WorkflowStream from an HTTP Response.

Response

The HTTP Response containing workflow events

Record<string, WorkflowEvent<any>>

Map of event unique IDs to event constructors

WorkflowStream<WorkflowEventData<any>>

A new WorkflowStream instance


toResponse(init?, transformer?): R extends WorkflowEventData<any> ? Response : never

Defined in: core/src/core/stream.ts:237

Convert the stream to an HTTP Response.

ResponseInit

Optional ResponseInit parameters

JsonEncodeTransform = ...

Optional custom transformer (defaults to JSON encoding)

R extends WorkflowEventData<any> ? Response : never

HTTP Response containing the stream data


forEach(callback): Promise<void>

Defined in: core/src/core/stream.ts:314

Process each item in the stream with a callback function.

(item) => void

Function to call for each item

Promise<void>

Promise that resolves when all items are processed

await stream.forEach(event => \{
console.log('Processing:', event);
\});

map<T>(callback): WorkflowStream<T>

Defined in: core/src/core/stream.ts:338

Transform each item in the stream.

T

(item) => T

Function to transform each item

WorkflowStream<T>

A new WorkflowStream with transformed items

const timestamps = stream.map(event => (\{
...event,
timestamp: Date.now()
\}));

take(limit): WorkflowStream<R>

Defined in: core/src/core/stream.ts:369

Take only the first N items from the stream.

number

Maximum number of items to take

WorkflowStream<R>

A new WorkflowStream limited to the specified number of items

const firstTen = stream.take(10);
for await (const event of firstTen) \{
console.log(event);
\}

filter(predicate): WorkflowStream<R>

Defined in: core/src/core/stream.ts:404

Filter the stream to include only items matching the predicate.

R extends WorkflowEventData<any> ? WorkflowEvent<InferWorkflowEventData<R<R>>> : never

Event type, function, or value to filter by

WorkflowStream<R>

A new WorkflowStream containing only matching items

// Filter by event type
const userEvents = stream.filter(UserEvent);
// Filter by function
const importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific value
const specificEvent = stream.filter(myEventInstance);

filter(predicate): WorkflowStream<R>

Defined in: core/src/core/stream.ts:409

Filter the stream to include only items matching the predicate.

R

Event type, function, or value to filter by

WorkflowStream<R>

A new WorkflowStream containing only matching items

// Filter by event type
const userEvents = stream.filter(UserEvent);
// Filter by function
const importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific value
const specificEvent = stream.filter(myEventInstance);

filter(predicate): WorkflowStream<R>

Defined in: core/src/core/stream.ts:410

Filter the stream to include only items matching the predicate.

(event) => boolean

Event type, function, or value to filter by

WorkflowStream<R>

A new WorkflowStream containing only matching items

// Filter by event type
const userEvents = stream.filter(UserEvent);
// Filter by function
const importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific value
const specificEvent = stream.filter(myEventInstance);

until(predicate): WorkflowStream<R>

Defined in: core/src/core/stream.ts:452

Continue the stream until the predicate is met, then terminate.

R extends WorkflowEventData<any> ? WorkflowEvent<InferWorkflowEventData<R<R>>> : never

Event type, function, or value to stop at

WorkflowStream<R>

A new WorkflowStream that terminates when the predicate is met

// Stop at completion event
const processingEvents = stream.until(CompletionEvent);
// Stop when condition is met
const beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instance
const beforeSpecific = stream.until(myEventInstance);

until(predicate): WorkflowStream<R>

Defined in: core/src/core/stream.ts:457

Continue the stream until the predicate is met, then terminate.

(item) => boolean

Event type, function, or value to stop at

WorkflowStream<R>

A new WorkflowStream that terminates when the predicate is met

// Stop at completion event
const processingEvents = stream.until(CompletionEvent);
// Stop when condition is met
const beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instance
const beforeSpecific = stream.until(myEventInstance);

until(item): WorkflowStream<R>

Defined in: core/src/core/stream.ts:458

Continue the stream until the predicate is met, then terminate.

R

WorkflowStream<R>

A new WorkflowStream that terminates when the predicate is met

// Stop at completion event
const processingEvents = stream.until(CompletionEvent);
// Stop when condition is met
const beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instance
const beforeSpecific = stream.until(myEventInstance);

toArray(): Promise<R[]>

Defined in: core/src/core/stream.ts:494

Collect all items from the stream into an array.

Promise<R[]>

Promise resolving to an array of all stream items

const events = await stream.take(5).toArray();
console.log('Collected events:', events);