Class IterableMapper<Element, NewElement>

Iterates over a source iterable / generator with specified concurrency, calling the mapper on each iterated item, and storing the mapper result in a queue of maxUnread size, before being iterated / read by the caller.

Remarks

Typical Use Case

  • Prefetching items from an async I/O source
  • In the simple sequential (concurrency: 1) case, allows items to be prefetched async, preserving order, while caller processes an item
  • Can allow parallel prefetches for sources that allow for out of order reads (concurrency: 2+)
  • Prevents the producer from racing ahead of the consumer if maxUnread is reached

Error Handling

The mapper should ideally handle all errors internally to enable error handling closest to where they occur. However, if errors do escape the mapper:

When stopOnMapperError is true (default):

  • First error immediately stops processing
  • Error is thrown from the AsyncIterator's next() call

When stopOnMapperError is false:

  • Processing continues despite errors
  • All errors are collected and thrown together
  • Errors are thrown as AggregateError after all items complete

Usage

  • Items are exposed to the mapper via an iterator or async iterator (this includes generator and async generator functions)
  • IMPORTANT: mapper method not be invoked when maxUnread is reached, until items are consumed
  • The iterable will set done when the input has indicated done and all mapper promises have resolved

Example

Typical Processing Loop without IterableMapper

const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
const sink = new SomeSink();
for (const sourceId of sourceIds) {
const item = await source.read(sourceId); // takes 300 ms of I/O wait, no CPU
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU
}

Each iteration takes 820ms total, but we waste time waiting for I/O. We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), without changing the order of reads or writes.

Example

Using IterableMapper as Prefetcher with Blocking Sequential Writes

concurrency: 1 on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged).

const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
// Pre-reads up to 8 items serially and releases in sequential order
const sourcePrefetcher = new IterableMapper(sourceIds,
async (sourceId) => source.read(sourceId),
{ concurrency: 1, maxUnread: 10 }
);
const sink = new SomeSink();
for await (const item of sourcePrefetcher) { // may not block for fast sources
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await sink.write(outputItem); // takes 500 ms of I/O wait, no CPU
}

This reduces iteration time to 520ms by overlapping reads with processing/writing.

Example

Using IterableMapper as Prefetcher with Background Sequential Writes with IterableQueueMapperSimple

concurrency: 1 on the prefetcher preserves the order of the reads. concurrency: 1 on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing.

const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
const sourcePrefetcher = new IterableMapper(sourceIds,
async (sourceId) => source.read(sourceId),
{ concurrency: 1, maxUnread: 10 }
);
const sink = new SomeSink();
const flusher = new IterableQueueMapperSimple(
async (outputItem) => sink.write(outputItem),
{ concurrency: 1 }
);
for await (const item of sourcePrefetcher) { // may not block for fast sources
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await flusher.enqueue(outputItem); // will periodically block for portion of write time
}
// Wait for all writes to complete
await flusher.onIdle();
// Check for errors
if (flusher.errors.length > 0) {
// ...
}

This reduces iteration time to about max((max(readTime, writeTime) - cpuOpTime, cpuOpTime)) by overlapping reads and writes with the CPU processing step. In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms. In cases where the CPU usage time is higher, the impact can be greater.

Example

Using IterableMapper as Prefetcher with Out of Order Reads and Background Out of Order Writes with IterableQueueMapperSimple

For maximum throughput, allow out of order reads and writes with IterableQueueMapper (to iterate results with backpressure when too many unread items) or IterableQueueMapperSimple (to handle errors at end without custom iteration and applying backpressure to block further enqueues when concurrency items are in process):

const source = new SomeSource();
const sourceIds = [1, 2,... 1000];
const sourcePrefetcher = new IterableMapper(sourceIds,
async (sourceId) => source.read(sourceId),
{ concurrency: 10, maxUnread: 20 }
);
const sink = new SomeSink();
const flusher = new IterableQueueMapperSimple(
async (outputItem) => sink.write(outputItem),
{ concurrency: 10 }
);
for await (const item of sourcePrefetcher) { // typically will not block
const outputItem = doSomeOperation(item); // takes 20 ms of CPU
await flusher.enqueue(outputItem); // typically will not block
}
// Wait for all writes to complete
await flusher.onIdle();
// Check for errors
if (flusher.errors.length > 0) {
// ...
}

This reduces iteration time to about 20ms by overlapping reads and writes with the CPU processing step. In this contrived (but common) example we would get a 41x improvement in throughput, removing 97.5% of the time to process each item and fully utilizing the CPU time available in the JS event loop.

Type Parameters

  • Element

  • NewElement

Hierarchy

  • IterableMapper

Implements

  • AsyncIterable<NewElement>

Constructors

  • Create a new IterableMapper

    Type Parameters

    • Element

    • NewElement

    Parameters

    • input: AsyncIterable<Element> | Iterable<Element>

      Iterated over concurrently, or serially, in the mapper function.

    • mapper: Mapper<Element, NewElement>

      Function called for every item in input. Returns a Promise or value.

    • options: IterableMapperOptions = {}

      IterableMapper options

    Returns IterableMapper<Element, NewElement>

    See

    IterableQueueMapper for full class documentation

Properties

_activeRunners: number = 0
_asyncIterator: boolean = false
_currentIndex: number = 0
_errors: Error[] = ...
_initialRunnersCreated: boolean = false
_isIterableDone: boolean = false
_isRejected: boolean = false
_iterator: AsyncIterator<Element, any, undefined> | Iterator<Element, any, undefined>
_mapper: Mapper<Element, NewElement>
_options: Required<IterableMapperOptions>
_resolvingCount: number = 0
_unreadQueue: IterableQueue<NewElementOrError<NewElement>>

Methods

  • Returns AsyncIterator<NewElement, any, undefined>

  • Used by the iterator returned from [Symbol.asyncIterator] Called every time an item is needed

    Returns Promise<IteratorResult<NewElement, any>>

    Iterator result

  • Get the next item from the input iterable.

    Returns Promise<void>

    Remarks

    This is called up to concurrency times in parallel.

    If the read queue is not full, and there are source items to read, each instance of this will keep calling a new instance of itself that detaches and runs asynchronously (keeping the same number of instances running).

    If the read queue + runners = max read queue length then the runner will exit and will be restarted when an item is read from the queue.

  • Throw an exception if the wrapped NewElement is an Error

    Parameters

    • item: NewElementOrError<NewElement>

    Returns NewElement

    Element if no error

Generated using TypeDoc