Skip to content
Commits on Source (3)
......@@ -37,7 +37,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
<version>3.7.2.Final</version>
<version>3.7.3.Final</version>
</parent>
<dependencies>
......
......@@ -19,11 +19,14 @@
package org.xnio;
import java.lang.ref.WeakReference;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedQueue;
......@@ -35,14 +38,20 @@ import static org.xnio._private.Messages.msg;
* returned pooled buffers. When the buffer is no longer needed, it should be freed back into the pool; failure
* to do so will cause the corresponding buffer area to be unavailable until the buffer is garbage-collected.
*
* If the buffer pool is no longer used, it is advisable to invoke {@link #clean()} to make
* sure that direct allocated buffers can be reused by a future instance.
*
* @author <a href="mailto:david.lloyd@redhat.com">David M. Lloyd</a>
* @author Flavia Rainone
* @deprecated See {@link ByteBufferPool}.
*/
public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
private static final int LOCAL_LENGTH;
private static final Queue<ByteBuffer> FREE_DIRECT_BUFFERS;
static {
// read thread local size property
String value = AccessController.doPrivileged(new ReadPropertyAction("xnio.bufferpool.threadlocal.size", "12"));
int val;
try {
......@@ -51,30 +60,19 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
val = 12;
}
LOCAL_LENGTH = val;
// free direct buffers queue to keep direct buffers that are out of reach because of garbage collection of pools
FREE_DIRECT_BUFFERS = new ConcurrentLinkedQueue<>();
}
private final Set<Ref> refSet = Collections.synchronizedSet(new HashSet<Ref>());
private final Set<Ref> refSet = Collections.synchronizedSet(new HashSet<>());
private final Queue<Slice> sliceQueue;
private final BufferAllocator<ByteBuffer> allocator;
private final int bufferSize;
private final int buffersPerRegion;
private final int threadLocalQueueSize;
private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocal<ThreadLocalCache>() {
protected ThreadLocalCache initialValue() {
//noinspection serial
return new ThreadLocalCache();
}
public void remove() {
final ArrayDeque<Slice> deque = get().queue;
Slice slice = deque.poll();
while (slice != null) {
doFree(slice);
slice = deque.poll();
}
super.remove();
}
};
private final List<ByteBuffer> directBuffers;
private final ThreadLocal<ThreadLocalCache> localQueueHolder = new ThreadLocalCacheWrapper(this);
/**
* Construct a new instance.
......@@ -94,8 +92,14 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
buffersPerRegion = maxRegionSize / bufferSize;
this.bufferSize = bufferSize;
this.allocator = allocator;
sliceQueue = new ConcurrentLinkedQueue<Slice>();
sliceQueue = new ConcurrentLinkedQueue<>();
this.threadLocalQueueSize = threadLocalQueueSize;
// handle direct byte buffer allocation for reuse of direct buffers
if (allocator == BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR) {
directBuffers = Collections.synchronizedList(new ArrayList<>());
} else {
directBuffers = null;
}
}
/**
......@@ -142,31 +146,72 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
if (slice != null) {
return new PooledByteBuffer(slice, slice.slice());
}
final int bufferSize = this.bufferSize;
final int buffersPerRegion = this.buffersPerRegion;
final ByteBuffer region = allocator.allocate(buffersPerRegion * bufferSize);
int idx = bufferSize;
for (int i = 1; i < buffersPerRegion; i ++) {
sliceQueue.add(new Slice(region, idx, bufferSize));
idx += bufferSize;
}
final Slice newSlice = new Slice(region, 0, bufferSize);
final Slice newSlice = allocateSlices(buffersPerRegion, bufferSize);
return new PooledByteBuffer(newSlice, newSlice.slice());
}
}
private Slice allocateSlices(final int buffersPerRegion, final int bufferSize) {
// only true if using direct allocation
if (directBuffers != null) {
ByteBuffer region = FREE_DIRECT_BUFFERS.poll();
try {
if (region != null) {
return sliceReusedBuffer(region, buffersPerRegion, bufferSize);
}
region = allocator.allocate(buffersPerRegion * bufferSize);
return sliceAllocatedBuffer(region, buffersPerRegion, bufferSize);
} finally {
directBuffers.add(region);
}
}
return sliceAllocatedBuffer(
allocator.allocate(buffersPerRegion * bufferSize),
buffersPerRegion, bufferSize);
}
private Slice sliceReusedBuffer(final ByteBuffer region, final int buffersPerRegion, final int bufferSize) {
int maxI = Math.min(buffersPerRegion, region.capacity() / bufferSize);
// create slices
int idx = bufferSize;
for (int i = 1; i < maxI; i++) {
sliceQueue.add(new Slice(region, idx, bufferSize));
idx += bufferSize;
}
if (maxI == 0)
return allocateSlices(buffersPerRegion, bufferSize);
if (maxI < buffersPerRegion)
sliceQueue.add(allocateSlices(buffersPerRegion - maxI, bufferSize));
return new Slice(region, 0, bufferSize);
}
private Slice sliceAllocatedBuffer(final ByteBuffer region, final int buffersPerRegion, final int bufferSize) {
// create slices
int idx = bufferSize;
for (int i = 1; i < buffersPerRegion; i++) {
sliceQueue.add(new Slice(region, idx, bufferSize));
idx += bufferSize;
}
return new Slice(region, 0, bufferSize);
}
/**
* Cleans all ThreadLocal caches
* Cleans the pool, removing references to any buffers inside it.
* Should be invoked on pool disposal, when the pool will no longer be
* used.
*/
public void clean() {
ThreadLocalCache localCache = localQueueHolder.get();
if (!localCache.queue.isEmpty()) {
localCache.queue.clear();
}
if(!sliceQueue.isEmpty()) {
sliceQueue.clear();
}
// pass everything that is directly allocated to free direct buffers
FREE_DIRECT_BUFFERS.addAll(directBuffers);
}
/**
......@@ -176,6 +221,19 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
return bufferSize;
}
private ThreadLocalCache createThreadLocalCache() {
return new ThreadLocalCache(this);
}
private void freeThreadLocalCache(ThreadLocalCache cache) {
final ArrayDeque<Slice> deque = cache.queue;
Slice slice = deque.poll();
while (slice != null) {
doFree(slice);
slice = deque.poll();
}
}
private void doFree(Slice region) {
if (threadLocalQueueSize > 0) {
final ThreadLocalCache localCache = localQueueHolder.get();
......@@ -239,11 +297,15 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
}
}
private final class Slice {
// to prevent memory leaks via thread internal map for thread local, we need to
// make this class static or else the outer ByteBufferSlicePool
// is never collected while the thread is active
// Thread -> thread local map -> ThreadLocalCacheWrapper -> ThreadLocalCache -> queue -> Slices -> ByteBufferSlicePool
private static final class Slice {
private final ByteBuffer parent;
private Slice(final ByteBuffer parent, final int start, final int size) {
this.parent = (ByteBuffer)parent.duplicate().position(start).limit(start+size);
this.parent = (ByteBuffer) parent.duplicate().position(start).limit(start+size);
}
ByteBuffer slice() {
......@@ -265,26 +327,68 @@ public final class ByteBufferSlicePool implements Pool<ByteBuffer> {
}
}
private final class ThreadLocalCache {
final static class ThreadLocalCache {
// to prevent memory leaks via thread internal map for thread local, we need to
// weakly reference the outer ByteBufferSlicePool
// or else the pool is never collected while the thread is active
// Thread -> thread local map -> ThreadLocalCache -> pool
final WeakReference<ByteBufferSlicePool> pool;
final ArrayDeque<Slice> queue = new ArrayDeque<Slice>(threadLocalQueueSize) {
// internal queue of slices; used to prevent all threads synchronizing on a single queue
final ArrayDeque<Slice> queue;
// indicates how many slices should be returned to queue on free
int outstanding = 0;
/**
* This sucks but there's no other way to ensure these buffers are returned to the pool.
*/
protected void finalize() {
final ArrayDeque<Slice> deque = queue;
Slice slice = deque.poll();
while (slice != null) {
doFree(slice);
slice = deque.poll();
ThreadLocalCache(ByteBufferSlicePool pool) {
this.pool = new WeakReference<>(pool);
this.queue = new ArrayDeque<Slice>(pool.threadLocalQueueSize) {
/**
* This sucks but there's no other way to ensure these buffers are returned to the pool.
*/
protected void finalize() {
final ByteBufferSlicePool pool = ThreadLocalCache.this.pool.get();
if (pool == null)
return;
final ArrayDeque<Slice> deque = queue;
Slice slice = deque.poll();
while (slice != null) {
pool.doFree(slice);
slice = deque.poll();
}
}
}
};
};
}
}
int outstanding = 0;
private static class ThreadLocalCacheWrapper extends ThreadLocal<ThreadLocalCache> {
// to prevent memory leaks via thread internal map for thread local, we need to
// weakly reference the outer ByteBufferSlicePool
// or else the pool is never collected while the thread is active
// Thread -> thread local map -> ThreadLocalCacheWrapper -> pool
private final WeakReference<ByteBufferSlicePool> pool;
ThreadLocalCacheWrapper(ByteBufferSlicePool pool) {
this.pool = new WeakReference<>(pool);
}
ThreadLocalCache() {
protected ThreadLocalCache initialValue() {
final ByteBufferSlicePool pool = this.pool.get();
if (pool != null) {
//noinspection serial
return pool.createThreadLocalCache();
}
return null;
}
public void remove() {
final ByteBufferSlicePool pool = this.pool.get();
final ThreadLocalCache cache = get();
if (pool != null && cache != null) {
//noinspection serial
pool.freeThreadLocalCache(cache);
}
super.remove();
}
}
}
/*
* JBoss, Home of Professional Open Source.
*
* Copyright 2019 Red Hat, Inc. and/or its affiliates, and individual
* contributors as indicated by the @author tags.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.xnio;
import java.nio.ByteBuffer;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
/**
* Test for {@link ByteBufferSlicePool}.
*
* @author <a href="mailto:flavia.rainone@jboss.com">Flavia Rainone</a>
*
*/
public class ByteBufferSlicePoolTestCase {
@Test
public void basicSlicePool() {
ByteBufferSlicePool slicePool = new ByteBufferSlicePool(10, 10);
for (int i = 0; i < 10; i++){
Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
assertNotNull(pooledBuffer);
ByteBuffer buffer = pooledBuffer.getResource();
assertNotNull(buffer);
assertEquals(10, buffer.capacity());
assertEquals(0, buffer.position());
pooledBuffer.free();
boolean failed = false;
try {
pooledBuffer.getResource();
} catch (IllegalStateException expected) {
failed = true;
}
assertTrue(failed);
}
}
@Test
public void bufferAllocatorSlicePool() {
ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.BYTE_BUFFER_ALLOCATOR, 2, 5);
for (int i = 0; i < 2; i++){
Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
assertNotNull(pooledBuffer);
ByteBuffer buffer = pooledBuffer.getResource();
assertNotNull(buffer);
assertEquals(2, buffer.capacity());
assertEquals(0, buffer.position());
pooledBuffer.free();
boolean failed = false;
try {
pooledBuffer.getResource();
} catch (IllegalStateException expected) {
failed = true;
}
assertTrue(failed);
}
}
@Test
public void bufferAllocatorNoThreadLocalSlicePool() {
ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.BYTE_BUFFER_ALLOCATOR, 8, 12, 0);
Pooled<ByteBuffer> pooledBuffer = slicePool.allocate();
assertNotNull(pooledBuffer);
ByteBuffer buffer = pooledBuffer.getResource();
assertNotNull(buffer);
assertEquals(8, buffer.capacity());
assertEquals(0, buffer.position());
pooledBuffer.free();
boolean failed = false;
try {
pooledBuffer.getResource();
} catch (IllegalStateException expected) {
failed = true;
}
assertTrue(failed);
}
@Test
public void directBufferAllocator() throws InterruptedException {
for (int i = 0; i < 100; i++)
stressDirectBufferReuse();
}
private void stressDirectBufferReuse() throws InterruptedException {
ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
slicePool.allocate();
slicePool.clean();
// reusing the same size
slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
slicePool.allocate();
slicePool.clean();
// reusing for a smaller size
slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 7, 21);
slicePool.allocate();
slicePool.clean();
// reusing for a bigger size
slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 10, 20);
slicePool.allocate();
slicePool.clean();
}
@Test
public void directBufferAllocator2() throws InterruptedException {
for (int i = 0; i < 100; i++)
stressDirectBufferReuse2();
}
private void stressDirectBufferReuse2() throws InterruptedException {
ByteBufferSlicePool slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
slicePool.allocate();
slicePool.clean();
// reusing the same size
slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 8, 24);
slicePool.allocate();
slicePool.clean();
// reusing for a smaller size
slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 7, 21);
slicePool.allocate();
slicePool.clean();
// reusing for a bigger size whose first buffer is bigger than all previous allocated byte buffer regions
slicePool = new ByteBufferSlicePool(BufferAllocator.DIRECT_BYTE_BUFFER_ALLOCATOR, 250, 10000);
slicePool.allocate();
slicePool.clean();
}
}
\ No newline at end of file
jboss-xnio (3.7.3-1) unstable; urgency=medium
* New upstream version 3.7.3.
-- Markus Koschany <apo@debian.org> Thu, 15 Aug 2019 01:32:45 +0200
jboss-xnio (3.7.2-1) unstable; urgency=medium
* New upstream version 3.7.2.
......
......@@ -31,7 +31,7 @@
<parent>
<groupId>org.jboss.xnio</groupId>
<artifactId>xnio-all</artifactId>
<version>3.7.2.Final</version>
<version>3.7.3.Final</version>
</parent>
<properties>
......
......@@ -126,6 +126,16 @@ class WatchServiceFileSystemWatcher implements FileSystemWatcher, Runnable {
while (it.hasNext()) {
FileChangeEvent event = it.next();
if (event.getType() == FileChangeEvent.Type.MODIFIED) {
if (addedFiles.contains(event.getFile()) &&
deletedFiles.contains(event.getFile())) {
// XNIO-344
// All file change events (ADDED, REMOVED and MODIFIED) occurred here.
// This happens when an updated file is moved from the different
// filesystems or the directory having different project quota on Linux.
// ADDED and REMOVED events will be removed in the latter conditional branching.
// So, this MODIFIED event needs to be kept for the file change notification.
continue;
}
if (addedFiles.contains(event.getFile()) ||
deletedFiles.contains(event.getFile())) {
it.remove();
......
......@@ -32,7 +32,7 @@
<artifactId>xnio-all</artifactId>
<packaging>pom</packaging>
<name>XNIO Parent POM</name>
<version>3.7.2.Final</version>
<version>3.7.3.Final</version>
<description>The aggregator POM of the XNIO project</description>
<licenses>
......