Commit a8f8eaa9 authored by Gordon Ball's avatar Gordon Ball

New upstream version 5.1.0

parent 63d11fbf
language: node_js
node_js: "6"
node_js:
- "6"
- "8"
- "10"
sudo: required
matrix:
......@@ -22,7 +25,7 @@ matrix:
- os: linux
dist: trusty
env:
- ELECTRON="1.4.10"
- ELECTRON="3.0.0"
- os: osx
env:
- DEPLOY="true"
......
# zeromq
# zeromq.js
[![codecov](https://codecov.io/gh/zeromq/zeromq.js/branch/master/graph/badge.svg)](https://codecov.io/gh/zeromq/zeromq.js)
[![Greenkeeper badge](https://badges.greenkeeper.io/zeromq/zeromq.js.svg)](https://greenkeeper.io/)
......@@ -11,7 +11,7 @@
[**Users**](#installation---users) | [**From Source**](#installation---from-source) | [**Contributors and Development**](#installation---contributors-and-development) | [**Maintainers**](#for-maintainers-creating-a-release)
**zeromq**: Your ready to use, prebuilt [ØMQ](http://www.zeromq.org/)
**`zeromq`**: Your ready to use, prebuilt [ØMQ](http://www.zeromq.org/)
bindings for [Node.js](https://nodejs.org/en/).
ØMQ provides handy functionality when working with sockets. Yet,
......@@ -38,7 +38,9 @@ Install `zeromq` with the following:
```bash
npm install zeromq
```
windows users:
do not forget to set msvs_version according to your visual studio version 2013,2015,2017
`npm config set msvs_version 2015`
Now, prepare to be amazed by the wonders of binaries.
To use your system's libzmq (if it has been installed and development headers
......@@ -64,7 +66,7 @@ For packaging your Electron application we recommend using [`electron-builder`](
## Installation - From Source
If you are working on a Linux 32-bit system or want to install a developement version, you have to build `zeromq` from source.
If you are working on a Linux 32-bit system or want to install a development version, you have to build `zeromq` from source.
### Prerequisites
......@@ -91,7 +93,8 @@ Use your distribution's package manager to install.
> :bulb: [Windows Vista / 7 only] requires [.NET Framework 4.5.1](http://www.microsoft.com/en-us/download/details.aspx?id=40773)
2. Install [Python 2.7](https://www.python.org/downloads/) or [Miniconda 2.7](http://conda.pydata.org/miniconda.html) (`v3.x.x` is not supported), and run `npm config set python python2.7`
3. Launch cmd, `npm config set msvs_version 2015`
3. Launch cmd, and set msvs_version according to your visual studio version 2013,2015,2017
`npm config set msvs_version 2015`
### Installation
......
......@@ -2,10 +2,10 @@ environment:
GITHUB_TOKEN:
secure: E1HpiZf9OJuc8XPGA57hJbCQlMWVCPVBePHiWF/BgmJ/+e/2OplyifiS/x8CJtcw
matrix:
- nodejs_version: "4"
- nodejs_version: "10"
deploy: "true"
- nodejs_version: "6"
electron: "1.4.10"
- nodejs_version: "10"
electron: "3.0.0"
platform:
- x64
......
......@@ -48,6 +48,8 @@
#define ZMQ_CAN_MONITOR (ZMQ_VERSION > 30201)
#define ZMQ_CAN_SET_CTX (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2) || ZMQ_VERSION_MAJOR > 3
#define ZERO_COPY_MESSAGE_SEND 1
using namespace v8;
using namespace node;
......@@ -143,10 +145,9 @@ namespace zmq {
#endif
class IncomingMessage;
class OutgoingMessage;
static NAN_METHOD(Recv);
static NAN_METHOD(Readv);
class OutgoingMessage;
static NAN_METHOD(Send);
static NAN_METHOD(Sendv);
void Close();
static NAN_METHOD(Close);
......@@ -183,6 +184,12 @@ namespace zmq {
static NAN_MODULE_INIT(Initialize);
static void
on_uv_close(uv_handle_t *handle)
{
delete handle;
}
/*
* Helpers for dealing with ØMQ errors.
*/
......@@ -197,7 +204,6 @@ namespace zmq {
return Nan::Error(ErrorMessage());
}
/*
* Context methods.
*/
......@@ -318,7 +324,6 @@ namespace zmq {
Nan::SetPrototypeMethod(t, "unref", DetachFromEventLoop);
Nan::SetPrototypeMethod(t, "recv", Recv);
Nan::SetPrototypeMethod(t, "readv", Readv);
Nan::SetPrototypeMethod(t, "send", Send);
Nan::SetPrototypeMethod(t, "sendv", Sendv);
Nan::SetPrototypeMethod(t, "close", Close);
......@@ -1019,6 +1024,66 @@ namespace zmq {
MessageReference* msgref_;
};
class Socket::OutgoingMessage {
public:
inline OutgoingMessage(Local<Object> buf)
: bufref_(new BufferReference(buf)) {
if (zmq_msg_init_data(&msg_, Buffer::Data(buf), Buffer::Length(buf),
BufferReference::FreeCallback, bufref_) < 0) {
delete bufref_;
Nan::ThrowError(ErrorMessage());
}
};
inline ~OutgoingMessage() {
if (zmq_msg_close(&msg_) < 0)
Nan::ThrowError(ErrorMessage());
};
inline operator zmq_msg_t*() {
return &msg_;
}
private:
class BufferReference {
public:
inline BufferReference(Local<Object> buf)
: persistent_(buf), async_(new uv_async_t) {
if (uv_async_init(uv_default_loop(), async_, Destroy) < 0) {
delete async_;
delete this;
Nan::ThrowError("Async initialization failed");
} else {
async_->data = this;
}
}
inline ~BufferReference() {
persistent_.Reset();
}
// Called by zmq when the message has been sent.
// NOTE: May be called from a worker thread. Do not modify V8/Node.
static void FreeCallback(void*, void* bufref) {
int result = uv_async_send(static_cast<BufferReference*>(bufref)->async_);
assert(result == 0);
}
static void Destroy(uv_async_t* async) {
uv_close(reinterpret_cast<uv_handle_t*>(async), on_uv_close);
BufferReference* bufref = static_cast<BufferReference*>(async->data);
delete bufref;
}
private:
Nan::Persistent<Object> persistent_;
uv_async_t* async_;
};
zmq_msg_t msg_;
BufferReference* bufref_;
};
#if ZMQ_CAN_MONITOR
NAN_METHOD(Socket::Monitor) {
int64_t timer_interval = 10; // default to 10ms interval
......@@ -1076,6 +1141,7 @@ namespace zmq {
return;
}
uv_timer_stop(this->monitor_handle_);
uv_close(reinterpret_cast<uv_handle_t*>(this->monitor_handle_), on_uv_close);
this->monitor_handle_ = NULL;
this->monitor_socket_ = NULL;
}
......@@ -1193,65 +1259,6 @@ namespace zmq {
info.GetReturnValue().Set(msg.GetBuffer());
}
/*
* An object that creates a ØMQ message from the given Buffer Object,
* and manages the reference to it using RAII. A persistent V8 handle
* for the Buffer object will remain while its data is in use by ØMQ.
*/
class Socket::OutgoingMessage {
public:
inline OutgoingMessage(Local<Object> buf) {
bufref_ = new BufferReference(buf);
if (zmq_msg_init_data(&msg_, Buffer::Data(buf), Buffer::Length(buf),
BufferReference::FreeCallback, bufref_) < 0) {
delete bufref_;
Nan::ThrowError(ErrorMessage());
}
};
inline ~OutgoingMessage() {
if (zmq_msg_close(&msg_) < 0)
Nan::ThrowError(ErrorMessage());
};
inline operator zmq_msg_t*() {
return &msg_;
}
private:
class BufferReference {
public:
inline BufferReference(Local<Object> buf) {
loop = uv_default_loop();
uv_async_init(loop, &async, reinterpret_cast<uv_async_cb>(cleanup));
async.data = this;
persistent.Reset(buf);
}
inline ~BufferReference() {
persistent.Reset();
}
// Called by zmq when the message has been sent.
// NOTE: May be called from a worker thread. Do not modify V8/Node.
static void FreeCallback(void* data, void* message) {
uv_async_send(&static_cast<BufferReference *>(message)->async);
}
static void cleanup(uv_async_t *handle, int status) {
delete static_cast<BufferReference *>(handle->data);
}
private:
Nan::Persistent<Object> persistent;
uv_async_t async;
uv_loop_t *loop;
};
zmq_msg_t msg_;
BufferReference* bufref_;
};
NAN_METHOD(Socket::Sendv) {
Socket* socket = GetSocket(info);
if (socket->state_ != STATE_READY)
......@@ -1262,8 +1269,6 @@ namespace zmq {
bool checkPollOut = true;
bool readsReady = false;
int rc;
Local<Array> batch = info[0].As<Array>();
size_t len = batch->Length();
......@@ -1296,25 +1301,33 @@ namespace zmq {
Local<Number> flagsObj = batch->Get(i + 1).As<Number>();
int flags = Nan::To<int>(flagsObj).FromJust();
size_t len = Buffer::Length(buf);
#if ZERO_COPY_MESSAGE_SEND
/* Non-copying implementation. */
OutgoingMessage msg_p(buf);
#else
/* Copying implementation. */
zmq_msg_t msg;
int rc;
size_t len = Buffer::Length(buf);
rc = zmq_msg_init_size(&msg, len);
if (rc != 0)
return Nan::ThrowError(ErrorMessage());
char * cp = static_cast<char *>(zmq_msg_data(&msg));
const char * dat = Buffer::Data(buf);
char* cp = static_cast<char *>(zmq_msg_data(&msg));
const char* dat = Buffer::Data(buf);
std::copy(dat, dat + len, cp);
zmq_msg_t* msg_p = &msg;
#endif
while (true) {
int rc;
#if ZMQ_VERSION_MAJOR == 2
rc = zmq_send(socket->socket_, &msg, flags);
rc = zmq_send(socket->socket_, msg_p, flags);
#elif ZMQ_VERSION_MAJOR == 3
rc = zmq_sendmsg(socket->socket_, &msg, flags);
rc = zmq_sendmsg(socket->socket_, msg_p, flags);
#else
rc = zmq_msg_send(&msg, socket->socket_, flags);
rc = zmq_msg_send(msg_p, socket->socket_, flags);
checkPollOut = false;
#endif
if (rc < 0){
......@@ -1343,71 +1356,6 @@ namespace zmq {
return info.GetReturnValue().Set(true);
}
// WARNING: the buffer passed here will be kept alive
// until zmq_send completes, possibly on another thread.
// Do not modify or reuse any buffer passed to send.
// This is bad, but allows us to send without copying.
NAN_METHOD(Socket::Send) {
int argc = info.Length();
if (argc != 1 && argc != 2)
return Nan::ThrowTypeError("Must pass a Buffer and optionally flags");
if (!Buffer::HasInstance(info[0]))
return Nan::ThrowTypeError("First argument should be a Buffer");
int flags = 0;
if (argc == 2) {
if (!info[1]->IsNumber())
return Nan::ThrowTypeError("Second argument should be an integer");
flags = Nan::To<int>(info[1]).FromJust();
}
GET_SOCKET(info);
#if 0 // zero-copy version, but doesn't properly pin buffer and so has GC issues
OutgoingMessage msg(info[0].As<Object>());
if (zmq_send(socket->socket_, msg, flags) < 0)
return Nan::ThrowError(ErrorMessage());
#else // copying version that has no GC issues
zmq_msg_t msg;
Local<Object> buf = info[0].As<Object>();
size_t len = Buffer::Length(buf);
int res = zmq_msg_init_size(&msg, len);
if (res != 0)
return Nan::ThrowError(ErrorMessage());
char * cp = static_cast<char *>(zmq_msg_data(&msg));
const char * dat = Buffer::Data(buf);
std::copy(dat, dat + len, cp);
while (true) {
int rc;
#if ZMQ_VERSION_MAJOR == 2
rc = zmq_send(socket->socket_, &msg, flags);
#elif ZMQ_VERSION_MAJOR == 3
rc = zmq_sendmsg(socket->socket_, &msg, flags);
#else
rc = zmq_msg_send(&msg, socket->socket_, flags);
#endif
if (rc < 0){
if (zmq_errno()==EINTR) {
continue;
}
return Nan::ThrowError(ErrorMessage());
} else {
break;
}
}
#endif // zero copy / copying version
return;
}
static void
on_uv_close(uv_handle_t *handle)
{
delete handle;
}
void
Socket::Close() {
if (socket_) {
......
......@@ -202,7 +202,7 @@ function OutBatch() {
OutBatch.prototype.append = function (buf, flags, cb) {
if (!Buffer.isBuffer(buf)) {
buf = new Buffer(String(buf), 'utf8');
buf = Buffer.from(String(buf), 'utf8');
}
this.content.push(buf, flags);
......@@ -303,7 +303,9 @@ exports.Socket = function (type) {
this._outgoing = new BatchList();
this._zmq.onReadReady = function () {
self._flushReads();
setImmediate(function(){
self._flushReads();
});
};
this._zmq.onSendReady = function () {
......@@ -407,11 +409,18 @@ Object.keys(opts).forEach(function(name){
});
Socket.prototype.__defineSetter__(name, function(val) {
if ('string' == typeof val) val = new Buffer(val, 'utf8');
if ('string' == typeof val) val = Buffer.from(val, 'utf8');
return this._zmq.setsockopt(opts[name], val);
});
});
/**
* Return true if socket state is closed.
*/
Socket.prototype.__defineGetter__("closed", function() {
return this._zmq.state === zmq.STATE_CLOSED;
});
/**
* Async bind.
*
......@@ -815,25 +824,25 @@ function proxy (frontend, backend, capture){
//forwarding router/dealer pack signature: id, delimiter, msg
frontend.on('message',function (id,delimiter,msg){
backend.send([id,delimiter,msg]);
backend.send([].slice.call(arguments));
});
backend.on('message',function (id,delimiter,msg){
frontend.send([id,delimiter,msg]);
frontend.send([].slice.call(arguments));
//forwarding message to the capture socket
capture.send(msg);
capture.send([].slice.call(arguments, 2));
});
} else {
//forwarding router/dealer signatures without capture
frontend.on('message',function (id,delimiter,msg){
backend.send([id,delimiter,msg]);
backend.send([].slice.call(arguments));
});
backend.on('message',function (id,delimiter,msg){
frontend.send([id,delimiter,msg]);
frontend.send([].slice.call(arguments));
});
}
......
{
"name": "zeromq",
"version": "4.6.0",
"description": "Prebuilt bindings for node.js to ZeroMQ",
"version": "5.1.0",
"description": "ZeroMQ for node.js",
"main": "index",
"gypfile": true,
"repository": {
......@@ -9,31 +9,30 @@
"url": "https://github.com/zeromq/zeromq.js.git"
},
"dependencies": {
"nan": "^2.6.2",
"prebuild-install": "^2.2.2"
"nan": "^2.10.0",
"prebuild-install": "5.2.1"
},
"devDependencies": {
"electron-mocha": "^4.0.0",
"electron-mocha": "^6.0.0",
"jsdoc": "^3.5.4",
"mocha": "^3.5.0",
"nyc": "^11.1.0",
"prebuild": "^6.2.1",
"mocha": "^5.0.0",
"nyc": "^12.0.2",
"prebuild": "^8.1.0",
"semver": "^5.4.1",
"should": "^12.0.0"
"should": "^13.0.0"
},
"engines": {
"node": ">=0.10"
"node": ">=6.0"
},
"scripts": {
"build:libzmq": "node scripts/preinstall.js",
"install": "node scripts/prebuild-install.js || (node scripts/preinstall.js && node-gyp rebuild)",
"prebuild": "prebuild --all --strip",
"build:docs": "jsdoc -R README.md -d docs lib/*.js",
"postpublish": "./scripts/trigger_travis_build.sh",
"test": "mocha --expose-gc --slow 300",
"test:electron": "electron-mocha --slow 300",
"precoverage": "nyc npm run test",
"coverage": "nyc report --reporter=text-lcov > coverage/lcov.info"
"coverage": "nyc report --reporter=text-lcov > coverage.lcov"
},
"keywords": [
"zeromq",
......
......@@ -8,14 +8,20 @@ else
exit 1
fi
if [ "$2" = "ia32" ]; then
export CFLAGS="-fPIC -m32"
export CXXFLAGS="-fPIC -m32"
else
export CFLAGS=-fPIC
export CXXFLAGS=-fPIC
fi
export MACOSX_DEPLOYMENT_TARGET=10.9
export BASE=$(dirname "$0")
export ZMQ_PREFIX="${BASE}/../zmq"
export ZMQ_SRC_DIR=zeromq-$ZMQ
cd "${ZMQ_PREFIX}"
export CFLAGS=-fPIC
export CXXFLAGS=-fPIC
export PKG_CONFIG_PATH="${ZMQ_PREFIX}/lib/pkgconfig"
test -d "${ZMQ_SRC_DIR}" || tar xzf zeromq-$ZMQ.tar.gz
......
......@@ -3,6 +3,7 @@ var spawn = require("child_process").spawn;
var path = require("path");
var fs = require("fs");
var ARCH = process.arch;
var ZMQ = "4.2.2";
var ZMQ_REPO = "libzmq";
......@@ -14,7 +15,7 @@ if (process.env.npm_config_zmq_external == "true") {
function buildZMQ(scriptPath, zmqDir) {
console.log("Building libzmq for " + process.platform);
var child = spawn(scriptPath, [ZMQ]);
var child = spawn(scriptPath, [ZMQ, ARCH]);
child.stdout.pipe(process.stdout);
child.stderr.pipe(process.stderr);
......
travis_body='{
"request": {
"message": "Test prebuilt binaries",
"branch": "prebuilt-testing"
}}'
curl -s -X POST \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Travis-API-Version: 3" \
-H "Authorization: token $TRAVIS_TOKEN" \
-d "$travis_body" \
https://api.travis-ci.org/repo/zeromq%2Fzeromq.js/requests
appveyor_body='{
"accountName": "zeromq",
"projectSlug": "zeromq-js",
"branch": "prebuilt-testing",
}'
curl -s -X POST \
-H "Content-Type: application/json" \
-H "Accept: application/json" \
-H "Authorization: Bearer $APPVEYOR_TOKEN" \
-d "$appveyor_body" \
https://ci.appveyor.com/api/builds
......@@ -35,7 +35,9 @@ describe('socket', function(){
});
it('should close', function(){
sock.closed.should.equal(false);
sock.close();
sock.closed.should.equal(true);
});
it('should support options', function(){
......
......@@ -36,7 +36,7 @@ describe('socket.messages', function(){
push.connect('inproc://stuff_ssm');
push.send('string');
push.send(15.99);
push.send(new Buffer('buffer'));
push.send(Buffer.from('buffer'));
});
});
......@@ -53,7 +53,7 @@ describe('socket.messages', function(){
pull.bind('inproc://stuff_ssmm', function (error) {
if (error) throw error;
push.connect('inproc://stuff_ssmm');
push.send(['string', 15.99, new Buffer('buffer')]);
push.send(['string', 15.99, Buffer.from('buffer')]);
});
});
......@@ -111,7 +111,7 @@ describe('socket.messages', function(){
if (error) throw error;
push.send('string');
push.send(15.99);
push.send(new Buffer('buffer'));
push.send(Buffer.from('buffer'));
pull.connect('tcp://127.0.0.1:12345');
});
});
......
......@@ -131,4 +131,61 @@ describe('socket.pub-sub', function(){
});
});
it('should subscribe to character arguments', function(done){
sub.subscribe('ABC012');
sub.on('message', function (msg) {
msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('ABC012');
sub.close();
return done();
});
var addr = "inproc://stuff_sspsfa";
sub.bind(addr, function (error) {
if (error) throw error;
pub.connect(addr);
setTimeout(function() {
pub.send('012');
pub.send('ABC');
pub.send('MSG');
pub.send('ABC012');
}, 100.0);
});
})
it('should subscribe to binary argument', function(done){
sub.subscribe(Buffer.from(new Uint8Array([65, 66, 67, 48, 49, 50])));
sub.on('message', function (msg) {
msg.should.be.an.instanceof(Buffer);
msg.toString().should.equal('ABC0123');
sub.close();
return done();
});
var addr = "inproc://stuff_sspsfb";
sub.bind(addr, function (error) {
if (error) throw error;
pub.connect(addr);
setTimeout(function() {
pub.send('ABC');
pub.send('MSG');
pub.send('MSG1');
pub.send(Buffer.from(new Uint8Array([65, 66, 67, 48, 49, 50, 51])));
}, 100.0);
});
})
});
......@@ -35,10 +35,10 @@ describe('socket.zap', function(){
return;
}
var serverPublicKey = new Buffer('7f188e5244b02bf497b86de417515cf4d4053ce4eb977aee91a55354655ec33a', 'hex')
, serverPrivateKey = new Buffer('1f5d3873472f95e11f4723d858aaf0919ab1fb402cb3097742c606e61dd0d7d8', 'hex')
, clientPublicKey = new Buffer('ea1cc8bd7c8af65497d43fc21dbec6560c5e7b61bcfdcbd2b0dfacf0b4c38d45', 'hex')
, clientPrivateKey = new Buffer('83f99afacfab052406e5f421612568034e85f4c8182a1c92671e83dca669d31d', 'hex');
var serverPublicKey = Buffer.from('7f188e5244b02bf497b86de417515cf4d4053ce4eb977aee91a55354655ec33a', 'hex')
, serverPrivateKey = Buffer.from('1f5d3873472f95e11f4723d858aaf0919ab1fb402cb3097742c606e61dd0d7d8', 'hex')
, clientPublicKey = Buffer.from('ea1cc8bd7c8af65497d43fc21dbec6560c5e7b61bcfdcbd2b0dfacf0b4c38d45', 'hex')
, clientPrivateKey = Buffer.from('83f99afacfab052406e5f421612568034e85f4c8182a1c92671e83dca669d31d', 'hex');
rep.on('message', function(msg){
msg.should.be.an.instanceof(Buffer);
......
......@@ -24,20 +24,20 @@ module.exports.start = function(count) {
var zapReq = {
version: data.shift(),
requestId: data.shift(),
domain: new Buffer(data.shift()).toString('utf8'),
address: new Buffer(data.shift()).toString('utf8'),
identity: new Buffer(data.shift()).toString('utf8'),
mechanism: new Buffer(data.shift()).toString('utf8'),
domain: Buffer.from(data.shift()).toString('utf8'),
address: Buffer.from(data.shift()).toString('utf8'),
identity: Buffer.from(data.shift()).toString('utf8'),
mechanism: Buffer.from(data.shift()).toString('utf8'),
credentials: data.slice(0)
};
zap.send(returnPath.concat([
zapReq.version,
zapReq.requestId,
new Buffer("200", "utf8"),
new Buffer("OK", "utf8"),
new Buffer(0),
new Buffer(0)
Buffer.from("200", "utf8"),
Buffer.from("OK", "utf8"),
Buffer.alloc(0),
Buffer.alloc(0)
]));
});
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment