Skip to content
Commits on Source (2)
Change Log
==========
## Version 1.15.0
_2018-07-18_
* New: Trie-based `Buffer.select()`. This improves performance when selecting
among large lists of options.
* Fix: Retain interrupted state when throwing `InterruptedIOException`.
## Version 1.14.0
_2018-02-11_
......
......@@ -118,12 +118,12 @@ Download [the latest JAR][2] or grab via Maven:
<dependency>
<groupId>com.squareup.okio</groupId>
<artifactId>okio</artifactId>
<version>1.14.0</version>
<version>1.15.0</version>
</dependency>
```
or Gradle:
```groovy
compile 'com.squareup.okio:okio:1.14.0'
compile 'com.squareup.okio:okio:1.15.0'
```
Snapshots of the development version are available in [Sonatype's `snapshots` repository][snap].
......
......@@ -2,7 +2,7 @@
<parent>
<artifactId>okio-parent</artifactId>
<groupId>com.squareup.okio</groupId>
<version>1.14.1</version>
<version>1.16.0</version>
</parent>
<modelVersion>4.0.0</modelVersion>
......
......@@ -6,7 +6,7 @@
<parent>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-parent</artifactId>
<version>1.14.1</version>
<version>1.16.0</version>
</parent>
<artifactId>okio</artifactId>
......
......@@ -246,6 +246,7 @@ public class AsyncTimeout extends Timeout {
@Override public void close() throws IOException {
boolean throwOnTimeout = false;
enter();
try {
source.close();
throwOnTimeout = true;
......
......@@ -71,6 +71,10 @@ public final class Buffer implements BufferedSource, BufferedSink, Cloneable, By
return this;
}
@Override public Buffer getBuffer() {
return this;
}
@Override public OutputStream outputStream() {
return new OutputStream() {
@Override public void write(int b) {
......@@ -113,6 +117,10 @@ public final class Buffer implements BufferedSource, BufferedSink, Cloneable, By
return size >= byteCount;
}
@Override public BufferedSource peek() {
return Okio.buffer(new PeekSource(this));
}
@Override public InputStream inputStream() {
return new InputStream() {
@Override public int read() {
......@@ -545,40 +553,122 @@ public final class Buffer implements BufferedSource, BufferedSink, Cloneable, By
}
@Override public int select(Options options) {
Segment s = head;
if (s == null) return options.indexOf(ByteString.EMPTY);
ByteString[] byteStrings = options.byteStrings;
for (int i = 0, listSize = byteStrings.length; i < listSize; i++) {
ByteString b = byteStrings[i];
if (size >= b.size() && rangeEquals(s, s.pos, b, 0, b.size())) {
try {
skip(b.size());
return i;
} catch (EOFException e) {
throw new AssertionError(e);
}
}
int index = selectPrefix(options, false);
if (index == -1) return -1;
// If the prefix match actually matched a full byte string, consume it and return it.
int selectedSize = options.byteStrings[index].size();
try {
skip(selectedSize);
} catch (EOFException e) {
throw new AssertionError();
}
return -1;
return index;
}
/**
* Returns the index of a value in {@code options} that is either the prefix of this buffer, or
* that this buffer is a prefix of. Unlike {@link #select} this never consumes the value, even
* if it is found in full.
* Returns the index of a value in options that is a prefix of this buffer. Returns -1 if no value
* is found. This method does two simultaneous iterations: it iterates the trie and it iterates
* this buffer. It returns when it reaches a result in the trie, when it mismatches in the trie,
* and when the buffer is exhausted.
*
* @param selectTruncated true to return -2 if a possible result is present but truncated. For
* example, this will return -2 if the buffer contains [ab] and the options are [abc, abd].
* Note that this is made complicated by the fact that options are listed in preference order,
* and one option may be a prefix of another. For example, this returns -2 if the buffer
* contains [ab] and the options are [abc, a].
*/
int selectPrefix(Options options) {
int selectPrefix(Options options, boolean selectTruncated) {
Segment head = this.head;
if (head == null) {
if (selectTruncated) return -2; // A result is present but truncated.
return options.indexOf(ByteString.EMPTY);
}
Segment s = head;
ByteString[] byteStrings = options.byteStrings;
for (int i = 0, listSize = byteStrings.length; i < listSize; i++) {
ByteString b = byteStrings[i];
int bytesLimit = (int) Math.min(size, b.size());
if (bytesLimit == 0 || rangeEquals(s, s.pos, b, 0, bytesLimit)) {
return i;
byte[] data = head.data;
int pos = head.pos;
int limit = head.limit;
int[] trie = options.trie;
int triePos = 0;
int prefixIndex = -1;
navigateTrie:
while (true) {
int scanOrSelect = trie[triePos++];
int possiblePrefixIndex = trie[triePos++];
if (possiblePrefixIndex != -1) {
prefixIndex = possiblePrefixIndex;
}
int nextStep;
if (s == null) {
break;
} else if (scanOrSelect < 0) {
// Scan: take multiple bytes from the buffer and the trie, looking for any mismatch.
int scanByteCount = -1 * scanOrSelect;
int trieLimit = triePos + scanByteCount;
while (true) {
int b = data[pos++] & 0xff;
if (b != trie[triePos++]) return prefixIndex; // Fail 'cause we found a mismatch.
boolean scanComplete = (triePos == trieLimit);
// Advance to the next buffer segment if this one is exhausted.
if (pos == limit) {
s = s.next;
pos = s.pos;
data = s.data;
limit = s.limit;
if (s == head) {
if (!scanComplete) break navigateTrie; // We were exhausted before the scan completed.
s = null; // We were exhausted at the end of the scan.
}
}
if (scanComplete) {
nextStep = trie[triePos];
break;
}
}
} else {
// Select: take one byte from the buffer and find a match in the trie.
int selectChoiceCount = scanOrSelect;
int b = data[pos++] & 0xff;
int selectLimit = triePos + selectChoiceCount;
while (true) {
if (triePos == selectLimit) return prefixIndex; // Fail 'cause we didn't find a match.
if (b == trie[triePos]) {
nextStep = trie[triePos + selectChoiceCount];
break;
}
triePos++;
}
// Advance to the next buffer segment if this one is exhausted.
if (pos == limit) {
s = s.next;
pos = s.pos;
data = s.data;
limit = s.limit;
if (s == head) {
s = null; // No more segments! The next trie node will be our last.
}
}
}
if (nextStep >= 0) return nextStep; // Found a matching option.
triePos = -nextStep; // Found another node to continue the search.
}
return -1;
// We break out of the loop above when we've exhausted the buffer without exhausting the trie.
if (selectTruncated) return -2; // The buffer is a prefix of at least one option.
return prefixIndex; // Return any matches we encountered while searching for a deeper match.
}
@Override public void readFully(Buffer sink, long byteCount) throws EOFException {
......
......@@ -27,9 +27,17 @@ import javax.annotation.Nullable;
* input.
*/
public interface BufferedSource extends Source, ReadableByteChannel {
/** Returns this source's internal buffer. */
/**
* Returns this source's internal buffer.
*
* @deprecated use getBuffer() instead.
*/
@Deprecated
Buffer buffer();
/** This source's internal buffer. */
Buffer getBuffer();
/**
* Returns true if there are no more bytes in this source. This will block until there are bytes
* to read or the source is definitely exhausted.
......@@ -533,6 +541,29 @@ public interface BufferedSource extends Source, ReadableByteChannel {
boolean rangeEquals(long offset, ByteString bytes, int bytesOffset, int byteCount)
throws IOException;
/**
* Returns a new {@code BufferedSource} that can read data from this {@code BufferedSource}
* without consuming it. The returned source becomes invalid once this source is next read or
* closed.
*
* For example, we can use {@code peek()} to lookahead and read the same data multiple times.
*
* <pre> {@code
*
* Buffer buffer = new Buffer();
* buffer.writeUtf8("abcdefghi");
*
* buffer.readUtf8(3) // returns "abc", buffer contains "defghi"
*
* BufferedSource peek = buffer.peek();
* peek.readUtf8(3); // returns "def", buffer contains "defghi"
* peek.readUtf8(3); // returns "ghi", buffer contains "defghi"
*
* buffer.readUtf8(3); // returns "def", buffer contains "ghi"
* }</pre>
*/
BufferedSource peek();
/** Returns an input stream that reads from this source. */
InputStream inputStream();
}
......@@ -16,18 +16,225 @@
package okio;
import java.util.AbstractList;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.RandomAccess;
/** An indexed set of values that may be read with {@link BufferedSource#select}. */
public final class Options extends AbstractList<ByteString> implements RandomAccess {
final ByteString[] byteStrings;
final int[] trie;
private Options(ByteString[] byteStrings) {
private Options(ByteString[] byteStrings, int[] trie) {
this.byteStrings = byteStrings;
this.trie = trie;
}
public static Options of(ByteString... byteStrings) {
return new Options(byteStrings.clone()); // Defensive copy.
if (byteStrings.length == 0) {
// With no choices we must always return -1. Create a trie that selects from an empty set.
return new Options(new ByteString[0], new int[] { 0, -1 });
}
// Sort the byte strings which is required when recursively building the trie. Map the sorted
// indexes to the caller's indexes.
List<ByteString> list = new ArrayList<>(Arrays.asList(byteStrings));
Collections.sort(list);
List<Integer> indexes = new ArrayList<>();
for (int i = 0; i < list.size(); i++) {
indexes.add(-1);
}
for (int i = 0; i < list.size(); i++) {
int sortedIndex = Collections.binarySearch(list, byteStrings[i]);
indexes.set(sortedIndex, i);
}
if (list.get(0).size() == 0) {
throw new IllegalArgumentException("the empty byte string is not a supported option");
}
// Strip elements that will never be returned because they follow their own prefixes. For
// example, if the caller provides ["abc", "abcde"] we will never return "abcde" because we
// return as soon as we encounter "abc".
for (int a = 0; a < list.size(); a++) {
ByteString prefix = list.get(a);
for (int b = a + 1; b < list.size(); ) {
ByteString byteString = list.get(b);
if (!byteString.startsWith(prefix)) break;
if (byteString.size() == prefix.size()) {
throw new IllegalArgumentException("duplicate option: " + byteString);
}
if (indexes.get(b) > indexes.get(a)) {
list.remove(b);
indexes.remove(b);
} else {
b++;
}
}
}
Buffer trieBytes = new Buffer();
buildTrieRecursive(0L, trieBytes, 0, list, 0, list.size(), indexes);
int[] trie = new int[intCount(trieBytes)];
for (int i = 0; i < trie.length; i++) {
trie[i] = trieBytes.readInt();
}
if (!trieBytes.exhausted()) {
throw new AssertionError();
}
return new Options(byteStrings.clone() /* Defensive copy. */, trie);
}
/**
* Builds a trie encoded as an int array. Nodes in the trie are of two types: SELECT and SCAN.
*
* SELECT nodes are encoded as:
* - selectChoiceCount: the number of bytes to choose between (a positive int)
* - prefixIndex: the result index at the current position or -1 if the current position is not
* a result on its own
* - a sorted list of selectChoiceCount bytes to match against the input string
* - a heterogeneous list of selectChoiceCount result indexes (>= 0) or offsets (< 0) of the
* next node to follow. Elements in this list correspond to elements in the preceding list.
* Offsets are negative and must be multiplied by -1 before being used.
*
* SCAN nodes are encoded as:
* - scanByteCount: the number of bytes to match in sequence. This count is negative and must
* be multiplied by -1 before being used.
* - prefixIndex: the result index at the current position or -1 if the current position is not
* a result on its own
* - a list of scanByteCount bytes to match
* - nextStep: the result index (>= 0) or offset (< 0) of the next node to follow. Offsets are
* negative and must be multiplied by -1 before being used.
*
* This structure is used to improve locality and performance when selecting from a list of
* options.
*/
private static void buildTrieRecursive(
long nodeOffset,
Buffer node,
int byteStringOffset,
List<ByteString> byteStrings,
int fromIndex,
int toIndex,
List<Integer> indexes) {
if (fromIndex >= toIndex) throw new AssertionError();
for (int i = fromIndex; i < toIndex; i++) {
if (byteStrings.get(i).size() < byteStringOffset) throw new AssertionError();
}
ByteString from = byteStrings.get(fromIndex);
ByteString to = byteStrings.get(toIndex - 1);
int prefixIndex = -1;
// If the first element is already matched, that's our prefix.
if (byteStringOffset == from.size()) {
prefixIndex = indexes.get(fromIndex);
fromIndex++;
from = byteStrings.get(fromIndex);
}
if (from.getByte(byteStringOffset) != to.getByte(byteStringOffset)) {
// If we have multiple bytes to choose from, encode a SELECT node.
int selectChoiceCount = 1;
for (int i = fromIndex + 1; i < toIndex; i++) {
if (byteStrings.get(i - 1).getByte(byteStringOffset)
!= byteStrings.get(i).getByte(byteStringOffset)) {
selectChoiceCount++;
}
}
// Compute the offset that childNodes will get when we append it to node.
long childNodesOffset = nodeOffset + intCount(node) + 2 + (selectChoiceCount * 2);
node.writeInt(selectChoiceCount);
node.writeInt(prefixIndex);
for (int i = fromIndex; i < toIndex; i++) {
byte rangeByte = byteStrings.get(i).getByte(byteStringOffset);
if (i == fromIndex || rangeByte != byteStrings.get(i - 1).getByte(byteStringOffset)) {
node.writeInt(rangeByte & 0xff);
}
}
Buffer childNodes = new Buffer();
int rangeStart = fromIndex;
while (rangeStart < toIndex) {
byte rangeByte = byteStrings.get(rangeStart).getByte(byteStringOffset);
int rangeEnd = toIndex;
for (int i = rangeStart + 1; i < toIndex; i++) {
if (rangeByte != byteStrings.get(i).getByte(byteStringOffset)) {
rangeEnd = i;
break;
}
}
if (rangeStart + 1 == rangeEnd
&& byteStringOffset + 1 == byteStrings.get(rangeStart).size()) {
// The result is a single index.
node.writeInt(indexes.get(rangeStart));
} else {
// The result is another node.
node.writeInt((int) (-1 * (childNodesOffset + intCount(childNodes))));
buildTrieRecursive(
childNodesOffset,
childNodes,
byteStringOffset + 1,
byteStrings,
rangeStart,
rangeEnd,
indexes);
}
rangeStart = rangeEnd;
}
node.write(childNodes, childNodes.size());
} else {
// If all of the bytes are the same, encode a SCAN node.
int scanByteCount = 0;
for (int i = byteStringOffset, max = Math.min(from.size(), to.size()); i < max; i++) {
if (from.getByte(i) == to.getByte(i)) {
scanByteCount++;
} else {
break;
}
}
// Compute the offset that childNodes will get when we append it to node.
long childNodesOffset = nodeOffset + intCount(node) + 2 + scanByteCount + 1;
node.writeInt(-scanByteCount);
node.writeInt(prefixIndex);
for (int i = byteStringOffset; i < byteStringOffset + scanByteCount; i++) {
node.writeInt(from.getByte(i) & 0xff);
}
if (fromIndex + 1 == toIndex) {
// The result is a single index.
if (byteStringOffset + scanByteCount != byteStrings.get(fromIndex).size()) {
throw new AssertionError();
}
node.writeInt(indexes.get(fromIndex));
} else {
// The result is another node.
Buffer childNodes = new Buffer();
node.writeInt((int) (-1 * (childNodesOffset + intCount(childNodes))));
buildTrieRecursive(
childNodesOffset,
childNodes,
byteStringOffset + scanByteCount,
byteStrings,
fromIndex,
toIndex,
indexes);
node.write(childNodes, childNodes.size());
}
}
}
@Override public ByteString get(int i) {
......@@ -37,4 +244,8 @@ public final class Options extends AbstractList<ByteString> implements RandomAcc
@Override public final int size() {
return byteStrings.length;
}
private static int intCount(Buffer trieBytes) {
return (int) (trieBytes.size() / 4);
}
}
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okio;
import java.io.IOException;
/**
* A {@link Source} which peeks into an upstream {@link BufferedSource} and allows reading and
* expanding of the buffered data without consuming it. Does this by requesting additional data from
* the upstream source if needed and copying out of the internal buffer of the upstream source if
* possible.
*
* <p>This source also maintains a snapshot of the starting location of the upstream buffer which it
* validates against on every read. If the upstream buffer is read from, this source will become
* invalid and throw {@link IllegalStateException} on any future reads.
*/
final class PeekSource implements Source {
private final BufferedSource upstream;
private final Buffer buffer;
private Segment expectedSegment;
private int expectedPos;
private boolean closed;
private long pos;
PeekSource(BufferedSource upstream) {
this.upstream = upstream;
this.buffer = upstream.buffer();
this.expectedSegment = buffer.head;
this.expectedPos = expectedSegment != null ? expectedSegment.pos : -1;
}
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (closed) throw new IllegalStateException("closed");
// Source becomes invalid if there is an expected Segment and it and the expected position
// do not match the current head and head position of the upstream buffer
if (expectedSegment != null
&& (expectedSegment != buffer.head || expectedPos != buffer.head.pos)) {
throw new IllegalStateException("Peek source is invalid because upstream source was used");
}
upstream.request(pos + byteCount);
if (expectedSegment == null && buffer.head != null) {
// Only once the buffer actually holds data should an expected Segment and position be
// recorded. This allows reads from the peek source to repeatedly return -1 and for data to be
// added later. Unit tests depend on this behavior.
expectedSegment = buffer.head;
expectedPos = buffer.head.pos;
}
long toCopy = Math.min(byteCount, buffer.size - pos);
if (toCopy <= 0L) return -1L;
buffer.copyTo(sink, pos, toCopy);
pos += toCopy;
return toCopy;
}
@Override public Timeout timeout() {
return upstream.timeout();
}
@Override public void close() throws IOException {
closed = true;
}
}
......@@ -38,6 +38,10 @@ final class RealBufferedSource implements BufferedSource {
return buffer;
}
@Override public Buffer getBuffer() {
return buffer;
}
@Override public long read(Buffer sink, long byteCount) throws IOException {
if (sink == null) throw new IllegalArgumentException("sink == null");
if (byteCount < 0) throw new IllegalArgumentException("byteCount < 0: " + byteCount);
......@@ -89,18 +93,17 @@ final class RealBufferedSource implements BufferedSource {
if (closed) throw new IllegalStateException("closed");
while (true) {
int index = buffer.selectPrefix(options);
int index = buffer.selectPrefix(options, true);
if (index == -1) return -1;
// If the prefix match actually matched a full byte string, consume it and return it.
int selectedSize = options.byteStrings[index].size();
if (selectedSize <= buffer.size) {
if (index == -2) {
// We need to grow the buffer. Do that, then try it all again.
if (source.read(buffer, Segment.SIZE) == -1L) return -1;
} else {
// We matched a full byte string: consume it and return it.
int selectedSize = options.byteStrings[index].size();
buffer.skip(selectedSize);
return index;
}
// We need to grow the buffer. Do that, then try it all again.
if (source.read(buffer, Segment.SIZE) == -1) return -1;
}
}
......@@ -421,6 +424,10 @@ final class RealBufferedSource implements BufferedSource {
return true;
}
@Override public BufferedSource peek() {
return Okio.buffer(new PeekSource(this));
}
@Override public InputStream inputStream() {
return new InputStream() {
@Override public int read() throws IOException {
......
......@@ -142,7 +142,8 @@ public class Timeout {
*/
public void throwIfReached() throws IOException {
if (Thread.interrupted()) {
throw new InterruptedIOException("thread interrupted");
Thread.currentThread().interrupt(); // Retain interrupted status.
throw new InterruptedIOException("interrupted");
}
if (hasDeadline && deadlineNanoTime - System.nanoTime() <= 0) {
......@@ -221,6 +222,7 @@ public class Timeout {
throw new InterruptedIOException("timeout");
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt(); // Retain interrupted status.
throw new InterruptedIOException("interrupted");
}
}
......
......@@ -202,6 +202,47 @@ public final class AsyncTimeoutTest {
}
}
@Test public void wrappedSinkFlushTimesOut() throws Exception {
Sink sink = new ForwardingSink(new Buffer()) {
@Override public void flush() throws IOException {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new AssertionError();
}
}
};
AsyncTimeout timeout = new AsyncTimeout();
timeout.timeout(250, TimeUnit.MILLISECONDS);
Sink timeoutSink = timeout.sink(sink);
try {
timeoutSink.flush();
fail();
} catch (InterruptedIOException expected) {
}
}
@Test public void wrappedSinkCloseTimesOut() throws Exception {
Sink sink = new ForwardingSink(new Buffer()) {
@Override public void close() throws IOException {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new AssertionError();
}
}
};
AsyncTimeout timeout = new AsyncTimeout();
timeout.timeout(250, TimeUnit.MILLISECONDS);
Sink timeoutSink = timeout.sink(sink);
try {
timeoutSink.close();
fail();
} catch (InterruptedIOException expected) {
}
}
@Test public void wrappedSourceTimesOut() throws Exception {
Source source = new ForwardingSource(new Buffer()) {
@Override public long read(Buffer sink, long byteCount) throws IOException {
......@@ -223,6 +264,26 @@ public final class AsyncTimeoutTest {
}
}
@Test public void wrappedSourceCloseTimesOut() throws Exception {
Source source = new ForwardingSource(new Buffer()) {
@Override public void close() throws IOException {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
throw new AssertionError();
}
}
};
AsyncTimeout timeout = new AsyncTimeout();
timeout.timeout(250, TimeUnit.MILLISECONDS);
Source timeoutSource = timeout.source(source);
try {
timeoutSource.close();
fail();
} catch (InterruptedIOException expected) {
}
}
@Test public void wrappedThrowsWithTimeout() throws Exception {
Sink sink = new ForwardingSink(new Buffer()) {
@Override public void write(Buffer source, long byteCount) throws IOException {
......
......@@ -97,6 +97,34 @@ public final class BufferedSourceTest {
}
};
Factory PEEK_BUFFER = new Factory() {
@Override public Pipe pipe() {
Buffer buffer = new Buffer();
Pipe result = new Pipe();
result.sink = buffer;
result.source = buffer.peek();
return result;
}
@Override public String toString() {
return "PeekBuffer";
}
};
Factory PEEK_BUFFERED_SOURCE = new Factory() {
@Override public Pipe pipe() {
Buffer buffer = new Buffer();
Pipe result = new Pipe();
result.sink = buffer;
result.source = Okio.buffer((Source) buffer).peek();
return result;
}
@Override public String toString() {
return "PeekBufferedSource";
}
};
Pipe pipe();
}
......@@ -110,7 +138,9 @@ public final class BufferedSourceTest {
return Arrays.asList(
new Object[] { Factory.BUFFER},
new Object[] { Factory.REAL_BUFFERED_SOURCE},
new Object[] { Factory.ONE_BYTE_AT_A_TIME});
new Object[] { Factory.ONE_BYTE_AT_A_TIME},
new Object[] { Factory.PEEK_BUFFER },
new Object[] { Factory.PEEK_BUFFERED_SOURCE });
}
@Parameter public Factory factory;
......@@ -955,12 +985,6 @@ public final class BufferedSourceTest {
assertEquals("ef", source.readUtf8());
}
@Test public void selectNoByteStrings() throws IOException {
Options options = Options.of();
sink.writeUtf8("abc");
assertEquals(-1, source.select(options));
}
@Test public void selectFromEmptySource() throws IOException {
Options options = Options.of(
ByteString.encodeUtf8("abc"),
......@@ -974,15 +998,109 @@ public final class BufferedSourceTest {
}
@Test public void selectEmptyByteString() throws IOException {
Options options = Options.of(ByteString.of());
sink.writeUtf8("abc");
assertEquals(0, source.select(options));
assertEquals("abc", source.readUtf8());
try {
Options.of(ByteString.of());
fail();
} catch (IllegalArgumentException expected) {
}
}
@Test public void selectEmptyByteStringFromEmptySource() throws IOException {
Options options = Options.of(ByteString.of());
assertEquals(0, source.select(options));
@Test public void peek() throws IOException {
sink.writeUtf8("abcdefghi");
sink.emit();
assertEquals("abc", source.readUtf8(3));
BufferedSource peek = source.peek();
assertEquals("def", peek.readUtf8(3));
assertEquals("ghi", peek.readUtf8(3));
assertFalse(peek.request(1));
assertEquals("def", source.readUtf8(3));
}
@Test public void peekMultiple() throws IOException {
sink.writeUtf8("abcdefghi");
sink.emit();
assertEquals("abc", source.readUtf8(3));
BufferedSource peek1 = source.peek();
BufferedSource peek2 = source.peek();
assertEquals("def", peek1.readUtf8(3));
assertEquals("def", peek2.readUtf8(3));
assertEquals("ghi", peek2.readUtf8(3));
assertFalse(peek2.request(1));
assertEquals("ghi", peek1.readUtf8(3));
assertFalse(peek1.request(1));
assertEquals("def", source.readUtf8(3));
}
@Test public void peekLarge() throws IOException {
sink.writeUtf8("abcdef");
sink.writeUtf8(repeat('g', 2 * Segment.SIZE));
sink.writeUtf8("hij");
sink.emit();
assertEquals("abc", source.readUtf8(3));
BufferedSource peek = source.peek();
assertEquals("def", peek.readUtf8(3));
peek.skip(2 * Segment.SIZE);
assertEquals("hij", peek.readUtf8(3));
assertFalse(peek.request(1));
assertEquals("def", source.readUtf8(3));
source.skip(2 * Segment.SIZE);
assertEquals("hij", source.readUtf8(3));
}
@Test public void peekInvalid() throws IOException {
sink.writeUtf8("abcdefghi");
sink.emit();
assertEquals("abc", source.readUtf8(3));
BufferedSource peek = source.peek();
assertEquals("def", peek.readUtf8(3));
assertEquals("ghi", peek.readUtf8(3));
assertFalse(peek.request(1));
assertEquals("def", source.readUtf8(3));
try {
peek.readUtf8();
fail();
} catch (IllegalStateException e) {
assertEquals("Peek source is invalid because upstream source was used", e.getMessage());
}
}
@Test public void peekSegmentThenInvalid() throws IOException {
sink.writeUtf8("abc");
sink.writeUtf8(repeat('d', 2 * Segment.SIZE));
sink.emit();
assertEquals("abc", source.readUtf8(3));
// Peek a little data and skip the rest of the upstream source
BufferedSource peek = source.peek();
assertEquals("ddd", peek.readUtf8(3));
source.readAll(Okio.blackhole());
// Skip the rest of the buffered data
peek.skip(Segment.SIZE - 3);
try {
peek.readByte();
fail();
} catch (IllegalStateException e) {
assertEquals("Peek source is invalid because upstream source was used", e.getMessage());
}
}
@Test public void rangeEquals() throws IOException {
......
/*
* Copyright (C) 2018 Square, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package okio;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
public final class OptionsTest {
/** Confirm that options prefers the first-listed option, not the longest or shortest one. */
@Test public void optionOrderTakesPrecedence() {
assertSelect("abcdefg", 0, "abc", "abcdef");
assertSelect("abcdefg", 0, "abcdef", "abc");
}
@Test public void simpleOptionsTrie() {
assertEquals(trieString(utf8Options("hotdog", "hoth", "hot")), ""
+ "hot\n"
+ " -> 2\n"
+ " d\n"
+ " og -> 0\n"
+ " h -> 1\n");
}
@Test public void realisticOptionsTrie() {
// These are the fields of OkHttpClient in 3.10.
Options options = utf8Options(
"dispatcher",
"proxy",
"protocols",
"connectionSpecs",
"interceptors",
"networkInterceptors",
"eventListenerFactory",
"proxySelector", // No index 7 in the trie because 'proxy' is a prefix!
"cookieJar",
"cache",
"internalCache",
"socketFactory",
"sslSocketFactory",
"certificateChainCleaner",
"hostnameVerifier",
"certificatePinner",
"proxyAuthenticator", // No index 16 in the trie because 'proxy' is a prefix!
"authenticator",
"connectionPool",
"dns",
"followSslRedirects",
"followRedirects",
"retryOnConnectionFailure",
"connectTimeout",
"readTimeout",
"writeTimeout",
"pingInterval");
assertEquals(trieString(options), ""
+ "a\n"
+ " uthenticator -> 17\n"
+ "c\n"
+ " a\n"
+ " che -> 9\n"
+ " e\n"
+ " rtificate\n"
+ " C\n"
+ " hainCleaner -> 13\n"
+ " P\n"
+ " inner -> 15\n"
+ " o\n"
+ " n\n"
+ " nect\n"
+ " T\n"
+ " imeout -> 23\n"
+ " i\n"
+ " on\n"
+ " P\n"
+ " ool -> 18\n"
+ " S\n"
+ " pecs -> 3\n"
+ " o\n"
+ " kieJar -> 8\n"
+ "d\n"
+ " i\n"
+ " spatcher -> 0\n"
+ " n\n"
+ " s -> 19\n"
+ "e\n"
+ " ventListenerFactory -> 6\n"
+ "f\n"
+ " ollow\n"
+ " R\n"
+ " edirects -> 21\n"
+ " S\n"
+ " slRedirects -> 20\n"
+ "h\n"
+ " ostnameVerifier -> 14\n"
+ "i\n"
+ " nter\n"
+ " c\n"
+ " eptors -> 4\n"
+ " n\n"
+ " alCache -> 10\n"
+ "n\n"
+ " etworkInterceptors -> 5\n"
+ "p\n"
+ " i\n"
+ " ngInterval -> 26\n"
+ " r\n"
+ " o\n"
+ " t\n"
+ " ocols -> 2\n"
+ " x\n"
+ " y -> 1\n"
+ "r\n"
+ " e\n"
+ " a\n"
+ " dTimeout -> 24\n"
+ " t\n"
+ " ryOnConnectionFailure -> 22\n"
+ "s\n"
+ " o\n"
+ " cketFactory -> 11\n"
+ " s\n"
+ " lSocketFactory -> 12\n"
+ "w\n"
+ " riteTimeout -> 25\n");
assertSelect("", -1, options);
assertSelect("a", -1, options);
assertSelect("eventListenerFactor", -1, options);
assertSelect("dnst", 19, options);
assertSelect("proxyproxy", 1, options);
assertSelect("prox", -1, options);
assertSelect("dispatcher", 0, options);
assertSelect("proxy", 1, options);
assertSelect("protocols", 2, options);
assertSelect("connectionSpecs", 3, options);
assertSelect("interceptors", 4, options);
assertSelect("networkInterceptors", 5, options);
assertSelect("eventListenerFactory", 6, options);
assertSelect("proxySelector", 1, options); // 'proxy' is a prefix.
assertSelect("cookieJar", 8, options);
assertSelect("cache", 9, options);
assertSelect("internalCache", 10, options);
assertSelect("socketFactory", 11, options);
assertSelect("sslSocketFactory", 12, options);
assertSelect("certificateChainCleaner", 13, options);
assertSelect("hostnameVerifier", 14, options);
assertSelect("certificatePinner", 15, options);
assertSelect("proxyAuthenticator", 1, options); // 'proxy' is a prefix.
assertSelect("authenticator", 17, options);
assertSelect("connectionPool", 18, options);
assertSelect("dns", 19, options);
assertSelect("followSslRedirects", 20, options);
assertSelect("followRedirects", 21, options);
assertSelect("retryOnConnectionFailure", 22, options);
assertSelect("connectTimeout", 23, options);
assertSelect("readTimeout", 24, options);
assertSelect("writeTimeout", 25, options);
assertSelect("pingInterval", 26, options);
}
@Test public void emptyOptions() {
Options options = utf8Options();
assertSelect("", -1, options);
assertSelect("a", -1, options);
assertSelect("abc", -1, options);
}
@Test public void emptyStringInOptionsTrie() {
try {
utf8Options("");
fail();
} catch (IllegalArgumentException expected) {
}
try {
utf8Options("abc", "");
fail();
} catch (IllegalArgumentException expected) {
}
}
@Test public void multipleIdenticalValues() {
try {
utf8Options("abc", "abc");
fail();
} catch (IllegalArgumentException expected) {
assertEquals(expected.getMessage(), "duplicate option: [text=abc]");
}
}
@Test public void prefixesAreStripped() {
Options options = utf8Options("abcA", "abc", "abcB");
assertEquals(trieString(options), ""
+ "abc\n"
+ " -> 1\n"
+ " A -> 0\n");
assertSelect("abc", 1, options);
assertSelect("abcA", 0, options);
assertSelect("abcB", 1, options);
assertSelect("abcC", 1, options);
assertSelect("ab", -1, options);
}
@Test public void multiplePrefixesAreStripped() {
assertEquals(trieString(utf8Options("a", "ab", "abc", "abcd", "abcde")), ""
+ "a -> 0\n");
assertEquals(trieString(utf8Options("abc", "a", "ab", "abe", "abcd", "abcf")), ""
+ "a\n"
+ " -> 1\n"
+ " bc -> 0\n");
assertEquals(trieString(utf8Options("abc", "ab", "a")), ""
+ "a\n"
+ " -> 2\n"
+ " b\n"
+ " -> 1\n"
+ " c -> 0\n");
assertEquals(trieString(utf8Options("abcd", "abce", "abc", "abcf", "abcg")), ""
+ "abc\n"
+ " -> 2\n"
+ " d -> 0\n"
+ " e -> 1\n");
}
private Options utf8Options(String... options) {
ByteString[] byteStrings = new ByteString[options.length];
for (int i = 0; i < options.length; i++) {
byteStrings[i] = ByteString.encodeUtf8(options[i]);
}
return Options.of(byteStrings);
}
private void assertSelect(String data, int expected, Options options) {
Buffer buffer = new Buffer().writeUtf8(data);
long dataSize = buffer.size;
int actual = buffer.select(options);
assertEquals(actual, expected);
if (expected == -1) {
assertEquals(buffer.size, dataSize);
} else {
assertEquals(buffer.size + options.get(expected).size(), dataSize);
}
}
private void assertSelect(String data, int expected, String... options) {
assertSelect(data, expected, utf8Options(options));
}
private String trieString(Options options) {
StringBuilder result = new StringBuilder();
printTrieNode(result, options, 0, "");
return result.toString();
}
private void printTrieNode(StringBuilder out, Options options, int offset, String indent) {
if (options.trie[offset + 1] != -1) {
// Print the prefix.
out.append(indent + "-> " + options.trie[offset + 1] + "\n");
}
if (options.trie[offset] > 0) {
// Print the select.
int selectChoiceCount = options.trie[offset];
for (int i = 0; i < selectChoiceCount; i++) {
out.append(indent + (char) options.trie[offset + 2 + i]);
printTrieResult(out, options, options.trie[offset + 2 + selectChoiceCount + i], indent + " ");
}
} else {
// Print the scan.
int scanByteCount = -1 * options.trie[offset];
out.append(indent);
for (int i = 0; i < scanByteCount; i++) {
out.append((char) options.trie[offset + 2 + i]);
}
printTrieResult(out, options, options.trie[offset + 2 + scanByteCount], indent + TestUtil.repeat(' ', scanByteCount));
}
}
private void printTrieResult(StringBuilder out, Options options, int result, String indent) {
if (result >= 0) {
out.append(" -> " + result + "\n");
} else {
out.append("\n");
printTrieNode(out, options, -1 * result, indent);
}
}
}
......@@ -22,9 +22,9 @@ import java.util.concurrent.TimeUnit;
import org.junit.After;
import org.junit.Test;
import static org.junit.Assert.fail;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
public final class WaitUntilNotifiedTest {
final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0);
......@@ -126,11 +126,23 @@ public final class WaitUntilNotifiedTest {
fail();
} catch (InterruptedIOException expected) {
assertEquals("interrupted", expected.getMessage());
assertFalse(Thread.interrupted());
assertTrue(Thread.interrupted());
}
assertElapsed(0.0, start);
}
@Test public synchronized void threadInterruptedOnThrowIfReached() throws Exception {
Timeout timeout = new Timeout();
Thread.currentThread().interrupt();
try {
timeout.throwIfReached();
fail();
} catch (InterruptedIOException expected) {
assertEquals("interrupted", expected.getMessage());
assertTrue(Thread.interrupted());
}
}
/** Returns the nanotime in milliseconds as a double for measuring timeouts. */
private double now() {
return System.nanoTime() / 1000000.0d;
......
......@@ -11,7 +11,7 @@
<groupId>com.squareup.okio</groupId>
<artifactId>okio-parent</artifactId>
<version>1.14.1</version>
<version>1.16.0</version>
<packaging>pom</packaging>
<name>Okio (Parent)</name>
<description>A modern I/O API for Java</description>
......@@ -39,7 +39,7 @@
<url>https://github.com/square/okio/</url>
<connection>scm:git:https://github.com/square/okio.git</connection>
<developerConnection>scm:git:git@github.com:square/okio.git</developerConnection>
<tag>okio-parent-1.14.1</tag>
<tag>okio-parent-1.16.0</tag>
</scm>
<issueManagement>
......
......@@ -6,7 +6,7 @@
<parent>
<groupId>com.squareup.okio</groupId>
<artifactId>okio-parent</artifactId>
<version>1.14.1</version>
<version>1.16.0</version>
</parent>
<artifactId>samples</artifactId>
......