New upstream version 1.0.1

parent d1b0d5de
......@@ -3,6 +3,8 @@ branches:
except: /^v\d/
language: node_js
node_js:
- iojs
- node
- '6'
- '5'
- '4'
- '0.12'
- '0.10'
var Readable = require('readable-stream/readable');
var isReadable = require('is-stream').readable;
var util = require('util');
function isReadable(stream) {
if (typeof stream.pipe !== 'function') {
return false;
}
if (!stream.readable) {
return false;
}
if (typeof stream._read !== 'function') {
return false;
}
if (!stream._readableState) {
return false;
}
return true;
}
function addStream (streams, stream) {
if (!isReadable(stream)) {
throw new Error('All input streams must be readable');
......@@ -13,14 +32,13 @@ function addStream (streams, stream) {
stream.on('readable', function () {
var chunk = stream.read();
if (chunk === null) {
return;
}
if (this === streams[0]) {
self.push(chunk);
} else {
this._buffer.push(chunk);
while (chunk) {
if (this === streams[0]) {
self.push(chunk);
} else {
this._buffer.push(chunk);
}
chunk = stream.read();
}
});
......
{
"name": "ordered-read-streams",
"version": "1.0.0",
"version": "1.0.1",
"description": "Combines array of streams into one read stream in strict order",
"files": [
"index.js"
......@@ -12,15 +12,15 @@
"author": "Artem Medeusheyev <artem.medeusheyev@gmail.com>",
"license": "MIT",
"dependencies": {
"is-stream": "^1.0.1",
"readable-stream": "^2.0.1"
},
"devDependencies": {
"expect": "^1.20.2",
"jscs": "^1.13.1",
"jshint": "^2.8.0",
"mississippi": "^1.3.0",
"mocha": "^2.2.5",
"pre-commit": "^1.0.10",
"should": "^7.0.1",
"through2": "^2.0.0"
}
}
/* global it, describe */
require('should');
var through = require('through2');
var expect = require('expect');
var miss = require('mississippi');
var OrderedStreams = require('..');
var to = miss.to;
var from = miss.from;
var pipe = miss.pipe;
var concat = miss.concat;
function fromOnce(fn) {
var called = false;
return from.obj(function(size, next) {
if (called) {
return next(null, null);
}
called = true;
fn.apply(this, arguments);
});
}
describe('ordered-read-streams', function () {
it('should end if no streams are given', function (done) {
it('ends if no streams are given', function (done) {
var streams = new OrderedStreams();
streams.on('data', function () {
done('error');
});
streams.on('end', done);
pipe([
streams,
concat()
], done);
});
it('should throw error if stream is not readable', function (done) {
var writable = {readable: false};
it('throws an error if stream is not readable', function (done) {
var writable = to();
try {
function withWritable() {
new OrderedStreams(writable);
} catch (e) {
e.message.should.equal('All input streams must be readable');
done();
}
expect(withWritable).toThrow('All input streams must be readable');
done();
});
it('should emit data from all streams', function(done) {
var s1 = through.obj();
var s2 = through.obj();
var s3 = through.obj();
it('emits data from all streams', function(done) {
var s1 = from.obj([{value: 'stream 1'}]);
var s2 = from.obj([{value: 'stream 2'}]);
var s3 = from.obj([{value: 'stream 3'}]);
var streams = new OrderedStreams([s1, s2, s3]);
var results = [];
streams.on('data', function (data) {
results.push(data);
});
streams.on('end', function () {
results.length.should.be.exactly(3);
results[0].should.equal('stream 1');
results[1].should.equal('stream 2');
results[2].should.equal('stream 3');
done();
});
s1.write('stream 1');
s1.end();
s2.write('stream 2');
s2.end();
function assert(results) {
expect(results.length).toEqual(3);
expect(results[0]).toEqual({value: 'stream 1'});
expect(results[1]).toEqual({value: 'stream 2'});
expect(results[2]).toEqual({value: 'stream 3'});
}
s3.write('stream 3');
s3.end();
pipe([
streams,
concat(assert)
], done);
});
it('should emit all data event from each stream', function (done) {
var s = through.obj();
it('emits all data event from each stream', function (done) {
var s = from.obj([
{value: 'data1'},
{value: 'data2'},
{value: 'data3'}
]);
var streams = new OrderedStreams(s);
var results = [];
streams.on('data', function (data) {
results.push(data);
});
streams.on('end', function () {
results.length.should.be.exactly(3);
done();
});
s.write('data1');
s.write('data2');
s.write('data3');
s.end();
function assert(results) {
expect(results.length).toEqual(3);
}
pipe([
streams,
concat(assert)
], done);
});
it('should preserve streams order', function(done) {
var s1 = through.obj(function (data, enc, next) {
var self = this;
it('preserves streams order', function(done) {
var s1 = fromOnce(function (size, next) {
setTimeout(function () {
self.push(data);
next();
next(null, {value: 'stream 1'});
}, 200);
});
var s2 = through.obj(function (data, enc, next) {
var self = this;
var s2 = fromOnce(function (size, next) {
setTimeout(function () {
self.push(data);
next();
next(null, {value: 'stream 2'});
}, 30);
});
var s3 = through.obj(function (data, enc, next) {
var self = this;
var s3 = fromOnce(function (size, next) {
setTimeout(function () {
self.push(data);
next();
next(null, {value: 'stream 3'});
}, 100);
});
var streams = new OrderedStreams([s1, s2, s3]);
var results = [];
streams.on('data', function (data) {
results.push(data);
});
streams.on('end', function () {
results.length.should.be.exactly(3);
results[0].should.equal('stream 1');
results[1].should.equal('stream 2');
results[2].should.equal('stream 3');
done();
});
s1.write('stream 1');
s1.end();
s2.write('stream 2');
s2.end();
function assert(results) {
expect(results.length).toEqual(3);
expect(results[0]).toEqual({value: 'stream 1'});
expect(results[1]).toEqual({value: 'stream 2'});
expect(results[2]).toEqual({value: 'stream 3'});
}
s3.write('stream 3');
s3.end();
pipe([
streams,
concat(assert)
], done);
});
it('should emit stream errors downstream', function (done) {
var s = through.obj(function (data, enc, next) {
this.emit('error', new Error('stahp!'));
next();
it('emits stream errors downstream', function (done) {
var s = fromOnce(function(size, next) {
setTimeout(function () {
next(new Error('stahp!'));
}, 500);
});
var s2 = through.obj();
var s2 = from.obj([{value: 'Im ok!'}]);
var errMsg;
var streamData;
var streams = new OrderedStreams([s, s2]);
streams.on('data', function (data) {
streamData = data;
});
streams.on('error', function (err) {
errMsg = err.message;
});
streams.on('end', function () {
errMsg.should.equal('stahp!');
streamData.should.equal('Im ok!');
function assert(err) {
expect(err.message).toEqual('stahp!');
done();
}
pipe([
streams,
concat()
], assert);
});
it('emits received data before a stream errors downstream', function (done) {
var s = fromOnce(function(size, next) {
setTimeout(function () {
next(new Error('stahp!'));
}, 500);
});
var s2 = from.obj([{value: 'Im ok!'}]);
// Invert the order to emit data first
var streams = new OrderedStreams([s2, s]);
function assertData(chunk, enc, next) {
expect(chunk).toEqual({value: 'Im ok!'});
next();
}
function assertErr(err) {
expect(err.message).toEqual('stahp!');
done();
}
s.write('go');
s.end();
s2.write('Im ok!');
s2.end();
pipe([
streams,
to.obj(assertData)
], assertErr);
});
});
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment