S3 to S3 — transform and re-upload

Read from S3, parse CSV, validate, re-format, and write back:

import { pipeline } from '@datastream/core'
import { awsS3GetObjectStream, awsS3PutObjectStream } from '@datastream/aws'
import {
  csvDetectDelimitersStream,
  csvDetectHeaderStream,
  csvParseStream,
  csvRemoveMalformedRowsStream,
  csvFormatStream,
} from '@datastream/csv'
import { objectFromEntriesStream } from '@datastream/object'
import { validateStream } from '@datastream/validate'
import { gzipCompressStream, gzipDecompressStream } from '@datastream/compress'

const detectDelimiters = csvDetectDelimitersStream()
const detectHeader = csvDetectHeaderStream({
  delimiterChar: () => detectDelimiters.result().value.delimiterChar,
  newlineChar: () => detectDelimiters.result().value.newlineChar,
  quoteChar: () => detectDelimiters.result().value.quoteChar,
  escapeChar: () => detectDelimiters.result().value.escapeChar,
})

const result = await pipeline([
  await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'input.csv.gz' }),
  gzipDecompressStream(),
  detectDelimiters,
  detectHeader,
  csvParseStream({
    delimiterChar: () => detectDelimiters.result().value.delimiterChar,
    newlineChar: () => detectDelimiters.result().value.newlineChar,
    quoteChar: () => detectDelimiters.result().value.quoteChar,
    escapeChar: () => detectDelimiters.result().value.escapeChar,
  }),
  csvRemoveMalformedRowsStream({
    headers: () => detectHeader.result().value.header,
  }),
  objectFromEntriesStream({
    keys: () => detectHeader.result().value.header,
  }),
  validateStream({ schema }),
  csvFormatStream({ header: true }),
  gzipCompressStream(),
  awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }),
])