// File: +page.md --- title: Introduction slug: / --- ## What is datastream Datastream is a collection of commonly used **stream patterns** for the **Web Streams API** and **Node.js Streams**. If you're iterating over an array more than once, it's time to use streams. ## A quick example Code is better than 10,000 words, so let's jump into an example. Let's assume you want to read a CSV file, validate the data, and compress it: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvParseStream } from '@datastream/csv' import { validateStream } from '@datastream/validate' import { gzipCompressStream } from '@datastream/compress' const streams = [ createReadableStream(csvData), csvParseStream({ header: true }), validateStream(schema), gzipCompressStream() ] await pipeline(streams) ``` ## Stream types - **Readable**: The start of a pipeline that injects data into a stream. - **PassThrough**: Does not modify the data, but listens and prepares a result that can be retrieved. - **Transform**: Modifies data as it passes through. - **Writable**: The end of a pipeline that stores data from the stream. ## Setup ```bash npm install @datastream/core @datastream/{module} ``` ## Why streams? Streams allow you to process data incrementally, without loading everything into memory at once. This is essential for: - **Large files**: Process gigabytes of data with constant memory usage - **Real-time data**: Handle data as it arrives, not after it's all collected - **Composability**: Chain simple operations into complex data pipelines - **Performance**: Start processing before all data is available Datastream provides ready-made stream patterns so you can focus on your data flow instead of stream plumbing. ## Next steps Ready to dive in? Head to the [Quick Start](/docs/quick-start) guide. // File: core-concepts/+page.md --- title: Core Concepts --- ## Stream types Datastream uses four stream types. Each package export follows a naming convention that tells you the type: | Type | Naming pattern | Purpose | |------|---------------|---------| | **Readable** | `*ReadableStream`, `*ReadStream` | Injects data into a pipeline — files, databases, APIs | | **PassThrough** | `*CountStream`, `*LengthStream`, `*DetectStream` | Observes data without modifying it, collects metrics via `.result()` | | **Transform** | `*ParseStream`, `*FormatStream`, `*CompressStream` | Modifies data as it passes through | | **Writable** | `*WriteStream`, `*PutItemStream` | Consumes data at the end — files, databases, APIs | ## `pipeline()` vs `pipejoin()` ### `pipeline(streams[], streamOptions)` Connects all streams, waits for completion, and returns combined results from all PassThrough streams: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { objectCountStream } from '@datastream/object' import { stringLengthStream } from '@datastream/string' const count = objectCountStream() const length = stringLengthStream() const result = await pipeline([ createReadableStream(['hello', 'world']), count, length, ]) console.log(result) // { count: 2, length: 10 } ``` If the last stream is Readable or Transform (no Writable at the end), `pipeline` automatically appends a no-op Writable so the pipeline completes. ### `pipejoin(streams[])` Connects streams and returns the resulting stream — use this when you want to consume the output manually: ```javascript import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core' const streams = [ createReadableStream([1, 2, 3]), createTransformStream((n, enqueue) => enqueue(n * 2)), ] const river = pipejoin(streams) const output = await streamToArray(river) // [2, 4, 6] ``` ## The `.result()` pattern PassThrough streams collect metrics without modifying data. After the pipeline completes, retrieve results: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvParseStream } from '@datastream/csv' import { objectCountStream } from '@datastream/object' import { digestStream } from '@datastream/digest' const count = objectCountStream() const digest = await digestStream({ algorithm: 'SHA2-256' }) const result = await pipeline([ createReadableStream(data), digest, csvParseStream(), count, ]) console.log(result) // { digest: 'SHA2-256:abc123...', count: 1000 } ``` Each PassThrough stream returns `{ key, value }` from its `.result()` method. `pipeline()` combines them into a single object. You can customize the key with the `resultKey` option: ```javascript const count = objectCountStream({ resultKey: 'rowCount' }) // result: { rowCount: 1000 } ``` ## Stream options All stream factory functions accept a `streamOptions` parameter: | Option | Type | Default | Description | |--------|------|---------|-------------| | `highWaterMark` | `number` | — | Backpressure threshold — how many chunks to buffer before pausing | | `chunkSize` | `number` | `16384` | Size hint for chunking strategies | | `signal` | `AbortSignal` | — | Signal to abort the pipeline | ```javascript const controller = new AbortController() await pipeline([ createReadableStream(data), csvParseStream(), ], { signal: controller.signal }) // Abort from elsewhere: controller.abort() ``` ## Error handling ### AbortSignal Use `AbortController` to cancel pipelines: ```javascript const controller = new AbortController() setTimeout(() => controller.abort(), 5000) // 5s timeout try { await pipeline(streams, { signal: controller.signal }) } catch (e) { if (e.name === 'AbortError') { console.log('Pipeline was cancelled') } } ``` ### Validation errors Streams like `validateStream` and CSV cleaning streams collect errors in `.result()` rather than throwing, so the pipeline continues processing: ```javascript const result = await pipeline([ createReadableStream(data), csvParseStream(), validateStream({ schema }), ]) console.log(result.validate) // { '#/required/name': { id: '#/required/name', keys: ['name'], message: '...', idx: [3, 7] } } ``` Invalid rows are dropped by default. Set `onErrorEnqueue: true` to keep them in the stream. ## Lazy options Many CSV streams accept functions instead of values for options. This lets you wire up detection results that aren't available until runtime: ```javascript const detect = csvDetectDelimitersStream() csvParseStream({ delimiterChar: () => detect.result().value.delimiterChar, }) ``` The function is called when the stream first needs the value, not at creation time. // File: packages/aws/+page.md --- title: aws --- AWS service streams for S3, DynamoDB, Lambda, SNS, and SQS. Node.js only. ## Install ```bash npm install @datastream/aws ``` Requires the corresponding AWS SDK v3 client packages: ```bash npm install @aws-sdk/client-s3 @aws-sdk/lib-storage npm install @aws-sdk/client-dynamodb npm install @aws-sdk/client-lambda npm install @aws-sdk/client-sns npm install @aws-sdk/client-sqs ``` ## S3 ### `awsS3SetClient` Set a custom S3 client. By default, FIPS endpoints are enabled for US and CA regions. ```javascript import { S3Client } from '@aws-sdk/client-s3' import { awsS3SetClient } from '@datastream/aws' awsS3SetClient(new S3Client({ region: 'eu-west-1' })) ``` ### `awsS3GetObjectStream` Readable, async Downloads an object from S3 as a stream. #### Options Accepts all `GetObjectCommand` parameters plus: | Option | Type | Default | Description | |--------|------|---------|-------------| | `client` | `S3Client` | default client | Custom S3 client for this call | | `Bucket` | `string` | — | S3 bucket name | | `Key` | `string` | — | S3 object key | #### Example ```javascript import { pipeline } from '@datastream/core' import { awsS3GetObjectStream } from '@datastream/aws' import { csvParseStream } from '@datastream/csv' await pipeline([ await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'data.csv' }), csvParseStream(), ]) ``` ### `awsS3PutObjectStream` PassThrough Uploads data to S3 using multipart upload. #### Options Accepts all S3 PutObject parameters plus: | Option | Type | Default | Description | |--------|------|---------|-------------| | `client` | `S3Client` | default client | Custom S3 client | | `Bucket` | `string` | — | S3 bucket name | | `Key` | `string` | — | S3 object key | | `onProgress` | `function` | — | Upload progress callback | | `tags` | `object` | — | S3 object tags | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsS3PutObjectStream } from '@datastream/aws' import { gzipCompressStream } from '@datastream/compress' await pipeline([ createReadableStream(data), gzipCompressStream(), awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }), ]) ``` ### `awsS3ChecksumStream` PassThrough Computes a multi-part S3 checksum while data passes through. Designed for pre-signed URL uploads in the browser. #### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `ChecksumAlgorithm` | `string` | `"SHA256"` | `"SHA1"` or `"SHA256"` | | `partSize` | `number` | `17179870` | Part size in bytes | | `resultKey` | `string` | `"s3"` | Key in pipeline result | #### Result ```javascript { checksum: 'base64hash-3', checksums: ['part1hash', 'part2hash', 'part3hash'], partSize: 17179870 } ``` ## DynamoDB ### `awsDynamoDBSetClient` ```javascript import { DynamoDBClient } from '@aws-sdk/client-dynamodb' import { awsDynamoDBSetClient } from '@datastream/aws' awsDynamoDBSetClient(new DynamoDBClient({ region: 'us-east-1' })) ``` ### `awsDynamoDBQueryStream` Readable, async Queries a DynamoDB table and auto-paginates through all results. #### Options Accepts all `QueryCommand` parameters: | Option | Type | Description | |--------|------|-------------| | `TableName` | `string` | DynamoDB table name | | `KeyConditionExpression` | `string` | Query key condition | | `ExpressionAttributeValues` | `object` | Expression values | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsDynamoDBQueryStream } from '@datastream/aws' await pipeline([ createReadableStream(await awsDynamoDBQueryStream({ TableName: 'Users', KeyConditionExpression: 'PK = :pk', ExpressionAttributeValues: { ':pk': { S: 'USER#123' } }, })), ]) ``` ### `awsDynamoDBScanStream` Readable, async Scans an entire DynamoDB table with automatic pagination. ```javascript import { awsDynamoDBScanStream } from '@datastream/aws' const items = await awsDynamoDBScanStream({ TableName: 'Users' }) ``` ### `awsDynamoDBGetItemStream` Readable, async Batch gets items by keys. Automatically retries unprocessed keys with exponential backoff. #### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `TableName` | `string` | — | DynamoDB table name | | `Keys` | `object[]` | — | Array of key objects | | `retryMaxCount` | `number` | `10` | Maximum retry attempts | ### `awsDynamoDBPutItemStream` Writable Writes items to DynamoDB using `BatchWriteItem`. Automatically batches 25 items per request and retries unprocessed items with exponential backoff. #### Options | Option | Type | Description | |--------|------|-------------| | `TableName` | `string` | DynamoDB table name | | `retryMaxCount` | `number` | Maximum retry attempts (default 10) | #### Example ```javascript import { pipeline, createReadableStream, createTransformStream } from '@datastream/core' import { awsDynamoDBPutItemStream } from '@datastream/aws' await pipeline([ createReadableStream(items), createTransformStream((item, enqueue) => { enqueue({ PK: { S: `USER#${item.id}` }, SK: { S: 'PROFILE' }, name: { S: item.name }, }) }), awsDynamoDBPutItemStream({ TableName: 'Users' }), ]) ``` ### `awsDynamoDBDeleteItemStream` Writable Deletes items from DynamoDB using `BatchWriteItem`. Batches 25 items per request. #### Options | Option | Type | Description | |--------|------|-------------| | `TableName` | `string` | DynamoDB table name | | `retryMaxCount` | `number` | Maximum retry attempts (default 10) | #### Example ```javascript import { awsDynamoDBDeleteItemStream } from '@datastream/aws' awsDynamoDBDeleteItemStream({ TableName: 'Users' }) // Input chunks: { PK: { S: 'USER#1' }, SK: { S: 'PROFILE' } } ``` ## Lambda ### `awsLambdaSetClient` ```javascript import { LambdaClient } from '@aws-sdk/client-lambda' import { awsLambdaSetClient } from '@datastream/aws' awsLambdaSetClient(new LambdaClient({ region: 'us-east-1' })) ``` ### `awsLambdaReadableStream` Readable Invokes a Lambda function with response streaming (`InvokeWithResponseStream`). Also exported as `awsLambdaResponseStream`. #### Options Accepts `InvokeWithResponseStreamCommand` parameters. Pass an array to invoke multiple functions sequentially. | Option | Type | Description | |--------|------|-------------| | `FunctionName` | `string` | Lambda function name or ARN | | `Payload` | `string` | JSON payload | #### Example ```javascript import { pipeline } from '@datastream/core' import { awsLambdaReadableStream } from '@datastream/aws' import { csvParseStream } from '@datastream/csv' await pipeline([ awsLambdaReadableStream({ FunctionName: 'data-processor', Payload: JSON.stringify({ key: 'input.csv' }), }), csvParseStream(), ]) ``` ## SNS ### `awsSNSSetClient` ```javascript import { SNSClient } from '@aws-sdk/client-sns' import { awsSNSSetClient } from '@datastream/aws' awsSNSSetClient(new SNSClient({ region: 'us-east-1' })) ``` ### `awsSNSPublishMessageStream` Writable Publishes messages to an SNS topic. Batches 10 messages per `PublishBatchCommand`. #### Options | Option | Type | Description | |--------|------|-------------| | `TopicArn` | `string` | SNS topic ARN | #### Example ```javascript import { pipeline, createReadableStream, createTransformStream } from '@datastream/core' import { awsSNSPublishMessageStream } from '@datastream/aws' await pipeline([ createReadableStream(events), createTransformStream((event, enqueue) => { enqueue({ Id: event.id, Message: JSON.stringify(event), }) }), awsSNSPublishMessageStream({ TopicArn: 'arn:aws:sns:us-east-1:123:my-topic' }), ]) ``` ## SQS ### `awsSQSSetClient` ```javascript import { SQSClient } from '@aws-sdk/client-sqs' import { awsSQSSetClient } from '@datastream/aws' awsSQSSetClient(new SQSClient({ region: 'us-east-1' })) ``` ### `awsSQSReceiveMessageStream` Readable, async Polls an SQS queue and yields messages until the queue is empty. #### Options Accepts `ReceiveMessageCommand` parameters: | Option | Type | Description | |--------|------|-------------| | `QueueUrl` | `string` | SQS queue URL | | `MaxNumberOfMessages` | `number` | Max messages per poll (1-10) | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { awsSQSReceiveMessageStream, awsSQSDeleteMessageStream } from '@datastream/aws' await pipeline([ createReadableStream(await awsSQSReceiveMessageStream({ QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue', })), awsSQSDeleteMessageStream({ QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue', }), ]) ``` ### `awsSQSSendMessageStream` Writable Sends messages to an SQS queue. Batches 10 messages per `SendMessageBatchCommand`. #### Options | Option | Type | Description | |--------|------|-------------| | `QueueUrl` | `string` | SQS queue URL | ### `awsSQSDeleteMessageStream` Writable Deletes messages from an SQS queue. Batches 10 messages per `DeleteMessageBatchCommand`. #### Options | Option | Type | Description | |--------|------|-------------| | `QueueUrl` | `string` | SQS queue URL | // File: packages/base64/+page.md --- title: base64 --- Base64 encoding and decoding streams. ## Install ```bash npm install @datastream/base64 ``` ## `base64EncodeStream` Transform Encodes data to base64. Handles chunk boundaries correctly by buffering partial 3-byte groups. ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { base64EncodeStream } from '@datastream/base64' await pipeline([ createReadableStream('Hello, World!'), base64EncodeStream(), ]) ``` ## `base64DecodeStream` Transform Decodes base64 data back to its original form. Handles chunk boundaries by buffering partial 4-character groups. ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { base64DecodeStream } from '@datastream/base64' await pipeline([ createReadableStream('SGVsbG8sIFdvcmxkIQ=='), base64DecodeStream(), ]) ``` // File: packages/charset/+page.md --- title: charset --- Character set detection, decoding, and encoding streams. ## Install ```bash npm install @datastream/charset ``` ## `charsetDetectStream` PassThrough Detects the character encoding of the data passing through by analyzing byte patterns. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `resultKey` | `string` | `"charset"` | Key in pipeline result | ### Result Returns the most likely charset with confidence score: ```javascript { charset: 'UTF-8', confidence: 80 } ``` ### Supported charsets UTF-8, UTF-16BE, UTF-16LE, UTF-32BE, UTF-32LE, Shift_JIS, ISO-2022-JP, ISO-2022-CN, ISO-2022-KR, GB18030, EUC-JP, EUC-KR, Big5, ISO-8859-1, ISO-8859-2, ISO-8859-5, ISO-8859-6, ISO-8859-7, ISO-8859-8, ISO-8859-9, windows-1250, windows-1251, windows-1252, windows-1254, windows-1256, KOI8-R ### Example ```javascript import { pipeline } from '@datastream/core' import { fileReadStream } from '@datastream/file' import { charsetDetectStream, charsetDecodeStream } from '@datastream/charset' const detect = charsetDetectStream() const result = await pipeline([ fileReadStream({ path: './data.csv' }), detect, ]) console.log(result.charset) // { charset: 'UTF-8', confidence: 80 } ``` ## `charsetDecodeStream` Transform Decodes binary data to text using the specified character encoding. Uses `TextDecoderStream` internally. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `charset` | `string` | — | Character encoding name (e.g. `"UTF-8"`, `"ISO-8859-1"`) | ### Example ```javascript import { charsetDecodeStream } from '@datastream/charset' charsetDecodeStream({ charset: 'ISO-8859-1' }) ``` ## `charsetEncodeStream` Transform Encodes text to binary using the specified character encoding. Uses `TextEncoderStream` internally. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `charset` | `string` | — | Character encoding name | ### Example ```javascript import { charsetEncodeStream } from '@datastream/charset' charsetEncodeStream({ charset: 'UTF-8' }) ``` // File: packages/compress/+page.md --- title: compress --- Compression and decompression streams for gzip, deflate, brotli, and zstd. ## Install ```bash npm install @datastream/compress ``` ## gzip ### `gzipCompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `quality` | `number` | `-1` | Compression level (-1 to 9). -1 = default, 0 = none, 9 = best | ### `gzipDecompressStream` Transform No options required. ### Example ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress' // Compress await pipeline([ fileReadStream({ path: './data.csv' }), gzipCompressStream({ quality: 9 }), fileWriteStream({ path: './data.csv.gz' }), ]) // Decompress await pipeline([ fileReadStream({ path: './data.csv.gz' }), gzipDecompressStream(), fileWriteStream({ path: './data.csv' }), ]) ``` ## deflate ### `deflateCompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `quality` | `number` | `-1` | Compression level (-1 to 9) | ### `deflateDecompressStream` Transform No options required. ## brotli ### `brotliCompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `quality` | `number` | `11` | Compression level (0 to 11) | ### `brotliDecompressStream` Transform No options required. ## zstd Node.js only Requires Node.js with zstd support. ### `zstdCompressStream` Transform | Option | Type | Default | Description | |--------|------|---------|-------------| | `quality` | `number` | `3` | Compression level | ### `zstdDecompressStream` Transform No options required. ## Platform support | Algorithm | Node.js | Browser | |-----------|---------|---------| | gzip | `node:zlib` | `CompressionStream` | | deflate | `node:zlib` | `CompressionStream` | | brotli | `node:zlib` | `CompressionStream` | | zstd | `node:zlib` | Not supported | // File: packages/core/+page.md --- title: core --- Foundation package providing pipeline orchestration, stream factories, and utility functions. ## Install ```bash npm install @datastream/core ``` ## Pipeline ### `pipeline(streams, streamOptions)` async Connects all streams, waits for completion, and returns combined `.result()` values from all PassThrough streams. Automatically appends a no-op Writable if the last stream is Readable. #### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `highWaterMark` | `number` | — | Backpressure threshold | | `chunkSize` | `number` | — | Size hint for chunking | | `signal` | `AbortSignal` | — | Abort the pipeline | #### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { objectCountStream } from '@datastream/object' const count = objectCountStream() const result = await pipeline([ createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]), count, ]) console.log(result) // { count: 3 } ``` ### `pipejoin(streams)` returns stream Connects streams and returns the resulting stream. Use this when you want to consume output manually with `streamToArray`, `streamToString`, or `for await`. #### Example ```javascript import { pipejoin, streamToArray, createReadableStream, createTransformStream } from '@datastream/core' const river = pipejoin([ createReadableStream([1, 2, 3]), createTransformStream((n, enqueue) => enqueue(n * 2)), ]) const output = await streamToArray(river) // [2, 4, 6] ``` ### `result(streams)` async Iterates over streams and combines all `.result()` return values into a single object. Called automatically by `pipeline()`. ## Consumers ### `streamToArray(stream)` async Collects all chunks from a stream into an array. ```javascript import { pipejoin, streamToArray, createReadableStream } from '@datastream/core' const river = pipejoin([createReadableStream(['a', 'b', 'c'])]) const output = await streamToArray(river) // ['a', 'b', 'c'] ``` ### `streamToString(stream)` async Concatenates all chunks into a single string. ```javascript const output = await streamToString(river) // 'abc' ``` ### `streamToObject(stream)` async Merges all chunks into a single object using `Object.assign`. ```javascript const river = pipejoin([createReadableStream([{ a: 1 }, { b: 2 }])]) const output = await streamToObject(river) // { a: 1, b: 2 } ``` ### `streamToBuffer(stream)` async, Node.js only Collects all chunks into a `Buffer`. ## Stream Factories ### `createReadableStream(input, streamOptions)` Readable Creates a Readable stream from various input types. #### Input types | Type | Behavior | |------|----------| | `string` | Chunked at `chunkSize` (default 16KB) | | `Array` | Each element emitted as a chunk | | `AsyncIterable` / `Iterable` | Each yielded value emitted as a chunk | | `ArrayBuffer` | Chunked at `chunkSize` (Node.js only) | #### Example ```javascript import { createReadableStream } from '@datastream/core' // From string — auto-chunked const stream = createReadableStream('hello world') // From array — one chunk per element const stream = createReadableStream([{ a: 1 }, { a: 2 }]) // From async generator async function* generate() { yield 'chunk1' yield 'chunk2' } const stream = createReadableStream(generate()) ``` ### `createPassThroughStream(fn, flush?, streamOptions)` Transform (PassThrough) Creates a stream that observes each chunk without modifying it. The chunk is automatically passed through. #### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `fn` | `(chunk) => void` | Called for each chunk, return value ignored | | `flush` | `() => void` | Optional, called when stream ends | | `streamOptions` | `object` | Stream configuration | #### Example ```javascript import { createPassThroughStream } from '@datastream/core' let total = 0 const counter = createPassThroughStream((chunk) => { total += chunk.length }) counter.result = () => ({ key: 'total', value: total }) ``` ### `createTransformStream(fn, flush?, streamOptions)` Transform Creates a stream that modifies chunks. Use `enqueue` to emit output — you can emit zero, one, or many chunks per input. #### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `fn` | `(chunk, enqueue) => void` | Transform each chunk, call `enqueue(output)` to emit | | `flush` | `(enqueue) => void` | Optional, emit final chunks when stream ends | | `streamOptions` | `object` | Stream configuration | #### Example ```javascript import { createTransformStream } from '@datastream/core' // Filter: emit only matching chunks const filter = createTransformStream((chunk, enqueue) => { if (chunk.age > 18) enqueue(chunk) }) // Expand: emit multiple chunks per input const expand = createTransformStream((chunk, enqueue) => { for (const item of chunk.items) { enqueue(item) } }) ``` ### `createWritableStream(fn, close?, streamOptions)` Writable Creates a stream that consumes chunks at the end of a pipeline. #### Parameters | Parameter | Type | Description | |-----------|------|-------------| | `fn` | `(chunk) => void` | Called for each chunk | | `close` | `() => void` | Optional, called when stream ends | | `streamOptions` | `object` | Stream configuration | #### Example ```javascript import { createWritableStream } from '@datastream/core' const rows = [] const collector = createWritableStream((chunk) => { rows.push(chunk) }) ``` ## Utilities ### `isReadable(stream)` Returns `true` if the stream is Readable. ### `isWritable(stream)` Returns `true` if the stream is Writable. ### `makeOptions(options)` Normalizes stream options for interoperability between Readable, Transform, and Writable streams. | Option | Type | Description | |--------|------|-------------| | `highWaterMark` | `number` | Backpressure threshold | | `chunkSize` | `number` | Chunking size hint | | `signal` | `AbortSignal` | Abort signal | ### `timeout(ms, options)` async Returns a promise that resolves after `ms` milliseconds. Supports `AbortSignal` cancellation. ```javascript import { timeout } from '@datastream/core' await timeout(1000) // wait 1 second const controller = new AbortController() await timeout(5000, { signal: controller.signal }) // cancellable ``` ### `backpressureGuage(streams)` Node.js only Measures pause/resume timing across streams. Useful for identifying bottlenecks. ```javascript import { backpressureGuage } from '@datastream/core' const metrics = backpressureGuage({ parse: parseStream, validate: validateStream }) // After pipeline completes: // metrics.parse.total = { timestamp, duration } // metrics.parse.timeline = [{ timestamp, duration }, ...] ``` // File: packages/csv/+page.md --- title: csv --- Parse, format, detect, clean, and coerce CSV data. ## Install ```bash npm install @datastream/csv ``` ## `csvDetectDelimitersStream` PassThrough Auto-detects the delimiter, newline, quote, and escape characters from the first chunk of data. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `1024` | Minimum bytes to buffer before detecting | | `resultKey` | `string` | `"csvDetectDelimiters"` | Key in pipeline result | ### Result ```javascript { delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' } ``` ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvDetectDelimitersStream } from '@datastream/csv' const detect = csvDetectDelimitersStream() const result = await pipeline([ createReadableStream('name\tage\r\nAlice\t30'), detect, ]) console.log(result.csvDetectDelimiters) // { delimiterChar: '\t', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' } ``` ## `csvDetectHeaderStream` Transform Detects and strips the header row. Outputs data rows only (without the header). ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `1024` | Minimum bytes to buffer before detecting | | `delimiterChar` | `string \| () => string` | `","` | Delimiter character or lazy function | | `newlineChar` | `string \| () => string` | `"\r\n"` | Newline character or lazy function | | `quoteChar` | `string \| () => string` | `'"'` | Quote character or lazy function | | `escapeChar` | `string \| () => string` | quoteChar | Escape character or lazy function | | `parser` | `function` | `csvQuotedParser` | Custom parser function | | `resultKey` | `string` | `"csvDetectHeader"` | Key in pipeline result | ### Result ```javascript { header: ['name', 'age', 'city'] } ``` ### Example ```javascript import { csvDetectDelimitersStream, csvDetectHeaderStream } from '@datastream/csv' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) ``` ## `csvParseStream` Transform Parses CSV text into arrays of field values (string arrays). Each output chunk is one row as `string[]`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `2097152` (2MB) | Input buffer size before first parse | | `delimiterChar` | `string \| () => string` | `","` | Delimiter character or lazy function | | `newlineChar` | `string \| () => string` | `"\r\n"` | Newline character or lazy function | | `quoteChar` | `string \| () => string` | `'"'` | Quote character or lazy function | | `escapeChar` | `string \| () => string` | quoteChar | Escape character or lazy function | | `parser` | `function` | `csvQuotedParser` | Custom parser function | | `resultKey` | `string` | `"csvErrors"` | Key in pipeline result | ### Result Parse errors collected during processing: ```javascript { UnterminatedQuote: { id: 'UnterminatedQuote', message: 'Unterminated quoted field', idx: [5] } } ``` ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvParseStream } from '@datastream/csv' const result = await pipeline([ createReadableStream('a,b,c\r\n1,2,3\r\n4,5,6'), csvParseStream(), ]) // Chunks emitted: ['a','b','c'], ['1','2','3'], ['4','5','6'] ``` ## `csvFormatStream` Transform Formats objects or arrays back to CSV text. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `header` | `boolean \| string[]` | — | `true` to auto-detect from object keys, or provide array of column names | | `delimiterChar` | `string` | `","` | Delimiter character | | `newlineChar` | `string` | `"\r\n"` | Newline character | | `quoteChar` | `string` | `'"'` | Quote character | | `escapeChar` | `string` | quoteChar | Escape character | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvFormatStream } from '@datastream/csv' await pipeline([ createReadableStream([ { name: 'Alice', age: 30 }, { name: 'Bob', age: 25 }, ]), csvFormatStream({ header: true }), ]) // Output: "name","age"\r\n"Alice","30"\r\n"Bob","25"\r\n ``` ## `csvRemoveEmptyRowsStream` Transform Removes rows where all fields are empty strings. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `onErrorEnqueue` | `boolean` | `false` | If `true`, empty rows are kept in stream (but still tracked) | | `resultKey` | `string` | `"csvRemoveEmptyRows"` | Key in pipeline result | ### Result ```javascript { EmptyRow: { id: 'EmptyRow', message: 'Row is empty', idx: [3, 7] } } ``` ## `csvRemoveMalformedRowsStream` Transform Removes rows with an incorrect number of fields compared to the first row or the provided header. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `headers` | `string[] \| () => string[]` | — | Expected header array, or lazy function. If not provided, uses first row's field count | | `onErrorEnqueue` | `boolean` | `false` | If `true`, malformed rows are kept in stream | | `resultKey` | `string` | `"csvRemoveMalformedRows"` | Key in pipeline result | ### Result ```javascript { MalformedRow: { id: 'MalformedRow', message: 'Row has incorrect number of fields', idx: [2] } } ``` ## `csvCoerceValuesStream` Transform Converts string field values to typed JavaScript values. Works on objects (use after `objectFromEntriesStream`). ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `columns` | `object` | — | Map of column names to types. Without this, auto-coercion is used | | `resultKey` | `string` | `"csvCoerceValues"` | Key in pipeline result | ### Auto-coercion rules | Input | Output | |-------|--------| | `""` | `null` | | `"true"` / `"false"` | `true` / `false` | | Numeric strings | `Number` | | ISO 8601 date strings | `Date` | | JSON strings (`{...}`, `[...]`) | Parsed object/array | ### Column types When specifying `columns`, valid types are: `"number"`, `"boolean"`, `"null"`, `"date"`, `"json"`. ```javascript csvCoerceValuesStream({ columns: { age: 'number', active: 'boolean', birthday: 'date' } }) ``` ## `csvQuotedParser` / `csvUnquotedParser` Standalone parser functions for use outside of streams. `csvUnquotedParser` is faster but does not handle quoted fields. ```javascript import { csvQuotedParser } from '@datastream/csv' const { rows, tail, numCols, idx, errors } = csvQuotedParser( 'a,b,c\r\n1,2,3\r\n', { delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"' }, true ) // rows: [['a','b','c'], ['1','2','3']] ``` ## Full pipeline example Detect, parse, clean, coerce, and validate: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvRemoveEmptyRowsStream, csvRemoveMalformedRowsStream, csvCoerceValuesStream, } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ createReadableStream(csvData), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), csvRemoveEmptyRowsStream(), csvRemoveMalformedRowsStream({ headers: () => detectHeader.result().value.header, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), csvCoerceValuesStream(), validateStream({ schema }), ]) ``` // File: packages/digest/+page.md --- title: digest --- Compute cryptographic hash digests while streaming data. ## Install ```bash npm install @datastream/digest ``` ## `digestStream` PassThrough, async (browser) Computes a hash digest of all data passing through. The stream is async in the browser (returns a Promise) because it uses `hash-wasm`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `algorithm` | `string` | — | Hash algorithm (see table below) | | `resultKey` | `string` | `"digest"` | Key in pipeline result | ### Supported algorithms | Algorithm | Node.js | Browser | |-----------|---------|---------| | `SHA2-256` | `node:crypto` | `hash-wasm` | | `SHA2-384` | `node:crypto` | `hash-wasm` | | `SHA2-512` | `node:crypto` | `hash-wasm` | | `SHA3-256` | — | `hash-wasm` | | `SHA3-384` | — | `hash-wasm` | | `SHA3-512` | — | `hash-wasm` | ### Result ```javascript 'SHA2-256:e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' ``` ### Example ```javascript import { pipeline } from '@datastream/core' import { fileReadStream } from '@datastream/file' import { digestStream } from '@datastream/digest' // Node.js — synchronous const digest = digestStream({ algorithm: 'SHA2-256' }) const result = await pipeline([ fileReadStream({ path: './data.csv' }), digest, ]) console.log(result) // { digest: 'SHA2-256:e3b0c4429...' } ``` ```javascript // Browser — async, must await const digest = await digestStream({ algorithm: 'SHA2-256' }) ``` // File: packages/fetch/+page.md --- title: fetch --- HTTP client streams with automatic pagination, rate limiting, and 429 retry. ## Install ```bash npm install @datastream/fetch ``` ## `fetchSetDefaults` Set global defaults for all fetch streams. ### Defaults | Option | Type | Default | Description | |--------|------|---------|-------------| | `method` | `string` | `"GET"` | HTTP method | | `headers` | `object` | `{ Accept: 'application/json', 'Accept-Encoding': 'br, gzip, deflate' }` | Request headers | | `rateLimit` | `number` | `0.01` | Minimum seconds between requests (0.01 = 100/sec) | | `dataPath` | `string` | — | Dot-path to data array in JSON response body | | `nextPath` | `string` | — | Dot-path to next page URL in JSON response body | | `qs` | `object` | `{}` | Default query string parameters | | `offsetParam` | `string` | — | Query parameter name for offset pagination | | `offsetAmount` | `number` | — | Increment per page for offset pagination | ### Example ```javascript import { fetchSetDefaults } from '@datastream/fetch' fetchSetDefaults({ headers: { Authorization: 'Bearer token123' }, rateLimit: 0.1, // 10 requests/sec }) ``` ## `fetchReadableStream` Readable Fetches data from one or more URLs and emits chunks. Automatically detects JSON responses and handles pagination. Also exported as `fetchResponseStream`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `url` | `string` | — | Request URL | | `method` | `string` | `"GET"` | HTTP method | | `headers` | `object` | — | Request headers (merged with defaults) | | `rateLimit` | `number` | `0.01` | Seconds between requests | | `dataPath` | `string` | — | Dot-path to data in JSON body | | `nextPath` | `string` | — | Dot-path to next URL in JSON body | | `qs` | `object` | `{}` | Query string parameters | | `offsetParam` | `string` | — | Query param for offset-based pagination | | `offsetAmount` | `number` | — | Offset increment per page | Pass an array of option objects to fetch from multiple URLs in sequence. ### Pagination strategies **Link header** — automatically followed when present: ``` Link: ; rel="next" ``` **Body path** — use `nextPath` to extract the next URL from the JSON response: ```javascript fetchReadableStream({ url: 'https://api.example.com/users', dataPath: 'data', nextPath: 'pagination.next_url', }) ``` **Offset** — use `offsetParam` and `offsetAmount` for numeric pagination: ```javascript fetchReadableStream({ url: 'https://api.example.com/users', dataPath: 'results', offsetParam: 'offset', offsetAmount: 100, }) ``` ### 429 retry When receiving a `429 Too Many Requests` response, the request is automatically retried after respecting the rate limit delay. ### Example ```javascript import { pipeline } from '@datastream/core' import { fetchReadableStream } from '@datastream/fetch' import { objectCountStream } from '@datastream/object' const count = objectCountStream() const result = await pipeline([ fetchReadableStream({ url: 'https://api.example.com/users', dataPath: 'data', nextPath: 'meta.next', headers: { Authorization: 'Bearer token' }, }), count, ]) console.log(result) // { count: 450 } ``` ### Multiple URLs ```javascript fetchReadableStream([ { url: 'https://api.example.com/users?status=active', dataPath: 'data' }, { url: 'https://api.example.com/users?status=inactive', dataPath: 'data' }, ]) ``` ## `fetchWritableStream` Writable, async Streams data as the body of an HTTP request. Uses `duplex: "half"` for browser compatibility. Also exported as `fetchRequestStream`. ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { fetchWritableStream } from '@datastream/fetch' import { csvFormatStream } from '@datastream/csv' await pipeline([ createReadableStream(data), csvFormatStream({ header: true }), await fetchWritableStream({ url: 'https://api.example.com/upload', method: 'PUT', headers: { 'Content-Type': 'text/csv' }, }), ]) ``` // File: packages/file/+page.md --- title: file --- File read and write streams for Node.js and the browser. ## Install ```bash npm install @datastream/file ``` ## `fileReadStream` Readable Reads a file as a stream. - **Node.js**: Uses `fs.createReadStream` - **Browser**: Uses `window.showOpenFilePicker` (File System Access API) ### Options | Option | Type | Description | |--------|------|-------------| | `path` | `string` | File path (Node.js) | | `types` | `object[]` | File type filter for the file picker (see below) | ### Example — Node.js ```javascript import { pipeline } from '@datastream/core' import { fileReadStream } from '@datastream/file' await pipeline([ fileReadStream({ path: './data.csv' }), ]) ``` ### Example — Browser ```javascript import { fileReadStream } from '@datastream/file' const stream = await fileReadStream({ types: [{ accept: { 'text/csv': ['.csv'] } }], }) ``` ## `fileWriteStream` Writable Writes a stream to a file. - **Node.js**: Uses `fs.createWriteStream` - **Browser**: Uses `window.showSaveFilePicker` (File System Access API) ### Options | Option | Type | Description | |--------|------|-------------| | `path` | `string` | File path (Node.js), suggested file name (Browser) | | `types` | `object[]` | File type filter | ### Example — Node.js ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { fileWriteStream } from '@datastream/file' await pipeline([ createReadableStream('hello world'), fileWriteStream({ path: './output.txt' }), ]) ``` ### Example — Browser ```javascript import { fileWriteStream } from '@datastream/file' const writable = await fileWriteStream({ path: 'output.csv', types: [{ accept: { 'text/csv': ['.csv'] } }], }) ``` ## File type filtering The `types` option validates file extensions (Node.js) and configures the file picker dialog (Browser): ```javascript const types = [ { accept: { 'text/csv': ['.csv'], 'application/json': ['.json'], }, }, ] ``` On Node.js, if `types` is provided and the file extension doesn't match, an `"Invalid extension"` error is thrown. // File: packages/indexeddb/+page.md --- title: indexeddb --- IndexedDB read and write streams for the browser. ## Install ```bash npm install @datastream/indexeddb ``` ## `indexedDBConnect` Opens (or creates) an IndexedDB database. Re-exported from the [idb](https://www.npmjs.com/package/idb) library. ```javascript import { indexedDBConnect } from '@datastream/indexeddb' const db = await indexedDBConnect('my-database', 1, { upgrade(db) { db.createObjectStore('records', { keyPath: 'id' }) }, }) ``` ## `indexedDBReadStream` Readable, async Reads records from an IndexedDB object store as a stream. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `db` | `IDBDatabase` | — | Database connection from `indexedDBConnect` | | `store` | `string` | — | Object store name | | `index` | `string` | — | Optional index name | | `key` | `IDBKeyRange` | — | Optional key range filter | ### Example ```javascript import { pipeline } from '@datastream/core' import { indexedDBConnect, indexedDBReadStream } from '@datastream/indexeddb' import { objectCountStream } from '@datastream/object' const db = await indexedDBConnect('my-database', 1) const count = objectCountStream() const result = await pipeline([ await indexedDBReadStream({ db, store: 'records' }), count, ]) console.log(result) // { count: 100 } ``` ## `indexedDBWriteStream` Writable, async Writes records to an IndexedDB object store. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `db` | `IDBDatabase` | — | Database connection from `indexedDBConnect` | | `store` | `string` | — | Object store name | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { indexedDBConnect, indexedDBWriteStream } from '@datastream/indexeddb' const db = await indexedDBConnect('my-database', 1) await pipeline([ createReadableStream([ { id: 1, name: 'Alice' }, { id: 2, name: 'Bob' }, ]), await indexedDBWriteStream({ db, store: 'records' }), ]) ``` // File: packages/ipfs/+page.md --- title: ipfs --- IPFS get and add streams. ## Install ```bash npm install @datastream/ipfs ``` ## `ipfsGetStream` Readable, async Retrieves content from IPFS by CID. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `node` | `IPFS` | — | IPFS node instance | | `cid` | `string` | — | Content identifier | ### Example ```javascript import { pipeline } from '@datastream/core' import { ipfsGetStream } from '@datastream/ipfs' await pipeline([ await ipfsGetStream({ node: ipfsNode, cid: 'QmHash...' }), ]) ``` ## `ipfsAddStream` PassThrough, async Adds content to IPFS and returns the CID in `.result()`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `node` | `IPFS` | — | IPFS node instance | | `resultKey` | `string` | `"cid"` | Key in pipeline result | ### Result The CID of the stored content. ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { ipfsAddStream } from '@datastream/ipfs' const result = await pipeline([ createReadableStream('Hello IPFS!'), await ipfsAddStream({ node: ipfsNode }), ]) console.log(result) // { cid: 'QmHash...' } ``` // File: packages/object/+page.md --- title: object --- Transform, reshape, filter, and aggregate object streams. ## Install ```bash npm install @datastream/object ``` ## `objectReadableStream` Readable Creates a Readable stream from an array of objects. ```javascript import { objectReadableStream } from '@datastream/object' const stream = objectReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]) ``` ## `objectCountStream` PassThrough Counts the number of chunks that pass through. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `resultKey` | `string` | `"count"` | Key in pipeline result | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { objectCountStream } from '@datastream/object' const count = objectCountStream() const result = await pipeline([ createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]), count, ]) console.log(result) // { count: 3 } ``` ## `objectFromEntriesStream` Transform Converts arrays to objects using column names. Commonly used with `csvParseStream` output. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[] \| () => string[]` | — | Column names, or lazy function | ### Example ```javascript import { objectFromEntriesStream } from '@datastream/object' // Converts ['Alice', '30', 'Toronto'] → { name: 'Alice', age: '30', city: 'Toronto' } objectFromEntriesStream({ keys: ['name', 'age', 'city'] }) // With lazy keys from csvDetectHeaderStream objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }) ``` ## `objectPickStream` Transform Keeps only the specified keys on each object. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Keys to keep | ### Example ```javascript import { objectPickStream } from '@datastream/object' // { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice', age: 30 } objectPickStream({ keys: ['name', 'age'] }) ``` ## `objectOmitStream` Transform Removes the specified keys from each object. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Keys to remove | ### Example ```javascript import { objectOmitStream } from '@datastream/object' // { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice' } objectOmitStream({ keys: ['age', 'city'] }) ``` ## `objectKeyMapStream` Transform Renames keys on each object. Unmapped keys are kept as-is. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `object` | — | Map of `{ oldKey: 'newKey' }` | ### Example ```javascript import { objectKeyMapStream } from '@datastream/object' // { firstName: 'Alice', lastName: 'Smith' } → { first_name: 'Alice', last_name: 'Smith' } objectKeyMapStream({ keys: { firstName: 'first_name', lastName: 'last_name' } }) ``` ## `objectValueMapStream` Transform Maps values for a specific key using a lookup table. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `key` | `string` | — | Key whose value to map | | `values` | `object` | — | Map of `{ oldValue: newValue }` | ### Example ```javascript import { objectValueMapStream } from '@datastream/object' // { status: 'A' } → { status: 'Active' } objectValueMapStream({ key: 'status', values: { A: 'Active', I: 'Inactive' } }) ``` ## `objectKeyJoinStream` Transform Joins multiple keys into new combined keys. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `object` | — | Map of `{ newKey: ['key1', 'key2'] }` | | `separator` | `string` | — | Separator for joined values | ### Example ```javascript import { objectKeyJoinStream } from '@datastream/object' // { first: 'Alice', last: 'Smith', age: 30 } → { name: 'Alice Smith', age: 30 } objectKeyJoinStream({ keys: { name: ['first', 'last'] }, separator: ' ' }) ``` ## `objectKeyValueStream` Transform Creates a key-value pair object from two fields. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `key` | `string` | — | Field to use as the key | | `value` | `string` | — | Field to use as the value | ### Example ```javascript import { objectKeyValueStream } from '@datastream/object' // { code: 'CA', country: 'Canada' } → { CA: 'Canada' } objectKeyValueStream({ key: 'code', value: 'country' }) ``` ## `objectKeyValuesStream` Transform Creates a key-values pair object — the key comes from a field, the value is a subset of the object. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `key` | `string` | — | Field to use as the key | | `values` | `string[]` | — | Fields to include in value object. If omitted, entire object is used | ### Example ```javascript import { objectKeyValuesStream } from '@datastream/object' // { id: 'u1', name: 'Alice', age: 30 } → { u1: { name: 'Alice', age: 30 } } objectKeyValuesStream({ key: 'id', values: ['name', 'age'] }) ``` ## `objectBatchStream` Transform Groups consecutive objects with the same key values into arrays. Use with `objectPivotLongToWideStream`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Keys to group by | ### Example ```javascript import { objectBatchStream } from '@datastream/object' // Input: [{id:1,k:'a'}, {id:1,k:'b'}, {id:2,k:'c'}] // Output: [[{id:1,k:'a'}, {id:1,k:'b'}]], [[{id:2,k:'c'}]] objectBatchStream({ keys: ['id'] }) ``` ## `objectPivotLongToWideStream` Transform Pivots batched arrays from long to wide format. Must be used after `objectBatchStream`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Pivot key columns | | `valueParam` | `string` | — | Column containing the values | | `delimiter` | `string` | `" "` | Separator for combined key names | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { objectBatchStream, objectPivotLongToWideStream } from '@datastream/object' // Input: [{id:1, metric:'temp', value:20}, {id:1, metric:'humidity', value:60}] // Output: {id:1, temp:20, humidity:60} await pipeline([ createReadableStream(data), objectBatchStream({ keys: ['id'] }), objectPivotLongToWideStream({ keys: ['metric'], valueParam: 'value' }), ]) ``` ## `objectPivotWideToLongStream` Transform Pivots wide format to long format. Emits multiple chunks per input object. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `keys` | `string[]` | — | Columns to unpivot | | `keyParam` | `string` | `"keyParam"` | Name for the new key column | | `valueParam` | `string` | `"valueParam"` | Name for the new value column | ### Example ```javascript import { objectPivotWideToLongStream } from '@datastream/object' // Input: { id: 1, temp: 20, humidity: 60 } // Output: { id: 1, keyParam: 'temp', valueParam: 20 }, { id: 1, keyParam: 'humidity', valueParam: 60 } objectPivotWideToLongStream({ keys: ['temp', 'humidity'], keyParam: 'metric', valueParam: 'value', }) ``` ## `objectSkipConsecutiveDuplicatesStream` Transform Skips consecutive duplicate objects (compared via `JSON.stringify`). ### Example ```javascript import { objectSkipConsecutiveDuplicatesStream } from '@datastream/object' // Input: [{a:1}, {a:1}, {a:2}, {a:1}] // Output: [{a:1}, {a:2}, {a:1}] ``` // File: packages/string/+page.md --- title: string --- String manipulation streams — split, replace, count, and measure text data. ## Install ```bash npm install @datastream/string ``` ## `stringReadableStream` Readable Creates a Readable stream from a string input. ```javascript import { stringReadableStream } from '@datastream/string' const stream = stringReadableStream('hello world') ``` ## `stringLengthStream` PassThrough Counts the total character length of all chunks. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `resultKey` | `string` | `"length"` | Key in pipeline result | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { stringLengthStream } from '@datastream/string' const length = stringLengthStream() const result = await pipeline([ createReadableStream('hello world'), length, ]) console.log(result) // { length: 11 } ``` ## `stringCountStream` PassThrough Counts occurrences of a substring across all chunks. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `substr` | `string` | — | Substring to count | | `resultKey` | `string` | `"count"` | Key in pipeline result | ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { stringCountStream } from '@datastream/string' const count = stringCountStream({ substr: '\n' }) const result = await pipeline([ createReadableStream('line1\nline2\nline3'), count, ]) console.log(result) // { count: 2 } ``` ## `stringSplitStream` Transform Splits streaming text by a separator, emitting one chunk per segment. Handles splits that cross chunk boundaries. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `separator` | `string` | — | String to split on | ### Example ```javascript import { pipeline, createReadableStream, streamToArray, pipejoin } from '@datastream/core' import { stringSplitStream } from '@datastream/string' const river = pipejoin([ createReadableStream('alice,bob,charlie'), stringSplitStream({ separator: ',' }), ]) const output = await streamToArray(river) // ['alice', 'bob', 'charlie'] ``` ## `stringReplaceStream` Transform Replaces pattern matches in streaming text. Handles replacements that span chunk boundaries. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `pattern` | `string \| RegExp` | — | Pattern to search for | | `replacement` | `string` | — | Replacement string | ### Example ```javascript import { stringReplaceStream } from '@datastream/string' stringReplaceStream({ pattern: /\t/g, replacement: ',' }) ``` ## `stringMinimumFirstChunkSize` Transform Buffers data until the first chunk meets a minimum size, then passes all subsequent chunks through unchanged. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `1024` | Minimum first chunk size in characters | ## `stringMinimumChunkSize` Transform Buffers data until chunks meet a minimum size before emitting. Useful when downstream requires a minimum data size for processing. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `chunkSize` | `number` | `1024` | Minimum chunk size in characters | ## `stringSkipConsecutiveDuplicates` Transform Skips consecutive duplicate string chunks. ```javascript import { stringSkipConsecutiveDuplicates } from '@datastream/string' // Input chunks: 'a', 'a', 'b', 'a' → Output: 'a', 'b', 'a' ``` // File: packages/validate/+page.md --- title: validate --- JSON Schema validation for object streams using Ajv. ## Install ```bash npm install @datastream/validate ``` ## `validateStream` Transform Validates each object chunk against a JSON Schema. Invalid rows are dropped by default; errors are collected in `.result()`. ### Options | Option | Type | Default | Description | |--------|------|---------|-------------| | `schema` | `object \| function` | — | JSON Schema object, or pre-compiled validation function | | `idxStart` | `number` | `0` | Starting row index for error tracking | | `onErrorEnqueue` | `boolean` | `false` | If `true`, invalid rows are kept in the stream | | `allowCoerceTypes` | `boolean` | `true` | If `false`, emits original data without Ajv coercion | | `resultKey` | `string` | `"validate"` | Key in pipeline result | ### Result Errors grouped by schema path: ```javascript { '#/required': { id: '#/required', keys: ['email'], message: "must have required property 'email'", idx: [3, 7, 15] }, '#/properties/age/type': { id: '#/properties/age/type', keys: ['age'], message: 'must be number', idx: [5] } } ``` ### Example ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { validateStream } from '@datastream/validate' const schema = { type: 'object', required: ['name', 'age'], properties: { name: { type: 'string', minLength: 1 }, age: { type: 'number', minimum: 0 }, email: { type: 'string', format: 'email' }, }, additionalProperties: false, } const result = await pipeline([ createReadableStream([ { name: 'Alice', age: 30, email: 'alice@example.com' }, { name: '', age: -1 }, { name: 'Bob', age: 25 }, ]), validateStream({ schema }), ]) console.log(result.validate) // Errors for row 1 (name minLength, age minimum) ``` ### Keep invalid rows ```javascript validateStream({ schema, onErrorEnqueue: true }) ``` ### Pre-compiled schema For better cold-start performance, pre-compile your schema during the build step: ```javascript import { transpileSchema, validateStream } from '@datastream/validate' const validate = transpileSchema(schema) validateStream({ schema: validate }) ``` ## `transpileSchema` Pre-compiles a JSON Schema into a validation function using Ajv. ### Ajv defaults | Option | Default | Description | |--------|---------|-------------| | `strict` | `true` | Strict mode | | `coerceTypes` | `true` | Coerce types to match schema | | `allErrors` | `true` | Report all errors, not just the first | | `useDefaults` | `"empty"` | Apply defaults for missing/empty values | | `messages` | `true` | Include error messages | ```javascript import { transpileSchema } from '@datastream/validate' const validate = transpileSchema(schema, { coerceTypes: false }) ``` // File: quick-start/+page.md --- title: Quick Start --- ## Install ```bash npm install @datastream/core ``` Add packages as needed: ```bash npm install @datastream/csv @datastream/validate @datastream/compress @datastream/file ``` ## Your first pipeline Read a CSV string, parse it, and collect the results: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvParseStream } from '@datastream/csv' import { objectFromEntriesStream, objectCountStream } from '@datastream/object' const csvData = 'name,age,city\r\nAlice,30,Toronto\r\nBob,25,Vancouver\r\nCharlie,35,Montreal' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const count = objectCountStream() const result = await pipeline([ createReadableStream(csvData), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), count, ]) console.log(result) // { csvDetectDelimiters: { delimiterChar: ',', ... }, csvDetectHeader: { header: ['name','age','city'] }, count: 3 } ``` ## Add validation Extend the pipeline with schema validation using JSON Schema: ```javascript import { pipeline, createReadableStream } from '@datastream/core' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' const schema = { type: 'object', required: ['name', 'age', 'city'], properties: { name: { type: 'string' }, age: { type: 'number' }, city: { type: 'string' }, }, additionalProperties: false, } const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ createReadableStream(csvData), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), validateStream({ schema }), ]) console.log(result) // { csvDetectDelimiters: {...}, csvDetectHeader: {...}, csvErrors: {}, validate: {} } ``` ## Add file I/O Read from and write to files (Node.js): ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvFormatStream } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' import { gzipCompressStream } from '@datastream/compress' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ fileReadStream({ path: './input.csv' }), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), validateStream({ schema }), csvFormatStream({ header: true }), gzipCompressStream(), fileWriteStream({ path: './output.csv.gz' }), ]) ``` ## Next steps - Learn about [Core Concepts](/docs/core-concepts) — stream types, pipeline patterns, and error handling - Browse [Recipes](/docs/recipes) for complete real-world examples - Explore the [core](/docs/packages/core) package API reference // File: recipes/+page.md --- title: Recipes --- Complete pipeline examples for common use cases. ## CSV ETL — validate and compress Read a CSV file, detect format, parse, validate, re-format, compress, and write: ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvRemoveEmptyRowsStream, csvRemoveMalformedRowsStream, csvCoerceValuesStream, csvFormatStream, } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' import { gzipCompressStream } from '@datastream/compress' const schema = { type: 'object', required: ['id', 'name', 'email'], properties: { id: { type: 'number' }, name: { type: 'string', minLength: 1 }, email: { type: 'string', format: 'email' }, }, additionalProperties: false, } const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ fileReadStream({ path: './input.csv' }), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), csvRemoveEmptyRowsStream(), csvRemoveMalformedRowsStream({ headers: () => detectHeader.result().value.header, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), csvCoerceValuesStream(), validateStream({ schema }), csvFormatStream({ header: true }), gzipCompressStream(), fileWriteStream({ path: './output.csv.gz' }), ]) console.log(result) // { csvDetectDelimiters: {...}, csvDetectHeader: {...}, csvErrors: {}, csvRemoveEmptyRows: {...}, csvRemoveMalformedRows: {...}, validate: {} } ``` ## API pagination to DynamoDB Fetch paginated API data and write to DynamoDB: ```javascript import { pipeline } from '@datastream/core' import { fetchReadableStream } from '@datastream/fetch' import { objectCountStream } from '@datastream/object' import { awsDynamoDBPutItemStream } from '@datastream/aws' import { createTransformStream } from '@datastream/core' const count = objectCountStream() const result = await pipeline([ fetchReadableStream({ url: 'https://api.example.com/users', dataPath: 'data', nextPath: 'pagination.next_url', }), createTransformStream((item, enqueue) => { enqueue({ PK: { S: `USER#${item.id}` }, SK: { S: `PROFILE` }, name: { S: item.name }, email: { S: item.email }, }) }), count, awsDynamoDBPutItemStream({ TableName: 'Users' }), ]) console.log(result) // { count: 450 } ``` ## S3 to S3 — transform and re-upload Read from S3, parse CSV, validate, re-format, and write back: ```javascript import { pipeline } from '@datastream/core' import { awsS3GetObjectStream, awsS3PutObjectStream } from '@datastream/aws' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvRemoveMalformedRowsStream, csvFormatStream, } from '@datastream/csv' import { objectFromEntriesStream } from '@datastream/object' import { validateStream } from '@datastream/validate' import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress' const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const result = await pipeline([ await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'input.csv.gz' }), gzipDecompressStream(), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), csvRemoveMalformedRowsStream({ headers: () => detectHeader.result().value.header, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), validateStream({ schema }), csvFormatStream({ header: true }), gzipCompressStream(), awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }), ]) ``` ## Browser file processing Use the File System Access API to process files in the browser: ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { csvDetectDelimitersStream, csvDetectHeaderStream, csvParseStream, csvFormatStream, } from '@datastream/csv' import { objectFromEntriesStream, objectCountStream } from '@datastream/object' const types = [{ accept: { 'text/csv': ['.csv'] } }] const detectDelimiters = csvDetectDelimitersStream() const detectHeader = csvDetectHeaderStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }) const count = objectCountStream() const result = await pipeline([ await fileReadStream({ types }), detectDelimiters, detectHeader, csvParseStream({ delimiterChar: () => detectDelimiters.result().value.delimiterChar, newlineChar: () => detectDelimiters.result().value.newlineChar, quoteChar: () => detectDelimiters.result().value.quoteChar, escapeChar: () => detectDelimiters.result().value.escapeChar, }), objectFromEntriesStream({ keys: () => detectHeader.result().value.header, }), count, csvFormatStream({ header: true }), await fileWriteStream({ path: 'output.csv', types }), ]) console.log(result) // { count: 500 } ``` ## Checksum and compress Calculate a digest while compressing and writing a file: ```javascript import { pipeline } from '@datastream/core' import { fileReadStream, fileWriteStream } from '@datastream/file' import { digestStream } from '@datastream/digest' import { gzipCompressStream } from '@datastream/compress' const digest = await digestStream({ algorithm: 'SHA2-256' }) const result = await pipeline([ fileReadStream({ path: './data.csv' }), digest, gzipCompressStream(), fileWriteStream({ path: './data.csv.gz' }), ]) console.log(result) // { digest: 'SHA2-256:e3b0c44298fc1c14...' } ```