validate

JSON Schema validation for object streams using Ajv.

Install

npm install @datastream/validate

validateStream Transform

Validates each object chunk against a JSON Schema. Invalid rows are dropped by default; errors are collected in .result().

Options

OptionTypeDefaultDescription
schemaobject \| functionJSON Schema object, or pre-compiled validation function
idxStartnumber0Starting row index for error tracking
onErrorEnqueuebooleanfalseIf true, invalid rows are kept in the stream
allowCoerceTypesbooleantrueIf false, emits original data without Ajv coercion
resultKeystring"validate"Key in pipeline result

Result

Errors grouped by schema path:

{
  '#/required': {
    id: '#/required',
    keys: ['email'],
    message: "must have required property 'email'",
    idx: [3, 7, 15]
  },
  '#/properties/age/type': {
    id: '#/properties/age/type',
    keys: ['age'],
    message: 'must be number',
    idx: [5]
  }
}

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { validateStream } from '@datastream/validate'

const schema = {
  type: 'object',
  required: ['name', 'age'],
  properties: {
    name: { type: 'string', minLength: 1 },
    age: { type: 'number', minimum: 0 },
    email: { type: 'string', format: 'email' },
  },
  additionalProperties: false,
}

const result = await pipeline([
  createReadableStream([
    { name: 'Alice', age: 30, email: 'alice@example.com' },
    { name: '', age: -1 },
    { name: 'Bob', age: 25 },
  ]),
  validateStream({ schema }),
])

console.log(result.validate)
// Errors for row 1 (name minLength, age minimum)

Keep invalid rows

validateStream({ schema, onErrorEnqueue: true })

Pre-compiled schema

For better cold-start performance, pre-compile your schema during the build step:

import { transpileSchema, validateStream } from '@datastream/validate'

const validate = transpileSchema(schema)
validateStream({ schema: validate })

transpileSchema

Pre-compiles a JSON Schema into a validation function using Ajv.

Ajv defaults

OptionDefaultDescription
stricttrueStrict mode
coerceTypestrueCoerce types to match schema
allErrorstrueReport all errors, not just the first
useDefaults"empty"Apply defaults for missing/empty values
messagestrueInclude error messages
import { transpileSchema } from '@datastream/validate'

const validate = transpileSchema(schema, { coerceTypes: false })