aws

AWS service streams for S3, DynamoDB, Lambda, SNS, and SQS. Node.js only.

Install

npm install @datastream/aws

Requires the corresponding AWS SDK v3 client packages:

npm install @aws-sdk/client-s3 @aws-sdk/lib-storage
npm install @aws-sdk/client-dynamodb
npm install @aws-sdk/client-lambda
npm install @aws-sdk/client-sns
npm install @aws-sdk/client-sqs

S3

awsS3SetClient

Set a custom S3 client. By default, FIPS endpoints are enabled for US and CA regions.

import { S3Client } from '@aws-sdk/client-s3'
import { awsS3SetClient } from '@datastream/aws'

awsS3SetClient(new S3Client({ region: 'eu-west-1' }))

awsS3GetObjectStream Readable, async

Downloads an object from S3 as a stream.

Options

Accepts all GetObjectCommand parameters plus:

OptionTypeDefaultDescription
clientS3Clientdefault clientCustom S3 client for this call
BucketstringS3 bucket name
KeystringS3 object key

Example

import { pipeline } from '@datastream/core'
import { awsS3GetObjectStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'

await pipeline([
  await awsS3GetObjectStream({ Bucket: 'my-bucket', Key: 'data.csv' }),
  csvParseStream(),
])

awsS3PutObjectStream PassThrough

Uploads data to S3 using multipart upload.

Options

Accepts all S3 PutObject parameters plus:

OptionTypeDefaultDescription
clientS3Clientdefault clientCustom S3 client
BucketstringS3 bucket name
KeystringS3 object key
onProgressfunctionUpload progress callback
tagsobjectS3 object tags

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsS3PutObjectStream } from '@datastream/aws'
import { gzipCompressStream } from '@datastream/compress'

await pipeline([
  createReadableStream(data),
  gzipCompressStream(),
  awsS3PutObjectStream({ Bucket: 'my-bucket', Key: 'output.csv.gz' }),
])

awsS3ChecksumStream PassThrough

Computes a multi-part S3 checksum while data passes through. Designed for pre-signed URL uploads in the browser.

Options

OptionTypeDefaultDescription
ChecksumAlgorithmstring"SHA256""SHA1" or "SHA256"
partSizenumber17179870Part size in bytes
resultKeystring"s3"Key in pipeline result

Result

{ checksum: 'base64hash-3', checksums: ['part1hash', 'part2hash', 'part3hash'], partSize: 17179870 }

DynamoDB

awsDynamoDBSetClient

import { DynamoDBClient } from '@aws-sdk/client-dynamodb'
import { awsDynamoDBSetClient } from '@datastream/aws'

awsDynamoDBSetClient(new DynamoDBClient({ region: 'us-east-1' }))

awsDynamoDBQueryStream Readable, async

Queries a DynamoDB table and auto-paginates through all results.

Options

Accepts all QueryCommand parameters:

OptionTypeDescription
TableNamestringDynamoDB table name
KeyConditionExpressionstringQuery key condition
ExpressionAttributeValuesobjectExpression values

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsDynamoDBQueryStream } from '@datastream/aws'

await pipeline([
  createReadableStream(await awsDynamoDBQueryStream({
    TableName: 'Users',
    KeyConditionExpression: 'PK = :pk',
    ExpressionAttributeValues: { ':pk': { S: 'USER#123' } },
  })),
])

awsDynamoDBScanStream Readable, async

Scans an entire DynamoDB table with automatic pagination.

import { awsDynamoDBScanStream } from '@datastream/aws'

const items = await awsDynamoDBScanStream({ TableName: 'Users' })

awsDynamoDBGetItemStream Readable, async

Batch gets items by keys. Automatically retries unprocessed keys with exponential backoff.

Options

OptionTypeDefaultDescription
TableNamestringDynamoDB table name
Keysobject[]Array of key objects
retryMaxCountnumber10Maximum retry attempts

awsDynamoDBPutItemStream Writable

Writes items to DynamoDB using BatchWriteItem. Automatically batches 25 items per request and retries unprocessed items with exponential backoff.

Options

OptionTypeDescription
TableNamestringDynamoDB table name
retryMaxCountnumberMaximum retry attempts (default 10)

Example

import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsDynamoDBPutItemStream } from '@datastream/aws'

await pipeline([
  createReadableStream(items),
  createTransformStream((item, enqueue) => {
    enqueue({
      PK: { S: `USER#${item.id}` },
      SK: { S: 'PROFILE' },
      name: { S: item.name },
    })
  }),
  awsDynamoDBPutItemStream({ TableName: 'Users' }),
])

awsDynamoDBDeleteItemStream Writable

Deletes items from DynamoDB using BatchWriteItem. Batches 25 items per request.

Options

OptionTypeDescription
TableNamestringDynamoDB table name
retryMaxCountnumberMaximum retry attempts (default 10)

Example

import { awsDynamoDBDeleteItemStream } from '@datastream/aws'

awsDynamoDBDeleteItemStream({ TableName: 'Users' })
// Input chunks: { PK: { S: 'USER#1' }, SK: { S: 'PROFILE' } }

Lambda

awsLambdaSetClient

import { LambdaClient } from '@aws-sdk/client-lambda'
import { awsLambdaSetClient } from '@datastream/aws'

awsLambdaSetClient(new LambdaClient({ region: 'us-east-1' }))

awsLambdaReadableStream Readable

Invokes a Lambda function with response streaming (InvokeWithResponseStream).

Also exported as awsLambdaResponseStream.

Options

Accepts InvokeWithResponseStreamCommand parameters. Pass an array to invoke multiple functions sequentially.

OptionTypeDescription
FunctionNamestringLambda function name or ARN
PayloadstringJSON payload

Example

import { pipeline } from '@datastream/core'
import { awsLambdaReadableStream } from '@datastream/aws'
import { csvParseStream } from '@datastream/csv'

await pipeline([
  awsLambdaReadableStream({
    FunctionName: 'data-processor',
    Payload: JSON.stringify({ key: 'input.csv' }),
  }),
  csvParseStream(),
])

SNS

awsSNSSetClient

import { SNSClient } from '@aws-sdk/client-sns'
import { awsSNSSetClient } from '@datastream/aws'

awsSNSSetClient(new SNSClient({ region: 'us-east-1' }))

awsSNSPublishMessageStream Writable

Publishes messages to an SNS topic. Batches 10 messages per PublishBatchCommand.

Options

OptionTypeDescription
TopicArnstringSNS topic ARN

Example

import { pipeline, createReadableStream, createTransformStream } from '@datastream/core'
import { awsSNSPublishMessageStream } from '@datastream/aws'

await pipeline([
  createReadableStream(events),
  createTransformStream((event, enqueue) => {
    enqueue({
      Id: event.id,
      Message: JSON.stringify(event),
    })
  }),
  awsSNSPublishMessageStream({ TopicArn: 'arn:aws:sns:us-east-1:123:my-topic' }),
])

SQS

awsSQSSetClient

import { SQSClient } from '@aws-sdk/client-sqs'
import { awsSQSSetClient } from '@datastream/aws'

awsSQSSetClient(new SQSClient({ region: 'us-east-1' }))

awsSQSReceiveMessageStream Readable, async

Polls an SQS queue and yields messages until the queue is empty.

Options

Accepts ReceiveMessageCommand parameters:

OptionTypeDescription
QueueUrlstringSQS queue URL
MaxNumberOfMessagesnumberMax messages per poll (1-10)

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { awsSQSReceiveMessageStream, awsSQSDeleteMessageStream } from '@datastream/aws'

await pipeline([
  createReadableStream(await awsSQSReceiveMessageStream({
    QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  })),
  awsSQSDeleteMessageStream({
    QueueUrl: 'https://sqs.us-east-1.amazonaws.com/123/my-queue',
  }),
])

awsSQSSendMessageStream Writable

Sends messages to an SQS queue. Batches 10 messages per SendMessageBatchCommand.

Options

OptionTypeDescription
QueueUrlstringSQS queue URL

awsSQSDeleteMessageStream Writable

Deletes messages from an SQS queue. Batches 10 messages per DeleteMessageBatchCommand.

Options

OptionTypeDescription
QueueUrlstringSQS queue URL