This is a bunch of ideas of things to add or change.
IN PROGRESS
Stumbling upon vitality-t's iter-ops library, I got curious about performance, and startedworking on some performance improvements for the (very limited) test.
These are the current results (2023-08-22). With itr8 being about 3 times slower than iter-ops for the synchronous tests, but being slightly faster for the asynchronous tests.
Synchronous test for 1e+7 items:
library | duration | length |
---|---|---|
array | 362 | 5000000 |
iter-ops | 224 | 5000000 |
rxjs | 531 | 5000000 |
rxjs + sub | 1183 | 5000000 |
itr8 | 862 | 5000000 |
Asynchronous test for 1e+7 items:
library | duration | length |
---|---|---|
iter-ops | 5383 | 5000000 |
rxjs | 2281 | 5000000 |
rxjs + sub | 5884 | 5000000 |
itr8 | 3741 | 5000000 |
I must point out though: itr8's map and filter functions are more powerful than the ones in iter-ops, because the mapping and filter functions can also be async, which is not the case in most other libraries!
In order to simplify writing code that supports both sync and async inputs, I started out with a utility function called thenable(variable). This makes writing the code easier, but it is not a good fit for iterators, because once we've established (during the first next() call) whether the resulting iterator will be synchronous or asynchronous, we can stick to that conclusion for all the other items of that iterator, allowing for some optimization.
Currently I kind of hand-crafted the optimizations in the filter and map functions (both of which use the also optimized powerMap operator internally). I'm still thinking about a way to do the same without the manual labour behind it.
Anyway, after these optimizations, the results are pretty decent, although not on par with iter-ops yet for purely synchronous code.
** IN PROGRESS ** 2023-08-22: Implemented in some places, but not everywhere and largely untested
The return(value) method indicates that the caller does not intend to make any more next calls. This way the iterator can run any cleanup methods.
The throw(exception) indicates the caller detects an error condition, which also indicates that no more next calls will follow.
My first intuition says that we'd need to improve the powerMap operator, catching any errors in the handler, in order to call throw() on the input iterator, and we also need to add calls to the incomoing iterator's return() when we detect that the output oterator is done (if the output iterator is done, the input iterator can clean up as well).
We'll also need to propagate these calls to the incoming iterator.
If we create a generic test to check if these functions are called, we can add tests for all operators.
It's not clear to me how iterators are supposed to behave on errors. Some informaton about it might be found in some gituhub issues online:
A response to the first issue above:
The most useful thing to look at is for await. The behavior of for await is, if next returns a rejected promise, the loop will immediately stop, and will not call .return on the iterator (as it would if the loop early-exited any other way). In other words, for await treats next returning a rejected promise as indicating that the iterator is done.
Also, when using an async generator, the only way for next to give a rejected promise is if the body of the generator throws an uncaught exception, in which case the generator is finished and any subsequent call to .next will resolve with
{ done: true }
Now, of course, you can implement a different behavior if you want to. But if you want to match the behavior of the things in the language, "if the promise returned by .next() rejects then the iterator is finished" is the principle to follow, and I think that would be a reasonable thing to write down without getting into all of the edge cases around async-from-sync and so on.
I wanted to create a retry() operator, naively assuming that if the next call's promise is rejected we could call next again, but every next call is supposed to return an actual next promise, never the same thing twice. Actually, you can call next() a few times without waiting for the promises to resolve. This makes a generic retry mechanism in the form of a transIterator impossible.
** DONE **
Currently we use module: "CommonJS" in tsconfig.json, but ideally it should be ES2015 or something (an Ecmascript module instead of a CommonJS module), so that the compiled typescript code can be used unmodified both in NodeJS and in the browser without forcing users to have build tools like webpack or browserify in between.
We could release a 'dual' module, by building the CJS files into a cjs subdirectory, and adding
another package.json file in there, in order to make sure .js files under this folder are
interpreted by NodeJS as CommonJS module,instead of Ecmascript Modules. Then, someone who is still
using CommonJS modules could require("itr8/cjs")
, whereas everyone else would simply
import { ... } from "itr8"
.
TODO: either move the gzip-related operators to the "peer" folder (because they are currently NodeJS specific), or use a JS-only implementation of gzip (I remeber finding one). I could also drop support for NodeJS < 18 and older browsers and use the more modern Compression Streams API. So maybe it would always be better to keep it out of the core and put it in 'peer'.
** DONE ** in version 0.4.5
I first implemented a .pipe function on every iterator that the library would return, but that makes the iterators returned by the lib 'special' instead of being simple and plain (sync or async) iterators. This does not make sense if we state that we want to embrace this standard. It is also not necessary, because we only need a generic pipe(...) function instead of an object method. That way, we can pipe everything (not just things related to this library), and it will be easy to replace once the pipe syntax is finally added to javascript (possibly '|>').
So instead of writing.
itr8FromIterator(standardIterator).pipe(...)
it will simply become:
pipe(standardIterator, ...);
// This is fully generic
const plusOne = (x) => x + 1;
const timesThree = (x) => x * 3;
pipe(4, plusOne, timesThree) === timesThree(plusOne(4)); // returns the actual result (15)
and a compose function wil also be added to easily create (without executing it) a new function that is the result of applying the second one to the output of the first one.
const plusOneTimesThree = compose(plusOne, timesThree); // returns a function!
plusOneTimesThree(4) === 15;
The current situation of the proposal about the pipe operator in Javascript, suggests that the same thing would become:
4 |> plusOne(%) |> timesThree(%) === timesThree(plusOne(4));
This is unfortunate, as I was hoping that the proposal would gravitate towards the F# pipe operator that does not use the underscore to indicate where the value should be passed in.
4 |> plusOne |> timesThree === timesThree(plusOne(4));
It will make using itr8 a lot uglier than with the F# pipe operator...
# Hack pipes
itr8Range(0,10) |> map(plusOne)(%) |> map(timesThree)(%) |> filter(x => x > 10)(%);
# F# pipes
itr8Range(0,10) |> map(plusOne) |> map(timesThree) |> filter(x => x > 10);
but if that bothers anyone, they can keep using the current pipe(...) function.
Just an idea: when a user does npm i -g mrft/itr8
he should be able to run itr8 on the command-line.
Then all operators should be available to him, so it can be used as a CLI tool to manipulate data.
By default stdin could be used for input (maybe parsed lineByLine by default?)
Think something like:
FILE="myLargeFile.jsonl.gz"
zcat "$FILE" | itr8 "[ filter(l => l.length), map(l => JSON.parse(l)), map(o => o.key)]"
# or we could make it more 'CLI-like' and allow parameters to be used with the names of the operators
zcat "$FILE" | itr8 --filter "l => l.length" --map "JSON.parse" --map "o => o.key"
# or maybe use a, b, c as default param names and only write the function body for function arguments?
zcat "$FILE" | itr8 --filter "a.length" --map "JSON.parse(a)" --map "a.key"
The way transducers have been implemented in Ramda and a few other libraries has always confused me (as in you really have to dig into it to really understand how they work).
James Long has a great explanation about separation of concerns in his article about Transducers.js. He argues that iterate, transform and build are three separate problems, and that transducers decouple this in such away that the transform can be used on any data structure. To quote him:
These are completely separate concerns, and yet most transformations in JavaScript are tightly coupled with specific data structures. Transducers decouples this and you can apply all the available transformations on any data structure.
Something he also mentions in the final notes of the post that is introducing his transducers.js library
Lazy sequences are something I think should be added to transducers.js in the future. (edit: well, this paragraph isn't exactly true, but we'll have to explain laziness more in the future)
I think that itr8 also decouples this, but in a different way: with transducers you'd need to implement another protocol (the most important method being 'step')
The transformer protocol requires you to add these methods on any data structure that wants to play along, or to define a transformation:
{
'@@transducer/init': () => 'some inital value';
'@@transducer/result': (r) => r;
'@@transducer/step': (acc, cur) => ...;
}
In itr8, we assume that anything can be made iterable (even push-streams, by buffering). The thing about push-streams is: if a stream is push-based and the receiving end can't handle the speed, you'd get in trouble eventually, so we can safely assume that any push-based stream, can be buffered and pulled from, because the puller will be fast enough, and the buffer will always be near-empty anyway.
With this assumption in mind, we actually don't decouple iteration from transformation, but we do decouple iteration from 'build'. First of all: maybe you don't want to 'build something' in the end, but simply perform actions based on each item, and secondly: it feels like everything can easily be made iterable, and I think it's equally easy to build something from an iterator in most cases. Besides: with transducers it also looks as if you somehow have to make it compatible with some kind of spec to make it iterable, so we might as well agree that that protocol will always be the iterator protocol. The 'step' function feels very much like the 'next' function of an iterator anyway.
So for me personally, I feel like there is less new things to learn - given you know how the iterator protocol works (it's simple, well-supported and widely used, so we might as well embrace it).
So instead of trying to be entirely agnostic about the source, I think we end up with something that's even easier to understand (maybe that's just me of course) because we assume the source to always be the same thing, an iterator. It gives us one less degree of freedom, which makes for one less thing to explain or worry about in my opinion.
Another post trying to explain transducers: https://medium.com/@shaadydawood/transducers-functional-transducers-async-transducers-e0ec65964fc2 Check out: https://www.npmjs.com/package/functional-pipelines
It looks like itr8 has chosen a different path, by composing iterators instead of operators.
Could we think of another protocol that allows us to compose 'operators' that looks more like the iterator protocol? I mean: if the signature would be
(inValue: [{ done: boolean; value?: any }, state]) => [
{ done: boolean, value: any },
state,
];
they could very easily be composed, but unfortunately our output format is more complex as in: we also allow 'iterable'. Of course we could change that to disallow iterable, and put that responsibility in the hands of the developer => he should keep the iterator in the state and return all values as long as there are any? It could be done, and if we do that, we'd have operators that are as easy to compose as the ones from the transducer protocol, and some people might find that easier to understand than the way transducers are implemented, because both done (or 'reduced' in transducer terms) and the value are returned. There's still a problem with 'state' being inside the inValue and outValue, because it belongs to a specific operator, so state should be kept 'locally' somehow in that case (this.state).
If we compose the transformations rather than the iterators, we might be able to gain some performance, but I find writing transducers cumbersome, because you have to think about 'writing a function that gets another function as input' and how to combine them. When writing the nextFn for the itr8OperatorFactory, we don't care about how they will be composed as that will be done for us, so we only have to think about what an input element produces on the output side, which is quite easy most of the time. Adding that extra complexity of havng to call "the other function" somewhere adds a mental burden that I find too high, which might be part of the reason transducers haven't really been embraced by the masses.
Why would I try composing the transformers then instead of the iterators?
For performance reasons: once 1 element in the chain is async, every iterator that comes behind it will necessarily become async, causing for a lot of functions being put onto the event loop (This also means that each 'transIterator' is running a lot of code to check whether the input iterator is sync or async). I have actually proven (in the 'transduce' operator tests) that transducer based version of the same operations (filter, map, repeat, ...) was quite a lot faster (probably because of the single intermediate iterator, and probably also because all the transducers are synchronous, so there are way less isPromise checks, but maybe in general, because the transducers call the next transducer there are less intermediate allocations of new data structures?) If we combine the transformations into 1 single method, we'd end up with a single 'intermediate' iterator that executes a single function - in case everything is synchronous - for every element in the stream.
So I have been trying to figure out a way to also make our 'transformers' composable, without changing how they work. So instead of relying on a theoretical model, I would provide helper functions to make composing possible, while maintaining the easy-to-use interface. So while (or maybe just because) theoretically less advanced than transducers, we might end up with something that is easier to use, which in my view is really important. (if we find a way to compose them, we might also find a way to turn them into actual transformers for a transducer as well, in which case we might also have helped the transducer-loving world forward)
// Transformer spec defines these methods on an object
// init:
// step:
// result:
// reduced:
// value:
// processing fn:
step(prevOut, curIn) => newOut
// map:
nextFn(nextIn, state) => {
return { done: false, value: state.mapFn(nextIn.value) }
}
// filter
nextFn(nextIn, state) => {
return state.filterFn(nextIn.value) ? nextIn : { done: false }
}
// nextFn returns a nextIn and to compose we'd need a function
// that takes a nextFn and produces another nextFn
// composing a map, then filter would be written manually as
const result1 = /* await is some cases */ nextFnMap(nextIn, stateOfMap)
return nextFnFilter(result1, stateOfFilter)
itr8OperDefFactory(nextFn, initFn) => // we could produce a 'stateful' nextFn()?
// or a nextFn, that already has state applied? and thus only takes 1 argument
// which is an input next?
// and if input and output would be compatible,
// this would create functions that can be composed/piped
// unfortunately they are not 100% compatible right currently because the output
// 1. can contain iterable instead of value and 2. can be a promise
// which means that we'd need tooling to link them together
// 1. could be replaced by always returning an iterator or by adding a boolean
// to the state (which we 'internalized') informing the 'engine' whether
// a new next is needed already
// so if we want to 'compose' the nextFns (turning the 'pull' into a 'push' to the next nextFn)
// so that we can create a single transIterator from multiple 'transformers' combined,
// we'd need a composer function to do that for us so it can interpret for example the iterable field
// and as a result call the next one multiple times?
// all this should produce another nextFn that is the combination of all the others
// so then we could have a method called transIt(nextFn, nextFn, nextFn) that turns that list into
// a single transIterator, instead of a chain of many
// OH AND I GUESS MAYBE WE CAN USE SOMETHING LIKE MONADS TO TURN NEXTFN INTO A FUNCTION THAT CAN BE COMPOSED???
// (instead of writing a manual function to do it?) state will always be a problem when it's an argument I guess
(...args) => { // each arg is a 'transform' function (nextFn)
// return another nextFn function that is the combination of all the arguments
return {
nextFn: (nextIn, state) => {
let curPrevOut = nextIn; // a 'normal' IteratorResult is compatible with a nextOut value
let result;
for (const aFn of args) {
if (curPrevOut.iterable) {
result = { done: false, iterable: [] };
let count = 0;
for (const c of curPrevOut.iterable) {
const r = aFn(c, state[aFn.id]);
if (r.done) break;
result.iterable.push(r.value); // iterable should be created with a generator function
count += 1;
}
if ( count <= 0 ) {
result = { done: true, iterable: [] };
}
} else {
result = aFn(curPrevOut, state[aFn.id]) // state thing is pseudo code to get the idea across
}
if (result.done) {
return result;
}
curPrevOut = result;
}
return result;
},
initStateFn: () => {} // combine all the init-states of all the args?
}
}
/**
* the "monad-inspired" version would have a "bind" function to turn
* ```(nextIn:IteratorResult, state) => nextFnResult```
* into an ```(nextFnResult) => nextFnResult``` version
* state from the output is kept for next time
* (the 'state' will be 'against pure functional programming', but I see no way around it
* in order to create some truly useful operations)
*
* The 'unity' function and the 'lift' function - lift(f, x) = unit(f(x)) - to
* the "lift" function should make sure the right part of the input is handed over
* to the original function.
* So in short (ignoring the iterable property) unit should be like IteratorResult => NextFnOutputResult
*/
const unity = (nextIn) => {
let state;
let done = false;
return (nextFnFormattedNextIn) => {
if (nextFnFormattedNextIn.iterable) {
// call nextIn on every element from the iterable, and return
// a response also containing an iterable with all the results
// from calling
// watch out: when it returns done: true on one, we'd need to keep some state
// so we can tell we are 'done' the next time
} else {
return nextIn(nextFnFormattedNextIn /* without the state */, state)
}
}
}
Some of our operators (for example 'map') allow async methods to be run, so they can be used for things that are prone to failure (I am not considering the synchronous case because that can be controlled entirely by the user).
The question is: if we know that things can potentially fail, are we going to add a specific protocol to handle these failures? Right now: if something fails that means that the next call will reject its promise, and the entire processing chain will break.
We could say: it's up to the user to make sure that his function always resolves, and so it's up to him to invent a data format that can express failures, so that they can be handled further down the line.
But I recently viewed this youtube video about the Saga pattern which essentially means: on failure take compensating measures for every action that already happened (kind of like 'rollback' if possible, but in some cases - like sending an email - things cannot be undone and you have to send another message explaining that the previous email should be ignored).
It could be that we can agree on a way to enforce (or at least support) people to implement this kind of pattern, which also helps in being aware of whatever can go wrong, in order to build more robust systems.
Piping should have better typing (like RxJS does it?) to make sure you get hints if you are trying to pipe functions together whose output and input types do not match.
for typical cases like file input, db paga-per-page processing, ...
how about an 'itr8ToMultiIterable' to handle the case where you want to 'split' the stream into multiple streams for separate further processing?
Think about how to make it easy to use operator parameters that are iterators themselves.
I created 2 helper functions called thenable and forLoop for this. Thenable will make any value thenable, to make sure we can use the same code regardless whether the input is a promise or not, and guaranteeing that the handling will also be synchronous if the input is not a promise. forLoop is like a for loop that will be synchronous if the input is synchronous and synchronous otherwise.
Currently I don't see a lot of performance benefits of the batch support, so it could be that we might as well remove the support for that, because it complicates building new operators.
1 thing is important: 'itr8batch' should not be a property (literally a JS property now) of the iterator, nor should it make the itr8OperatorFactory more complex (as it currently does). That code should be removed ASAP. If we would still want to support it, it should be done as an operator that has a transIterator as its argument (or maybe support multiple arguments in order to avoid needing another compose)
Example:
// Instead of
myIt.pipe(
asBatch(),
someOp(),
someOtherOp(),
asNoBatch(),
)
// it would become something like below (so the asBatch operator would make sure all its
// transIt arguments would be applied to each array element separately)
myIt.pipe(
asBatch(,
someOp(),
someOtherOp(),
),
)
// or if the batch operator would only support a single argument it would become a bit less
// elegant as we'd need 'itr8Pipe' to compose the transIterators.
myIt.pipe(
asBatch(,
itr8Pipe(
someOp(),
someOtherOp(),
),
),
)
Other questions about how the batch things should work:
Improve batch support: current implementation will grow and shrink batch size depending on the operation (filter could shrink batches significantly for example, but batches with only a few elements don't have a very big advantage performance wise). Of course you could always unBatch |> batch(size)
to force a new batch size, but it could be more efficient if the itr8OperatorFactory handles the batch size and keeps it constant throughtout the chain???
Generated using TypeDoc