object

Transform, reshape, filter, and aggregate object streams.

Install

npm install @datastream/object

objectReadableStream Readable

Creates a Readable stream from an array of objects.

import { objectReadableStream } from '@datastream/object'

const stream = objectReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }])

objectCountStream PassThrough

Counts the number of chunks that pass through.

Options

OptionTypeDefaultDescription
resultKeystring"count"Key in pipeline result

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { objectCountStream } from '@datastream/object'

const count = objectCountStream()
const result = await pipeline([
  createReadableStream([{ a: 1 }, { a: 2 }, { a: 3 }]),
  count,
])

console.log(result)
// { count: 3 }

objectFromEntriesStream Transform

Converts arrays to objects using column names. Commonly used with csvParseStream output.

Options

OptionTypeDefaultDescription
keysstring[] \| () => string[]Column names, or lazy function

Example

import { objectFromEntriesStream } from '@datastream/object'

// Converts ['Alice', '30', 'Toronto'] → { name: 'Alice', age: '30', city: 'Toronto' }
objectFromEntriesStream({ keys: ['name', 'age', 'city'] })

// With lazy keys from csvDetectHeaderStream
objectFromEntriesStream({
  keys: () => detectHeader.result().value.header,
})

objectPickStream Transform

Keeps only the specified keys on each object.

Options

OptionTypeDefaultDescription
keysstring[]Keys to keep

Example

import { objectPickStream } from '@datastream/object'

// { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice', age: 30 }
objectPickStream({ keys: ['name', 'age'] })

objectOmitStream Transform

Removes the specified keys from each object.

Options

OptionTypeDefaultDescription
keysstring[]Keys to remove

Example

import { objectOmitStream } from '@datastream/object'

// { name: 'Alice', age: 30, city: 'Toronto' } → { name: 'Alice' }
objectOmitStream({ keys: ['age', 'city'] })

objectKeyMapStream Transform

Renames keys on each object. Unmapped keys are kept as-is.

Options

OptionTypeDefaultDescription
keysobjectMap of { oldKey: 'newKey' }

Example

import { objectKeyMapStream } from '@datastream/object'

// { firstName: 'Alice', lastName: 'Smith' } → { first_name: 'Alice', last_name: 'Smith' }
objectKeyMapStream({ keys: { firstName: 'first_name', lastName: 'last_name' } })

objectValueMapStream Transform

Maps values for a specific key using a lookup table.

Options

OptionTypeDefaultDescription
keystringKey whose value to map
valuesobjectMap of { oldValue: newValue }

Example

import { objectValueMapStream } from '@datastream/object'

// { status: 'A' } → { status: 'Active' }
objectValueMapStream({ key: 'status', values: { A: 'Active', I: 'Inactive' } })

objectKeyJoinStream Transform

Joins multiple keys into new combined keys.

Options

OptionTypeDefaultDescription
keysobjectMap of { newKey: ['key1', 'key2'] }
separatorstringSeparator for joined values

Example

import { objectKeyJoinStream } from '@datastream/object'

// { first: 'Alice', last: 'Smith', age: 30 } → { name: 'Alice Smith', age: 30 }
objectKeyJoinStream({ keys: { name: ['first', 'last'] }, separator: ' ' })

objectKeyValueStream Transform

Creates a key-value pair object from two fields.

Options

OptionTypeDefaultDescription
keystringField to use as the key
valuestringField to use as the value

Example

import { objectKeyValueStream } from '@datastream/object'

// { code: 'CA', country: 'Canada' } → { CA: 'Canada' }
objectKeyValueStream({ key: 'code', value: 'country' })

objectKeyValuesStream Transform

Creates a key-values pair object — the key comes from a field, the value is a subset of the object.

Options

OptionTypeDefaultDescription
keystringField to use as the key
valuesstring[]Fields to include in value object. If omitted, entire object is used

Example

import { objectKeyValuesStream } from '@datastream/object'

// { id: 'u1', name: 'Alice', age: 30 } → { u1: { name: 'Alice', age: 30 } }
objectKeyValuesStream({ key: 'id', values: ['name', 'age'] })

objectBatchStream Transform

Groups consecutive objects with the same key values into arrays. Use with objectPivotLongToWideStream.

Options

OptionTypeDefaultDescription
keysstring[]Keys to group by

Example

import { objectBatchStream } from '@datastream/object'

// Input: [{id:1,k:'a'}, {id:1,k:'b'}, {id:2,k:'c'}]
// Output: [[{id:1,k:'a'}, {id:1,k:'b'}]], [[{id:2,k:'c'}]]
objectBatchStream({ keys: ['id'] })

objectPivotLongToWideStream Transform

Pivots batched arrays from long to wide format. Must be used after objectBatchStream.

Options

OptionTypeDefaultDescription
keysstring[]Pivot key columns
valueParamstringColumn containing the values
delimiterstring" "Separator for combined key names

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { objectBatchStream, objectPivotLongToWideStream } from '@datastream/object'

// Input: [{id:1, metric:'temp', value:20}, {id:1, metric:'humidity', value:60}]
// Output: {id:1, temp:20, humidity:60}
await pipeline([
  createReadableStream(data),
  objectBatchStream({ keys: ['id'] }),
  objectPivotLongToWideStream({ keys: ['metric'], valueParam: 'value' }),
])

objectPivotWideToLongStream Transform

Pivots wide format to long format. Emits multiple chunks per input object.

Options

OptionTypeDefaultDescription
keysstring[]Columns to unpivot
keyParamstring"keyParam"Name for the new key column
valueParamstring"valueParam"Name for the new value column

Example

import { objectPivotWideToLongStream } from '@datastream/object'

// Input: { id: 1, temp: 20, humidity: 60 }
// Output: { id: 1, keyParam: 'temp', valueParam: 20 }, { id: 1, keyParam: 'humidity', valueParam: 60 }
objectPivotWideToLongStream({
  keys: ['temp', 'humidity'],
  keyParam: 'metric',
  valueParam: 'value',
})

objectSkipConsecutiveDuplicatesStream Transform

Skips consecutive duplicate objects (compared via JSON.stringify).

Example

import { objectSkipConsecutiveDuplicatesStream } from '@datastream/object'

// Input: [{a:1}, {a:1}, {a:2}, {a:1}]
// Output: [{a:1}, {a:2}, {a:1}]