core

Foundation package providing pipeline orchestration, stream factories, and utility functions.

Install

npm install @datastream/core

Pipeline

pipeline(streams, streamOptions) async

Connects all streams, waits for completion, and returns combined .result() values from all PassThrough streams. Automatically appends a no-op Writable if the last stream is Readable.

Options

OptionTypeDefaultDescription
highWaterMarknumberBackpressure threshold
chunkSizenumberSize hint for chunking
signalAbortSignalAbort the pipeline

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'

const count = objectCountStream()

const result = await pipeline([
  createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]),
  count,
])

console.log(result)
// { count: 3 }

pipejoin(streams) returns stream

Connects streams and returns the resulting stream. Use this when you want to consume output manually with streamToArray, streamToString, or for await.

Example

import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core'

const river = pipejoin([
  createReadableStream([1, 2, 3]),
  createTransformStream((n, enqueue) => enqueue(n * 2)),
])

const output = await streamToArray(river)
// [2, 4, 6]

result(streams) async

Iterates over streams and combines all .result() return values into a single object. Called automatically by pipeline().

Consumers

streamToArray(stream) async

Collects all chunks from a stream into an array.

import { pipejoin, streamToArray, createReadableStream } from '@datastream/core'

const river = pipejoin([createReadableStream(['a', 'b', 'c'])])
const output = await streamToArray(river)
// ['a', 'b', 'c']

streamToString(stream) async

Concatenates all chunks into a single string.

const output = await streamToString(river)
// 'abc'

streamToObject(stream) async

Merges all chunks into a single object using Object.assign.

const river = pipejoin([createReadableStream([{ a: 1 }, { b: 2 }])])
const output = await streamToObject(river)
// { a: 1, b: 2 }

streamToBuffer(stream) async, Node.js only

Collects all chunks into a Buffer.

Stream Factories

createReadableStream(input, streamOptions) Readable

Creates a Readable stream from various input types.

Input types

TypeBehavior
stringChunked at chunkSize (default 16KB)
ArrayEach element emitted as a chunk
AsyncIterable / IterableEach yielded value emitted as a chunk
ArrayBufferChunked at chunkSize (Node.js only)

Example

import { createReadableStream } from '@datastream/core'

// From string — auto-chunked
const stream = createReadableStream('hello world')

// From array — one chunk per element
const stream = createReadableStream([{ a: 1 }, { a: 2 }])

// From async generator
async function* generate() {
  yield 'chunk1'
  yield 'chunk2'
}
const stream = createReadableStream(generate())

createPassThroughStream(fn, flush?, streamOptions) Transform (PassThrough)

Creates a stream that observes each chunk without modifying it. The chunk is automatically passed through.

Parameters

ParameterTypeDescription
fn(chunk) => voidCalled for each chunk, return value ignored
flush() => voidOptional, called when stream ends
streamOptionsobjectStream configuration

Example

import { createPassThroughStream } from '@datastream/core'

let total = 0
const counter = createPassThroughStream((chunk) => {
  total += chunk.length
})
counter.result = () => ({ key: 'total', value: total })

createTransformStream(fn, flush?, streamOptions) Transform

Creates a stream that modifies chunks. Use enqueue to emit output — you can emit zero, one, or many chunks per input.

Parameters

ParameterTypeDescription
fn(chunk, enqueue) => voidTransform each chunk, call enqueue(output) to emit
flush(enqueue) => voidOptional, emit final chunks when stream ends
streamOptionsobjectStream configuration

Example

import { createTransformStream } from '@datastream/core'

// Filter: emit only matching chunks
const filter = createTransformStream((chunk, enqueue) => {
  if (chunk.age > 18) enqueue(chunk)
})

// Expand: emit multiple chunks per input
const expand = createTransformStream((chunk, enqueue) => {
  for (const item of chunk.items) {
    enqueue(item)
  }
})

createWritableStream(fn, close?, streamOptions) Writable

Creates a stream that consumes chunks at the end of a pipeline.

Parameters

ParameterTypeDescription
fn(chunk) => voidCalled for each chunk
close() => voidOptional, called when stream ends
streamOptionsobjectStream configuration

Example

import { createWritableStream } from '@datastream/core'

const rows = []
const collector = createWritableStream((chunk) => {
  rows.push(chunk)
})

Utilities

isReadable(stream)

Returns true if the stream is Readable.

isWritable(stream)

Returns true if the stream is Writable.

makeOptions(options)

Normalizes stream options for interoperability between Readable, Transform, and Writable streams.

OptionTypeDescription
highWaterMarknumberBackpressure threshold
chunkSizenumberChunking size hint
signalAbortSignalAbort signal

timeout(ms, options) async

Returns a promise that resolves after ms milliseconds. Supports AbortSignal cancellation.

import { timeout } from '@datastream/core'

await timeout(1000) // wait 1 second

const controller = new AbortController()
await timeout(5000, { signal: controller.signal }) // cancellable

backpressureGuage(streams) Node.js only

Measures pause/resume timing across streams. Useful for identifying bottlenecks.

import { backpressureGuage } from '@datastream/core'

const metrics = backpressureGuage({ parse: parseStream, validate: validateStream })
// After pipeline completes:
// metrics.parse.total = { timestamp, duration }
// metrics.parse.timeline = [{ timestamp, duration }, ...]