Skip to content
Commits on Source (4)
......@@ -33,3 +33,7 @@ hs_err_pid*.log
dependency-reduced-pom.xml
*/.unison.*
# exclude mainframer files
mainframer
.mainframer
......@@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</parent>
<artifactId>netty-all</artifactId>
......
......@@ -25,7 +25,7 @@
<groupId>io.netty</groupId>
<artifactId>netty-bom</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
<packaging>pom</packaging>
<name>Netty/BOM</name>
......@@ -49,7 +49,7 @@
<url>https://github.com/netty/netty</url>
<connection>scm:git:git://github.com/netty/netty.git</connection>
<developerConnection>scm:git:ssh://git@github.com/netty/netty.git</developerConnection>
<tag>netty-4.1.29.Final</tag>
<tag>netty-4.1.33.Final</tag>
</scm>
<developers>
......@@ -69,165 +69,165 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-dns</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-haproxy</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-memcache</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-mqtt</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-redis</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-smtp</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-socks</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-stomp</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-xml</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-dev-tools</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-resolver-dns</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-rxtx</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-sctp</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-udt</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-example</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
<classifier>osx-x86_64</classifier>
</dependency>
</dependencies>
......
......@@ -20,7 +20,7 @@
<parent>
<groupId>io.netty</groupId>
<artifactId>netty-parent</artifactId>
<version>4.1.29.Final</version>
<version>4.1.33.Final</version>
</parent>
<artifactId>netty-buffer</artifactId>
......
......@@ -15,6 +15,7 @@
*/
package io.netty.buffer;
import io.netty.util.AsciiString;
import io.netty.util.ByteProcessor;
import io.netty.util.CharsetUtil;
import io.netty.util.IllegalReferenceCountException;
......@@ -43,13 +44,22 @@ import static io.netty.util.internal.MathUtil.isOutOfBounds;
*/
public abstract class AbstractByteBuf extends ByteBuf {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(AbstractByteBuf.class);
private static final String PROP_MODE = "io.netty.buffer.bytebuf.checkAccessible";
private static final boolean checkAccessible;
private static final String LEGACY_PROP_CHECK_ACCESSIBLE = "io.netty.buffer.bytebuf.checkAccessible";
private static final String PROP_CHECK_ACCESSIBLE = "io.netty.buffer.checkAccessible";
static final boolean checkAccessible; // accessed from CompositeByteBuf
private static final String PROP_CHECK_BOUNDS = "io.netty.buffer.checkBounds";
private static final boolean checkBounds;
static {
checkAccessible = SystemPropertyUtil.getBoolean(PROP_MODE, true);
if (SystemPropertyUtil.contains(PROP_CHECK_ACCESSIBLE)) {
checkAccessible = SystemPropertyUtil.getBoolean(PROP_CHECK_ACCESSIBLE, true);
} else {
checkAccessible = SystemPropertyUtil.getBoolean(LEGACY_PROP_CHECK_ACCESSIBLE, true);
}
checkBounds = SystemPropertyUtil.getBoolean(PROP_CHECK_BOUNDS, true);
if (logger.isDebugEnabled()) {
logger.debug("-D{}: {}", PROP_MODE, checkAccessible);
logger.debug("-D{}: {}", PROP_CHECK_ACCESSIBLE, checkAccessible);
logger.debug("-D{}: {}", PROP_CHECK_BOUNDS, checkBounds);
}
}
......@@ -97,11 +107,18 @@ public abstract class AbstractByteBuf extends ByteBuf {
return readerIndex;
}
private static void checkIndexBounds(final int readerIndex, final int writerIndex, final int capacity) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity));
}
}
@Override
public ByteBuf readerIndex(int readerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d (expected: 0 <= readerIndex <= writerIndex(%d))", readerIndex, writerIndex));
if (checkBounds) {
checkIndexBounds(readerIndex, writerIndex, capacity());
}
this.readerIndex = readerIndex;
return this;
......@@ -114,10 +131,8 @@ public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf writerIndex(int writerIndex) {
if (writerIndex < readerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex: %d (expected: readerIndex(%d) <= writerIndex <= capacity(%d))",
writerIndex, readerIndex, capacity()));
if (checkBounds) {
checkIndexBounds(readerIndex, writerIndex, capacity());
}
this.writerIndex = writerIndex;
return this;
......@@ -125,10 +140,8 @@ public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf setIndex(int readerIndex, int writerIndex) {
if (readerIndex < 0 || readerIndex > writerIndex || writerIndex > capacity()) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex: %d, writerIndex: %d (expected: 0 <= readerIndex <= writerIndex <= capacity(%d))",
readerIndex, writerIndex, capacity()));
if (checkBounds) {
checkIndexBounds(readerIndex, writerIndex, capacity());
}
setIndex0(readerIndex, writerIndex);
return this;
......@@ -271,12 +284,13 @@ public abstract class AbstractByteBuf extends ByteBuf {
if (minWritableBytes <= writableBytes()) {
return;
}
if (checkBounds) {
if (minWritableBytes > maxCapacity - writerIndex) {
throw new IndexOutOfBoundsException(String.format(
"writerIndex(%d) + minWritableBytes(%d) exceeds maxCapacity(%d): %s",
writerIndex, minWritableBytes, maxCapacity, this));
}
}
// Normalize the current capacity to the power of 2.
int newCapacity = alloc().calculateNewCapacity(writerIndex + minWritableBytes, maxCapacity);
......@@ -318,12 +332,12 @@ public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf order(ByteOrder endianness) {
if (endianness == null) {
throw new NullPointerException("endianness");
}
if (endianness == order()) {
return this;
}
if (endianness == null) {
throw new NullPointerException("endianness");
}
return newSwappedByteBuf();
}
......@@ -490,7 +504,10 @@ public abstract class AbstractByteBuf extends ByteBuf {
@Override
public CharSequence getCharSequence(int index, int length, Charset charset) {
// TODO: We could optimize this for UTF8 and US_ASCII
if (CharsetUtil.US_ASCII.equals(charset) || CharsetUtil.ISO_8859_1.equals(charset)) {
// ByteBufUtil.getBytes(...) will return a new copy which the AsciiString uses directly
return new AsciiString(ByteBufUtil.getBytes(this, index, length, true), false);
}
return toString(index, length, charset);
}
......@@ -618,15 +635,21 @@ public abstract class AbstractByteBuf extends ByteBuf {
return this;
}
private static void checkReadableBounds(final ByteBuf src, final int length) {
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
}
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int length) {
checkIndex(index, length);
if (src == null) {
throw new NullPointerException("src");
}
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
if (checkBounds) {
checkReadableBounds(src, length);
}
setBytes(index, src, src.readerIndex(), length);
......@@ -889,10 +912,12 @@ public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf readBytes(ByteBuf dst, int length) {
if (checkBounds) {
if (length > dst.writableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds dst.writableBytes(%d) where dst is: %s", length, dst.writableBytes(), dst));
}
}
readBytes(dst, dst.writerIndex(), length);
dst.writerIndex(dst.writerIndex() + length);
return this;
......@@ -1065,9 +1090,8 @@ public abstract class AbstractByteBuf extends ByteBuf {
@Override
public ByteBuf writeBytes(ByteBuf src, int length) {
if (length > src.readableBytes()) {
throw new IndexOutOfBoundsException(String.format(
"length(%d) exceeds src.readableBytes(%d) where src is: %s", length, src.readableBytes(), src));
if (checkBounds) {
checkReadableBounds(src, length);
}
writeBytes(src, src.readerIndex(), length);
src.readerIndex(src.readerIndex() + length);
......@@ -1269,7 +1293,7 @@ public abstract class AbstractByteBuf extends ByteBuf {
}
}
private int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
int forEachByteAsc0(int start, int end, ByteProcessor processor) throws Exception {
for (; start < end; ++start) {
if (!processor.process(_getByte(start))) {
return start;
......@@ -1301,7 +1325,7 @@ public abstract class AbstractByteBuf extends ByteBuf {
}
}
private int forEachByteDesc0(int rStart, final int rEnd, ByteProcessor processor) throws Exception {
int forEachByteDesc0(int rStart, final int rEnd, ByteProcessor processor) throws Exception {
for (; rStart >= rEnd; --rStart) {
if (!processor.process(_getByte(rStart))) {
return rStart;
......@@ -1357,26 +1381,30 @@ public abstract class AbstractByteBuf extends ByteBuf {
checkIndex0(index, fieldLength);
}
final void checkIndex0(int index, int fieldLength) {
if (isOutOfBounds(index, fieldLength, capacity())) {
private static void checkRangeBounds(final int index, final int fieldLength, final int capacity) {
if (isOutOfBounds(index, fieldLength, capacity)) {
throw new IndexOutOfBoundsException(String.format(
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity()));
"index: %d, length: %d (expected: range(0, %d))", index, fieldLength, capacity));
}
}
final void checkIndex0(int index, int fieldLength) {
if (checkBounds) {
checkRangeBounds(index, fieldLength, capacity());
}
}
protected final void checkSrcIndex(int index, int length, int srcIndex, int srcCapacity) {
checkIndex(index, length);
if (isOutOfBounds(srcIndex, length, srcCapacity)) {
throw new IndexOutOfBoundsException(String.format(
"srcIndex: %d, length: %d (expected: range(0, %d))", srcIndex, length, srcCapacity));
if (checkBounds) {
checkRangeBounds(srcIndex, length, srcCapacity);
}
}
protected final void checkDstIndex(int index, int length, int dstIndex, int dstCapacity) {
checkIndex(index, length);
if (isOutOfBounds(dstIndex, length, dstCapacity)) {
throw new IndexOutOfBoundsException(String.format(
"dstIndex: %d, length: %d (expected: range(0, %d))", dstIndex, length, dstCapacity));
if (checkBounds) {
checkRangeBounds(dstIndex, length, dstCapacity);
}
}
......@@ -1394,30 +1422,43 @@ public abstract class AbstractByteBuf extends ByteBuf {
protected final void checkNewCapacity(int newCapacity) {
ensureAccessible();
if (checkBounds) {
if (newCapacity < 0 || newCapacity > maxCapacity()) {
throw new IllegalArgumentException("newCapacity: " + newCapacity + " (expected: 0-" + maxCapacity() + ')');
throw new IllegalArgumentException("newCapacity: " + newCapacity +
" (expected: 0-" + maxCapacity() + ')');
}
}
}
private void checkReadableBytes0(int minimumReadableBytes) {
ensureAccessible();
if (checkBounds) {
if (readerIndex > writerIndex - minimumReadableBytes) {
throw new IndexOutOfBoundsException(String.format(
"readerIndex(%d) + length(%d) exceeds writerIndex(%d): %s",
readerIndex, minimumReadableBytes, writerIndex, this));
}
}
}
/**
* Should be called by every method that tries to access the buffers content to check
* if the buffer was released before.
*/
protected final void ensureAccessible() {
if (checkAccessible && refCnt() == 0) {
if (checkAccessible && internalRefCnt() == 0) {
throw new IllegalReferenceCountException(0);
}
}
/**
* Returns the reference count that is used internally by {@link #ensureAccessible()} to try to guard
* against using the buffer after it was released (best-effort).
*/
int internalRefCnt() {
return refCnt();
}
final void setIndex0(int readerIndex, int writerIndex) {
this.readerIndex = readerIndex;
this.writerIndex = writerIndex;
......
......@@ -17,6 +17,7 @@
package io.netty.buffer;
import io.netty.util.IllegalReferenceCountException;
import io.netty.util.internal.PlatformDependent;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
......@@ -26,27 +27,59 @@ import static io.netty.util.internal.ObjectUtil.checkPositive;
* Abstract base class for {@link ByteBuf} implementations that count references.
*/
public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
private static final long REFCNT_FIELD_OFFSET;
private static final AtomicIntegerFieldUpdater<AbstractReferenceCountedByteBuf> refCntUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractReferenceCountedByteBuf.class, "refCnt");
private volatile int refCnt;
// even => "real" refcount is (refCnt >>> 1); odd => "real" refcount is 0
@SuppressWarnings("unused")
private volatile int refCnt = 2;
static {
long refCntFieldOffset = -1;
try {
if (PlatformDependent.hasUnsafe()) {
refCntFieldOffset = PlatformDependent.objectFieldOffset(
AbstractReferenceCountedByteBuf.class.getDeclaredField("refCnt"));
}
} catch (Throwable ignore) {
refCntFieldOffset = -1;
}
REFCNT_FIELD_OFFSET = refCntFieldOffset;
}
private static int realRefCnt(int rawCnt) {
return (rawCnt & 1) != 0 ? 0 : rawCnt >>> 1;
}
protected AbstractReferenceCountedByteBuf(int maxCapacity) {
super(maxCapacity);
refCntUpdater.set(this, 1);
}
private int nonVolatileRawCnt() {
// TODO: Once we compile against later versions of Java we can replace the Unsafe usage here by varhandles.
return REFCNT_FIELD_OFFSET != -1 ? PlatformDependent.getInt(this, REFCNT_FIELD_OFFSET)
: refCntUpdater.get(this);
}
@Override
int internalRefCnt() {
// Try to do non-volatile read for performance as the ensureAccessible() is racy anyway and only provide
// a best-effort guard.
return realRefCnt(nonVolatileRawCnt());
}
@Override
public int refCnt() {
return refCnt;
return realRefCnt(refCntUpdater.get(this));
}
/**
* An unsafe operation intended for use by a subclass that sets the reference count of the buffer directly
*/
protected final void setRefCnt(int refCnt) {
refCntUpdater.set(this, refCnt);
protected final void setRefCnt(int newRefCnt) {
refCntUpdater.set(this, newRefCnt << 1); // overflow OK here
}
@Override
......@@ -60,11 +93,18 @@ public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
}
private ByteBuf retain0(final int increment) {
int oldRef = refCntUpdater.getAndAdd(this, increment);
if (oldRef <= 0 || oldRef + increment < oldRef) {
// Ensure we don't resurrect (which means the refCnt was 0) and also that we encountered an overflow.
refCntUpdater.getAndAdd(this, -increment);
throw new IllegalReferenceCountException(oldRef, increment);
// all changes to the raw count are 2x the "real" change
int adjustedIncrement = increment << 1; // overflow OK here
int oldRef = refCntUpdater.getAndAdd(this, adjustedIncrement);
if ((oldRef & 1) != 0) {
throw new IllegalReferenceCountException(0, increment);
}
// don't pass 0!
if ((oldRef <= 0 && oldRef + adjustedIncrement >= 0)
|| (oldRef >= 0 && oldRef + adjustedIncrement < oldRef)) {
// overflow case
refCntUpdater.getAndAdd(this, -adjustedIncrement);
throw new IllegalReferenceCountException(realRefCnt(oldRef), increment);
}
return this;
}
......@@ -90,17 +130,57 @@ public abstract class AbstractReferenceCountedByteBuf extends AbstractByteBuf {
}
private boolean release0(int decrement) {
int oldRef = refCntUpdater.getAndAdd(this, -decrement);
if (oldRef == decrement) {
int rawCnt = nonVolatileRawCnt(), realCnt = toLiveRealCnt(rawCnt, decrement);
if (decrement == realCnt) {
if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
deallocate();
return true;
} else if (oldRef < decrement || oldRef - decrement > oldRef) {
// Ensure we don't over-release, and avoid underflow.
refCntUpdater.getAndAdd(this, decrement);
throw new IllegalReferenceCountException(oldRef, -decrement);
}
return retryRelease0(decrement);
}
return releaseNonFinal0(decrement, rawCnt, realCnt);
}
private boolean releaseNonFinal0(int decrement, int rawCnt, int realCnt) {
if (decrement < realCnt
// all changes to the raw count are 2x the "real" change
&& refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
return false;
}
return retryRelease0(decrement);
}
private boolean retryRelease0(int decrement) {
for (;;) {
int rawCnt = refCntUpdater.get(this), realCnt = toLiveRealCnt(rawCnt, decrement);
if (decrement == realCnt) {
if (refCntUpdater.compareAndSet(this, rawCnt, 1)) {
deallocate();
return true;
}
} else if (decrement < realCnt) {
// all changes to the raw count are 2x the "real" change
if (refCntUpdater.compareAndSet(this, rawCnt, rawCnt - (decrement << 1))) {
return false;
}
} else {
throw new IllegalReferenceCountException(realCnt, -decrement);
}
Thread.yield(); // this benefits throughput under high contention
}
}
/**
* Like {@link #realRefCnt(int)} but throws if refCnt == 0
*/
private static int toLiveRealCnt(int rawCnt, int decrement) {
if ((rawCnt & 1) == 0) {
return rawCnt >>> 1;
}
// odd rawCnt => already deallocated
throw new IllegalReferenceCountException(0, -decrement);
}
/**
* Called once {@link #refCnt()} is equals 0.
*/
......
......@@ -16,6 +16,7 @@
package io.netty.buffer;
import io.netty.util.ReferenceCounted;
import io.netty.util.internal.StringUtil;
import java.io.DataInput;
import java.io.DataInputStream;
......@@ -240,17 +241,18 @@ public class ByteBufInputStream extends InputStream implements DataInput {
return buffer.readInt();
}
private final StringBuilder lineBuf = new StringBuilder();
private StringBuilder lineBuf;
@Override
public String readLine() throws IOException {
lineBuf.setLength(0);
loop: while (true) {
if (!buffer.isReadable()) {
return lineBuf.length() > 0 ? lineBuf.toString() : null;
return null;
}
if (lineBuf != null) {
lineBuf.setLength(0);
}
loop: do {
int c = buffer.readUnsignedByte();
switch (c) {
case '\n':
......@@ -263,11 +265,14 @@ public class ByteBufInputStream extends InputStream implements DataInput {
break loop;
default:
lineBuf.append((char) c);
if (lineBuf == null) {
lineBuf = new StringBuilder();
}
lineBuf.append((char) c);
}
} while (buffer.isReadable());
return lineBuf.toString();
return lineBuf != null && lineBuf.length() > 0 ? lineBuf.toString() : StringUtil.EMPTY_STRING;
}
@Override
......
......@@ -53,10 +53,10 @@ import static io.netty.util.internal.StringUtil.isSurrogate;
public final class ByteBufUtil {
private static final InternalLogger logger = InternalLoggerFactory.getInstance(ByteBufUtil.class);
private static final FastThreadLocal<CharBuffer> CHAR_BUFFERS = new FastThreadLocal<CharBuffer>() {
private static final FastThreadLocal<byte[]> BYTE_ARRAYS = new FastThreadLocal<byte[]>() {
@Override
protected CharBuffer initialValue() throws Exception {
return CharBuffer.allocate(1024);
protected byte[] initialValue() throws Exception {
return PlatformDependent.allocateUninitializedArray(MAX_TL_ARRAY_LEN);
}
};
......@@ -95,6 +95,16 @@ public final class ByteBufUtil {
logger.debug("-Dio.netty.maxThreadLocalCharBufferSize: {}", MAX_CHAR_BUFFER_SIZE);
}
static final int MAX_TL_ARRAY_LEN = 1024;
/**
* Allocates a new array if minLength > {@link ByteBufUtil#MAX_TL_ARRAY_LEN}
*/
static byte[] threadLocalTempArray(int minLength) {
return minLength <= MAX_TL_ARRAY_LEN ? BYTE_ARRAYS.get()
: PlatformDependent.allocateUninitializedArray(minLength);
}
/**
* Returns a <a href="http://en.wikipedia.org/wiki/Hex_dump">hex dump</a>
* of the specified buffer's readable bytes.
......@@ -452,8 +462,9 @@ public final class ByteBufUtil {
}
private static int lastIndexOf(ByteBuf buffer, int fromIndex, int toIndex, byte value) {
fromIndex = Math.min(fromIndex, buffer.capacity());
if (fromIndex < 0 || buffer.capacity() == 0) {
int capacity = buffer.capacity();
fromIndex = Math.min(fromIndex, capacity);
if (fromIndex < 0 || capacity == 0) {
return -1;
}
......@@ -546,10 +557,22 @@ public final class ByteBufUtil {
buffer._setByte(writerIndex++, WRITE_UTF_UNKNOWN);
break;
}
// Extra method to allow inlining the rest of writeUtf8 which is the most likely code path.
writerIndex = writeUtf8Surrogate(buffer, writerIndex, c, c2);
} else {
buffer._setByte(writerIndex++, (byte) (0xe0 | (c >> 12)));
buffer._setByte(writerIndex++, (byte) (0x80 | ((c >> 6) & 0x3f)));
buffer._setByte(writerIndex++, (byte) (0x80 | (c & 0x3f)));
}
}
return writerIndex - oldWriterIndex;
}
private static int writeUtf8Surrogate(AbstractByteBuf buffer, int writerIndex, char c, char c2) {
if (!Character.isLowSurrogate(c2)) {
buffer._setByte(writerIndex++, WRITE_UTF_UNKNOWN);
buffer._setByte(writerIndex++, Character.isHighSurrogate(c2) ? WRITE_UTF_UNKNOWN : c2);
continue;
return writerIndex;
}
int codePoint = Character.toCodePoint(c, c2);
// See http://www.unicode.org/versions/Unicode7.0.0/ch03.pdf#G2630.
......@@ -557,13 +580,7 @@ public final class ByteBufUtil {
buffer._setByte(writerIndex++, (byte) (0x80 | ((codePoint >> 12) & 0x3f)));
buffer._setByte(writerIndex++, (byte) (0x80 | ((codePoint >> 6) & 0x3f)));
buffer._setByte(writerIndex++, (byte) (0x80 | (codePoint & 0x3f)));
} else {
buffer._setByte(writerIndex++, (byte) (0xe0 | (c >> 12)));
buffer._setByte(writerIndex++, (byte) (0x80 | ((c >> 6) & 0x3f)));
buffer._setByte(writerIndex++, (byte) (0x80 | (c & 0x3f)));
}
}
return writerIndex - oldWriterIndex;
return writerIndex;
}
/**
......@@ -756,52 +773,27 @@ public final class ByteBufUtil {
}
}
@SuppressWarnings("deprecation")
static String decodeString(ByteBuf src, int readerIndex, int len, Charset charset) {
if (len == 0) {
return StringUtil.EMPTY_STRING;
}
final CharsetDecoder decoder = CharsetUtil.decoder(charset);
final int maxLength = (int) ((double) len * decoder.maxCharsPerByte());
CharBuffer dst = CHAR_BUFFERS.get();
if (dst.length() < maxLength) {
dst = CharBuffer.allocate(maxLength);
if (maxLength <= MAX_CHAR_BUFFER_SIZE) {
CHAR_BUFFERS.set(dst);
}
} else {
dst.clear();
}
if (src.nioBufferCount() == 1) {
decodeString(decoder, src.nioBuffer(readerIndex, len), dst);
} else {
// We use a heap buffer as CharsetDecoder is most likely able to use a fast-path if src and dst buffers
// are both backed by a byte array.
ByteBuf buffer = src.alloc().heapBuffer(len);
try {
buffer.writeBytes(src, readerIndex, len);
// Use internalNioBuffer(...) to reduce object creation.
decodeString(decoder, buffer.internalNioBuffer(buffer.readerIndex(), len), dst);
} finally {
// Release the temporary buffer again.
buffer.release();
}
}
return dst.flip().toString();
}
final byte[] array;
final int offset;
private static void decodeString(CharsetDecoder decoder, ByteBuffer src, CharBuffer dst) {
try {
CoderResult cr = decoder.decode(src, dst, true);
if (!cr.isUnderflow()) {
cr.throwException();
if (src.hasArray()) {
array = src.array();
offset = src.arrayOffset() + readerIndex;
} else {
array = threadLocalTempArray(len);
offset = 0;
src.getBytes(readerIndex, array, 0, len);
}
cr = decoder.flush(dst);
if (!cr.isUnderflow()) {
cr.throwException();
}
} catch (CharacterCodingException x) {
throw new IllegalStateException(x);
if (CharsetUtil.US_ASCII.equals(charset)) {
// Fast-path for US-ASCII which is used frequently.
return new String(array, 0, offset, len);
}
return new String(array, offset, len, charset);
}
/**
......@@ -844,13 +836,14 @@ public final class ByteBufUtil {
* If {@code copy} is false the underlying storage will be shared, if possible.
*/
public static byte[] getBytes(ByteBuf buf, int start, int length, boolean copy) {
if (isOutOfBounds(start, length, buf.capacity())) {
int capacity = buf.capacity();
if (isOutOfBounds(start, length, capacity)) {
throw new IndexOutOfBoundsException("expected: " + "0 <= start(" + start + ") <= start + length(" + length
+ ") <= " + "buf.capacity(" + buf.capacity() + ')');
+ ") <= " + "buf.capacity(" + capacity + ')');
}
if (buf.hasArray()) {
if (copy || start != 0 || length != buf.capacity()) {
if (copy || start != 0 || length != capacity) {
int baseOffset = buf.arrayOffset() + start;
return Arrays.copyOfRange(buf.array(), baseOffset, baseOffset + length);
} else {
......@@ -858,7 +851,7 @@ public final class ByteBufUtil {
}
}
byte[] v = new byte[length];
byte[] v = PlatformDependent.allocateUninitializedArray(length);
buf.getBytes(start, v);
return v;
}
......@@ -1413,7 +1406,9 @@ public final class ByteBufUtil {
int chunkLen = Math.min(length, WRITE_CHUNK_SIZE);
buffer.clear().position(position);
if (allocator.isDirectBufferPooled()) {
if (length <= MAX_TL_ARRAY_LEN || !allocator.isDirectBufferPooled()) {
getBytes(buffer, threadLocalTempArray(chunkLen), 0, chunkLen, out, length);
} else {
// if direct buffers are pooled chances are good that heap buffers are pooled as well.
ByteBuf tmpBuf = allocator.heapBuffer(chunkLen);
try {
......@@ -1423,8 +1418,6 @@ public final class ByteBufUtil {
} finally {
tmpBuf.release();
}
} else {
getBytes(buffer, new byte[chunkLen], 0, chunkLen, out, length);
}
}
}
......
......@@ -205,7 +205,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
assert s.doNotDestroy && s.elemSize == normCapacity;
long handle = s.allocate();
assert handle >= 0;
s.chunk.initBufWithSubpage(buf, handle, reqCapacity);
s.chunk.initBufWithSubpage(buf, null, handle, reqCapacity);
incTinySmallAllocation(tiny);
return;
}
......@@ -242,9 +242,8 @@ abstract class PoolArena<T> implements PoolArenaMetric {
// Add a new chunk.
PoolChunk<T> c = newChunk(pageSize, maxOrder, pageShifts, chunkSize);
long handle = c.allocate(normCapacity);
assert handle > 0;
c.initBuf(buf, handle, reqCapacity);
boolean success = c.allocate(buf, reqCapacity, normCapacity);
assert success;
qInit.add(c);
}
......@@ -263,7 +262,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
allocationsHuge.increment();
}
void free(PoolChunk<T> chunk, long handle, int normCapacity, PoolThreadCache cache) {
void free(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, int normCapacity, PoolThreadCache cache) {
if (chunk.unpooled) {
int size = chunk.chunkSize();
destroyChunk(chunk);
......@@ -271,12 +270,12 @@ abstract class PoolArena<T> implements PoolArenaMetric {
deallocationsHuge.increment();
} else {
SizeClass sizeClass = sizeClass(normCapacity);
if (cache != null && cache.add(this, chunk, handle, normCapacity, sizeClass)) {
if (cache != null && cache.add(this, chunk, nioBuffer, handle, normCapacity, sizeClass)) {
// cached so not free it.
return;
}
freeChunk(chunk, handle, sizeClass);
freeChunk(chunk, handle, sizeClass, nioBuffer);
}
}
......@@ -287,7 +286,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
return isTiny(normCapacity) ? SizeClass.Tiny : SizeClass.Small;
}
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass) {
void freeChunk(PoolChunk<T> chunk, long handle, SizeClass sizeClass, ByteBuffer nioBuffer) {
final boolean destroyChunk;
synchronized (this) {
switch (sizeClass) {
......@@ -303,7 +302,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
default:
throw new Error();
}
destroyChunk = !chunk.parent.free(chunk, handle);
destroyChunk = !chunk.parent.free(chunk, handle, nioBuffer);
}
if (destroyChunk) {
// destroyChunk not need to be called while holding the synchronized lock.
......@@ -387,6 +386,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
}
PoolChunk<T> oldChunk = buf.chunk;
ByteBuffer oldNioBuffer = buf.tmpNioBuf;
long oldHandle = buf.handle;
T oldMemory = buf.memory;
int oldOffset = buf.offset;
......@@ -415,7 +415,7 @@ abstract class PoolArena<T> implements PoolArenaMetric {
buf.setIndex(readerIndex, writerIndex);
if (freeOldMemory) {
free(oldChunk, oldHandle, oldMaxLength, buf.cache);
free(oldChunk, oldNioBuffer, oldHandle, oldMaxLength, buf.cache);
}
}
......@@ -725,11 +725,16 @@ abstract class PoolArena<T> implements PoolArenaMetric {
return true;
}
private int offsetCacheLine(ByteBuffer memory) {
// mark as package-private, only for unit test
int offsetCacheLine(ByteBuffer memory) {
// We can only calculate the offset if Unsafe is present as otherwise directBufferAddress(...) will
// throw an NPE.
return HAS_UNSAFE ?
(int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask) : 0;
int remainder = HAS_UNSAFE
? (int) (PlatformDependent.directBufferAddress(memory) & directMemoryCacheAlignmentMask)
: 0;
// offset = alignment - address & (alignment - 1)
return directMemoryCacheAlignment - remainder;
}
@Override
......
......@@ -16,6 +16,10 @@
package io.netty.buffer;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Deque;
/**
* Description of algorithm for PageRun/PoolSubpage allocation from PoolChunk
*
......@@ -107,7 +111,6 @@ final class PoolChunk<T> implements PoolChunkMetric {
final T memory;
final boolean unpooled;
final int offset;
private final byte[] memoryMap;
private final byte[] depthMap;
private final PoolSubpage<T>[] subpages;
......@@ -122,6 +125,13 @@ final class PoolChunk<T> implements PoolChunkMetric {
/** Used to mark memory as unusable */
private final byte unusable;
// Use as cache for ByteBuffer created from the memory. These are just duplicates and so are only a container
// around the memory itself. These are often needed for operations within the Pooled*ByteBuf and so
// may produce extra GC, which can be greatly reduced by caching the duplicates.
//
// This may be null if the PoolChunk is unpooled as pooling the ByteBuffer instances does not make any sense here.
private final Deque<ByteBuffer> cachedNioBuffers;
private int freeBytes;
PoolChunkList<T> parent;
......@@ -163,6 +173,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
}
subpages = newSubpageArray(maxSubpageAllocs);
cachedNioBuffers = new ArrayDeque<ByteBuffer>(8);
}
/** Creates a special chunk that is not pooled. */
......@@ -182,6 +193,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
chunkSize = size;
log2ChunkSize = log2(chunkSize);
maxSubpageAllocs = 0;
cachedNioBuffers = null;
}
@SuppressWarnings("unchecked")
......@@ -210,12 +222,20 @@ final class PoolChunk<T> implements PoolChunkMetric {
return 100 - freePercentage;
}
long allocate(int normCapacity) {
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
final long handle;
if ((normCapacity & subpageOverflowMask) != 0) { // >= pageSize
return allocateRun(normCapacity);
handle = allocateRun(normCapacity);
} else {
return allocateSubpage(normCapacity);
handle = allocateSubpage(normCapacity);
}
if (handle < 0) {
return false;
}
ByteBuffer nioBuffer = cachedNioBuffers != null ? cachedNioBuffers.pollLast() : null;
initBuf(buf, nioBuffer, handle, reqCapacity);
return true;
}
/**
......@@ -320,8 +340,8 @@ final class PoolChunk<T> implements PoolChunkMetric {
// Obtain the head of the PoolSubPage pool that is owned by the PoolArena and synchronize on it.
// This is need as we may add it back and so alter the linked-list structure.
PoolSubpage<T> head = arena.findSubpagePoolHead(normCapacity);
synchronized (head) {
int d = maxOrder; // subpages are only be allocated from pages i.e., leaves
synchronized (head) {
int id = allocateNode(d);
if (id < 0) {
return id;
......@@ -352,7 +372,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
*
* @param handle handle to free
*/
void free(long handle) {
void free(long handle, ByteBuffer nioBuffer) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
......@@ -372,26 +392,32 @@ final class PoolChunk<T> implements PoolChunkMetric {
freeBytes += runLength(memoryMapIdx);
setValue(memoryMapIdx, depth(memoryMapIdx));
updateParentsFree(memoryMapIdx);
if (nioBuffer != null && cachedNioBuffers != null &&
cachedNioBuffers.size() < PooledByteBufAllocator.DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK) {
cachedNioBuffers.offer(nioBuffer);
}
}
void initBuf(PooledByteBuf<T> buf, long handle, int reqCapacity) {
void initBuf(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
int memoryMapIdx = memoryMapIdx(handle);
int bitmapIdx = bitmapIdx(handle);
if (bitmapIdx == 0) {
byte val = value(memoryMapIdx);
assert val == unusable : String.valueOf(val);
buf.init(this, handle, runOffset(memoryMapIdx) + offset, reqCapacity, runLength(memoryMapIdx),
arena.parent.threadCache());
buf.init(this, nioBuffer, handle, runOffset(memoryMapIdx) + offset,
reqCapacity, runLength(memoryMapIdx), arena.parent.threadCache());
} else {
initBufWithSubpage(buf, handle, bitmapIdx, reqCapacity);
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx, reqCapacity);
}
}
void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int reqCapacity) {
initBufWithSubpage(buf, handle, bitmapIdx(handle), reqCapacity);
void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer, long handle, int reqCapacity) {
initBufWithSubpage(buf, nioBuffer, handle, bitmapIdx(handle), reqCapacity);
}
private void initBufWithSubpage(PooledByteBuf<T> buf, long handle, int bitmapIdx, int reqCapacity) {
private void initBufWithSubpage(PooledByteBuf<T> buf, ByteBuffer nioBuffer,
long handle, int bitmapIdx, int reqCapacity) {
assert bitmapIdx != 0;
int memoryMapIdx = memoryMapIdx(handle);
......@@ -401,7 +427,7 @@ final class PoolChunk<T> implements PoolChunkMetric {
assert reqCapacity <= subpage.elemSize;
buf.init(
this, handle,
this, nioBuffer, handle,
runOffset(memoryMapIdx) + (bitmapIdx & 0x3FFFFFFF) * subpage.elemSize + offset,
reqCapacity, subpage.elemSize, arena.parent.threadCache());
}
......
......@@ -25,6 +25,8 @@ import java.util.List;
import static java.lang.Math.*;
import java.nio.ByteBuffer;
final class PoolChunkList<T> implements PoolChunkListMetric {
private static final Iterator<PoolChunkMetric> EMPTY_METRICS = Collections.<PoolChunkMetric>emptyList().iterator();
private final PoolArena<T> arena;
......@@ -75,21 +77,14 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
}
boolean allocate(PooledByteBuf<T> buf, int reqCapacity, int normCapacity) {
if (head == null || normCapacity > maxCapacity) {
if (normCapacity > maxCapacity) {
// Either this PoolChunkList is empty or the requested capacity is larger then the capacity which can
// be handled by the PoolChunks that are contained in this PoolChunkList.
return false;
}
for (PoolChunk<T> cur = head;;) {
long handle = cur.allocate(normCapacity);
if (handle < 0) {
cur = cur.next;
if (cur == null) {
return false;
}
} else {
cur.initBuf(buf, handle, reqCapacity);
for (PoolChunk<T> cur = head; cur != null; cur = cur.next) {
if (cur.allocate(buf, reqCapacity, normCapacity)) {
if (cur.usage() >= maxUsage) {
remove(cur);
nextList.add(cur);
......@@ -97,10 +92,11 @@ final class PoolChunkList<T> implements PoolChunkListMetric {
return true;
}
}
return false;
}
boolean free(PoolChunk<T> chunk, long handle) {
chunk.free(handle);
boolean free(PoolChunk<T> chunk, long handle, ByteBuffer nioBuffer) {
chunk.free(handle, nioBuffer);
if (chunk.usage() < minUsage) {
remove(chunk);
// Move the PoolChunk down the PoolChunkList linked-list.
......
......@@ -200,12 +200,13 @@ final class PoolThreadCache {
* Returns {@code true} if it fit into the cache {@code false} otherwise.
*/
@SuppressWarnings({ "unchecked", "rawtypes" })
boolean add(PoolArena<?> area, PoolChunk chunk, long handle, int normCapacity, SizeClass sizeClass) {
boolean add(PoolArena<?> area, PoolChunk chunk, ByteBuffer nioBuffer,
long handle, int normCapacity, SizeClass sizeClass) {
MemoryRegionCache<?> cache = cache(area, normCapacity, sizeClass);
if (cache == null) {
return false;
}
return cache.add(chunk, handle);
return cache.add(chunk, nioBuffer, handle);
}
private MemoryRegionCache<?> cache(PoolArena<?> area, int normCapacity, SizeClass sizeClass) {
......@@ -346,8 +347,8 @@ final class PoolThreadCache {
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, handle, reqCapacity);
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBufWithSubpage(buf, nioBuffer, handle, reqCapacity);
}
}
......@@ -361,8 +362,8 @@ final class PoolThreadCache {
@Override
protected void initBuf(
PoolChunk<T> chunk, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBuf(buf, handle, reqCapacity);
PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle, PooledByteBuf<T> buf, int reqCapacity) {
chunk.initBuf(buf, nioBuffer, handle, reqCapacity);
}
}
......@@ -381,15 +382,15 @@ final class PoolThreadCache {
/**
* Init the {@link PooledByteBuf} using the provided chunk and handle with the capacity restrictions.
*/
protected abstract void initBuf(PoolChunk<T> chunk, long handle,
protected abstract void initBuf(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle,
PooledByteBuf<T> buf, int reqCapacity);
/**
* Add to cache if not already full.
*/
@SuppressWarnings("unchecked")
public final boolean add(PoolChunk<T> chunk, long handle) {
Entry<T> entry = newEntry(chunk, handle);
public final boolean add(PoolChunk<T> chunk, ByteBuffer nioBuffer, long handle) {
Entry<T> entry = newEntry(chunk, nioBuffer, handle);
boolean queued = queue.offer(entry);
if (!queued) {
// If it was not possible to cache the chunk, immediately recycle the entry
......@@ -407,7 +408,7 @@ final class PoolThreadCache {
if (entry == null) {
return false;
}
initBuf(entry.chunk, entry.handle, buf, reqCapacity);
initBuf(entry.chunk, entry.nioBuffer, entry.handle, buf, reqCapacity);
entry.recycle();
// allocations is not thread-safe which is fine as this is only called from the same thread all time.
......@@ -453,16 +454,18 @@ final class PoolThreadCache {
private void freeEntry(Entry entry) {
PoolChunk chunk = entry.chunk;
long handle = entry.handle;
ByteBuffer nioBuffer = entry.nioBuffer;
// recycle now so PoolChunk can be GC'ed.
entry.recycle();
chunk.arena.freeChunk(chunk, handle, sizeClass);
chunk.arena.freeChunk(chunk, handle, sizeClass, nioBuffer);
}
static final class Entry<T> {
final Handle<Entry<?>> recyclerHandle;
PoolChunk<T> chunk;
ByteBuffer nioBuffer;
long handle = -1;
Entry(Handle<Entry<?>> recyclerHandle) {
......@@ -471,15 +474,17 @@ final class PoolThreadCache {
void recycle() {
chunk = null;
nioBuffer = null;
handle = -1;
recyclerHandle.recycle(this);
}
}
@SuppressWarnings("rawtypes")
private static Entry newEntry(PoolChunk<?> chunk, long handle) {
private static Entry newEntry(PoolChunk<?> chunk, ByteBuffer nioBuffer, long handle) {
Entry entry = RECYCLER.get();
entry.chunk = chunk;
entry.nioBuffer = nioBuffer;
entry.handle = handle;
return entry;
}
......
......@@ -33,7 +33,7 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
protected int length;
int maxLength;
PoolThreadCache cache;
private ByteBuffer tmpNioBuf;
ByteBuffer tmpNioBuf;
private ByteBufAllocator allocator;
@SuppressWarnings("unchecked")
......@@ -42,27 +42,29 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
this.recyclerHandle = (Handle<PooledByteBuf<T>>) recyclerHandle;
}
void init(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, handle, offset, length, maxLength, cache);
void init(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
init0(chunk, nioBuffer, handle, offset, length, maxLength, cache);
}
void initUnpooled(PoolChunk<T> chunk, int length) {
init0(chunk, 0, chunk.offset, length, length, null);
init0(chunk, null, 0, chunk.offset, length, length, null);
}
private void init0(PoolChunk<T> chunk, long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
private void init0(PoolChunk<T> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
assert handle >= 0;
assert chunk != null;
this.chunk = chunk;
memory = chunk.memory;
tmpNioBuf = nioBuffer;
allocator = chunk.arena.parent;
this.cache = cache;
this.handle = handle;
this.offset = offset;
this.length = length;
this.maxLength = maxLength;
tmpNioBuf = null;
}
/**
......@@ -166,8 +168,8 @@ abstract class PooledByteBuf<T> extends AbstractReferenceCountedByteBuf {
final long handle = this.handle;
this.handle = -1;
memory = null;
chunk.arena.free(chunk, tmpNioBuf, handle, maxLength, cache);
tmpNioBuf = null;
chunk.arena.free(chunk, handle, maxLength, cache);
chunk = null;
recycle();
}
......
......@@ -45,6 +45,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
private static final int DEFAULT_CACHE_TRIM_INTERVAL;
private static final boolean DEFAULT_USE_CACHE_FOR_ALL_THREADS;
private static final int DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT;
static final int DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK;
private static final int MIN_PAGE_SIZE = 4096;
private static final int MAX_CHUNK_SIZE = (int) (((long) Integer.MAX_VALUE + 1) / 2);
......@@ -116,6 +117,11 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
DEFAULT_DIRECT_MEMORY_CACHE_ALIGNMENT = SystemPropertyUtil.getInt(
"io.netty.allocator.directMemoryCacheAlignment", 0);
// Use 1023 by default as we use an ArrayDeque as backing storage which will then allocate an internal array
// of 1024 elements. Otherwise we would allocate 2048 and only use 1024 which is wasteful.
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK = SystemPropertyUtil.getInt(
"io.netty.allocator.maxCachedByteBuffersPerChunk", 1023);
if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.allocator.numHeapArenas: {}", DEFAULT_NUM_HEAP_ARENA);
logger.debug("-Dio.netty.allocator.numDirectArenas: {}", DEFAULT_NUM_DIRECT_ARENA);
......@@ -136,6 +142,8 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
logger.debug("-Dio.netty.allocator.maxCachedBufferCapacity: {}", DEFAULT_MAX_CACHED_BUFFER_CAPACITY);
logger.debug("-Dio.netty.allocator.cacheTrimInterval: {}", DEFAULT_CACHE_TRIM_INTERVAL);
logger.debug("-Dio.netty.allocator.useCacheForAllThreads: {}", DEFAULT_USE_CACHE_FOR_ALL_THREADS);
logger.debug("-Dio.netty.allocator.maxCachedByteBuffersPerChunk: {}",
DEFAULT_MAX_CACHED_BYTEBUFFERS_PER_CHUNK);
}
}
......@@ -580,7 +588,7 @@ public class PooledByteBufAllocator extends AbstractByteBufAllocator implements
return usedMemory(directArenas);
}
private static long usedMemory(PoolArena<?>... arenas) {
private static long usedMemory(PoolArena<?>[] arenas) {
if (arenas == null) {
return -1;
}
......
......@@ -351,8 +351,8 @@ final class PooledDirectByteBuf extends PooledByteBuf<ByteBuffer> {
@Override
public int setBytes(int index, InputStream in, int length) throws IOException {
checkIndex(index, length);
byte[] tmp = new byte[length];
int readBytes = in.read(tmp);
byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
int readBytes = in.read(tmp, 0, length);
if (readBytes <= 0) {
return readBytes;
}
......
......@@ -49,9 +49,9 @@ final class PooledUnsafeDirectByteBuf extends PooledByteBuf<ByteBuffer> {
}
@Override
void init(PoolChunk<ByteBuffer> chunk, long handle, int offset, int length, int maxLength,
PoolThreadCache cache) {
super.init(chunk, handle, offset, length, maxLength, cache);
void init(PoolChunk<ByteBuffer> chunk, ByteBuffer nioBuffer,
long handle, int offset, int length, int maxLength, PoolThreadCache cache) {
super.init(chunk, nioBuffer, handle, offset, length, maxLength, cache);
initMemoryAddress();
}
......
......@@ -355,11 +355,11 @@ class ReadOnlyByteBufferBuf extends AbstractReferenceCountedByteBuf {
if (buffer.hasArray()) {
out.write(buffer.array(), index + buffer.arrayOffset(), length);
} else {
byte[] tmp = new byte[length];
byte[] tmp = ByteBufUtil.threadLocalTempArray(length);
ByteBuffer tmpBuf = internalNioBuffer();
tmpBuf.clear().position(index);
tmpBuf.get(tmp);
out.write(tmp);
tmpBuf.get(tmp, 0, length);
out.write(tmp, 0, length);
}
return this;
}
......
......@@ -15,15 +15,14 @@
*/
package io.netty.buffer;
import io.netty.buffer.CompositeByteBuf.ByteWrapper;
import io.netty.util.internal.PlatformDependent;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.CharBuffer;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
/**
......@@ -220,7 +219,7 @@ public final class Unpooled {
* Creates a new buffer which wraps the specified buffer's readable bytes.
* A modification on the specified buffer's content will be visible to the
* returned buffer.
* @param buffer The buffer to wrap. Reference count ownership of this variable is transfered to this method.
* @param buffer The buffer to wrap. Reference count ownership of this variable is transferred to this method.
* @return The readable portion of the {@code buffer}, or an empty buffer if there is no readable portion.
* The caller is responsible for releasing this buffer.
*/
......@@ -246,7 +245,7 @@ public final class Unpooled {
* Creates a new big-endian composite buffer which wraps the readable bytes of the
* specified buffers without copying them. A modification on the content
* of the specified buffers will be visible to the returned buffer.
* @param buffers The buffers to wrap. Reference count ownership of all variables is transfered to this method.
* @param buffers The buffers to wrap. Reference count ownership of all variables is transferred to this method.
* @return The readable portion of the {@code buffers}. The caller is responsible for releasing this buffer.
*/
public static ByteBuf wrappedBuffer(ByteBuf... buffers) {
......@@ -262,47 +261,46 @@ public final class Unpooled {
return wrappedBuffer(buffers.length, buffers);
}
/**
* Creates a new big-endian composite buffer which wraps the specified
* arrays without copying them. A modification on the specified arrays'
* content will be visible to the returned buffer.
*/
public static ByteBuf wrappedBuffer(int maxNumComponents, byte[]... arrays) {
switch (arrays.length) {
static <T> ByteBuf wrappedBuffer(int maxNumComponents, ByteWrapper<T> wrapper, T[] array) {
switch (array.length) {
case 0:
break;
case 1:
if (arrays[0].length != 0) {
return wrappedBuffer(arrays[0]);
if (!wrapper.isEmpty(array[0])) {
return wrapper.wrap(array[0]);
}
break;
default:
// Get the list of the component, while guessing the byte order.
final List<ByteBuf> components = new ArrayList<ByteBuf>(arrays.length);
for (byte[] a: arrays) {
if (a == null) {
break;
}
if (a.length > 0) {
components.add(wrappedBuffer(a));
for (int i = 0, len = array.length; i < len; i++) {
T bytes = array[i];
if (bytes == null) {
return EMPTY_BUFFER;
}
if (!wrapper.isEmpty(bytes)) {
return new CompositeByteBuf(ALLOC, false, maxNumComponents, wrapper, array, i);
}
if (!components.isEmpty()) {
return new CompositeByteBuf(ALLOC, false, maxNumComponents, components);
}
}
return EMPTY_BUFFER;
}
/**
* Creates a new big-endian composite buffer which wraps the specified
* arrays without copying them. A modification on the specified arrays'
* content will be visible to the returned buffer.
*/
public static ByteBuf wrappedBuffer(int maxNumComponents, byte[]... arrays) {
return wrappedBuffer(maxNumComponents, CompositeByteBuf.BYTE_ARRAY_WRAPPER, arrays);
}
/**
* Creates a new big-endian composite buffer which wraps the readable bytes of the
* specified buffers without copying them. A modification on the content
* of the specified buffers will be visible to the returned buffer.
* @param maxNumComponents Advisement as to how many independent buffers are allowed to exist before
* consolidation occurs.
* @param buffers The buffers to wrap. Reference count ownership of all variables is transfered to this method.
* @param buffers The buffers to wrap. Reference count ownership of all variables is transferred to this method.
* @return The readable portion of the {@code buffers}. The caller is responsible for releasing this buffer.
*/
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuf... buffers) {
......@@ -321,7 +319,7 @@ public final class Unpooled {
for (int i = 0; i < buffers.length; i++) {
ByteBuf buf = buffers[i];
if (buf.isReadable()) {
return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i, buffers.length);
return new CompositeByteBuf(ALLOC, false, maxNumComponents, buffers, i);
}
buf.release();
}
......@@ -336,32 +334,7 @@ public final class Unpooled {
* specified buffers will be visible to the returned buffer.
*/
public static ByteBuf wrappedBuffer(int maxNumComponents, ByteBuffer... buffers) {
switch (buffers.length) {
case 0:
break;
case 1:
if (buffers[0].hasRemaining()) {
return wrappedBuffer(buffers[0].order(BIG_ENDIAN));
}
break;
default:
// Get the list of the component, while guessing the byte order.
final List<ByteBuf> components = new ArrayList<ByteBuf>(buffers.length);
for (ByteBuffer b: buffers) {
if (b == null) {
break;
}
if (b.remaining() > 0) {
components.add(wrappedBuffer(b.order(BIG_ENDIAN)));
}
}
if (!components.isEmpty()) {
return new CompositeByteBuf(ALLOC, false, maxNumComponents, components);
}
}
return EMPTY_BUFFER;
return wrappedBuffer(maxNumComponents, CompositeByteBuf.BYTE_BUFFER_WRAPPER, buffers);
}
/**
......@@ -400,7 +373,7 @@ public final class Unpooled {
if (length == 0) {
return EMPTY_BUFFER;
}
byte[] copy = new byte[length];
byte[] copy = PlatformDependent.allocateUninitializedArray(length);
System.arraycopy(array, offset, copy, 0, length);
return wrappedBuffer(copy);
}
......@@ -416,7 +389,7 @@ public final class Unpooled {
if (length == 0) {
return EMPTY_BUFFER;
}
byte[] copy = new byte[length];
byte[] copy = PlatformDependent.allocateUninitializedArray(length);
// Duplicate the buffer so we not adjust the position during our get operation.
// See https://github.com/netty/netty/issues/3896
ByteBuffer duplicate = buffer.duplicate();
......@@ -473,7 +446,7 @@ public final class Unpooled {
return EMPTY_BUFFER;
}
byte[] mergedArray = new byte[length];
byte[] mergedArray = PlatformDependent.allocateUninitializedArray(length);
for (int i = 0, j = 0; i < arrays.length; i ++) {
byte[] a = arrays[i];
System.arraycopy(a, 0, mergedArray, j, a.length);
......@@ -527,7 +500,7 @@ public final class Unpooled {
return EMPTY_BUFFER;
}
byte[] mergedArray = new byte[length];
byte[] mergedArray = PlatformDependent.allocateUninitializedArray(length);
for (int i = 0, j = 0; i < buffers.length; i ++) {
ByteBuf b = buffers[i];
int bLen = b.readableBytes();
......@@ -582,7 +555,7 @@ public final class Unpooled {
return EMPTY_BUFFER;
}
byte[] mergedArray = new byte[length];
byte[] mergedArray = PlatformDependent.allocateUninitializedArray(length);
for (int i = 0, j = 0; i < buffers.length; i ++) {
// Duplicate the buffer so we not adjust the position during our get operation.
// See https://github.com/netty/netty/issues/3896
......