Concurrency safety at a glance
All types in Go Container—Queue, Stack, OrderedMap, and Set—are not safe for concurrent use unless explicitly noted. Calling methods on these types from multiple goroutines without external synchronization will cause data races.The sole exception is WaitableQueue, which is designed for concurrent producer-consumer patterns and is fully concurrency-safe.
Adding external synchronization
For Queue, Stack, OrderedMap, and Set, wrap every access in a mutex.
import "sync"
var mu sync.Mutex
q := queue.NewSliceQueue[string](64)
// Enqueue from any goroutine
mu.Lock()
q.Enqueue("item")
mu.Unlock()
// Dequeue from any goroutine
mu.Lock()
item, ok := q.Dequeue()
mu.Unlock()
Use sync.RWMutex if reads (e.g., Len() on a Countable) vastly outnumber writes and you need maximum read throughput. For the common case, a plain sync.Mutex is sufficient and easier to reason about.
Alternatively, confine the collection to a single goroutine and communicate through channels—the standard Go concurrency pattern.
WaitableQueue
WaitableQueue is the purpose-built solution for concurrent producer-consumer pipelines. It is:
- Unbounded:
Enqueue never blocks, unlike a buffered channel that back-pressures the producer when full.
- Signal-based: consumers wait on a channel returned by
WaitChan() rather than spinning or polling.
- Flow-aware: watermark states let producers throttle themselves before the queue grows without bound.
When to use WaitableQueue vs channels
| Concern | Buffered channel | WaitableQueue |
|---|
| Producer blocking when full | Yes — producer blocks | No — always accepts new items |
| Consumer blocking when empty | Yes — consumer blocks | Consumer waits on WaitChan() |
| Flow control | Implicit (back-pressure) | Explicit watermark states |
| Graceful shutdown signaling | close(ch) | q.Close() |
| Inspection (current length) | len(ch) | q.(Countable).Len() (observability only) |
Use WaitableQueue when:
- You cannot afford to block the producer.
- You want explicit, graduated flow control (warn at high watermark, shed load near saturation).
- The queue must remain responsive under bursty load.
Use a plain channel when:
- Back-pressure on the producer is acceptable or desirable.
- You want the simplest possible implementation.
Watermark states
WaitableQueue reports one of five states after every Enqueue or Dequeue:
| State | Meaning |
|---|
QueueIsEmpty | Set at creation only; not returned during normal operation. |
QueueIsBelowLowWatermark | Queue is draining; producers can safely increase throughput. |
QueueIsNominal | Operating within normal bounds. |
QueueIsAboveHighWatermark | Queue is filling up; consider throttling the producer. |
QueueIsNearSaturation | Emergency signal — the queue is approaching capacity. Trigger load shedding immediately. |
Do not use QueueIsBelowLowWatermark as the sole signal to resume a fully-stopped producer. If the producer has stopped calling Enqueue, it will never observe the state change and will never restart. Check watermark state on the consumer (Dequeue) side for reliable flow control, or poll periodically.
Constructor parameters
q, err := queue.NewWaitableQueue[MyEvent](
initialCapacity, // preallocate backing storage (use 0 if unknown)
lowWatermark, // items below this → QueueIsBelowLowWatermark
highWatermark, // items above this → QueueIsAboveHighWatermark
)
A saturation threshold above highWatermark is computed automatically from initialCapacity and highWatermark. You do not need to configure it directly.
Producer-consumer example
The pattern below shows a single producer and a single consumer. Scale to multiple goroutines by sharing the same WaitableQueue reference—it is safe to call Enqueue and Dequeue (and WaitChan) concurrently from any number of goroutines.
package main
import (
"fmt"
"sync"
"github.com/fgm/container/queue"
"github.com/fgm/container"
)
func main() {
const (
capacity = 256
lowWatermark = 32
highWatermark = 192
)
q, err := queue.NewWaitableQueue[int](capacity, lowWatermark, highWatermark)
if err != nil {
panic(err)
}
var wg sync.WaitGroup
// Producer goroutine
wg.Add(1)
go func() {
defer wg.Done()
defer q.Close() // signal consumers that no more items will arrive
for i := 0; i < 1000; i++ {
state := q.Enqueue(i)
switch state {
case container.QueueIsNearSaturation:
// Emergency: drop or block until the consumer catches up.
// Here we just log and continue; a real application might sleep.
fmt.Println("queue near saturation, consider load shedding")
case container.QueueIsAboveHighWatermark:
// Soft throttle: slow down production.
fmt.Println("above high watermark, throttling producer")
}
}
}()
// Consumer goroutine
wg.Add(1)
go func() {
defer wg.Done()
for {
// Block until an item is available or the queue is closed.
<-q.WaitChan()
// Drain all available items without waiting again.
for {
item, ok, _ := q.Dequeue()
if !ok {
break // queue is empty right now; go back to waiting
}
fmt.Println("consumed:", item)
}
// WaitChan is closed when q.Close() is called.
// After draining, check whether the channel will never produce again.
// A closed channel always returns immediately, so we re-enter the
// outer loop, drain the remaining items, then break on the empty check.
//
// Detecting closure: use a select with a default to check without blocking.
select {
case _, open := <-q.WaitChan():
if !open {
// Queue is closed and drained.
return
}
default:
// Channel is still open; continue consuming.
}
}
}()
wg.Wait()
}
Closing the queue
Call q.Close() from the producer when no more items will be enqueued. This closes the signal channel returned by WaitChan(), permanently unblocking any consumer waiting on it. Consumers should drain remaining items after observing a closed channel.
Calling Enqueue on a closed WaitableQueue panics. Ensure only the producer calls Close, and only after it has finished enqueueing.
Using Len() safely
WaitableQueue implements container.Countable, so you can call Len() to inspect the current size. However, Len() acquires a separate mutex lock and its result is not atomic with respect to concurrent Enqueue or Dequeue calls.
Do not use Len() to make control-flow decisions (e.g., “dequeue only if Len() > 0”). By the time you act on the result, the queue may already be empty or have grown further. Use the WaitableQueueState returned by Enqueue and Dequeue for flow control, and reserve Len() for logging, metrics, and debugging.
// OK: observability
if c, ok := q.(container.Countable); ok {
metrics.Record("queue.size", c.Len())
}
// NOT OK: control flow — race condition
if c, ok := q.(container.Countable); ok {
if c.Len() > 0 {
item, ok, _ := q.Dequeue() // Len() may already be stale
}
}