pvc is a lightweight (no dependencies!) collection of utilities for Node.js streams in object mode. It wraps vanilla Readable streams, and also arrays, allowing you to manipulate them functionally.
TALK ABOUT FUNCTIONAL PARADIGM FOR STREAMS. TALK ABOUT BACK-PRESSURE. TALK ABOUT PASSING EXCEPTIONS.
pvc provides several methods for functional transformation of streams.
Install with npm install pvc.
var pvc = require('pvc');
// You can easily make an array into a stream
pvc.source([1, 2, 3, 4, 5, 6, 7, 8, 9])
.map( function (n) { return n + 1; } )
// [2, 3, 4, 5, 6, 7, 8, 9, 10]
.filter( function (n) { return n % 2 == 0; } )
// [2, 4, 6, 8, 10]
.mapAsync( function (n, callback) {
setTimeout( function () {
callback(null, n + 10);
}, 100)
})
// [12, 14, 16, 18, 20]
.filterAsync( function (n, callback) {
setTimeout( function () {
callback(null, ( n % 3 == 0 ) );
}, 100)
})
// [12, 18]
.toArray( function (exceptions, results) {
console.error('Exceptions:', exceptions);
// 'Exceptions: null'
console.log('Results:', results);
// 'Results: [12, 18]'
});Create a pvc Readable stream, from the argument if supplied.
If there is no argument given, create a PassThrough source:
s = pvc.source();
s.write(1);
s.write('a');
s.read(); // 1
s.read(); // 'a'
s.read(); // nullIf the argument is an array, create a readable stream from the array:
s = pvc.source([1, 'a', '1']);
s.read(); // 1
s.read(); // 'a'
s.read(); // '1'
s.read(); // nullIf the argument is a node Readable stream, augment it with the pvc functional methods.
// TODO: ExampleMerge takes multiple streams, and merges them into a single stream. All outputs objects of the input streams will be outputted by the merged stream, but there is no guarantee of the order.
// TODO: ExampleZip takes a map of streams, zipping them into a stream that outputs objects containing the values of the stream outputs.
// TODO: ExampleSinks pull results from the pipe as fast as they can, accumulating them in some way. They are all passed a callback which is called when the pipe is finished.
Collect the output of the pipe into an array. toArray is called when the
pipe is finished with a callback which receives an exceptions array (null if
no exceptions occured) and a results array.
pvc.source(1, 2, 3)
.map(function (x) {
if (x == 2)
throw new Error('BAD');
else
return x * 2
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [2, 4]'
});Reduce the output of the pipe according to the passed reduction function. An
optional initialValue argument is the initial value used for reduction;
if it is not passed (or passed as null or undefined), the first value in
the pipe is used as the initial value.
pvc.source([1, 2, 3, 4])
.reduce(function (x, y) {
if (x == 2)
throw new Error('BAD');
else
return x + y;
}, function (exceptions, result) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Result:', result);
// 'Result: 8'
});
pvc.source([1, 2, 3, 4])
.reduce(-4, function (x, y) {
return x + y;
}, function (exceptions, result) {
console.log('Exceptions:', exceptions);
// 'Exceptions: null'
console.log('Result:', result);
// 'Result: 6'
});Count the number of output objects (not including exceptions).
pvc.source([1, 2, 3, 4])
.count(function (exceptions, result) {
console.log('Exceptions:', exceptions);
// 'Exceptions: null'
console.log('Result:', result);
// 'Result: 4'
});Limits the stream to the first n entries.
s = pvc.source( ['a', 'b', 'c', 'd', 'e'] ).limit(2);
s.read(); // 'a'
s.read(); // 'b'
s.read(); // nullSkips the first n entries.
s = pvc.source( ['a', 'b', 'c', 'd', 'e'] ).skip(2);
s.read(); // 'c'
s.read(); // 'd'
s.read(); // 'e'
s.read(); // nullMap the incoming objects, passing the mapped objects. If the mapping function
throws an error, pass that through. It will drop null or undefined output
values.
pvc.source(1, 2, 3)
.map(function (x) {
if (x == 2)
throw new Error('BAD');
else
return x * 2
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [2, 4]'
});Map the incoming objects through an asynchronous function, passing the mapped
objects. The mapping function takes the incoming object and a node-style
callback. If the callback is passed a non-null argument in the first position,
it will pass it as an exception. Arguments in the second position will be
passed as outputs. It will drop null or undefined outputs.
pvc.source(1, 2, 3)
.mapAsync(function (x, callback) {
process.nextTick( function () {
if (x == 2)
callback(new Error('BAD'));
else
callback(null, x * 2);
})
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [2, 4]'
});Filter the incoming objects, only passing objects that pass the truth test. If the filter function throws an error, pass it through as an exception.
pvc.source(1, 2, 3)
.filter(function (x) {
if (x == 2)
throw new Error('BAD');
else
return x == 3
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [3]'
});Pass the inputs through an asynchronous filter function. The filter function is passed the input object, and a node-style callback.
pvc.source(1, 2, 3)
.mapAsync(function (x, callback) {
process.nextTick( function () {
if (x == 2)
callback(new Error('BAD'));
else
callback(null, x == 3);
})
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: [ [Error: BAD] ]'
console.log('Results:', results);
// 'Results: [3]'
});Operate on the incoming elements and pass them through. Note that currently
this is performed synchronously; pass through a function wrapped in
process.nextTick to make it perform asynchronously.
pvc.source(1, 2, 3)
.doto(function (x) {
console.log('Passing through:', x);
// 'Passing through: 1'
// 'Passing through: 2'
// 'Passing through: 3'
})
.toArray( function (exceptions, results) {
console.log('Exceptions:', exceptions);
// 'Exceptions: null'
console.log('Results:', results);
// 'Results: [1, 2, 3]'
});Separate incoming arrays, so that they are flattened on the output.
Takes two options, lax and recursive. If recursive == false (default),
it does not flatten array elements of arrays. If recursive == true, it will
recursively flatten array elements of arrays (all the way down). If
lax == false (default), it will emit an error if it receives a non-array
input. If lax == true, it will allow non-array inputs to pass through
unchanged.
s = pvc.source([ [1], [2, 3] ]).separate();
s.read(); // 1
s.read(); // [2, 3]
s.read(); // null
s = pvc.separate();
s.write( 1 );
s.on('error', function (e) {
// Will emit an error because 1 is not an array!
});
s = pvc.separate({ lax: true });
s.write( 1 );
s.read(); // 1TODO: Description
TODO: Description