Forked from
Debian Go Packaging Team / packages / golang-github-centrifugal-centrifuge
1 commit behind the upstream repository.
-
Thorsten Alteholz authoredThorsten Alteholz authored
queue_test.go 2.61 KiB
package queue
import (
"strconv"
"testing"
"github.com/stretchr/testify/require"
)
func TestByteQueueResize(t *testing.T) {
q := New()
require.Equal(t, 0, q.Len())
require.Equal(t, false, q.Closed())
for i := 0; i < initialCapacity; i++ {
q.Add([]byte(strconv.Itoa(i)))
}
q.Add([]byte("resize here"))
require.Equal(t, initialCapacity*2, q.Cap())
q.Remove()
q.Add([]byte("new resize here"))
require.Equal(t, initialCapacity*2, q.Cap())
q.Add([]byte("one more item, no resize must happen"))
require.Equal(t, initialCapacity*2, q.Cap())
require.Equal(t, initialCapacity+2, q.Len())
}
func TestByteQueueSize(t *testing.T) {
q := New()
require.Equal(t, 0, q.Size())
q.Add([]byte("1"))
q.Add([]byte("2"))
require.Equal(t, 2, q.Size())
q.Remove()
require.Equal(t, 1, q.Size())
}
func TestByteQueueWait(t *testing.T) {
q := New()
q.Add([]byte("1"))
q.Add([]byte("2"))
ok := q.Wait()
require.Equal(t, true, ok)
s, ok := q.Remove()
require.Equal(t, true, ok)
require.Equal(t, "1", string(s))
ok = q.Wait()
require.Equal(t, true, ok)
s, ok = q.Remove()
require.Equal(t, true, ok)
require.Equal(t, "2", string(s))
go func() {
q.Add([]byte("3"))
}()
ok = q.Wait()
require.Equal(t, true, ok)
s, ok = q.Remove()
require.Equal(t, true, ok)
require.Equal(t, "3", string(s))
}
func TestByteQueueClose(t *testing.T) {
q := New()
// test removing from empty queue
_, ok := q.Remove()
require.Equal(t, false, ok)
q.Add([]byte("1"))
q.Add([]byte("2"))
q.Close()
ok = q.Add([]byte("3"))
require.Equal(t, false, ok)
ok = q.Wait()
require.Equal(t, false, ok)
_, ok = q.Remove()
require.Equal(t, false, ok)
require.Equal(t, true, q.Closed())
}
func TestByteQueueCloseRemaining(t *testing.T) {
q := New()
q.Add([]byte("1"))
q.Add([]byte("2"))
msgs := q.CloseRemaining()
require.Equal(t, 2, len(msgs))
ok := q.Add([]byte("3"))
require.Equal(t, false, ok)
require.Equal(t, true, q.Closed())
msgs = q.CloseRemaining()
require.Equal(t, 0, len(msgs))
}
func BenchmarkQueueAdd(b *testing.B) {
q := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
q.Add([]byte("test"))
}
b.StopTimer()
q.Close()
}
func addAndConsume(q Queue, n int) {
// Add to queue and consume in another goroutine.
done := make(chan struct{})
go func() {
count := 0
for {
ok := q.Wait()
if !ok {
continue
}
q.Remove()
count++
if count == n {
close(done)
break
}
}
}()
for i := 0; i < n; i++ {
q.Add([]byte("test"))
}
<-done
}
func BenchmarkQueueAddConsume(b *testing.B) {
q := New()
b.ResetTimer()
for i := 0; i < b.N; i++ {
addAndConsume(q, 10000)
}
b.StopTimer()
q.Close()
}