WorkflowStream
Defined in: core/src/core/stream.ts:106
A reactive stream for processing workflow events.
WorkflowStream extends the standard ReadableStream to provide specialized methods for filtering, transforming, and consuming workflow events. It supports reactive patterns and can be used to build complex event processing pipelines.
Example
Section titled “Example”// Get stream from workflow contextconst stream = context.stream;
// Filter for specific eventsconst userEvents = stream.filter(UserEvent);
// Transform eventsconst processed = stream.map(event => (\{ type: event.constructor.name, timestamp: Date.now(), data: event.data\}));
// Consume eventsfor await (const event of stream.take(10)) \{ console.log('Received:', event);\}
Extends
Section titled “Extends”ReadableStream
<R
>
Type Parameters
Section titled “Type Parameters”R
= any
The type of data flowing through the stream
Implements
Section titled “Implements”AsyncIterable
<R
>
Constructors
Section titled “Constructors”Constructor
Section titled “Constructor”new WorkflowStream<
R
>(subscribable
,rootStream
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:141
Parameters
Section titled “Parameters”subscribable
Section titled “subscribable”Subscribable
<[R
], void
>
rootStream
Section titled “rootStream”ReadableStream
<R
>
Returns
Section titled “Returns”WorkflowStream
<R
>
Overrides
Section titled “Overrides”ReadableStream<R>.constructor
Constructor
Section titled “Constructor”new WorkflowStream<
R
>(subscribable
,rootStream
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:145
Parameters
Section titled “Parameters”subscribable
Section titled “subscribable”Subscribable
<[R
], void
>
rootStream
Section titled “rootStream”null
Returns
Section titled “Returns”WorkflowStream
<R
>
Overrides
Section titled “Overrides”ReadableStream<R>.constructor
Constructor
Section titled “Constructor”new WorkflowStream<
R
>(subscribable
,rootStream
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:146
Parameters
Section titled “Parameters”subscribable
Section titled “subscribable”null
rootStream
Section titled “rootStream”null
| ReadableStream
<R
>
Returns
Section titled “Returns”WorkflowStream
<R
>
Overrides
Section titled “Overrides”ReadableStream<R>.constructor
Methods
Section titled “Methods”on<
T
>(event
,handler
): () =>void
Defined in: core/src/core/stream.ts:130
Subscribe to specific workflow events.
Type Parameters
Section titled “Type Parameters”T
Parameters
Section titled “Parameters”The event type to listen for
handler
Section titled “handler”(event
) => void
Function to handle the event
Returns
Section titled “Returns”Unsubscribe function
():
void
Returns
Section titled “Returns”void
Example
Section titled “Example”const unsubscribe = stream.on(UserEvent, (event) => \{ console.log('User event:', event.data);\});
// Later...unsubscribe();
fromReadableStream()
Section titled “fromReadableStream()”
static
fromReadableStream<T
>(stream
):WorkflowStream
<T
>
Defined in: core/src/core/stream.ts:192
Create a WorkflowStream from a standard ReadableStream.
Type Parameters
Section titled “Type Parameters”T
= any
Parameters
Section titled “Parameters”stream
Section titled “stream”ReadableStream
<WorkflowEventData
<any
>>
The ReadableStream to wrap
Returns
Section titled “Returns”WorkflowStream
<T
>
A new WorkflowStream instance
fromResponse()
Section titled “fromResponse()”
static
fromResponse(response
,eventMap
):WorkflowStream
<WorkflowEventData
<any
>>
Defined in: core/src/core/stream.ts:214
Create a WorkflowStream from an HTTP Response.
Parameters
Section titled “Parameters”response
Section titled “response”Response
The HTTP Response containing workflow events
eventMap
Section titled “eventMap”Record
<string
, WorkflowEvent
<any
>>
Map of event unique IDs to event constructors
Returns
Section titled “Returns”WorkflowStream
<WorkflowEventData
<any
>>
A new WorkflowStream instance
toResponse()
Section titled “toResponse()”toResponse(
init?
,transformer?
):R
extendsWorkflowEventData
<any
> ?Response
:never
Defined in: core/src/core/stream.ts:237
Convert the stream to an HTTP Response.
Parameters
Section titled “Parameters”ResponseInit
Optional ResponseInit parameters
transformer?
Section titled “transformer?”JsonEncodeTransform
= ...
Optional custom transformer (defaults to JSON encoding)
Returns
Section titled “Returns”R
extends WorkflowEventData
<any
> ? Response
: never
HTTP Response containing the stream data
forEach()
Section titled “forEach()”forEach(
callback
):Promise
<void
>
Defined in: core/src/core/stream.ts:314
Process each item in the stream with a callback function.
Parameters
Section titled “Parameters”callback
Section titled “callback”(item
) => void
Function to call for each item
Returns
Section titled “Returns”Promise
<void
>
Promise that resolves when all items are processed
Example
Section titled “Example”await stream.forEach(event => \{ console.log('Processing:', event);\});
map<
T
>(callback
):WorkflowStream
<T
>
Defined in: core/src/core/stream.ts:338
Transform each item in the stream.
Type Parameters
Section titled “Type Parameters”T
Parameters
Section titled “Parameters”callback
Section titled “callback”(item
) => T
Function to transform each item
Returns
Section titled “Returns”WorkflowStream
<T
>
A new WorkflowStream with transformed items
Example
Section titled “Example”const timestamps = stream.map(event => (\{ ...event, timestamp: Date.now()\}));
take()
Section titled “take()”take(
limit
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:369
Take only the first N items from the stream.
Parameters
Section titled “Parameters”number
Maximum number of items to take
Returns
Section titled “Returns”WorkflowStream
<R
>
A new WorkflowStream limited to the specified number of items
Example
Section titled “Example”const firstTen = stream.take(10);for await (const event of firstTen) \{ console.log(event);\}
filter()
Section titled “filter()”Call Signature
Section titled “Call Signature”filter(
predicate
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:404
Filter the stream to include only items matching the predicate.
Parameters
Section titled “Parameters”predicate
Section titled “predicate”R
extends WorkflowEventData
<any
> ? WorkflowEvent
<InferWorkflowEventData
<R
<R
>>> : never
Event type, function, or value to filter by
Returns
Section titled “Returns”WorkflowStream
<R
>
A new WorkflowStream containing only matching items
Example
Section titled “Example”// Filter by event typeconst userEvents = stream.filter(UserEvent);
// Filter by functionconst importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific valueconst specificEvent = stream.filter(myEventInstance);
Call Signature
Section titled “Call Signature”filter(
predicate
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:409
Filter the stream to include only items matching the predicate.
Parameters
Section titled “Parameters”predicate
Section titled “predicate”R
Event type, function, or value to filter by
Returns
Section titled “Returns”WorkflowStream
<R
>
A new WorkflowStream containing only matching items
Example
Section titled “Example”// Filter by event typeconst userEvents = stream.filter(UserEvent);
// Filter by functionconst importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific valueconst specificEvent = stream.filter(myEventInstance);
Call Signature
Section titled “Call Signature”filter(
predicate
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:410
Filter the stream to include only items matching the predicate.
Parameters
Section titled “Parameters”predicate
Section titled “predicate”(event
) => boolean
Event type, function, or value to filter by
Returns
Section titled “Returns”WorkflowStream
<R
>
A new WorkflowStream containing only matching items
Example
Section titled “Example”// Filter by event typeconst userEvents = stream.filter(UserEvent);
// Filter by functionconst importantEvents = stream.filter(event => event.priority === 'high');
// Filter by specific valueconst specificEvent = stream.filter(myEventInstance);
until()
Section titled “until()”Call Signature
Section titled “Call Signature”until(
predicate
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:452
Continue the stream until the predicate is met, then terminate.
Parameters
Section titled “Parameters”predicate
Section titled “predicate”R
extends WorkflowEventData
<any
> ? WorkflowEvent
<InferWorkflowEventData
<R
<R
>>> : never
Event type, function, or value to stop at
Returns
Section titled “Returns”WorkflowStream
<R
>
A new WorkflowStream that terminates when the predicate is met
Example
Section titled “Example”// Stop at completion eventconst processingEvents = stream.until(CompletionEvent);
// Stop when condition is metconst beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instanceconst beforeSpecific = stream.until(myEventInstance);
Call Signature
Section titled “Call Signature”until(
predicate
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:457
Continue the stream until the predicate is met, then terminate.
Parameters
Section titled “Parameters”predicate
Section titled “predicate”(item
) => boolean
Event type, function, or value to stop at
Returns
Section titled “Returns”WorkflowStream
<R
>
A new WorkflowStream that terminates when the predicate is met
Example
Section titled “Example”// Stop at completion eventconst processingEvents = stream.until(CompletionEvent);
// Stop when condition is metconst beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instanceconst beforeSpecific = stream.until(myEventInstance);
Call Signature
Section titled “Call Signature”until(
item
):WorkflowStream
<R
>
Defined in: core/src/core/stream.ts:458
Continue the stream until the predicate is met, then terminate.
Parameters
Section titled “Parameters”R
Returns
Section titled “Returns”WorkflowStream
<R
>
A new WorkflowStream that terminates when the predicate is met
Example
Section titled “Example”// Stop at completion eventconst processingEvents = stream.until(CompletionEvent);
// Stop when condition is metconst beforeError = stream.until(event => event.type === 'error');
// Stop at specific event instanceconst beforeSpecific = stream.until(myEventInstance);
toArray()
Section titled “toArray()”toArray():
Promise
<R
[]>
Defined in: core/src/core/stream.ts:494
Collect all items from the stream into an array.
Returns
Section titled “Returns”Promise
<R
[]>
Promise resolving to an array of all stream items
Example
Section titled “Example”const events = await stream.take(5).toArray();console.log('Collected events:', events);