Create a new IterableMapper
Iterated over concurrently, or serially, in the mapper function.
Function called for every item in input. Returns a Promise or value.
IterableMapper options
IterableQueueMapper for full class documentation
Private _activePrivate _asyncPrivate _currentPrivate Readonly _errorsPrivate _initialPrivate _isPrivate _isPrivate _iteratorPrivate _mapperPrivate _optionsPrivate _resolvingPrivate _unreadPrivate arePrivate bubblePrivate sourceGet the next item from the input iterable.
This is called up to concurrency times in parallel.
If the read queue is not full, and there are source items to read, each instance of this will keep calling a new instance of itself that detaches and runs asynchronously (keeping the same number of instances running).
If the read queue + runners = max read queue length then the runner will exit and will be restarted when an item is read from the queue.
Private startARunnerPrivate startPrivate throwGenerated using TypeDoc
Iterates over a source iterable / generator with specified
concurrency, calling themapperon each iterated item, and storing themapperresult in a queue ofmaxUnreadsize, before being iterated / read by the caller.Remarks
Typical Use Case
concurrency: 1) case, allows items to be prefetched async, preserving order, while caller processes an itemconcurrency: 2+)maxUnreadis reachedError Handling
The mapper should ideally handle all errors internally to enable error handling closest to where they occur. However, if errors do escape the mapper:
When
stopOnMapperErroris true (default):AsyncIterator's next() callWhen
stopOnMapperErroris false:AggregateErrorafter all items completeUsage
mappervia an iterator or async iterator (this includes generator and async generator functions)mappermethod not be invoked whenmaxUnreadis reached, until items are consumeddonewhen theinputhas indicateddoneand allmapperpromises have resolvedExample
Typical Processing Loop without
IterableMapperEach iteration takes 820ms total, but we waste time waiting for I/O. We could prefetch the next read (300ms) while processing (20ms) and writing (500ms), without changing the order of reads or writes.
Example
Using
IterableMapperas Prefetcher with Blocking Sequential Writesconcurrency: 1on the prefetcher preserves the order of the reads and and writes are sequential and blocking (unchanged).This reduces iteration time to 520ms by overlapping reads with processing/writing.
Example
Using
IterableMapperas Prefetcher with Background Sequential Writes withIterableQueueMapperSimpleconcurrency: 1on the prefetcher preserves the order of the reads.concurrency: 1on the flusher preserves the order of the writes, but allows the loop to iterate while last write is completing.This reduces iteration time to about
max((max(readTime, writeTime) - cpuOpTime, cpuOpTime))by overlapping reads and writes with the CPU processing step. In this contrived example, the loop time is reduced to 500ms - 20ms = 480ms. In cases where the CPU usage time is higher, the impact can be greater.Example
Using
IterableMapperas Prefetcher with Out of Order Reads and Background Out of Order Writes withIterableQueueMapperSimpleFor maximum throughput, allow out of order reads and writes with
IterableQueueMapper(to iterate results with backpressure when too many unread items) orIterableQueueMapperSimple(to handle errors at end without custom iteration and applying backpressure to block further enqueues whenconcurrencyitems are in process):This reduces iteration time to about 20ms by overlapping reads and writes with the CPU processing step. In this contrived (but common) example we would get a 41x improvement in throughput, removing 97.5% of the time to process each item and fully utilizing the CPU time available in the JS event loop.