Commit d48f1a23 authored by Amrithaa.TJ.'s avatar Amrithaa.TJ.

Import Upstream version 0.8.13

parents
root = true
[*]
end_of_line = lf
insert_final_newline = false
indent_style = space
indent_size = 2
module.exports = {
"extends": ["eslint:recommended"],
"env": {
"es6": true,
"node": true
},
"globals": {
"setTimeout": true
},
"parserOptions": {
"sourceType": "module"
},
"rules": {
"no-console": ["error", { "allow": ["warn", "error"] }],
"no-unsafe-finally": ["off"],
"camelcase": ["error", { "properties": "always" }],
"brace-style": ["off"],
"eqeqeq": ["error", "smart"],
"indent": ["error", 2, { "SwitchCase": 1 }],
"no-throw-literal": ["error"],
"comma-spacing": ["error", { "before": false, "after": true }],
"comma-style": ["error", "last"],
"comma-dangle": ["error", "always-multiline"],
"keyword-spacing": ["error"],
"no-trailing-spaces": ["error"],
"no-multi-spaces": ["error"],
"no-spaced-func": ["error"],
"no-whitespace-before-property": ["error"],
"space-before-blocks": ["error"],
"space-before-function-paren": ["error", "never"],
"space-in-parens": ["error", "never"],
"eol-last": ["error"],
"quotes": ["error", "single", { "avoidEscape": true }],
"no-implicit-globals": ["error"],
"no-useless-concat": ["error"],
"space-infix-ops": ["error", { "int32Hint": true }],
"semi-spacing": ["error", { "before": false, "after": true }],
"semi": ["error", "always", { "omitLastInOneLineBlock": true }],
"object-curly-spacing": ["error", "always"],
"array-bracket-spacing": ["error"],
"max-len": ["error", 100]
}
};
Copyright (c) 2018 zenparsing (Kevin Smith)
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
of the Software, and to permit persons to whom the Software is furnished to do
so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
# zen-observable
An implementation of Observables for JavaScript. Requires Promises or a Promise polyfill.
## Install
```sh
npm install zen-observable
```
## Usage
```js
import Observable from 'zen-observable';
Observable.of(1, 2, 3).subscribe(x => console.log(x));
```
## API
### new Observable(subscribe)
```js
let observable = new Observable(observer => {
// Emit a single value after 1 second
let timer = setTimeout(() => {
observer.next('hello');
observer.complete();
}, 1000);
// On unsubscription, cancel the timer
return () => clearTimeout(timer);
});
```
Creates a new Observable object using the specified subscriber function. The subscriber function is called whenever the `subscribe` method of the observable object is invoked. The subscriber function is passed an *observer* object which has the following methods:
- `next(value)` Sends the next value in the sequence.
- `error(exception)` Terminates the sequence with an exception.
- `complete()` Terminates the sequence successfully.
- `closed` A boolean property whose value is `true` if the observer's subscription is closed.
The subscriber function can optionally return either a cleanup function or a subscription object. If it returns a cleanup function, that function will be called when the subscription has closed. If it returns a subscription object, then the subscription's `unsubscribe` method will be invoked when the subscription has closed.
### Observable.of(...items)
```js
// Logs 1, 2, 3
Observable.of(1, 2, 3).subscribe(x => {
console.log(x);
});
```
Returns an observable which will emit each supplied argument.
### Observable.from(value)
```js
let list = [1, 2, 3];
// Iterate over an object
Observable.from(list).subscribe(x => {
console.log(x);
});
```
```js
// Convert something 'observable' to an Observable instance
Observable.from(otherObservable).subscribe(x => {
console.log(x);
});
```
Converts `value` to an Observable.
- If `value` is an implementation of Observable, then it is converted to an instance of Observable as defined by this library.
- Otherwise, it is converted to an Observable which synchronously iterates over `value`.
### observable.subscribe([observer])
```js
let subscription = observable.subscribe({
next(x) { console.log(x) },
error(err) { console.log(`Finished with error: ${ err }`) },
complete() { console.log('Finished') }
});
```
Subscribes to the observable. Observer objects may have any of the following methods:
- `next(value)` Receives the next value of the sequence.
- `error(exception)` Receives the terminating error of the sequence.
- `complete()` Called when the stream has completed successfully.
Returns a subscription object that can be used to cancel the stream.
### observable.subscribe(nextCallback[, errorCallback, completeCallback])
```js
let subscription = observable.subscribe(
x => console.log(x),
err => console.log(`Finished with error: ${ err }`),
() => console.log('Finished')
);
```
Subscribes to the observable with callback functions. Returns a subscription object that can be used to cancel the stream.
### observable.forEach(callback)
```js
observable.forEach(x => {
console.log(`Received value: ${ x }`);
}).then(() => {
console.log('Finished successfully')
}).catch(err => {
console.log(`Finished with error: ${ err }`);
})
```
Subscribes to the observable and returns a Promise for the completion value of the stream. The `callback` argument is called once for each value in the stream.
### observable.filter(callback)
```js
Observable.of(1, 2, 3).filter(value => {
return value > 2;
}).subscribe(value => {
console.log(value);
});
// 3
```
Returns a new Observable that emits all values which pass the test implemented by the `callback` argument.
### observable.map(callback)
Returns a new Observable that emits the results of calling the `callback` argument for every value in the stream.
```js
Observable.of(1, 2, 3).map(value => {
return value * 2;
}).subscribe(value => {
console.log(value);
});
// 2
// 4
// 6
```
### observable.reduce(callback [,initialValue])
```js
Observable.of(0, 1, 2, 3, 4).reduce((previousValue, currentValue) => {
return previousValue + currentValue;
}).subscribe(result => {
console.log(result);
});
// 10
```
Returns a new Observable that applies a function against an accumulator and each value of the stream to reduce it to a single value.
### observable.concat(...sources)
```js
Observable.of(1, 2, 3).concat(
Observable.of(4, 5, 6),
Observable.of(7, 8, 9)
).subscribe(result => {
console.log(result);
});
// 1, 2, 3, 4, 5, 6, 7, 8, 9
```
Merges the current observable with additional observables.
module.exports = require('./lib/extras.js');
module.exports = require('./lib/Observable.js').Observable;
This diff is collapsed.
'use strict';
Object.defineProperty(exports, "__esModule", {
value: true
});
exports.merge = merge;
exports.combineLatest = combineLatest;
exports.zip = zip;
var _Observable = require('./Observable.js');
// Emits all values from all inputs in parallel
function merge() {
for (var _len = arguments.length, sources = Array(_len), _key = 0; _key < _len; _key++) {
sources[_key] = arguments[_key];
}
return new _Observable.Observable(function (observer) {
if (sources.length === 0) return _Observable.Observable.from([]);
var count = sources.length;
var subscriptions = sources.map(function (source) {
return _Observable.Observable.from(source).subscribe({
next: function (v) {
observer.next(v);
},
error: function (e) {
observer.error(e);
},
complete: function () {
if (--count === 0) observer.complete();
}
});
});
return function () {
return subscriptions.forEach(function (s) {
return s.unsubscribe();
});
};
});
}
// Emits arrays containing the most current values from each input
function combineLatest() {
for (var _len2 = arguments.length, sources = Array(_len2), _key2 = 0; _key2 < _len2; _key2++) {
sources[_key2] = arguments[_key2];
}
return new _Observable.Observable(function (observer) {
if (sources.length === 0) return _Observable.Observable.from([]);
var count = sources.length;
var values = new Map();
var subscriptions = sources.map(function (source, index) {
return _Observable.Observable.from(source).subscribe({
next: function (v) {
values.set(index, v);
if (values.size === sources.length) observer.next(Array.from(values.values()));
},
error: function (e) {
observer.error(e);
},
complete: function () {
if (--count === 0) observer.complete();
}
});
});
return function () {
return subscriptions.forEach(function (s) {
return s.unsubscribe();
});
};
});
}
// Emits arrays containing the matching index values from each input
function zip() {
for (var _len3 = arguments.length, sources = Array(_len3), _key3 = 0; _key3 < _len3; _key3++) {
sources[_key3] = arguments[_key3];
}
return new _Observable.Observable(function (observer) {
if (sources.length === 0) return _Observable.Observable.from([]);
var queues = sources.map(function () {
return [];
});
function done() {
return queues.some(function (q, i) {
return q.length === 0 && subscriptions[i].closed;
});
}
var subscriptions = sources.map(function (source, index) {
return _Observable.Observable.from(source).subscribe({
next: function (v) {
queues[index].push(v);
if (queues.every(function (q) {
return q.length > 0;
})) {
observer.next(queues.map(function (q) {
return q.shift();
}));
if (done()) observer.complete();
}
},
error: function (e) {
observer.error(e);
},
complete: function () {
if (done()) observer.complete();
}
});
});
return function () {
return subscriptions.forEach(function (s) {
return s.unsubscribe();
});
};
});
}
\ No newline at end of file
{
"name": "zen-observable",
"version": "0.8.13",
"repository": "zenparsing/zen-observable",
"description": "An Implementation of ES Observables",
"homepage": "https://github.com/zenparsing/zen-observable",
"license": "MIT",
"devDependencies": {
"babel-cli": "^6.26.0",
"babel-preset-es2015": "^6.24.1",
"eslint": "^4.16.0",
"mocha": "^5.0.0"
},
"dependencies": {},
"scripts": {
"test": "mocha --recursive --require ./scripts/mocha-require",
"lint": "eslint src/*",
"build": "git clean -dfX ./lib && node ./scripts/build",
"prepublishOnly": "eslint src/* && npm test && npm run build"
}
}
module.exports = [
'transform-es2015-arrow-functions',
'transform-es2015-block-scoped-functions',
'transform-es2015-block-scoping',
'transform-es2015-classes',
'transform-es2015-computed-properties',
'transform-es2015-destructuring',
'transform-es2015-duplicate-keys',
'transform-es2015-for-of',
'transform-es2015-literals',
'transform-es2015-modules-commonjs',
'transform-es2015-parameters',
'transform-es2015-shorthand-properties',
'transform-es2015-spread',
'transform-es2015-template-literals',
];
const { execSync } = require('child_process');
const plugins = require('./babel-plugins');
execSync('babel src --out-dir lib --plugins=' + plugins.join(','), {
env: process.env,
stdio: 'inherit',
});
require('babel-core/register')({
plugins: require('./babel-plugins'),
});
This diff is collapsed.
import { Observable } from './Observable.js';
// Emits all values from all inputs in parallel
export function merge(...sources) {
return new Observable(observer => {
if (sources.length === 0)
return Observable.from([]);
let count = sources.length;
let subscriptions = sources.map(source => Observable.from(source).subscribe({
next(v) {
observer.next(v);
},
error(e) {
observer.error(e);
},
complete() {
if (--count === 0)
observer.complete();
},
}));
return () => subscriptions.forEach(s => s.unsubscribe());
});
}
// Emits arrays containing the most current values from each input
export function combineLatest(...sources) {
return new Observable(observer => {
if (sources.length === 0)
return Observable.from([]);
let count = sources.length;
let values = new Map();
let subscriptions = sources.map((source, index) => Observable.from(source).subscribe({
next(v) {
values.set(index, v);
if (values.size === sources.length)
observer.next(Array.from(values.values()));
},
error(e) {
observer.error(e);
},
complete() {
if (--count === 0)
observer.complete();
},
}));
return () => subscriptions.forEach(s => s.unsubscribe());
});
}
// Emits arrays containing the matching index values from each input
export function zip(...sources) {
return new Observable(observer => {
if (sources.length === 0)
return Observable.from([]);
let queues = sources.map(() => []);
function done() {
return queues.some((q, i) => q.length === 0 && subscriptions[i].closed);
}
let subscriptions = sources.map((source, index) => Observable.from(source).subscribe({
next(v) {
queues[index].push(v);
if (queues.every(q => q.length > 0)) {
observer.next(queues.map(q => q.shift()));
if (done())
observer.complete();
}
},
error(e) {
observer.error(e);
},
complete() {
if (done())
observer.complete();
},
}));
return () => subscriptions.forEach(s => s.unsubscribe());
});
}
import assert from 'assert';
describe('concat', () => {
it('concatenates the supplied Observable arguments', async () => {
let list = [];
await Observable
.from([1, 2, 3, 4])
.concat(Observable.of(5, 6, 7))
.forEach(x => list.push(x));
assert.deepEqual(list, [1, 2, 3, 4, 5, 6, 7]);
});
it('can be used multiple times to produce the same results', async () => {
const list1 = [];
const list2 = [];
const concatenated = Observable.from([1, 2, 3, 4])
.concat(Observable.of(5, 6, 7));
await concatenated
.forEach(x => list1.push(x));
await concatenated
.forEach(x => list2.push(x));
assert.deepEqual(list1, [1, 2, 3, 4, 5, 6, 7]);
assert.deepEqual(list2, [1, 2, 3, 4, 5, 6, 7]);
});
});
import assert from 'assert';
import { testMethodProperty } from './properties.js';
describe('constructor', () => {
it('throws if called as a function', () => {
assert.throws(() => Observable(() => {}));
assert.throws(() => Observable.call({}, () => {}));
});
it('throws if the argument is not callable', () => {
assert.throws(() => new Observable({}));
assert.throws(() => new Observable());
assert.throws(() => new Observable(1));
assert.throws(() => new Observable('string'));
});
it('accepts a function argument', () => {
let result = new Observable(() => {});
assert.ok(result instanceof Observable);
});
it('is the value of Observable.prototype.constructor', () => {
testMethodProperty(Observable.prototype, 'constructor', {
configurable: true,
writable: true,
length: 1,
});
});
it('does not call the subscriber function', () => {
let called = 0;
new Observable(() => { called++ });
assert.equal(called, 0);
});
});
import assert from 'assert';
import { parse } from './parse.js';
import { combineLatest } from '../../src/extras.js';
describe('extras/combineLatest', () => {
it('should emit arrays containing the most recent values', async () => {
let output = [];
await combineLatest(
parse('a-b-c-d'),
parse('-A-B-C-D')
).forEach(
value => output.push(value.join(''))
);
assert.deepEqual(output, [
'aA',
'bA',
'bB',
'cB',
'cC',
'dC',
'dD',
]);
});
});
import assert from 'assert';
import { parse } from './parse.js';
import { merge } from '../../src/extras.js';
describe('extras/merge', () => {
it('should emit all data from each input in parallel', async () => {
let output = '';
await merge(
parse('a-b-c-d'),
parse('-A-B-C-D')
).forEach(
value => output += value
);
assert.equal(output, 'aAbBcCdD');
});
});
export function parse(string) {
return new Observable(async observer => {
await null;
for (let char of string) {
if (observer.closed) return;
else if (char !== '-') observer.next(char);
await null;
}
observer.complete();
});
}
import assert from 'assert';
import { parse } from './parse.js';
import { zip } from '../../src/extras.js';
describe('extras/zip', () => {
it('should emit pairs of corresponding index values', async () => {
let output = [];
await zip(
parse('a-b-c-d'),
parse('-A-B-C-D')
).forEach(
value => output.push(value.join(''))
);
assert.deepEqual(output, [
'aA',
'bB',
'cC',
'dD',
]);
});
});
import assert from 'assert';
describe('filter', () => {
it('filters the results using the supplied callback', async () => {
let list = [];
await Observable
.from([1, 2, 3, 4])
.filter(x => x > 2)
.forEach(x => list.push(x));
assert.deepEqual(list, [3, 4]);
});
});
import assert from 'assert';
describe('flatMap', () => {
it('maps and flattens the results using the supplied callback', async () => {
let list = [];
await Observable.of('a', 'b', 'c').flatMap(x =>
Observable.of(1, 2, 3).map(y => [x, y])
).forEach(x => list.push(x));
assert.deepEqual(list, [
['a', 1],
['a', 2],
['a', 3],
['b', 1],
['b', 2],
['b', 3],
['c', 1],
['c', 2],
['c', 3],
]);
});
});
import assert from 'assert';
describe('forEach', () => {
it('rejects if the argument is not a function', async () => {
let promise = Observable.of(1, 2, 3).forEach();
try {
await promise;
assert.ok(false);
} catch (err) {
assert.equal(err.name, 'TypeError');
}
});
it('rejects if the callback throws', async () => {