"because programming should not be about solving the same problems over and over again"
Installation
# install from npmjs.com
npm install itr8
# install directly from github
npm install mrft/itr8
Usage in NodeJS or with bundlers
// in an EcmaScript module - ESM
import { pipe, itr8Range, forEach, filter, map } from "itr8";
// in a CommonJS module - CJS
const { pipe, itr8Range, forEach, filter, map } = require("itr8/cjs");
Usage in the browser
<script type="module">
import {
pipe,
itr8Range,
forEach,
filter,
map,
} from "https://cdn.jsdelivr.net/npm/itr8@latest";
// or "https://cdn.skypack.dev/itr8@latest" or "https://unpkg.com/itr8@latest"
// or use the minified bundle: "https://<some-cdn>/itr8@latest/itr8.min.js"
import {
pipe,
itr8Range,
forEach,
filter,
map,
} from "https://cdn.jsdelivr.net/npm/itr8@latest/itr8.min.js";
</script>
itr8 source code can be found on github
full itr8 documentation can be found at the itr8 github site.
Table of Contents
import {
// interface
itr8FromIterable,
itr8Range,
itr8ToArray,
forEach,
// operators
map,
filter,
skip,
take,
lineByLine,
// utils
pipe,
compose,
} from "itr8";
const inputArray = [
1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20,
];
const resultArray = pipe(
itr8FromIterable(inputArray), // the input iterator (= inputArray[Symbol.iterator]())
map((x) => x * 2), // => 2 4 6 8 10 12 14 16 ... [34 36 38 ...]
filter((x) => x % 3 === 0), // => 6 12 18 24 30 [36 42 ...]
skip(2), // => 18 24 30 [36 42 ...]
take(3), // => 18 24 30
itr8ToArray,
);
// 1) you should be aware that the values between square brackets will never actually be calculated
// because only as many elements as needed will be pulled from the iterator!
// (If you'd have written the same with array.filter, array.map etc. you'd have created
// many 'intermediate arrays' - using more memory - and the calculations would have been
// executed needlessly for all elements in the array)
// 2) you should also be aware that if 'itr8ToArray' was not there, nothing would have happened!
// only when you start doing next() calls on the iterator something would actually be executed
// 3) due to the current lack of a pipe operator in JS we are using a pipe function so we can
// read from left to right (or top to bottom), instead of right to left when applying functions
// to the rsult of the previous function.
// 4) this pipe function can be used for any function, as long as they expect one argument each,
// and as long as the output of the current matches the expected input of the next
// Now assume you need to apply the same algorithm to an asynchronous source (like a file or an api).
// In plain javascript you'd have to eiter load the entire file into an array (which could be
// problematic for large files), or rewrite your algorithm (which would typically have used
// array.map, array.filter) with other tools that help you with asynchronous code.
// But with itr8 that would simply become (assuming one number per line).
const resultArray2 = await pipe(
itr8FromIterable(createReadStream("test.js", "utf8")), // readable streams are async iterables!
lineByLine(), // interpret each line in the file as a single element
map(Number.parseFloat), // turn the text into a number
map((x) => x * 2), // => 2 4 6 8 10 12 14 16 ... [34 36 38 ...]
filter((x) => x % 3 === 0), // => 6 12 18 24 30 [36 42 ...]
skip(2), // => 18 24 30 [36 42 ...]
take(3), // => 18 24 30
itr8ToArray,
);
// as you can see the actual algorithm did not change at all, we just had to 'await' the result
// because we are now using an asynchronous iterator as input!!!
// But we'd want to be able to reuse that algorithm, so it would be nice if we can store it as a
// function and apply it when needed.
// This is where the 'compose' function comes in handy.
// It will help you to easily create new 'operators' by combining existing operators.
const myOperator = () =>
compose(
map((x) => x * 2), // => 2 4 6 8 10 12 14 16 ... [34 36 38 ...]
filter((x) => x % 3 === 0), // => 6 12 18 24 30 [36 42 ...]
skip(2), // => 18 24 30 [36 42 ...]
take(3), // => 18 24 30
);
// 1) this is essentially the same as the totally unreadable
// (iterator) => take(3)(skip(2)(filter((x) => x % 3 === 0)(map((x) => x * 2)(iterator))))
// 2) the compose function is also generic, so it will work for any functions expecting a single
// parameter
// And with that, we can now simply do
const resultArray = pipe(
itr8FromIterable(inputArray), // the input iterator
myOperator(),
itr8ToArray,
);
// if we need to execute some code for every element of the resulting iterator, use forEach
// (which support both synchronous and asynchronous handlers, and allows you to easily set the
// allowed concurrency as well!)
pipe(
itr8FromIterable(inputArray), // the input iterator
myOperator(),
forEach(
async (e) => {
const descr = await getElementFromDisk(id);
console.log("element = ", descr);
},
{ concurrency: 1 }, // these options are optional (default concurrency is 1)
),
);
// So now that we know that we can easily apply any operator to any iterator
// You just need to be aware of the various ways to create an iterator
// 1) create an iterator to start from with a utility function
const myIterator = () => itr8Range(0, 10_000_000);
// 2) OR create your own Iterator or AsyncIterator (for example with a generator function)
function* myGeneratorFunction() {
for (let i = 0; i < 10_000_000; i++) {
yield i;
}
}
// 3) 'itr8FromIterable' is an easy way to get an iterator from many built-in data structures
const myOwnIterator = () => itr8FromIterable(myGeneratorFunction());
// we can use standard JS 'for ... of' to loop over an iterable, which works because every operator
// returns an IterableIterator (an iterator that returns itself when being asked to return an iterator)
// (but forEach is more powerful !)
// This means that it will actually execute code and start 'draining' the iterator
for (let x of pipe(
itr8Range(1, 1000),
filter((x) => x % 3 === 0),
)) {
console.log(x);
}
You, can find some more documentation further in this file or go straight to the github site about itr8
You can see more working examples in the future in this replit playground
An experiment to create a unified interface over both synchronous and asynchronous iterators such that the same iterator-operators (cfr. RxJS operators like filter, map, ...) can be used in various contexts (plain arrays, NodeJS streams, Observables, page-by-page database queries by writing an async generator function, page-by-age API queries, streams of events, ...).
This makes the code much more declarative (describing what to do rather than how to do it).
While working on this library, it became clear to me that many, many problems can simply be seen as transforming one set of data into another set (which might be longer or shorter).
The library can also be used as a base for CSP (Communicating Simple Processes). By sharing the itr8Pushable iterator between 2 processes, one process could use it to push information onto the channel, and the other one can use the (async) next() call to pull the next message from the channel. I have added an example about CSP below.
So unlike OO, where a new interface has to be invented for every new problem encountered, we basically agree on a simple protocol on how data will be delivered to and returned by all processing units, so we can focus on functionality rather than the interface.
DISCLAIMER: This is work in progress (including the docs), and although a fair amount of functionality seems to work, things might still change along the way... It is mainly tested on NodeJS 16 currently, but should also work in the browser.
If you ever found yourself in one of these situations, this library might be useful for you:
array.filter(...).map(...)
you had to add a bunch of code to make it work like you wanted (maybe using sindresorhus' promise-fun here and there to keep things under control). As a result, your code suddenly becomes hard to read, even if the problem you needed to solve was actually quite simple.So, while all these libraries have their merit, none of them covered my needs well enough, so at a certain point things became clear enough in my head to write my own library.
To quickly show that async iterators can easily be used for CSP in javascript, and that this combines nicely with itr8, I have rewritten an example I found in the README of jfet97/csp library using itr8. This way we don't need the while loop (forEach will take care of that), we don't modify the message but create a new one to send to the other side. itr8Pushable is used as the 'channel', and map, tap and delay transIterators implement the same functionality as is in the other example. Also, there are no awaits in the code, because all the async plumbing is handled by the itr8 library so you can focus on the functionality.
type TBall = { hits: number; status: string };
const wiff = itr8Pushable<TBall>();
const waff = itr8Pushable<TBall>();
const createBall = (): TBall => ({ hits: 0, status: "" });
const createBat = async (
inbound: AsyncIterableIterator<TBall> & TPushable,
outbound: AsyncIterableIterator<TBall> & TPushable,
) => {
pipe(
inbound,
map(({ hits, status }) => ({
hits: hits + 1,
status: status === "wiff!" ? "waff!" : "wiff!",
})),
tap(({ hits, status }) => {
console.log(`🎾 Ball hit ${hits} time(s), ${status}`);
}),
delay(500), // assume it's going to take a bit to hit the ball
forEach((b) => outbound.push(b)), // smash the ball back
);
};
createBat(waff, wiff); // create a bat that will wiff waffs
createBat(wiff, waff); // create a bat that will waff wiffs
waff.push(createBall());
The ROADMAP.md contains various ideas about possible additions to the library and how this library could evolve in the future. Roadmap
Check https://mrft.github.io/itr8/modules.html to find developer documentation about all operators, interface functions and utility functions.
It is simply a function with an iterator as single argument which will return another iterator. So it transforms iterators, which is why I have called it transIterator (~transducers). The cool thing about it is that you can easily chain them together, by using the output of the first one as input of the next one and so on. You could compare this to piping in linux, where each tool typically reads from stdin, and outputs to stdout, and the pipe symbol makes sure that the output of the first program is used as input of the next one.
An operator is 'a function that generates a transIterator'. So for example filter(...) is an operator, because when called with an argument (the filter function) the result of that will be another function which is the transIterator.
There are multiple options for writing your own operators. You can either build a new operator by chaining a bunch of existing operators together, or you can write your own (ideally with the powerMap operator).
Let's use the same example as is used in the RxJS tutorial: a filterNil operator.
It can be created with the filter operator, like this:
const filterNil = () => filter((x) => x !== undefined && x !== null);
Another example: a 'regroup' operator can be created by combining flatten and groupPer. This is where the compose(...)
method will come in handy.
So to turn [ [ 1, 2 ], [ 3, 4 ], [ 5, 6 ] ] into [ [ 1, 2, 3 ], [ 4, 5, 6 ] ]
You'll want the regroup(3) operator (3 being the new 'rowSize').
const regroup = (rowSize: number) => compose(flatten(), groupPer(rowSize));
We'll explain to you how to use the powerMap, which makes it easy to write operators whose output transIterators correctly handle both synchronous and asynchronous iterators.
When writing operators, we also have the thenable and forLoop utility functions, which are also meant to be able to write the same code, regardless whether the input is a normal value or a promise.
Let's show you the code right away:
const filterNil = () =>
powerMap<any, any>(
(nextIn, state) => {
if (nextIn.done) {
return { done: true };
} else if (nextIn.value === undefined || nextIn.value === null) {
// nill so it's not done, but don't return a value
return { done: false };
} else {
// not nill, so it's not done, and return the value
return { done: false, value: nextIn.value };
}
},
() => null, // no state needed
);
Now what is nextIn, and state?
{ done: <true or false>, value: <current value> }
)What does that function return?
It returns an object that looks a lot like ...drum roll... the result of a next call of an iterator (but it's not entirely the same!). Make sure you've read and understood the iterator protocol!
{ done:true }
{ done: false }
, without a value or { done: false, state: <newState\> }
if you need to update the state{ done: false, value: <output value\> }
(again with an optional state property if you need to pass state to the next step){ done: false, iterable: <your iterable\> }
. Any Iterable (or IterableIterator) will do. (That means in practice that you can use a simple array, but a generator function will work as well, or some iterator that is the result of some input with a few itr8 operators applied to it).Knowing all this we can break down the example:
Let's write an operator that repeats each value from the input iterator n times on the output iterator:
const opr8RepeatEach = <TIn>(count: number) =>
powerMap<TIn, TIn>(
(nextIn, _state) => {
if (nextIn.done) {
return { done: true };
}
return {
done: false,
iterable: (function* () {
for (let i = 0; i < count; i++) {
yield nextIn.value;
}
})(),
};
},
() => undefined,
);
As you can see, we use the 'iterable' property here, and in order to easily generate an IterableIterator, we use an 'immediately invoked function expression'. This is important since a generator function generates an IterableIterator, so it should be called!
But you could also assign an array, because that is also an iterable. But beware that creating an intermediate array will use more memory (especially if the count is high)! I'll show you the same example with an array here:
const opr8RepeatEach = <TIn>(count: number) =>
powerMap<TIn, TIn>(
(nextIn, _state) => {
if (nextIn.done) {
return { done: true };
}
return {
done: false,
iterable: Array.from(Array(count)).map((x) => nextIn.value),
};
},
() => undefined,
);
What if we have an iterator that holds numbers and we want to calculate the total sum? This can only be done by holding on to the 'sum so far', because with each new element we need to add the current value to the value we already have calculated.
const total = () =>
powerMap<number, number, { done: boolean; total: number }>(
(nextIn, state) => {
if (state.done) {
return { done: true };
} else if (nextIn.done) {
return {
done: false,
value: state.total,
state: { ...state, done: true },
};
}
return {
done: false,
state: { ...state, total: state.total + nextIn.value },
};
},
() => ({ done: false, total: 0 }),
);
Here you can see that we also specified the second argument of the powerMap function, which is the initialState (not done, and the total so far = 0).
So this iterator will only return a value on the output iterator once the input iterator has finished. Hence the 'done' flag on the state to indicate that we've seen the last element of the input iterator.
{ done: true }
.If you have read the examples above, and you still don't see how to write your operator, it could be that it cannot be written with the powerMap operator. (But to be honest, at first I thought I had to write debounce and throttle by combining forEach with an itr8Pushable, but later I realized that was not necessary, and I made a proper version with powerMap operator which was passive again).
But if you are convinced that is the case, I advise you to look at the source code of 'prefetch' or 'mostRecent'. Prefetch and mostRecent are actually returning a custom built iterator.
As long as your operator returns a function transforming the input iterator into another iterator, you're good (and to be itr8-approved: always support both sync and async iterators as input, and if possible, make sure that if the input iterator is synchronous, the output iterator is synchronous as well).
The function given to the powerMap operator can also be ASYNC (useful if you can only know the new state after an async operation).
The function given to the powerMap operator can also return an ASYNC iterator (useful if you can only know each new elements after another async operation).
This section contains background info and random thoughts on how I ended up writing this library, so you can skip this...
The idea for this came from my search for a unified way to handle gets from an API, data from files, data in arrays, and I got frustrated that I needed to use HighlandJS for the file thing, but that I couldn't easily use that same library for handling stuff I got from an API in NodeJS as well as in the browser.
Now all anyone has to do is write a generator function for his use case, and they are good to go.
It took me a while and a lot of reading to have a clear picture in my head of how I could unify everything I found online in one single solution.
Some things that I read that helped me to get a better understanding:
Quote: "So, a transducer is a function that transforms a reducer into another reducer, opening the doors of composition."
And composition is what we all want, but there doesn't seem to be a single way to do things:
There does not seem to be an easy way to treat all these as essentialy the same. It also makes sense to use things that are part of the "standard" as much as possible, as it will produce less friction using it in multiple environments.
So here is another version of the schema on rxjs.dev that tries to explain what RxJS Observables are:
Single | Multiple | |
---|---|---|
Pull | Function | Iterator |
Push | Callback function |
And the more I think about it, the lesser this schema makes sense to me at all, so maybe the schema should be something 3-dimensional and not 2-dimensional like:
My conclusion was: you could build something similar based on Iterators, and then you could create operators that work both on sync or async iterators, which solves all these problems with a single solution. In my view, if we create a set of functions that take some arguments as input, and that produces functions transforming an existing iterator into a new iterator, we have all we need. And since it is pull-based it doesn't matter if the producer is faster, but it also means we can handle any situation that RxJS can handle, because it means we can definitely handle all 'reactive' cases where the producer is slower than the consumer.
But because not all iterators can produce the data synchronously, we will need to make sure all of them can handle both synchronous and asynchronous iterators (where a 'next()' call will return a Promise). This will allow us to support streams, as well as Observables (+ all the basics like arrays). Actually NodeJs streams are already AsyncIterables right now, so we don't even need a helper funtion to use them as a source!
So with a few utility functions that would make an iterator from its input (built-in iterables like array, string, stream, the arguments object, or other things that could easily be iterated over like Observables, ...). Also check Built-In Iterables on MDN
itr8FromIterable(myArray);
itr8FromIterable(myString);
// streams are AsyncIterables, way easier than stream.on('data', ...)!!!
itr8FromIterable(someStream);
// or if you're into Observables
itr8FromObservable(someObservable);
We could then write a bunch of 'operators' which are simply functions taking an iterator (+ some arguments) as input and returning another iterator as output.
function(...argumants) {
return (itr:Iterator) => Iterator
}
In this section I will list various libraries, projects and articles that I found mostly after having written this library, which are exploring or building upon similar ideas.
Synchronous test for 1e+7 items:
┌────────────┬──────────┬─────────┐
│ (index) │ duration │ length │
├────────────┼──────────┼─────────┤
│ iter-ops │ 292 │ 5000000 │
│ rxjs │ 339 │ 5000000 │
│ rxjs + sub │ 1550 │ 5000000 │
│ itr8 │ 2000 │ 5000000 │
└────────────┴──────────┴─────────┘
Asynchronous test for 1e+7 items:
┌────────────┬──────────┬─────────┐
│ (index) │ duration │ length │
├────────────┼──────────┼─────────┤
│ iter-ops │ 7403 │ 5000000 │
│ rxjs │ 3248 │ 5000000 │
│ rxjs + sub │ 7788 │ 5000000 │
│ itr8 │ 6860 │ 5000000 │
└────────────┴──────────┴─────────┘
Generated using TypeDoc