fetch

HTTP client streams with automatic pagination, rate limiting, and 429 retry.

Install

npm install @datastream/fetch

fetchSetDefaults

Set global defaults for all fetch streams.

Defaults

OptionTypeDefaultDescription
methodstring"GET"HTTP method
headersobject{ Accept: 'application/json', 'Accept-Encoding': 'br, gzip, deflate' }Request headers
rateLimitnumber0.01Minimum seconds between requests (0.01 = 100/sec)
dataPathstringDot-path to data array in JSON response body
nextPathstringDot-path to next page URL in JSON response body
qsobject{}Default query string parameters
offsetParamstringQuery parameter name for offset pagination
offsetAmountnumberIncrement per page for offset pagination

Example

import { fetchSetDefaults } from '@datastream/fetch'

fetchSetDefaults({
  headers: { Authorization: 'Bearer token123' },
  rateLimit: 0.1, // 10 requests/sec
})

fetchReadableStream Readable

Fetches data from one or more URLs and emits chunks. Automatically detects JSON responses and handles pagination.

Also exported as fetchResponseStream.

Options

OptionTypeDefaultDescription
urlstringRequest URL
methodstring"GET"HTTP method
headersobjectRequest headers (merged with defaults)
rateLimitnumber0.01Seconds between requests
dataPathstringDot-path to data in JSON body
nextPathstringDot-path to next URL in JSON body
qsobject{}Query string parameters
offsetParamstringQuery param for offset-based pagination
offsetAmountnumberOffset increment per page

Pass an array of option objects to fetch from multiple URLs in sequence.

Pagination strategies

Link header — automatically followed when present:

Link: <https://api.example.com/users?page=2>; rel="next"

Body path — use nextPath to extract the next URL from the JSON response:

fetchReadableStream({
  url: 'https://api.example.com/users',
  dataPath: 'data',
  nextPath: 'pagination.next_url',
})

Offset — use offsetParam and offsetAmount for numeric pagination:

fetchReadableStream({
  url: 'https://api.example.com/users',
  dataPath: 'results',
  offsetParam: 'offset',
  offsetAmount: 100,
})

429 retry

When receiving a 429 Too Many Requests response, the request is automatically retried after respecting the rate limit delay.

Example

import { pipeline } from '@datastream/core'
import { fetchReadableStream } from '@datastream/fetch'
import { objectCountStream } from '@datastream/object'

const count = objectCountStream()

const result = await pipeline([
  fetchReadableStream({
    url: 'https://api.example.com/users',
    dataPath: 'data',
    nextPath: 'meta.next',
    headers: { Authorization: 'Bearer token' },
  }),
  count,
])

console.log(result)
// { count: 450 }

Multiple URLs

fetchReadableStream([
  { url: 'https://api.example.com/users?status=active', dataPath: 'data' },
  { url: 'https://api.example.com/users?status=inactive', dataPath: 'data' },
])

fetchWritableStream Writable, async

Streams data as the body of an HTTP request. Uses duplex: "half" for browser compatibility.

Also exported as fetchRequestStream.

Example

import { pipeline, createReadableStream } from '@datastream/core'
import { fetchWritableStream } from '@datastream/fetch'
import { csvFormatStream } from '@datastream/csv'

await pipeline([
  createReadableStream(data),
  csvFormatStream({ header: true }),
  await fetchWritableStream({
    url: 'https://api.example.com/upload',
    method: 'PUT',
    headers: { 'Content-Type': 'text/csv' },
  }),
])