Create a new IterableMapper
Iterated over concurrently, or serially, in the mapper
function.
Function called for every item in input
. Returns a Promise
or value.
IterableMapper options
IterableQueueMapper for full class documentation
Private
_activePrivate
_asyncPrivate
_currentPrivate
Readonly
_errorsPrivate
_initialPrivate
_isPrivate
_isPrivate
_iteratorPrivate
_mapperPrivate
_optionsPrivate
_resolvingPrivate
_unreadPrivate
arePrivate
bubblePrivate
sourceGet the next item from the input
iterable.
This is called up to concurrency
times in parallel.
If the read queue is not full, and there are source items to read, each instance of this will keep calling a new instance of itself that detaches and runs asynchronously (keeping the same number of instances running).
If the read queue + runners = max read queue length then the runner will exit and will be restarted when an item is read from the queue.
Private
startARunnerPrivate
startPrivate
throwGenerated using TypeDoc
Iterates over a source iterable / generator with specified
concurrency
, calling themapper
on each iterated item, and storing themapper
result in a queue ofmaxUnread
size, before being iterated / read by the caller.Remarks
Typical Use Case
concurrency: 1
) case, allows items to be prefetched async, preserving order, while caller processes an itemconcurrency: 2+
)maxUnread
is reachedError Handling
The mapper should ideally handle all errors internally to enable error handling closest to where they occur. However, if errors do escape the mapper:
When
stopOnMapperError
is true (default):AsyncIterator
's next() callWhen
stopOnMapperError
is false:AggregateError
after all items completeUsage
mapper
via an iterator or async iterator (this includes generator and async generator functions)mapper
method not be invoked whenmaxUnread
is reached, until items are consumeddone
when theinput
has indicateddone
and allmapper
promises have resolvedExample
Typical Processing Loop without
IterableMapper
Each iteration takes 820ms total, but we waste time waiting for I/O. We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), without changing the order of reads or writes.
Example
Using
IterableMapper
as Prefetcher with Blocking Sequential Writesconcurrency: 1
on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged).This reduces iteration time to 520ms by overlapping reads with processing/writing.
Example
Using
IterableMapper
as Prefetcher with Background Sequential Writes withIterableQueueMapperSimple
concurrency: 1
on the prefetcher preserves the order of the reads.concurrency: 1
on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing.This reduces iteration time to about
max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))
by overlapping reads and writes with the CPU processing step. In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms. In cases where the CPU usage time is higher, the impact can be greater.Example
Using
IterableMapper
as Prefetcher with Out of Order Reads and Background Out of Order Writes withIterableQueueMapperSimple
For maximum throughput, allow out of order reads and writes with
IterableQueueMapper
(to iterate results with backpressure when too many unread items) orIterableQueueMapperSimple
(to handle errors at end without custom iteration and applying backpressure to block further enqueues whenconcurrency
items are in process):This reduces iteration time to about 20ms by overlapping reads and writes with the CPU processing step. In this contrived (but common) example we would get a 41x improvement in throughput, removing 97.5% of the time to process each item and fully utilizing the CPU time available in the JS event loop.