Core Concepts

Stream types

Datastream uses four stream types. Each package export follows a naming convention that tells you the type:

TypeNaming patternPurpose
Readable*ReadableStream, *ReadStreamInjects data into a pipeline — files, databases, APIs
PassThrough*CountStream, *LengthStream, *DetectStreamObserves data without modifying it, collects metrics via .result()
Transform*ParseStream, *FormatStream, *CompressStreamModifies data as it passes through
Writable*WriteStream, *PutItemStreamConsumes data at the end — files, databases, APIs

pipeline() vs pipejoin()

pipeline(streams[], streamOptions)

Connects all streams, waits for completion, and returns combined results from all PassThrough streams:

import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'
import { stringLengthStream } from '@datastream/string'

const count = objectCountStream()
const length = stringLengthStream()

const result = await pipeline([
  createReadableStream(['hello', 'world']),
  count,
  length,
])

console.log(result)
// { count: 2, length: 10 }

If the last stream is Readable or Transform (no Writable at the end), pipeline automatically appends a no-op Writable so the pipeline completes.

pipejoin(streams[])

Connects streams and returns the resulting stream — use this when you want to consume the output manually:

import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core'

const streams = [
  createReadableStream([1, 2, 3]),
  createTransformStream((n, enqueue) => enqueue(n * 2)),
]

const river = pipejoin(streams)
const output = await streamToArray(river)
// [2, 4, 6]

The .result() pattern

PassThrough streams collect metrics without modifying data. After the pipeline completes, retrieve results:

import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
import { objectCountStream } from '@datastream/object'
import { digestStream } from '@datastream/digest'

const count = objectCountStream()
const digest = await digestStream({ algorithm: 'SHA2-256' })

const result = await pipeline([
  createReadableStream(data),
  digest,
  csvParseStream(),
  count,
])

console.log(result)
// { digest: 'SHA2-256:abc123...', count: 1000 }

Each PassThrough stream returns { key, value } from its .result() method. pipeline() combines them into a single object. You can customize the key with the resultKey option:

const count = objectCountStream({ resultKey: 'rowCount' })
// result: { rowCount: 1000 }

Stream options

All stream factory functions accept a streamOptions parameter:

OptionTypeDefaultDescription
highWaterMarknumberBackpressure threshold — how many chunks to buffer before pausing
chunkSizenumber16384Size hint for chunking strategies
signalAbortSignalSignal to abort the pipeline
const controller = new AbortController()

await pipeline([
  createReadableStream(data),
  csvParseStream(),
], { signal: controller.signal })

// Abort from elsewhere:
controller.abort()

Error handling

AbortSignal

Use AbortController to cancel pipelines:

const controller = new AbortController()
setTimeout(() => controller.abort(), 5000) // 5s timeout

try {
  await pipeline(streams, { signal: controller.signal })
} catch (e) {
  if (e.name === 'AbortError') {
    console.log('Pipeline was cancelled')
  }
}

Validation errors

Streams like validateStream and CSV cleaning streams collect errors in .result() rather than throwing, so the pipeline continues processing:

const result = await pipeline([
  createReadableStream(data),
  csvParseStream(),
  validateStream({ schema }),
])

console.log(result.validate)
// { '#/required/name': { id: '#/required/name', keys: ['name'], message: '...', idx: [3, 7] } }

Invalid rows are dropped by default. Set onErrorEnqueue: true to keep them in the stream.

Lazy options

Many CSV streams accept functions instead of values for options. This lets you wire up detection results that aren’t available until runtime:

const detect = csvDetectDelimitersStream()

csvParseStream({
  delimiterChar: () => detect.result().value.delimiterChar,
})

The function is called when the stream first needs the value, not at creation time.