Skip to content
Commits on Source (5)
......@@ -37,7 +37,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
<version>3.7.3.Final</version>
<version>3.7.6.Final</version>
</parent>
<dependencies>
......@@ -208,7 +208,7 @@
*
</Import-Package>
<Require-Capability>
osgi.service;effective:=active;filter:="(objectClass=org.xnio.Xnio)";
osgi.service;effective:=active;filter:="(objectClass=org.xnio.Xnio)"
</Require-Capability>
</instructions>
</configuration>
......
......@@ -719,7 +719,7 @@ public abstract class AbstractIoFuture<T> implements IoFuture<T> {
try {
notifier.notify(future, attachment);
} catch (Throwable t) {
futureMsg.notifierFailed(t, notifier);
futureMsg.notifierFailed(t, notifier, attachment);
}
}
}
......
......@@ -310,9 +310,9 @@ public interface Messages extends BasicLogger {
@Message(id = 1002, value = "Operation was cancelled")
CancellationException opCancelled();
@Message(id = 1003, value = "Running IoFuture notifier %s failed")
@Message(id = 1003, value = "Running IoFuture notifier %s (with attachment %s) failed")
@LogMessage(level = WARN)
void notifierFailed(@Cause Throwable cause, IoFuture.Notifier<?, ?> notifier);
void notifierFailed(@Cause Throwable cause, IoFuture.Notifier<?, ?> notifier, Object attachment);
@Message(id = 1004, value = "Operation timed out")
TimeoutException opTimedOut();
......
jboss-xnio (3.7.6-1) unstable; urgency=medium
* New upstream version 3.7.6.
* Declare compliance with Debian Policy 4.4.1.
* Drop maven-bundle-plugin-rules.patch. Fixed upstream.
-- Markus Koschany <apo@debian.org> Wed, 09 Oct 2019 21:52:25 +0200
jboss-xnio (3.7.3-1) unstable; urgency=medium
* New upstream version 3.7.3.
......
......@@ -20,7 +20,7 @@ Build-Depends:
libwildfly-client-config-java,
libwildfly-common-java,
maven-debian-helper (>= 1.5)
Standards-Version: 4.4.0
Standards-Version: 4.4.1
Vcs-Git: https://salsa.debian.org/java-team/jboss-xnio.git
Vcs-Browser: https://salsa.debian.org/java-team/jboss-xnio
Homepage: http://xnio.jboss.org/
......
From: =?utf-8?b?IkthaS1DaHVuZyBZYW4gKOaut+WVn+iBsCki?= <seamlikok@gmail.com>
Date: Mon, 19 Mar 2018 11:08:23 +0100
Subject: Fix the rules in Maven Bundle Plugin
Bug: https://issues.jboss.org/browse/XNIO-321
---
api/pom.xml | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/api/pom.xml b/api/pom.xml
index 05e49c6..43377f1 100644
--- a/api/pom.xml
+++ b/api/pom.xml
@@ -207,7 +207,7 @@
*
</Import-Package>
<Require-Capability>
- osgi.service;effective:=active;filter:="(objectClass=org.xnio.Xnio)";
+ osgi.service;effective:=active;filter:="(objectClass=org.xnio.Xnio)"
</Require-Capability>
</instructions>
</configuration>
maven-bundle-plugin-rules.patch
......@@ -31,7 +31,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
<version>3.7.3.Final</version>
<version>3.7.6.Final</version>
</parent>
<properties>
......@@ -40,6 +40,7 @@
<xnio.nio.selector.main/>
<xnio.nio.selector.temp/>
<xnio.nio.selector.provider/>
<xnio.nio.alt-queued-server>true</xnio.nio.alt-queued-server>
</properties>
<dependencies>
......@@ -155,6 +156,10 @@
<name>org.xnio.ssl.new</name>
<value>${org.xnio.ssl.new}</value>
</property>
<property>
<name>xnio.nio.alt-queued-server</name>
<value>${xnio.nio.alt-queued-server}</value>
</property>
</systemProperties>
<enableAssertions>true</enableAssertions>
<redirectTestOutputToFile>true</redirectTestOutputToFile>
......
......@@ -21,7 +21,6 @@ package org.xnio.nio;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.XnioIoThread;
import org.xnio.XnioWorker;
import org.xnio.channels.CloseableChannel;
abstract class AbstractNioChannel<C extends AbstractNioChannel<C>> implements CloseableChannel {
......@@ -37,7 +36,7 @@ abstract class AbstractNioChannel<C extends AbstractNioChannel<C>> implements Cl
this.worker = worker;
}
public final XnioWorker getWorker() {
public final NioXnioWorker getWorker() {
return worker;
}
......
......@@ -125,6 +125,10 @@ interface Log extends BasicLogger {
@Message(id = 8000, value = "Received an I/O error on selection: %s")
void selectionError(IOException e);
@LogMessage(level = WARN)
@Message(id = 8001, value = "Socket accept failed, backing off for %2$d milliseconds: %1$s")
void acceptFailed(IOException problem, int backOffTime);
// Trace
@LogMessage(level = TRACE)
......
......@@ -27,11 +27,13 @@ import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
......@@ -111,16 +113,26 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
private static final AtomicLongFieldUpdater<NioTcpServer> connectionStatusUpdater = AtomicLongFieldUpdater.newUpdater(NioTcpServer.class, "connectionStatus");
NioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap) throws IOException {
NioTcpServer(final NioXnioWorker worker, final ServerSocketChannel channel, final OptionMap optionMap, final boolean useAcceptThreadOnly) throws IOException {
super(worker);
this.channel = channel;
final WorkerThread[] threads = worker.getAll();
final int threadCount = threads.length;
final WorkerThread[] threads;
final int threadCount;
final int tokens;
final int connections;
if (useAcceptThreadOnly) {
threads = new WorkerThread[] { worker.getAcceptThread() };
threadCount = 1;
tokens = 0;
connections = 0;
} else {
threads = worker.getAll();
threadCount = threads.length;
if (threadCount == 0) {
throw log.noThreads();
}
final int tokens = optionMap.get(Options.BALANCING_TOKENS, -1);
final int connections = optionMap.get(Options.BALANCING_CONNECTIONS, 16);
tokens = optionMap.get(Options.BALANCING_TOKENS, -1);
connections = optionMap.get(Options.BALANCING_CONNECTIONS, 16);
if (tokens != -1) {
if (tokens < 1 || tokens >= threadCount) {
throw log.balancingTokens();
......@@ -130,6 +142,7 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
}
tokenConnectionCount = connections;
}
}
socket = channel.socket();
if (optionMap.contains(Options.SEND_BUFFER)) {
final int sendBufferSize = optionMap.get(Options.SEND_BUFFER, DEFAULT_BUFFER_SIZE);
......@@ -377,9 +390,17 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
return (int) ((value & CONN_LOW_MASK) >> CONN_LOW_BIT);
}
public NioSocketStreamConnection accept() throws IOException {
public NioSocketStreamConnection accept() throws ClosedChannelException {
final WorkerThread current = WorkerThread.getCurrent();
final NioTcpServerHandle handle = handles[current.getNumber()];
if (current == null) {
return null;
}
final NioTcpServerHandle handle;
if (handles.length == 1) {
handle = handles[0];
} else {
handle = handles[current.getNumber()];
}
if (! handle.getConnection()) {
return null;
}
......@@ -388,25 +409,7 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
try {
accepted = channel.accept();
if (accepted != null) try {
final SocketAddress localAddress = accepted.getLocalAddress();
int hash;
if (localAddress instanceof InetSocketAddress) {
final InetSocketAddress address = (InetSocketAddress) localAddress;
hash = address.getAddress().hashCode() * 23 + address.getPort();
} else if (localAddress instanceof LocalSocketAddress) {
hash = ((LocalSocketAddress) localAddress).getName().hashCode();
} else {
hash = localAddress.hashCode();
}
final SocketAddress remoteAddress = accepted.getRemoteAddress();
if (remoteAddress instanceof InetSocketAddress) {
final InetSocketAddress address = (InetSocketAddress) remoteAddress;
hash = (address.getAddress().hashCode() * 23 + address.getPort()) * 23 + hash;
} else if (remoteAddress instanceof LocalSocketAddress) {
hash = ((LocalSocketAddress) remoteAddress).getName().hashCode() * 23 + hash;
} else {
hash = localAddress.hashCode() * 23 + hash;
}
int hash = ThreadLocalRandom.current().nextInt();
accepted.configureBlocking(false);
final Socket socket = accepted.socket();
socket.setKeepAlive(keepAlive != 0);
......@@ -420,11 +423,18 @@ final class NioTcpServer extends AbstractNioChannel<NioTcpServer> implements Acc
newConnection.setOption(Options.READ_TIMEOUT, Integer.valueOf(readTimeout));
newConnection.setOption(Options.WRITE_TIMEOUT, Integer.valueOf(writeTimeout));
ok = true;
handle.resetBackOff();
return newConnection;
} finally {
if (! ok) safeClose(accepted);
}
} catch (ClosedChannelException e) {
throw e;
} catch (IOException e) {
// something went wrong with the accept
// it could be due to running out of file descriptors, or due to closed channel, or other things
handle.startBackOff();
log.acceptFailed(e, handle.getBackOffTime());
return null;
} finally {
if (! ok) {
......
......@@ -19,8 +19,12 @@
package org.xnio.nio;
import java.nio.channels.SelectionKey;
import java.util.concurrent.TimeUnit;
import org.xnio.ChannelListeners;
import static java.lang.Math.min;
import static java.lang.Math.max;
import static java.lang.Thread.currentThread;
import static org.xnio.IoUtils.safeClose;
......@@ -36,6 +40,8 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
private int high;
private int tokenCount = -1;
private boolean stopped;
private boolean backOff;
private int backOffTime = 0;
NioTcpServerHandle(final NioTcpServer server, final SelectionKey key, final WorkerThread thread, final int low, final int high) {
super(thread, key);
......@@ -68,7 +74,7 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
void resume() {
final WorkerThread thread = getWorkerThread();
if (thread == currentThread()) {
if (! stopped && server.resumed) super.resume(SelectionKey.OP_ACCEPT);
if (! stopped && ! backOff && server.resumed) super.resume(SelectionKey.OP_ACCEPT);
} else {
thread.execute(new Runnable() {
public void run() {
......@@ -81,7 +87,7 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
void suspend() {
final WorkerThread thread = getWorkerThread();
if (thread == currentThread()) {
if (stopped || ! server.resumed) super.suspend(SelectionKey.OP_ACCEPT);
if (stopped || backOff || ! server.resumed) super.suspend(SelectionKey.OP_ACCEPT);
} else {
thread.execute(new Runnable() {
public void run() {
......@@ -105,6 +111,8 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
if (count-- <= low && tokenCount != 0 && stopped) {
stopped = false;
if (server.resumed) {
// end backoff optimistically
backOff = false;
super.resume(SelectionKey.OP_ACCEPT);
}
}
......@@ -117,7 +125,7 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
tokenCount = newCount;
if (count <= low && stopped) {
stopped = false;
if (server.resumed) {
if (server.resumed && ! backOff) {
super.resume(SelectionKey.OP_ACCEPT);
}
}
......@@ -128,6 +136,31 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
setThreadNewCount(workerThread, newCount);
}
/**
* Start back-off, when an accept produces an exception.
*/
void startBackOff() {
backOff = true;
backOffTime = max(250, min(30_000, backOffTime << 2));
suspend();
getWorkerThread().executeAfter(this::endBackOff, backOffTime, TimeUnit.MILLISECONDS);
}
/**
* End back-off, when an accept may be retried.
*/
void endBackOff() {
backOff = false;
resume();
}
/**
* Reset back-off, when an accept has succeeded.
*/
void resetBackOff() {
backOffTime = 0;
}
private void setThreadNewCount(final WorkerThread workerThread, final int newCount) {
final int number = workerThread.getNumber();
workerThread.execute(new Runnable() {
......@@ -157,7 +190,7 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
boolean getConnection() {
assert currentThread() == getWorkerThread();
if (stopped) {
if (stopped || backOff) {
return false;
}
if (tokenCount != -1 && --tokenCount == 0) {
......@@ -180,7 +213,7 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
suspend();
} else if (count <= low && stopped) {
stopped = false;
if (server.resumed) resume();
if (server.resumed && ! backOff) resume();
}
} else {
thread.execute(new Runnable() {
......@@ -195,4 +228,8 @@ final class NioTcpServerHandle extends NioHandle implements ChannelClosed {
assert currentThread() == getWorkerThread();
return count;
}
int getBackOffTime() {
return backOffTime;
}
}
......@@ -47,6 +47,7 @@ final class NioXnio extends Xnio {
static final boolean IS_HP_UX;
static final boolean HAS_BUGGY_EVENT_PORT;
static final boolean USE_ALT_QUEUED_SERVER;
interface SelectorCreator {
Selector open() throws IOException;
......@@ -64,6 +65,7 @@ final class NioXnio extends Xnio {
return Boolean.valueOf(System.getProperty("os.name", "unknown").equalsIgnoreCase("hp-ux"));
}
}).booleanValue();
USE_ALT_QUEUED_SERVER = Boolean.parseBoolean(AccessController.doPrivileged(new ReadPropertyAction("xnio.nio.alt-queued-server", "true")));
// if a JDK is released with a fix, we can try to detect it and set this to "false" for those JDKs.
HAS_BUGGY_EVENT_PORT = true;
}
......
......@@ -180,7 +180,12 @@ final class NioXnioWorker extends XnioWorker {
channel.socket().bind(bindAddress);
}
if (false) {
final NioTcpServer server = new NioTcpServer(this, channel, optionMap);
final NioTcpServer server = new NioTcpServer(this, channel, optionMap, false);
server.setAcceptListener(acceptListener);
ok = true;
return server;
} else if (NioXnio.USE_ALT_QUEUED_SERVER) {
final QueuedNioTcpServer2 server = new QueuedNioTcpServer2(new NioTcpServer(this, channel, optionMap, true));
server.setAcceptListener(acceptListener);
ok = true;
return server;
......
......@@ -38,6 +38,7 @@ import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
......@@ -492,25 +493,7 @@ final class QueuedNioTcpServer extends AbstractNioChannel<QueuedNioTcpServer> im
try {
boolean ok = false;
if (accepted != null) try {
final SocketAddress localAddress = accepted.getLocalAddress();
int hash;
if (localAddress instanceof InetSocketAddress) {
final InetSocketAddress address = (InetSocketAddress) localAddress;
hash = address.getAddress().hashCode() * 23 + address.getPort();
} else if (localAddress instanceof LocalSocketAddress) {
hash = ((LocalSocketAddress) localAddress).getName().hashCode();
} else {
hash = localAddress.hashCode();
}
final SocketAddress remoteAddress = accepted.getRemoteAddress();
if (remoteAddress instanceof InetSocketAddress) {
final InetSocketAddress address = (InetSocketAddress) remoteAddress;
hash = (address.getAddress().hashCode() * 23 + address.getPort()) * 23 + hash;
} else if (remoteAddress instanceof LocalSocketAddress) {
hash = ((LocalSocketAddress) remoteAddress).getName().hashCode() * 23 + hash;
} else {
hash = localAddress.hashCode() * 23 + hash;
}
int hash = ThreadLocalRandom.current().nextInt();
accepted.configureBlocking(false);
final Socket socket = accepted.socket();
socket.setKeepAlive(keepAlive != 0);
......
/*
* JBoss, Home of Professional Open Source.
* Copyright 2019 Red Hat, Inc., and individual contributors
* as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.xnio.nio;
import java.io.IOException;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.wildfly.common.Assert;
import org.xnio.ChannelListener;
import org.xnio.ChannelListeners;
import org.xnio.Option;
import org.xnio.StreamConnection;
import org.xnio.XnioExecutor;
import org.xnio.XnioIoThread;
import org.xnio.channels.AcceptListenerSettable;
import org.xnio.channels.AcceptingChannel;
final class QueuedNioTcpServer2 extends AbstractNioChannel<QueuedNioTcpServer2> implements AcceptingChannel<StreamConnection>, AcceptListenerSettable<QueuedNioTcpServer2> {
private final NioTcpServer realServer;
private final List<Queue<StreamConnection>> acceptQueues;
private final Runnable acceptTask = this::acceptTask;
private volatile ChannelListener<? super QueuedNioTcpServer2> acceptListener;
QueuedNioTcpServer2(final NioTcpServer realServer) {
super(realServer.getWorker());
this.realServer = realServer;
final NioXnioWorker worker = realServer.getWorker();
final int cnt = worker.getIoThreadCount();
acceptQueues = new ArrayList<>(cnt);
for (int i = 0; i < cnt; i ++) {
acceptQueues.add(new LinkedBlockingQueue<>());
}
realServer.getCloseSetter().set(ignored -> invokeCloseHandler());
realServer.getAcceptSetter().set(ignored -> handleReady());
}
public StreamConnection accept() throws IOException {
final WorkerThread current = WorkerThread.getCurrent();
if (current == null) {
return null;
}
final Queue<StreamConnection> socketChannels = acceptQueues.get(current.getNumber());
final StreamConnection connection = socketChannels.poll();
if (connection == null) {
if (! realServer.isOpen()) {
throw new ClosedChannelException();
}
}
return connection;
}
public ChannelListener<? super QueuedNioTcpServer2> getAcceptListener() {
return acceptListener;
}
public void setAcceptListener(final ChannelListener<? super QueuedNioTcpServer2> listener) {
this.acceptListener = listener;
}
public ChannelListener.Setter<QueuedNioTcpServer2> getAcceptSetter() {
return new Setter<QueuedNioTcpServer2>(this);
}
public SocketAddress getLocalAddress() {
return realServer.getLocalAddress();
}
public <A extends SocketAddress> A getLocalAddress(final Class<A> type) {
return realServer.getLocalAddress(type);
}
public void suspendAccepts() {
realServer.suspendAccepts();
}
public void resumeAccepts() {
realServer.resumeAccepts();
}
public boolean isAcceptResumed() {
return realServer.isAcceptResumed();
}
public void wakeupAccepts() {
realServer.wakeupAccepts();
}
public void awaitAcceptable() {
throw Assert.unsupported();
}
public void awaitAcceptable(final long time, final TimeUnit timeUnit) {
throw Assert.unsupported();
}
@Deprecated
public XnioExecutor getAcceptThread() {
return getIoThread();
}
public void close() throws IOException {
realServer.close();
}
public boolean isOpen() {
return realServer.isOpen();
}
public boolean supportsOption(final Option<?> option) {
return realServer.supportsOption(option);
}
public <T> T getOption(final Option<T> option) throws IOException {
return realServer.getOption(option);
}
public <T> T setOption(final Option<T> option, final T value) throws IllegalArgumentException, IOException {
return realServer.setOption(option, value);
}
void handleReady() {
final NioTcpServer realServer = this.realServer;
NioSocketStreamConnection connection;
try {
connection = realServer.accept();
} catch (ClosedChannelException e) {
return;
}
XnioIoThread thread;
if (connection != null) {
int i = 0;
final Runnable acceptTask = this.acceptTask;
do {
thread = connection.getIoThread();
acceptQueues.get(thread.getNumber()).add(connection);
thread.execute(acceptTask);
if (++i == 128) {
// prevent starvation of other acceptors
return;
}
try {
connection = realServer.accept();
} catch (ClosedChannelException e) {
return;
}
} while (connection != null);
}
}
void acceptTask() {
final WorkerThread current = WorkerThread.getCurrent();
assert current != null;
final Queue<StreamConnection> queue = acceptQueues.get(current.getNumber());
ChannelListeners.invokeChannelListener(QueuedNioTcpServer2.this, getAcceptListener());
if (! queue.isEmpty()) {
current.execute(acceptTask);
}
}
}
......@@ -25,14 +25,14 @@
<parent>
<groupId>org.jboss</groupId>
<artifactId>jboss-parent</artifactId>
<version>20</version>
<version>35</version>
</parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
<packaging>pom</packaging>
<name>XNIO Parent POM</name>
<version>3.7.3.Final</version>
<version>3.7.6.Final</version>
<description>The aggregator POM of the XNIO project</description>
<licenses>
......@@ -54,7 +54,7 @@
<version.org.jboss.logging.jboss-logging-tools>2.2.0.Final</version.org.jboss.logging.jboss-logging-tools>
<version.org.jboss.logmanager.jboss-logmanager>2.1.10.Final</version.org.jboss.logmanager.jboss-logmanager>
<version.org.jboss.threads>2.3.0.Beta2</version.org.jboss.threads>
<version.org.wildfly.common>1.3.0.Final</version.org.wildfly.common>
<version.org.wildfly.common>1.5.2.Final</version.org.wildfly.common>
<version.org.wildfly.client-config>1.0.0.Final</version.org.wildfly.client-config>
<version.bridger.plugin>1.1.Final</version.bridger.plugin>
<version.junit>4.11</version.junit>
......