csv
Parse, format, detect, clean, and coerce CSV data.
Install
npm install @datastream/csv csvDetectDelimitersStream PassThrough
Auto-detects the delimiter, newline, quote, and escape characters from the first chunk of data.
Options
| Option | Type | Default | Description |
|---|---|---|---|
chunkSize | number | 1024 | Minimum bytes to buffer before detecting |
resultKey | string | "csvDetectDelimiters" | Key in pipeline result |
Result
{ delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' } Example
import { pipeline, createReadableStream } from '@datastream/core'
import { csvDetectDelimitersStream } from '@datastream/csv'
const detect = csvDetectDelimitersStream()
const result = await pipeline([
createReadableStream('name\tage\r\nAlice\t30'),
detect,
])
console.log(result.csvDetectDelimiters)
// { delimiterChar: '\t', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' } csvDetectHeaderStream Transform
Detects and strips the header row. Outputs data rows only (without the header).
Options
| Option | Type | Default | Description |
|---|---|---|---|
chunkSize | number | 1024 | Minimum bytes to buffer before detecting |
delimiterChar | string \| () => string | "," | Delimiter character or lazy function |
newlineChar | string \| () => string | "\r\n" | Newline character or lazy function |
quoteChar | string \| () => string | '"' | Quote character or lazy function |
escapeChar | string \| () => string | quoteChar | Escape character or lazy function |
parser | function | csvQuotedParser | Custom parser function |
resultKey | string | "csvDetectHeader" | Key in pipeline result |
Result
{ header: ['name', 'age', 'city'] } Example
import { csvDetectDelimitersStream, csvDetectHeaderStream } from '@datastream/csv'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}) csvParseStream Transform
Parses CSV text into arrays of field values (string arrays). Each output chunk is one row as string[].
Options
| Option | Type | Default | Description |
|---|---|---|---|
chunkSize | number | 2097152 (2MB) | Input buffer size before first parse |
delimiterChar | string \| () => string | "," | Delimiter character or lazy function |
newlineChar | string \| () => string | "\r\n" | Newline character or lazy function |
quoteChar | string \| () => string | '"' | Quote character or lazy function |
escapeChar | string \| () => string | quoteChar | Escape character or lazy function |
parser | function | csvQuotedParser | Custom parser function |
resultKey | string | "csvErrors" | Key in pipeline result |
Result
Parse errors collected during processing:
{
UnterminatedQuote: { id: 'UnterminatedQuote', message: 'Unterminated quoted field', idx: [5] }
} Example
import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'
const result = await pipeline([
createReadableStream('a,b,c\r\n1,2,3\r\n4,5,6'),
csvParseStream(),
])
// Chunks emitted: ['a','b','c'], ['1','2','3'], ['4','5','6'] csvFormatStream Transform
Formats objects or arrays back to CSV text.
Options
| Option | Type | Default | Description |
|---|---|---|---|
header | boolean \| string[] | — | true to auto-detect from object keys, or provide array of column names |
delimiterChar | string | "," | Delimiter character |
newlineChar | string | "\r\n" | Newline character |
quoteChar | string | '"' | Quote character |
escapeChar | string | quoteChar | Escape character |
Example
import { pipeline, createReadableStream } from '@datastream/core'
import { csvFormatStream } from '@datastream/csv'
await pipeline([
createReadableStream([
{ name: 'Alice', age: 30 },
{ name: 'Bob', age: 25 },
]),
csvFormatStream({ header: true }),
])
// Output: "name","age"\r\n"Alice","30"\r\n"Bob","25"\r\n csvRemoveEmptyRowsStream Transform
Removes rows where all fields are empty strings.
Options
| Option | Type | Default | Description |
|---|---|---|---|
onErrorEnqueue | boolean | false | If true, empty rows are kept in stream (but still tracked) |
resultKey | string | "csvRemoveEmptyRows" | Key in pipeline result |
Result
{ EmptyRow: { id: 'EmptyRow', message: 'Row is empty', idx: [3, 7] } } csvRemoveMalformedRowsStream Transform
Removes rows with an incorrect number of fields compared to the first row or the provided header.
Options
| Option | Type | Default | Description |
|---|---|---|---|
headers | string[] \| () => string[] | — | Expected header array, or lazy function. If not provided, uses first row’s field count |
onErrorEnqueue | boolean | false | If true, malformed rows are kept in stream |
resultKey | string | "csvRemoveMalformedRows" | Key in pipeline result |
Result
{ MalformedRow: { id: 'MalformedRow', message: 'Row has incorrect number of fields', idx: [2] } } csvCoerceValuesStream Transform
Converts string field values to typed JavaScript values. Works on objects (use after objectFromEntriesStream).
Options
| Option | Type | Default | Description |
|---|---|---|---|
columns | object | — | Map of column names to types. Without this, auto-coercion is used |
resultKey | string | "csvCoerceValues" | Key in pipeline result |
Auto-coercion rules
| Input | Output |
|---|---|
"" | null |
"true" / "false" | true / false |
| Numeric strings | Number |
| ISO 8601 date strings | Date |
JSON strings ({...}, [...]) | Parsed object/array |
Column types
When specifying columns, valid types are: "number", "boolean", "null", "date", "json".
csvCoerceValuesStream({
columns: { age: 'number', active: 'boolean', birthday: 'date' }
}) csvQuotedParser / csvUnquotedParser
Standalone parser functions for use outside of streams. csvUnquotedParser is faster but does not handle quoted fields.
import { csvQuotedParser } from '@datastream/csv'
const { rows, tail, numCols, idx, errors } = csvQuotedParser(
'a,b,c\r\n1,2,3\r\n',
{ delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"' },
true
)
// rows: [['a','b','c'], ['1','2','3']] Full pipeline example
Detect, parse, clean, coerce, and validate:
import { pipeline, createReadableStream } from '@datastream/core'
import {
csvDetectDelimitersStream,
csvDetectHeaderStream,
csvParseStream,
csvRemoveEmptyRowsStream,
csvRemoveMalformedRowsStream,
csvCoerceValuesStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
})
const result = await pipeline([
createReadableStream(csvData),
detectDelimiters,
detectHeader,
csvParseStream({
delimiterChar: () => detectDelimiters.result().value.delimiterChar,
newlineChar: () => detectDelimiters.result().value.newlineChar,
quoteChar: () => detectDelimiters.result().value.quoteChar,
escapeChar: () => detectDelimiters.result().value.escapeChar,
}),
csvRemoveEmptyRowsStream(),
csvRemoveMalformedRowsStream({
headers: () => detectHeader.result().value.header,
}),
objectFromEntriesStream({
keys: () => detectHeader.result().value.header,
}),
csvCoerceValuesStream(),
validateStream({ schema }),
])