// File: +page.md
---
title: Introduction
slug: /
---
## What is datastream
Datastream is a collection of commonly used **stream patterns** for the **Web Streams API** and **Node.js Streams**.
If you're iterating over an array more than once, it's time to use streams.
## A quick example
Code is better than 10,000 words, so let's jump into an example.
Let's assume you want to read a CSV file, validate the data, and compress it:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream } from '@datastream/compress'
const streams = [
createReadableStream(csvData),
csvParseStream({ header: true }),
validateStream(schema),
gzipCompressStream()
]
await pipeline(streams)
```
## Stream types
- **Readable**: The start of a pipeline that injects data into a stream.
- **PassThrough**: Does not modify the data, but listens and prepares a result that can be retrieved.
- **Transform**: Modifies data as it passes through.
- **Writable**: The end of a pipeline that stores data from the stream.
## Setup
```bash
npm install @datastream/core @datastream/{module}
```
## Why streams?
Streams allow you to process data incrementally, without loading everything into memory at once. This is essential for:
- **Large files**: Process gigabytes of data with constant memory usage
- **Real-time data**: Handle data as it arrives, not after it's all collected
- **Composability**: Chain simple operations into complex data pipelines
- **Performance**: Start processing before all data is available
Datastream provides ready-made stream patterns so you can focus on your data flow instead of stream plumbing.
## Next steps
Ready to dive in? Head to the [Quick Start](/docs/quick-start) guide.
// File: core-concepts/+page.md
---
title: Core Concepts
---
## Stream types
Datastream uses four stream types. Each package export follows a naming convention that tells you the type:
| Type | Naming pattern | Purpose |
|------|---------------|---------|
| **Readable** | `*ReadableStream`, `*ReadStream` | Injects data into a pipeline — files, databases, APIs |
| **PassThrough** | `*CountStream`, `*LengthStream`, `*DetectStream` | Observes data without modifying it, collects metrics via `.result()` |
| **Transform** | `*ParseStream`, `*FormatStream`, `*CompressStream` | Modifies data as it passes through |
| **Writable** | `*WriteStream`, `*PutItemStream` | Consumes data at the end — files, databases, APIs |
## `pipeline()` vs `pipejoin()`
### `pipeline(streams[], streamOptions)`
Connects all streams, waits for completion, and returns combined results from all PassThrough streams:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'
import { stringLengthStream } from '@datastream/string'
const count = objectCountStream()
const length = stringLengthStream()
const result = await pipeline([
createReadableStream(['hello', 'world']),
count,
length,
])
console.log(result)
// { count: 2, length: 10 }
```
If the last stream is Readable or Transform (no Writable at the end), `pipeline` automatically appends a no-op Writable so the pipeline completes.
### `pipejoin(streams[])`
Connects streams and returns the resulting stream — use this when you want to consume the output manually:
```javascript
import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core'
const streams = [
createReadableStream([1, 2, 3]),
createTransformStream((n, enqueue) => enqueue(n * 2)),
]
const river = pipejoin(streams)
const output = await streamToArray(river)
// [2, 4, 6]
```
## The `.result()` pattern
PassThrough streams collect metrics without modifying data. After the pipeline completes, retrieve results:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
import { objectCountStream } from '@datastream/object'
import { digestStream } from '@datastream/digest'
const count = objectCountStream()
const digest = await digestStream({ algorithm: 'SHA2-256' })
const result = await pipeline([
createReadableStream(data),
digest,
csvParseStream(),
count,
])
console.log(result)
// { digest: 'SHA2-256:abc123...', count: 1000 }
```
Each PassThrough stream returns `{ key, value }` from its `.result()` method. `pipeline()` combines them into a single object. You can customize the key with the `resultKey` option:
```javascript
const count = objectCountStream({ resultKey: 'rowCount' })
// result: { rowCount: 1000 }
```
## Stream options
All stream factory functions accept a `streamOptions` parameter:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `highWaterMark` | `number` | — | Backpressure threshold — how many chunks to buffer before pausing |
| `chunkSize` | `number` | `16384` | Size hint for chunking strategies |
| `signal` | `AbortSignal` | — | Signal to abort the pipeline |
```javascript
const controller = new AbortController()
await pipeline([
createReadableStream(data),
csvParseStream(),
], { signal: controller.signal })
// Abort from elsewhere:
controller.abort()
```
## Error handling
### AbortSignal
Use `AbortController` to cancel pipelines:
```javascript
const controller = new AbortController()
setTimeout(() => controller.abort(), 5000) // 5s timeout
try {
await pipeline(streams, { signal: controller.signal })
} catch (e) {
if (e.name === 'AbortError') {
console.log('Pipeline was cancelled')
}
}
```
### Validation errors
Streams like `validateStream` and CSV cleaning streams collect errors in `.result()` rather than throwing, so the pipeline continues processing:
```javascript
const result = await pipeline([
createReadableStream(data),
csvParseStream(),
validateStream({ schema }),
])
console.log(result.validate)
// { '#/required/name': { id: '#/required/name', keys: ['name'], message: '...', idx: [3, 7] } }
```
Invalid rows are dropped by default. Set `onErrorEnqueue: true` to keep them in the stream.
## Lazy options
Many CSV streams accept functions instead of values for options. This lets you wire up detection results that aren't available until runtime:
```javascript
const detect = csvDetectDelimitersStream()
csvParseStream({
delimiterChar: () => detect.result().value.delimiterChar,
})
```
The function is called when the stream first needs the value, not at creation time.
// File: packages/aws/+page.md
---
title: aws
---
AWS service streams for S3, DynamoDB, Lambda, SNS, and SQS. Node.js only.
## Install
```bash
npm install @datastream/aws
```
Requires the corresponding AWS SDK v3 client packages:
```bash
npm install @aws-sdk/client-s3 @aws-sdk/lib-storage
npm install @aws-sdk/client-dynamodb
npm install @aws-sdk/client-lambda
npm install @aws-sdk/client-sns
npm install @aws-sdk/client-sqs
```
## S3
### `awsS3SetClient`
Set a custom S3 client. By default, FIPS endpoints are enabled for US and CA regions.
```javascript
import { S3Client } from '@aws-sdk/client-s3'
import { awsS3SetClient } from '@datastream/aws'
awsS3SetClient(new S3Client({ region: 'eu-west-1' }))
```
### `awsS3GetObjectStream` Readable, async
Downloads an object from S3 as a stream.
#### Options
Accepts all `GetObjectCommand` parameters plus:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `client` | `S3Client` | default client | Custom S3 client for this call |
| `Bucket` | `string` | — | S3 bucket name |
| `Key` | `string` | — | S3 object key |
#### Example
```javascript
import { pipeline } from '@datastream/core'
import { awsS3GetObjectStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'
await pipeline([
await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'data.csv' }),
csvParseStream(),
])
```
### `awsS3PutObjectStream` PassThrough
Uploads data to S3 using multipart upload.
#### Options
Accepts all S3 PutObject parameters plus:
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `client` | `S3Client` | default client | Custom S3 client |
| `Bucket` | `string` | — | S3 bucket name |
| `Key` | `string` | — | S3 object key |
| `onProgress` | `function` | — | Upload progress callback |
| `tags` | `object` | — | S3 object tags |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsS3PutObjectStream } from '@datastream/aws'
import { gzipCompressStream } from '@datastream/compress'
await pipeline([
createReadableStream(data),
gzipCompressStream(),
awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }),
])
```
### `awsS3ChecksumStream` PassThrough
Computes a multi-part S3 checksum while data passes through. Designed for pre-signed URL uploads in the browser.
#### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `ChecksumAlgorithm` | `string` | `"SHA256"` | `"SHA1"` or `"SHA256"` |
| `partSize` | `number` | `17179870` | Part size in bytes |
| `resultKey` | `string` | `"s3"` | Key in pipeline result |
#### Result
```javascript
{ checksum: 'base64hash-3', checksums: ['part1hash', 'part2hash', 'part3hash'], partSize: 17179870 }
```
## DynamoDB
### `awsDynamoDBSetClient`
```javascript
import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
import { awsDynamoDBSetClient } from '@datastream/aws'
awsDynamoDBSetClient(new DynamoDBClient({ region: 'us-east-1' }))
```
### `awsDynamoDBQueryStream` Readable, async
Queries a DynamoDB table and auto-paginates through all results.
#### Options
Accepts all `QueryCommand` parameters:
| Option | Type | Description |
|--------|------|-------------|
| `TableName` | `string` | DynamoDB table name |
| `KeyConditionExpression` | `string` | Query key condition |
| `ExpressionAttributeValues` | `object` | Expression values |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsDynamoDBQueryStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsDynamoDBQueryStream({
TableName: 'Users',
KeyConditionExpression: 'PK = :pk',
ExpressionAttributeValues: { ':pk': { S: 'USER#123' } },
})),
])
```
### `awsDynamoDBScanStream` Readable, async
Scans an entire DynamoDB table with automatic pagination.
```javascript
import { awsDynamoDBScanStream } from '@datastream/aws'
const items = await awsDynamoDBScanStream({ TableName: 'Users' })
```
### `awsDynamoDBGetItemStream` Readable, async
Batch gets items by keys. Automatically retries unprocessed keys with exponential backoff.
#### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `TableName` | `string` | — | DynamoDB table name |
| `Keys` | `object[]` | — | Array of key objects |
| `retryMaxCount` | `number` | `10` | Maximum retry attempts |
### `awsDynamoDBPutItemStream` Writable
Writes items to DynamoDB using `BatchWriteItem`. Automatically batches 25 items per request and retries unprocessed items with exponential backoff.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `TableName` | `string` | DynamoDB table name |
| `retryMaxCount` | `number` | Maximum retry attempts (default 10) |
#### Example
```javascript
import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsDynamoDBPutItemStream } from '@datastream/aws'
await pipeline([
createReadableStream(items),
createTransformStream((item, enqueue) => {
enqueue({
PK: { S: `USER#${item.id}` },
SK: { S: 'PROFILE' },
name: { S: item.name },
})
}),
awsDynamoDBPutItemStream({ TableName: 'Users' }),
])
```
### `awsDynamoDBDeleteItemStream` Writable
Deletes items from DynamoDB using `BatchWriteItem`. Batches 25 items per request.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `TableName` | `string` | DynamoDB table name |
| `retryMaxCount` | `number` | Maximum retry attempts (default 10) |
#### Example
```javascript
import { awsDynamoDBDeleteItemStream } from '@datastream/aws'
awsDynamoDBDeleteItemStream({ TableName: 'Users' })
// Input chunks: { PK: { S: 'USER#1' }, SK: { S: 'PROFILE' } }
```
## Lambda
### `awsLambdaSetClient`
```javascript
import { LambdaClient } from '@aws-sdk/client-lambda'
import { awsLambdaSetClient } from '@datastream/aws'
awsLambdaSetClient(new LambdaClient({ region: 'us-east-1' }))
```
### `awsLambdaReadableStream` Readable
Invokes a Lambda function with response streaming (`InvokeWithResponseStream`).
Also exported as `awsLambdaResponseStream`.
#### Options
Accepts `InvokeWithResponseStreamCommand` parameters. Pass an array to invoke multiple functions sequentially.
| Option | Type | Description |
|--------|------|-------------|
| `FunctionName` | `string` | Lambda function name or ARN |
| `Payload` | `string` | JSON payload |
#### Example
```javascript
import { pipeline } from '@datastream/core'
import { awsLambdaReadableStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'
await pipeline([
awsLambdaReadableStream({
FunctionName: 'data-processor',
Payload: JSON.stringify({ key: 'input.csv' }),
}),
csvParseStream(),
])
```
## SNS
### `awsSNSSetClient`
```javascript
import { SNSClient } from '@aws-sdk/client-sns'
import { awsSNSSetClient } from '@datastream/aws'
awsSNSSetClient(new SNSClient({ region: 'us-east-1' }))
```
### `awsSNSPublishMessageStream` Writable
Publishes messages to an SNS topic. Batches 10 messages per `PublishBatchCommand`.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `TopicArn` | `string` | SNS topic ARN |
#### Example
```javascript
import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsSNSPublishMessageStream } from '@datastream/aws'
await pipeline([
createReadableStream(events),
createTransformStream((event, enqueue) => {
enqueue({
Id: event.id,
Message: JSON.stringify(event),
})
}),
awsSNSPublishMessageStream({ TopicArn: 'arn:aws:sns:us-east-1:123:my-topic' }),
])
```
## SQS
### `awsSQSSetClient`
```javascript
import { SQSClient } from '@aws-sdk/client-sqs'
import { awsSQSSetClient } from '@datastream/aws'
awsSQSSetClient(new SQSClient({ region: 'us-east-1' }))
```
### `awsSQSReceiveMessageStream` Readable, async
Polls an SQS queue and yields messages until the queue is empty.
#### Options
Accepts `ReceiveMessageCommand` parameters:
| Option | Type | Description |
|--------|------|-------------|
| `QueueUrl` | `string` | SQS queue URL |
| `MaxNumberOfMessages` | `number` | Max messages per poll (1-10) |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { awsSQSReceiveMessageStream, awsSQSDeleteMessageStream } from '@datastream/aws'
await pipeline([
createReadableStream(await awsSQSReceiveMessageStream({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
})),
awsSQSDeleteMessageStream({
QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
}),
])
```
### `awsSQSSendMessageStream` Writable
Sends messages to an SQS queue. Batches 10 messages per `SendMessageBatchCommand`.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `QueueUrl` | `string` | SQS queue URL |
### `awsSQSDeleteMessageStream` Writable
Deletes messages from an SQS queue. Batches 10 messages per `DeleteMessageBatchCommand`.
#### Options
| Option | Type | Description |
|--------|------|-------------|
| `QueueUrl` | `string` | SQS queue URL |
// File: packages/base64/+page.md
---
title: base64
---
Base64 encoding and decoding streams.
## Install
```bash
npm install @datastream/base64
```
## `base64EncodeStream` Transform
Encodes data to base64. Handles chunk boundaries correctly by buffering partial 3-byte groups.
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { base64EncodeStream } from '@datastream/base64'
await pipeline([
createReadableStream('Hello, World!'),
base64EncodeStream(),
])
```
## `base64DecodeStream` Transform
Decodes base64 data back to its original form. Handles chunk boundaries by buffering partial 4-character groups.
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { base64DecodeStream } from '@datastream/base64'
await pipeline([
createReadableStream('SGVsbG8sIFdvcmxkIQ=='),
base64DecodeStream(),
])
```
// File: packages/charset/+page.md
---
title: charset
---
Character set detection, decoding, and encoding streams.
## Install
```bash
npm install @datastream/charset
```
## `charsetDetectStream` PassThrough
Detects the character encoding of the data passing through by analyzing byte patterns.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `resultKey` | `string` | `"charset"` | Key in pipeline result |
### Result
Returns the most likely charset with confidence score:
```javascript
{ charset: 'UTF-8', confidence: 80 }
```
### Supported charsets
UTF-8, UTF-16BE, UTF-16LE, UTF-32BE, UTF-32LE, Shift_JIS, ISO-2022-JP, ISO-2022-CN, ISO-2022-KR, GB18030, EUC-JP, EUC-KR, Big5, ISO-8859-1, ISO-8859-2, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, windows-1250, windows-1251, windows-1252, windows-1254, windows-1256, KOI8-R
### Example
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream } from '@datastream/file'
import { charsetDetectStream, charsetDecodeStream } from '@datastream/charset'
const detect = charsetDetectStream()
const result = await pipeline([
fileReadStream({ path: './data.csv' }),
detect,
])
console.log(result.charset)
// { charset: 'UTF-8', confidence: 80 }
```
## `charsetDecodeStream` Transform
Decodes binary data to text using the specified character encoding. Uses `TextDecoderStream` internally.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `charset` | `string` | — | Character encoding name (e.g. `"UTF-8"`, `"ISO-8859-1"`) |
### Example
```javascript
import { charsetDecodeStream } from '@datastream/charset'
charsetDecodeStream({ charset: 'ISO-8859-1' })
```
## `charsetEncodeStream` Transform
Encodes text to binary using the specified character encoding. Uses `TextEncoderStream` internally.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `charset` | `string` | — | Character encoding name |
### Example
```javascript
import { charsetEncodeStream } from '@datastream/charset'
charsetEncodeStream({ charset: 'UTF-8' })
```
// File: packages/compress/+page.md
---
title: compress
---
Compression and decompression streams for gzip, deflate, brotli, and zstd.
## Install
```bash
npm install @datastream/compress
```
## gzip
### `gzipCompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `quality` | `number` | `-1` | Compression level (-1 to 9). -1 = default, 0 = none, 9 = best |
### `gzipDecompressStream` Transform
No options required.
### Example
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress'
// Compress
await pipeline([
fileReadStream({ path: './data.csv' }),
gzipCompressStream({ quality: 9 }),
fileWriteStream({ path: './data.csv.gz' }),
])
// Decompress
await pipeline([
fileReadStream({ path: './data.csv.gz' }),
gzipDecompressStream(),
fileWriteStream({ path: './data.csv' }),
])
```
## deflate
### `deflateCompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `quality` | `number` | `-1` | Compression level (-1 to 9) |
### `deflateDecompressStream` Transform
No options required.
## brotli
### `brotliCompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `quality` | `number` | `11` | Compression level (0 to 11) |
### `brotliDecompressStream` Transform
No options required.
## zstd Node.js only
Requires Node.js with zstd support.
### `zstdCompressStream` Transform
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `quality` | `number` | `3` | Compression level |
### `zstdDecompressStream` Transform
No options required.
## Platform support
| Algorithm | Node.js | Browser |
|-----------|---------|---------|
| gzip | `node:zlib` | `CompressionStream` |
| deflate | `node:zlib` | `CompressionStream` |
| brotli | `node:zlib` | `CompressionStream` |
| zstd | `node:zlib` | Not supported |
// File: packages/core/+page.md
---
title: core
---
Foundation package providing pipeline orchestration, stream factories, and utility functions.
## Install
```bash
npm install @datastream/core
```
## Pipeline
### `pipeline(streams, streamOptions)` async
Connects all streams, waits for completion, and returns combined `.result()` values from all PassThrough streams. Automatically appends a no-op Writable if the last stream is Readable.
#### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `highWaterMark` | `number` | — | Backpressure threshold |
| `chunkSize` | `number` | — | Size hint for chunking |
| `signal` | `AbortSignal` | — | Abort the pipeline |
#### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'
const count = objectCountStream()
const result = await pipeline([
createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]),
count,
])
console.log(result)
// { count: 3 }
```
### `pipejoin(streams)` returns stream
Connects streams and returns the resulting stream. Use this when you want to consume output manually with `streamToArray`, `streamToString`, or `for await`.
#### Example
```javascript
import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core'
const river = pipejoin([
createReadableStream([1, 2, 3]),
createTransformStream((n, enqueue) => enqueue(n * 2)),
])
const output = await streamToArray(river)
// [2, 4, 6]
```
### `result(streams)` async
Iterates over streams and combines all `.result()` return values into a single object. Called automatically by `pipeline()`.
## Consumers
### `streamToArray(stream)` async
Collects all chunks from a stream into an array.
```javascript
import { pipejoin, streamToArray, createReadableStream } from '@datastream/core'
const river = pipejoin([createReadableStream(['a', 'b', 'c'])])
const output = await streamToArray(river)
// ['a', 'b', 'c']
```
### `streamToString(stream)` async
Concatenates all chunks into a single string.
```javascript
const output = await streamToString(river)
// 'abc'
```
### `streamToObject(stream)` async
Merges all chunks into a single object using `Object.assign`.
```javascript
const river = pipejoin([createReadableStream([{ a: 1 }, { b: 2 }])])
const output = await streamToObject(river)
// { a: 1, b: 2 }
```
### `streamToBuffer(stream)` async, Node.js only
Collects all chunks into a `Buffer`.
## Stream Factories
### `createReadableStream(input, streamOptions)` Readable
Creates a Readable stream from various input types.
#### Input types
| Type | Behavior |
|------|----------|
| `string` | Chunked at `chunkSize` (default 16KB) |
| `Array` | Each element emitted as a chunk |
| `AsyncIterable` / `Iterable` | Each yielded value emitted as a chunk |
| `ArrayBuffer` | Chunked at `chunkSize` (Node.js only) |
#### Example
```javascript
import { createReadableStream } from '@datastream/core'
// From string — auto-chunked
const stream = createReadableStream('hello world')
// From array — one chunk per element
const stream = createReadableStream([{ a: 1 }, { a: 2 }])
// From async generator
async function* generate() {
yield 'chunk1'
yield 'chunk2'
}
const stream = createReadableStream(generate())
```
### `createPassThroughStream(fn, flush?, streamOptions)` Transform (PassThrough)
Creates a stream that observes each chunk without modifying it. The chunk is automatically passed through.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `fn` | `(chunk) => void` | Called for each chunk, return value ignored |
| `flush` | `() => void` | Optional, called when stream ends |
| `streamOptions` | `object` | Stream configuration |
#### Example
```javascript
import { createPassThroughStream } from '@datastream/core'
let total = 0
const counter = createPassThroughStream((chunk) => {
total += chunk.length
})
counter.result = () => ({ key: 'total', value: total })
```
### `createTransformStream(fn, flush?, streamOptions)` Transform
Creates a stream that modifies chunks. Use `enqueue` to emit output — you can emit zero, one, or many chunks per input.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `fn` | `(chunk, enqueue) => void` | Transform each chunk, call `enqueue(output)` to emit |
| `flush` | `(enqueue) => void` | Optional, emit final chunks when stream ends |
| `streamOptions` | `object` | Stream configuration |
#### Example
```javascript
import { createTransformStream } from '@datastream/core'
// Filter: emit only matching chunks
const filter = createTransformStream((chunk, enqueue) => {
if (chunk.age > 18) enqueue(chunk)
})
// Expand: emit multiple chunks per input
const expand = createTransformStream((chunk, enqueue) => {
for (const item of chunk.items) {
enqueue(item)
}
})
```
### `createWritableStream(fn, close?, streamOptions)` Writable
Creates a stream that consumes chunks at the end of a pipeline.
#### Parameters
| Parameter | Type | Description |
|-----------|------|-------------|
| `fn` | `(chunk) => void` | Called for each chunk |
| `close` | `() => void` | Optional, called when stream ends |
| `streamOptions` | `object` | Stream configuration |
#### Example
```javascript
import { createWritableStream } from '@datastream/core'
const rows = []
const collector = createWritableStream((chunk) => {
rows.push(chunk)
})
```
## Utilities
### `isReadable(stream)`
Returns `true` if the stream is Readable.
### `isWritable(stream)`
Returns `true` if the stream is Writable.
### `makeOptions(options)`
Normalizes stream options for interoperability between Readable, Transform, and Writable streams.
| Option | Type | Description |
|--------|------|-------------|
| `highWaterMark` | `number` | Backpressure threshold |
| `chunkSize` | `number` | Chunking size hint |
| `signal` | `AbortSignal` | Abort signal |
### `timeout(ms, options)` async
Returns a promise that resolves after `ms` milliseconds. Supports `AbortSignal` cancellation.
```javascript
import { timeout } from '@datastream/core'
await timeout(1000) // wait 1 second
const controller = new AbortController()
await timeout(5000, { signal: controller.signal }) // cancellable
```
### `backpressureGuage(streams)` Node.js only
Measures pause/resume timing across streams. Useful for identifying bottlenecks.
```javascript
import { backpressureGuage } from '@datastream/core'
const metrics = backpressureGuage({ parse: parseStream, validate: validateStream })
// After pipeline completes:
// metrics.parse.total = { timestamp, duration }
// metrics.parse.timeline = [{ timestamp, duration }, ...]
```
// File: packages/csv/+page.md
---
title: csv
---
Parse, format, detect, clean, and coerce CSV data.
## Install
```bash
npm install @datastream/csv
```
## `csvDetectDelimitersStream` PassThrough
Auto-detects the delimiter, newline, quote, and escape characters from the first chunk of data.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `1024` | Minimum bytes to buffer before detecting |
| `resultKey` | `string` | `"csvDetectDelimiters"` | Key in pipeline result |
### Result
```javascript
{ delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' }
```
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvDetectDelimitersStream } from '@datastream/csv'
const detect = csvDetectDelimitersStream()
const result = await pipeline([
createReadableStream('name\tage\r\nAlice\t30'),
detect,
])
console.log(result.csvDetectDelimiters)
// { delimiterChar: '\t', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' }
```
## `csvDetectHeaderStream` Transform
Detects and strips the header row. Outputs data rows only (without the header).
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `1024` | Minimum bytes to buffer before detecting |
| `delimiterChar` | `string \| () => string` | `","` | Delimiter character or lazy function |
| `newlineChar` | `string \| () => string` | `"\r\n"` | Newline character or lazy function |
| `quoteChar` | `string \| () => string` | `'"'` | Quote character or lazy function |
| `escapeChar` | `string \| () => string` | quoteChar | Escape character or lazy function |
| `parser` | `function` | `csvQuotedParser` | Custom parser function |
| `resultKey` | `string` | `"csvDetectHeader"` | Key in pipeline result |
### Result
```javascript
{ header: ['name', 'age', 'city'] }
```
### Example
```javascript
import { csvDetectDelimitersStream, csvDetectHeaderStream } from '@datastream/csv'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
```
## `csvParseStream` Transform
Parses CSV text into arrays of field values (string arrays). Each output chunk is one row as `string[]`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `2097152` (2MB) | Input buffer size before first parse |
| `delimiterChar` | `string \| () => string` | `","` | Delimiter character or lazy function |
| `newlineChar` | `string \| () => string` | `"\r\n"` | Newline character or lazy function |
| `quoteChar` | `string \| () => string` | `'"'` | Quote character or lazy function |
| `escapeChar` | `string \| () => string` | quoteChar | Escape character or lazy function |
| `parser` | `function` | `csvQuotedParser` | Custom parser function |
| `resultKey` | `string` | `"csvErrors"` | Key in pipeline result |
### Result
Parse errors collected during processing:
```javascript
{
UnterminatedQuote: { id: 'UnterminatedQuote', message: 'Unterminated quoted field', idx: [5] }
}
```
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
const result = await pipeline([
createReadableStream('a,b,c\r\n1,2,3\r\n4,5,6'),
csvParseStream(),
])
// Chunks emitted: ['a','b','c'], ['1','2','3'], ['4','5','6']
```
## `csvFormatStream` Transform
Formats objects or arrays back to CSV text.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `header` | `boolean \| string[]` | — | `true` to auto-detect from object keys, or provide array of column names |
| `delimiterChar` | `string` | `","` | Delimiter character |
| `newlineChar` | `string` | `"\r\n"` | Newline character |
| `quoteChar` | `string` | `'"'` | Quote character |
| `escapeChar` | `string` | quoteChar | Escape character |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvFormatStream } from '@datastream/csv'
await pipeline([
createReadableStream([
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
]),
csvFormatStream({ header: true }),
])
// Output: "name","age"\r\n"Alice","30"\r\n"Bob","25"\r\n
```
## `csvRemoveEmptyRowsStream` Transform
Removes rows where all fields are empty strings.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `onErrorEnqueue` | `boolean` | `false` | If `true`, empty rows are kept in stream (but still tracked) |
| `resultKey` | `string` | `"csvRemoveEmptyRows"` | Key in pipeline result |
### Result
```javascript
{ EmptyRow: { id: 'EmptyRow', message: 'Row is empty', idx: [3, 7] } }
```
## `csvRemoveMalformedRowsStream` Transform
Removes rows with an incorrect number of fields compared to the first row or the provided header.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `headers` | `string[] \| () => string[]` | — | Expected header array, or lazy function. If not provided, uses first row's field count |
| `onErrorEnqueue` | `boolean` | `false` | If `true`, malformed rows are kept in stream |
| `resultKey` | `string` | `"csvRemoveMalformedRows"` | Key in pipeline result |
### Result
```javascript
{ MalformedRow: { id: 'MalformedRow', message: 'Row has incorrect number of fields', idx: [2] } }
```
## `csvCoerceValuesStream` Transform
Converts string field values to typed JavaScript values. Works on objects (use after `objectFromEntriesStream`).
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `columns` | `object` | — | Map of column names to types. Without this, auto-coercion is used |
| `resultKey` | `string` | `"csvCoerceValues"` | Key in pipeline result |
### Auto-coercion rules
| Input | Output |
|-------|--------|
| `""` | `null` |
| `"true"` / `"false"` | `true` / `false` |
| Numeric strings | `Number` |
| ISO 8601 date strings | `Date` |
| JSON strings (`{...}`, `[...]`) | Parsed object/array |
### Column types
When specifying `columns`, valid types are: `"number"`, `"boolean"`, `"null"`, `"date"`, `"json"`.
```javascript
csvCoerceValuesStream({
columns: { age: 'number', active: 'boolean', birthday: 'date' }
})
```
## `csvQuotedParser` / `csvUnquotedParser`
Standalone parser functions for use outside of streams. `csvUnquotedParser` is faster but does not handle quoted fields.
```javascript
import { csvQuotedParser } from '@datastream/csv'
const { rows, tail, numCols, idx, errors } = csvQuotedParser(
'a,b,c\r\n1,2,3\r\n',
{ delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"' },
true
)
// rows: [['a','b','c'], ['1','2','3']]
```
## Full pipeline example
Detect, parse, clean, coerce, and validate:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvRemoveEmptyRowsStream,
csvRemoveMalformedRowsStream,
csvCoerceValuesStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
createReadableStream(csvData),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
csvRemoveEmptyRowsStream(),
csvRemoveMalformedRowsStream({
headers: () => detectHeader.result().value.header,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
csvCoerceValuesStream(),
validateStream({ schema }),
])
```
// File: packages/digest/+page.md
---
title: digest
---
Compute cryptographic hash digests while streaming data.
## Install
```bash
npm install @datastream/digest
```
## `digestStream` PassThrough, async (browser)
Computes a hash digest of all data passing through. The stream is async in the browser (returns a Promise) because it uses `hash-wasm`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `algorithm` | `string` | — | Hash algorithm (see table below) |
| `resultKey` | `string` | `"digest"` | Key in pipeline result |
### Supported algorithms
| Algorithm | Node.js | Browser |
|-----------|---------|---------|
| `SHA2-256` | `node:crypto` | `hash-wasm` |
| `SHA2-384` | `node:crypto` | `hash-wasm` |
| `SHA2-512` | `node:crypto` | `hash-wasm` |
| `SHA3-256` | — | `hash-wasm` |
| `SHA3-384` | — | `hash-wasm` |
| `SHA3-512` | — | `hash-wasm` |
### Result
```javascript
'SHA2-256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
```
### Example
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream } from '@datastream/file'
import { digestStream } from '@datastream/digest'
// Node.js — synchronous
const digest = digestStream({ algorithm: 'SHA2-256' })
const result = await pipeline([
fileReadStream({ path: './data.csv' }),
digest,
])
console.log(result)
// { digest: 'SHA2-256:e3b0c4429...' }
```
```javascript
// Browser — async, must await
const digest = await digestStream({ algorithm: 'SHA2-256' })
```
// File: packages/fetch/+page.md
---
title: fetch
---
HTTP client streams with automatic pagination, rate limiting, and 429 retry.
## Install
```bash
npm install @datastream/fetch
```
## `fetchSetDefaults`
Set global defaults for all fetch streams.
### Defaults
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `method` | `string` | `"GET"` | HTTP method |
| `headers` | `object` | `{ Accept: 'application/json', 'Accept-Encoding': 'br, gzip, deflate' }` | Request headers |
| `rateLimit` | `number` | `0.01` | Minimum seconds between requests (0.01 = 100/sec) |
| `dataPath` | `string` | — | Dot-path to data array in JSON response body |
| `nextPath` | `string` | — | Dot-path to next page URL in JSON response body |
| `qs` | `object` | `{}` | Default query string parameters |
| `offsetParam` | `string` | — | Query parameter name for offset pagination |
| `offsetAmount` | `number` | — | Increment per page for offset pagination |
### Example
```javascript
import { fetchSetDefaults } from '@datastream/fetch'
fetchSetDefaults({
headers: { Authorization: 'Bearer token123' },
rateLimit: 0.1, // 10 requests/sec
})
```
## `fetchReadableStream` Readable
Fetches data from one or more URLs and emits chunks. Automatically detects JSON responses and handles pagination.
Also exported as `fetchResponseStream`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `url` | `string` | — | Request URL |
| `method` | `string` | `"GET"` | HTTP method |
| `headers` | `object` | — | Request headers (merged with defaults) |
| `rateLimit` | `number` | `0.01` | Seconds between requests |
| `dataPath` | `string` | — | Dot-path to data in JSON body |
| `nextPath` | `string` | — | Dot-path to next URL in JSON body |
| `qs` | `object` | `{}` | Query string parameters |
| `offsetParam` | `string` | — | Query param for offset-based pagination |
| `offsetAmount` | `number` | — | Offset increment per page |
Pass an array of option objects to fetch from multiple URLs in sequence.
### Pagination strategies
**Link header** — automatically followed when present:
```
Link: ; rel="next"
```
**Body path** — use `nextPath` to extract the next URL from the JSON response:
```javascript
fetchReadableStream({
url: 'https://api.example.com/users',
dataPath: 'data',
nextPath: 'pagination.next_url',
})
```
**Offset** — use `offsetParam` and `offsetAmount` for numeric pagination:
```javascript
fetchReadableStream({
url: 'https://api.example.com/users',
dataPath: 'results',
offsetParam: 'offset',
offsetAmount: 100,
})
```
### 429 retry
When receiving a `429 Too Many Requests` response, the request is automatically retried after respecting the rate limit delay.
### Example
```javascript
import { pipeline } from '@datastream/core'
import { fetchReadableStream } from '@datastream/fetch'
import { objectCountStream } from '@datastream/object'
const count = objectCountStream()
const result = await pipeline([
fetchReadableStream({
url: 'https://api.example.com/users',
dataPath: 'data',
nextPath: 'meta.next',
headers: { Authorization: 'Bearer token' },
}),
count,
])
console.log(result)
// { count: 450 }
```
### Multiple URLs
```javascript
fetchReadableStream([
{ url: 'https://api.example.com/users?status=active', dataPath: 'data' },
{ url: 'https://api.example.com/users?status=inactive', dataPath: 'data' },
])
```
## `fetchWritableStream` Writable, async
Streams data as the body of an HTTP request. Uses `duplex: "half"` for browser compatibility.
Also exported as `fetchRequestStream`.
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { fetchWritableStream } from '@datastream/fetch'
import { csvFormatStream } from '@datastream/csv'
await pipeline([
createReadableStream(data),
csvFormatStream({ header: true }),
await fetchWritableStream({
url: 'https://api.example.com/upload',
method: 'PUT',
headers: { 'Content-Type': 'text/csv' },
}),
])
```
// File: packages/file/+page.md
---
title: file
---
File read and write streams for Node.js and the browser.
## Install
```bash
npm install @datastream/file
```
## `fileReadStream` Readable
Reads a file as a stream.
- **Node.js**: Uses `fs.createReadStream`
- **Browser**: Uses `window.showOpenFilePicker` (File System Access API)
### Options
| Option | Type | Description |
|--------|------|-------------|
| `path` | `string` | File path (Node.js) |
| `types` | `object[]` | File type filter for the file picker (see below) |
### Example — Node.js
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream } from '@datastream/file'
await pipeline([
fileReadStream({ path: './data.csv' }),
])
```
### Example — Browser
```javascript
import { fileReadStream } from '@datastream/file'
const stream = await fileReadStream({
types: [{ accept: { 'text/csv': ['.csv'] } }],
})
```
## `fileWriteStream` Writable
Writes a stream to a file.
- **Node.js**: Uses `fs.createWriteStream`
- **Browser**: Uses `window.showSaveFilePicker` (File System Access API)
### Options
| Option | Type | Description |
|--------|------|-------------|
| `path` | `string` | File path (Node.js), suggested file name (Browser) |
| `types` | `object[]` | File type filter |
### Example — Node.js
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { fileWriteStream } from '@datastream/file'
await pipeline([
createReadableStream('hello world'),
fileWriteStream({ path: './output.txt' }),
])
```
### Example — Browser
```javascript
import { fileWriteStream } from '@datastream/file'
const writable = await fileWriteStream({
path: 'output.csv',
types: [{ accept: { 'text/csv': ['.csv'] } }],
})
```
## File type filtering
The `types` option validates file extensions (Node.js) and configures the file picker dialog (Browser):
```javascript
const types = [
{
accept: {
'text/csv': ['.csv'],
'application/json': ['.json'],
},
},
]
```
On Node.js, if `types` is provided and the file extension doesn't match, an `"Invalid extension"` error is thrown.
// File: packages/indexeddb/+page.md
---
title: indexeddb
---
IndexedDB read and write streams for the browser.
## Install
```bash
npm install @datastream/indexeddb
```
## `indexedDBConnect`
Opens (or creates) an IndexedDB database. Re-exported from the [idb](https://www.npmjs.com/package/idb) library.
```javascript
import { indexedDBConnect } from '@datastream/indexeddb'
const db = await indexedDBConnect('my-database', 1, {
upgrade(db) {
db.createObjectStore('records', { keyPath: 'id' })
},
})
```
## `indexedDBReadStream` Readable, async
Reads records from an IndexedDB object store as a stream.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `db` | `IDBDatabase` | — | Database connection from `indexedDBConnect` |
| `store` | `string` | — | Object store name |
| `index` | `string` | — | Optional index name |
| `key` | `IDBKeyRange` | — | Optional key range filter |
### Example
```javascript
import { pipeline } from '@datastream/core'
import { indexedDBConnect, indexedDBReadStream } from '@datastream/indexeddb'
import { objectCountStream } from '@datastream/object'
const db = await indexedDBConnect('my-database', 1)
const count = objectCountStream()
const result = await pipeline([
await indexedDBReadStream({ db, store: 'records' }),
count,
])
console.log(result)
// { count: 100 }
```
## `indexedDBWriteStream` Writable, async
Writes records to an IndexedDB object store.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `db` | `IDBDatabase` | — | Database connection from `indexedDBConnect` |
| `store` | `string` | — | Object store name |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { indexedDBConnect, indexedDBWriteStream } from '@datastream/indexeddb'
const db = await indexedDBConnect('my-database', 1)
await pipeline([
createReadableStream([
{ id: 1, name: 'Alice' },
{ id: 2, name: 'Bob' },
]),
await indexedDBWriteStream({ db, store: 'records' }),
])
```
// File: packages/ipfs/+page.md
---
title: ipfs
---
IPFS get and add streams.
## Install
```bash
npm install @datastream/ipfs
```
## `ipfsGetStream` Readable, async
Retrieves content from IPFS by CID.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `node` | `IPFS` | — | IPFS node instance |
| `cid` | `string` | — | Content identifier |
### Example
```javascript
import { pipeline } from '@datastream/core'
import { ipfsGetStream } from '@datastream/ipfs'
await pipeline([
await ipfsGetStream({ node: ipfsNode, cid: 'QmHash...' }),
])
```
## `ipfsAddStream` PassThrough, async
Adds content to IPFS and returns the CID in `.result()`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `node` | `IPFS` | — | IPFS node instance |
| `resultKey` | `string` | `"cid"` | Key in pipeline result |
### Result
The CID of the stored content.
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { ipfsAddStream } from '@datastream/ipfs'
const result = await pipeline([
createReadableStream('Hello IPFS!'),
await ipfsAddStream({ node: ipfsNode }),
])
console.log(result)
// { cid: 'QmHash...' }
```
// File: packages/object/+page.md
---
title: object
---
Transform, reshape, filter, and aggregate object streams.
## Install
```bash
npm install @datastream/object
```
## `objectReadableStream` Readable
Creates a Readable stream from an array of objects.
```javascript
import { objectReadableStream } from '@datastream/object'
const stream = objectReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }])
```
## `objectCountStream` PassThrough
Counts the number of chunks that pass through.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `resultKey` | `string` | `"count"` | Key in pipeline result |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'
const count = objectCountStream()
const result = await pipeline([
createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]),
count,
])
console.log(result)
// { count: 3 }
```
## `objectFromEntriesStream` Transform
Converts arrays to objects using column names. Commonly used with `csvParseStream` output.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[] \| () => string[]` | — | Column names, or lazy function |
### Example
```javascript
import { objectFromEntriesStream } from '@datastream/object'
// Converts ['Alice', '30', 'Toronto'] → { name: 'Alice', age: '30', city: 'Toronto' }
objectFromEntriesStream({ keys: ['name', 'age', 'city'] })
// With lazy keys from csvDetectHeaderStream
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
})
```
## `objectPickStream` Transform
Keeps only the specified keys on each object.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Keys to keep |
### Example
```javascript
import { objectPickStream } from '@datastream/object'
// { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice', age: 30 }
objectPickStream({ keys: ['name', 'age'] })
```
## `objectOmitStream` Transform
Removes the specified keys from each object.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Keys to remove |
### Example
```javascript
import { objectOmitStream } from '@datastream/object'
// { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice' }
objectOmitStream({ keys: ['age', 'city'] })
```
## `objectKeyMapStream` Transform
Renames keys on each object. Unmapped keys are kept as-is.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `object` | — | Map of `{ oldKey: 'newKey' }` |
### Example
```javascript
import { objectKeyMapStream } from '@datastream/object'
// { firstName: 'Alice', lastName: 'Smith' } → { first_name: 'Alice', last_name: 'Smith' }
objectKeyMapStream({ keys: { firstName: 'first_name', lastName: 'last_name' } })
```
## `objectValueMapStream` Transform
Maps values for a specific key using a lookup table.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `key` | `string` | — | Key whose value to map |
| `values` | `object` | — | Map of `{ oldValue: newValue }` |
### Example
```javascript
import { objectValueMapStream } from '@datastream/object'
// { status: 'A' } → { status: 'Active' }
objectValueMapStream({ key: 'status', values: { A: 'Active', I: 'Inactive' } })
```
## `objectKeyJoinStream` Transform
Joins multiple keys into new combined keys.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `object` | — | Map of `{ newKey: ['key1', 'key2'] }` |
| `separator` | `string` | — | Separator for joined values |
### Example
```javascript
import { objectKeyJoinStream } from '@datastream/object'
// { first: 'Alice', last: 'Smith', age: 30 } → { name: 'Alice Smith', age: 30 }
objectKeyJoinStream({ keys: { name: ['first', 'last'] }, separator: ' ' })
```
## `objectKeyValueStream` Transform
Creates a key-value pair object from two fields.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `key` | `string` | — | Field to use as the key |
| `value` | `string` | — | Field to use as the value |
### Example
```javascript
import { objectKeyValueStream } from '@datastream/object'
// { code: 'CA', country: 'Canada' } → { CA: 'Canada' }
objectKeyValueStream({ key: 'code', value: 'country' })
```
## `objectKeyValuesStream` Transform
Creates a key-values pair object — the key comes from a field, the value is a subset of the object.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `key` | `string` | — | Field to use as the key |
| `values` | `string[]` | — | Fields to include in value object. If omitted, entire object is used |
### Example
```javascript
import { objectKeyValuesStream } from '@datastream/object'
// { id: 'u1', name: 'Alice', age: 30 } → { u1: { name: 'Alice', age: 30 } }
objectKeyValuesStream({ key: 'id', values: ['name', 'age'] })
```
## `objectBatchStream` Transform
Groups consecutive objects with the same key values into arrays. Use with `objectPivotLongToWideStream`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Keys to group by |
### Example
```javascript
import { objectBatchStream } from '@datastream/object'
// Input: [{id:1,k:'a'}, {id:1,k:'b'}, {id:2,k:'c'}]
// Output: [[{id:1,k:'a'}, {id:1,k:'b'}]], [[{id:2,k:'c'}]]
objectBatchStream({ keys: ['id'] })
```
## `objectPivotLongToWideStream` Transform
Pivots batched arrays from long to wide format. Must be used after `objectBatchStream`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Pivot key columns |
| `valueParam` | `string` | — | Column containing the values |
| `delimiter` | `string` | `" "` | Separator for combined key names |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { objectBatchStream, objectPivotLongToWideStream } from '@datastream/object'
// Input: [{id:1, metric:'temp', value:20}, {id:1, metric:'humidity', value:60}]
// Output: {id:1, temp:20, humidity:60}
await pipeline([
createReadableStream(data),
objectBatchStream({ keys: ['id'] }),
objectPivotLongToWideStream({ keys: ['metric'], valueParam: 'value' }),
])
```
## `objectPivotWideToLongStream` Transform
Pivots wide format to long format. Emits multiple chunks per input object.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `keys` | `string[]` | — | Columns to unpivot |
| `keyParam` | `string` | `"keyParam"` | Name for the new key column |
| `valueParam` | `string` | `"valueParam"` | Name for the new value column |
### Example
```javascript
import { objectPivotWideToLongStream } from '@datastream/object'
// Input: { id: 1, temp: 20, humidity: 60 }
// Output: { id: 1, keyParam: 'temp', valueParam: 20 }, { id: 1, keyParam: 'humidity', valueParam: 60 }
objectPivotWideToLongStream({
keys: ['temp', 'humidity'],
keyParam: 'metric',
valueParam: 'value',
})
```
## `objectSkipConsecutiveDuplicatesStream` Transform
Skips consecutive duplicate objects (compared via `JSON.stringify`).
### Example
```javascript
import { objectSkipConsecutiveDuplicatesStream } from '@datastream/object'
// Input: [{a:1}, {a:1}, {a:2}, {a:1}]
// Output: [{a:1}, {a:2}, {a:1}]
```
// File: packages/string/+page.md
---
title: string
---
String manipulation streams — split, replace, count, and measure text data.
## Install
```bash
npm install @datastream/string
```
## `stringReadableStream` Readable
Creates a Readable stream from a string input.
```javascript
import { stringReadableStream } from '@datastream/string'
const stream = stringReadableStream('hello world')
```
## `stringLengthStream` PassThrough
Counts the total character length of all chunks.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `resultKey` | `string` | `"length"` | Key in pipeline result |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { stringLengthStream } from '@datastream/string'
const length = stringLengthStream()
const result = await pipeline([
createReadableStream('hello world'),
length,
])
console.log(result)
// { length: 11 }
```
## `stringCountStream` PassThrough
Counts occurrences of a substring across all chunks.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `substr` | `string` | — | Substring to count |
| `resultKey` | `string` | `"count"` | Key in pipeline result |
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { stringCountStream } from '@datastream/string'
const count = stringCountStream({ substr: '\n' })
const result = await pipeline([
createReadableStream('line1\nline2\nline3'),
count,
])
console.log(result)
// { count: 2 }
```
## `stringSplitStream` Transform
Splits streaming text by a separator, emitting one chunk per segment. Handles splits that cross chunk boundaries.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `separator` | `string` | — | String to split on |
### Example
```javascript
import { pipeline, createReadableStream, streamToArray, pipejoin } from '@datastream/core'
import { stringSplitStream } from '@datastream/string'
const river = pipejoin([
createReadableStream('alice,bob,charlie'),
stringSplitStream({ separator: ',' }),
])
const output = await streamToArray(river)
// ['alice', 'bob', 'charlie']
```
## `stringReplaceStream` Transform
Replaces pattern matches in streaming text. Handles replacements that span chunk boundaries.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `pattern` | `string \| RegExp` | — | Pattern to search for |
| `replacement` | `string` | — | Replacement string |
### Example
```javascript
import { stringReplaceStream } from '@datastream/string'
stringReplaceStream({ pattern: /\t/g, replacement: ',' })
```
## `stringMinimumFirstChunkSize` Transform
Buffers data until the first chunk meets a minimum size, then passes all subsequent chunks through unchanged.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `1024` | Minimum first chunk size in characters |
## `stringMinimumChunkSize` Transform
Buffers data until chunks meet a minimum size before emitting. Useful when downstream requires a minimum data size for processing.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `chunkSize` | `number` | `1024` | Minimum chunk size in characters |
## `stringSkipConsecutiveDuplicates` Transform
Skips consecutive duplicate string chunks.
```javascript
import { stringSkipConsecutiveDuplicates } from '@datastream/string'
// Input chunks: 'a', 'a', 'b', 'a' → Output: 'a', 'b', 'a'
```
// File: packages/validate/+page.md
---
title: validate
---
JSON Schema validation for object streams using Ajv.
## Install
```bash
npm install @datastream/validate
```
## `validateStream` Transform
Validates each object chunk against a JSON Schema. Invalid rows are dropped by default; errors are collected in `.result()`.
### Options
| Option | Type | Default | Description |
|--------|------|---------|-------------|
| `schema` | `object \| function` | — | JSON Schema object, or pre-compiled validation function |
| `idxStart` | `number` | `0` | Starting row index for error tracking |
| `onErrorEnqueue` | `boolean` | `false` | If `true`, invalid rows are kept in the stream |
| `allowCoerceTypes` | `boolean` | `true` | If `false`, emits original data without Ajv coercion |
| `resultKey` | `string` | `"validate"` | Key in pipeline result |
### Result
Errors grouped by schema path:
```javascript
{
'#/required': {
id: '#/required',
keys: ['email'],
message: "must have required property 'email'",
idx: [3, 7, 15]
},
'#/properties/age/type': {
id: '#/properties/age/type',
keys: ['age'],
message: 'must be number',
idx: [5]
}
}
```
### Example
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { validateStream } from '@datastream/validate'
const schema = {
type: 'object',
required: ['name', 'age'],
properties: {
name: { type: 'string', minLength: 1 },
age: { type: 'number', minimum: 0 },
email: { type: 'string', format: 'email' },
},
additionalProperties: false,
}
const result = await pipeline([
createReadableStream([
{ name: 'Alice', age: 30, email: 'alice@example.com' },
{ name: '', age: -1 },
{ name: 'Bob', age: 25 },
]),
validateStream({ schema }),
])
console.log(result.validate)
// Errors for row 1 (name minLength, age minimum)
```
### Keep invalid rows
```javascript
validateStream({ schema, onErrorEnqueue: true })
```
### Pre-compiled schema
For better cold-start performance, pre-compile your schema during the build step:
```javascript
import { transpileSchema, validateStream } from '@datastream/validate'
const validate = transpileSchema(schema)
validateStream({ schema: validate })
```
## `transpileSchema`
Pre-compiles a JSON Schema into a validation function using Ajv.
### Ajv defaults
| Option | Default | Description |
|--------|---------|-------------|
| `strict` | `true` | Strict mode |
| `coerceTypes` | `true` | Coerce types to match schema |
| `allErrors` | `true` | Report all errors, not just the first |
| `useDefaults` | `"empty"` | Apply defaults for missing/empty values |
| `messages` | `true` | Include error messages |
```javascript
import { transpileSchema } from '@datastream/validate'
const validate = transpileSchema(schema, { coerceTypes: false })
```
// File: quick-start/+page.md
---
title: Quick Start
---
## Install
```bash
npm install @datastream/core
```
Add packages as needed:
```bash
npm install @datastream/csv @datastream/validate @datastream/compress @datastream/file
```
## Your first pipeline
Read a CSV string, parse it, and collect the results:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
import { objectFromEntriesStream, objectCountStream } from '@datastream/object'
const csvData = 'name,age,city\r\nAlice,30,Toronto\r\nBob,25,Vancouver\r\nCharlie,35,Montreal'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const count = objectCountStream()
const result = await pipeline([
createReadableStream(csvData),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
count,
])
console.log(result)
// { csvDetectDelimiters: { delimiterChar: ',', ... }, csvDetectHeader: { header: ['name','age','city'] }, count: 3 }
```
## Add validation
Extend the pipeline with schema validation using JSON Schema:
```javascript
import { pipeline, createReadableStream } from '@datastream/core'
import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream } from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
const schema = {
type: 'object',
required: ['name', 'age', 'city'],
properties: {
name: { type: 'string' },
age: { type: 'number' },
city: { type: 'string' },
},
additionalProperties: false,
}
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
createReadableStream(csvData),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
validateStream({ schema }),
])
console.log(result)
// { csvDetectDelimiters: {...}, csvDetectHeader: {...}, csvErrors: {}, validate: {} }
```
## Add file I/O
Read from and write to files (Node.js):
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvFormatStream } from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream } from '@datastream/compress'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
fileReadStream({ path: './input.csv' }),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
validateStream({ schema }),
csvFormatStream({ header: true }),
gzipCompressStream(),
fileWriteStream({ path: './output.csv.gz' }),
])
```
## Next steps
- Learn about [Core Concepts](/docs/core-concepts) — stream types, pipeline patterns, and error handling
- Browse [Recipes](/docs/recipes) for complete real-world examples
- Explore the [core](/docs/packages/core) package API reference
// File: recipes/+page.md
---
title: Recipes
---
Complete pipeline examples for common use cases.
## CSV ETL — validate and compress
Read a CSV file, detect format, parse, validate, re-format, compress, and write:
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvRemoveEmptyRowsStream,
csvRemoveMalformedRowsStream,
csvCoerceValuesStream,
csvFormatStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream } from '@datastream/compress'
const schema = {
type: 'object',
required: ['id', 'name', 'email'],
properties: {
id: { type: 'number' },
name: { type: 'string', minLength: 1 },
email: { type: 'string', format: 'email' },
},
additionalProperties: false,
}
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
fileReadStream({ path: './input.csv' }),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
csvRemoveEmptyRowsStream(),
csvRemoveMalformedRowsStream({
headers: () => detectHeader.result().value.header,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
csvCoerceValuesStream(),
validateStream({ schema }),
csvFormatStream({ header: true }),
gzipCompressStream(),
fileWriteStream({ path: './output.csv.gz' }),
])
console.log(result)
// { csvDetectDelimiters: {...}, csvDetectHeader: {...}, csvErrors: {}, csvRemoveEmptyRows: {...}, csvRemoveMalformedRows: {...}, validate: {} }
```
## API pagination to DynamoDB
Fetch paginated API data and write to DynamoDB:
```javascript
import { pipeline } from '@datastream/core'
import { fetchReadableStream } from '@datastream/fetch'
import { objectCountStream } from '@datastream/object'
import { awsDynamoDBPutItemStream } from '@datastream/aws'
import { createTransformStream } from '@datastream/core'
const count = objectCountStream()
const result = await pipeline([
fetchReadableStream({
url: 'https://api.example.com/users',
dataPath: 'data',
nextPath: 'pagination.next_url',
}),
createTransformStream((item, enqueue) => {
enqueue({
PK: { S: `USER#${item.id}` },
SK: { S: `PROFILE` },
name: { S: item.name },
email: { S: item.email },
})
}),
count,
awsDynamoDBPutItemStream({ TableName: 'Users' }),
])
console.log(result)
// { count: 450 }
```
## S3 to S3 — transform and re-upload
Read from S3, parse CSV, validate, re-format, and write back:
```javascript
import { pipeline } from '@datastream/core'
import { awsS3GetObjectStream, awsS3PutObjectStream } from '@datastream/aws'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvRemoveMalformedRowsStream,
csvFormatStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'input.csv.gz' }),
gzipDecompressStream(),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
csvRemoveMalformedRowsStream({
headers: () => detectHeader.result().value.header,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
validateStream({ schema }),
csvFormatStream({ header: true }),
gzipCompressStream(),
awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }),
])
```
## Browser file processing
Use the File System Access API to process files in the browser:
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvFormatStream,
} from '@datastream/csv'
import { objectFromEntriesStream, objectCountStream } from '@datastream/object'
const types = [{ accept: { 'text/csv': ['.csv'] } }]
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const count = objectCountStream()
const result = await pipeline([
await fileReadStream({ types }),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
count,
csvFormatStream({ header: true }),
await fileWriteStream({ path: 'output.csv', types }),
])
console.log(result)
// { count: 500 }
```
## Checksum and compress
Calculate a digest while compressing and writing a file:
```javascript
import { pipeline } from '@datastream/core'
import { fileReadStream, fileWriteStream } from '@datastream/file'
import { digestStream } from '@datastream/digest'
import { gzipCompressStream } from '@datastream/compress'
const digest = await digestStream({ algorithm: 'SHA2-256' })
const result = await pipeline([
fileReadStream({ path: './data.csv' }),
digest,
gzipCompressStream(),
fileWriteStream({ path: './data.csv.gz' }),
])
console.log(result)
// { digest: 'SHA2-256:e3b0c44298fc1c14...' }
```