csv

Parse, format, detect, clean, and coerce CSV data.

Install

npm install @datastream/csv

csvDetectDelimitersStream PassThrough

Auto-detects the delimiter, newline, quote, and escape characters from the first chunk of data.

Options

OptionTypeDefaultDescription
chunkSizenumber1024Minimum bytes to buffer before detecting
resultKeystring"csvDetectDelimiters"Key in pipeline result

Result

{ delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' }

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { csvDetectDelimitersStream } from '@datastream/csv'

const detect = csvDetectDelimitersStream()

const result = await pipeline([
  createReadableStream('name\tage\r\nAlice\t30'),
  detect,
])

console.log(result.csvDetectDelimiters)
// { delimiterChar: '\t', newlineChar: '\r\n', quoteChar: '"', escapeChar: '"' }

csvDetectHeaderStream Transform

Detects and strips the header row. Outputs data rows only (without the header).

Options

OptionTypeDefaultDescription
chunkSizenumber1024Minimum bytes to buffer before detecting
delimiterCharstring \| () => string","Delimiter character or lazy function
newlineCharstring \| () => string"\r\n"Newline character or lazy function
quoteCharstring \| () => string'"'Quote character or lazy function
escapeCharstring \| () => stringquoteCharEscape character or lazy function
parserfunctioncsvQuotedParserCustom parser function
resultKeystring"csvDetectHeader"Key in pipeline result

Result

{ header: ['name', 'age', 'city'] }

Example

import { csvDetectDelimitersStream, csvDetectHeaderStream } from '@datastream/csv'

const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
  delimiterChar: () => detectDelimiters.result().value.delimiterChar,
  newlineChar: () => detectDelimiters.result().value.newlineChar,
  quoteChar: () => detectDelimiters.result().value.quoteChar,
  escapeChar: () => detectDelimiters.result().value.escapeChar,
})

csvParseStream Transform

Parses CSV text into arrays of field values (string arrays). Each output chunk is one row as string[].

Options

OptionTypeDefaultDescription
chunkSizenumber2097152 (2MB)Input buffer size before first parse
delimiterCharstring \| () => string","Delimiter character or lazy function
newlineCharstring \| () => string"\r\n"Newline character or lazy function
quoteCharstring \| () => string'"'Quote character or lazy function
escapeCharstring \| () => stringquoteCharEscape character or lazy function
parserfunctioncsvQuotedParserCustom parser function
resultKeystring"csvErrors"Key in pipeline result

Result

Parse errors collected during processing:

{
  UnterminatedQuote: { id: 'UnterminatedQuote', message: 'Unterminated quoted field', idx: [5] }
}

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { csvParseStream } from '@datastream/csv'

const result = await pipeline([
  createReadableStream('a,b,c\r\n1,2,3\r\n4,5,6'),
  csvParseStream(),
])

// Chunks emitted: ['a','b','c'], ['1','2','3'], ['4','5','6']

csvFormatStream Transform

Formats objects or arrays back to CSV text.

Options

OptionTypeDefaultDescription
headerboolean \| string[]true to auto-detect from object keys, or provide array of column names
delimiterCharstring","Delimiter character
newlineCharstring"\r\n"Newline character
quoteCharstring'"'Quote character
escapeCharstringquoteCharEscape character

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { csvFormatStream } from '@datastream/csv'

await pipeline([
  createReadableStream([
    { name: 'Alice', age: 30 },
    { name: 'Bob', age: 25 },
  ]),
  csvFormatStream({ header: true }),
])

// Output: "name","age"\r\n"Alice","30"\r\n"Bob","25"\r\n

csvRemoveEmptyRowsStream Transform

Removes rows where all fields are empty strings.

Options

OptionTypeDefaultDescription
onErrorEnqueuebooleanfalseIf true, empty rows are kept in stream (but still tracked)
resultKeystring"csvRemoveEmptyRows"Key in pipeline result

Result

{ EmptyRow: { id: 'EmptyRow', message: 'Row is empty', idx: [3, 7] } }

csvRemoveMalformedRowsStream Transform

Removes rows with an incorrect number of fields compared to the first row or the provided header.

Options

OptionTypeDefaultDescription
headersstring[] \| () => string[]Expected header array, or lazy function. If not provided, uses first row’s field count
onErrorEnqueuebooleanfalseIf true, malformed rows are kept in stream
resultKeystring"csvRemoveMalformedRows"Key in pipeline result

Result

{ MalformedRow: { id: 'MalformedRow', message: 'Row has incorrect number of fields', idx: [2] } }

csvCoerceValuesStream Transform

Converts string field values to typed JavaScript values. Works on objects (use after objectFromEntriesStream).

Options

OptionTypeDefaultDescription
columnsobjectMap of column names to types. Without this, auto-coercion is used
resultKeystring"csvCoerceValues"Key in pipeline result

Auto-coercion rules

InputOutput
""null
"true" / "false"true / false
Numeric stringsNumber
ISO 8601 date stringsDate
JSON strings ({...}, [...])Parsed object/array

Column types

When specifying columns, valid types are: "number", "boolean", "null", "date", "json".

csvCoerceValuesStream({
  columns: { age: 'number', active: 'boolean', birthday: 'date' }
})

csvQuotedParser / csvUnquotedParser

Standalone parser functions for use outside of streams. csvUnquotedParser is faster but does not handle quoted fields.

import { csvQuotedParser } from '@datastream/csv'

const { rows, tail, numCols, idx, errors } = csvQuotedParser(
  'a,b,c\r\n1,2,3\r\n',
  { delimiterChar: ',', newlineChar: '\r\n', quoteChar: '"' },
  true
)
// rows: [['a','b','c'], ['1','2','3']]

Full pipeline example

Detect, parse, clean, coerce, and validate:

import { pipeline, createReadableStream } from '@datastream/core'
import {
  csvDetectDelimitersStream,
  csvDetectHeaderStream,
  csvParseStream,
  csvRemoveEmptyRowsStream,
  csvRemoveMalformedRowsStream,
  csvCoerceValuesStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'

const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
  delimiterChar: () => detectDelimiters.result().value.delimiterChar,
  newlineChar: () => detectDelimiters.result().value.newlineChar,
  quoteChar: () => detectDelimiters.result().value.quoteChar,
  escapeChar: () => detectDelimiters.result().value.escapeChar,
})

const result = await pipeline([
  createReadableStream(csvData),
  detectDelimiters,
  detectHeader,
  csvParseStream({
    delimiterChar: () => detectDelimiters.result().value.delimiterChar,
    newlineChar: () => detectDelimiters.result().value.newlineChar,
    quoteChar: () => detectDelimiters.result().value.quoteChar,
    escapeChar: () => detectDelimiters.result().value.escapeChar,
  }),
  csvRemoveEmptyRowsStream(),
  csvRemoveMalformedRowsStream({
    headers: () => detectHeader.result().value.header,
  }),
  objectFromEntriesStream({
    keys: () => detectHeader.result().value.header,
  }),
  csvCoerceValuesStream(),
  validateStream({ schema }),
])