Skip to main content

Concurrency safety at a glance

All types in Go Container—Queue, Stack, OrderedMap, and Set—are not safe for concurrent use unless explicitly noted. Calling methods on these types from multiple goroutines without external synchronization will cause data races.The sole exception is WaitableQueue, which is designed for concurrent producer-consumer patterns and is fully concurrency-safe.

Adding external synchronization

For Queue, Stack, OrderedMap, and Set, wrap every access in a mutex.
import "sync"

var mu sync.Mutex
q := queue.NewSliceQueue[string](64)

// Enqueue from any goroutine
mu.Lock()
q.Enqueue("item")
mu.Unlock()

// Dequeue from any goroutine
mu.Lock()
item, ok := q.Dequeue()
mu.Unlock()
Use sync.RWMutex if reads (e.g., Len() on a Countable) vastly outnumber writes and you need maximum read throughput. For the common case, a plain sync.Mutex is sufficient and easier to reason about. Alternatively, confine the collection to a single goroutine and communicate through channels—the standard Go concurrency pattern.

WaitableQueue

WaitableQueue is the purpose-built solution for concurrent producer-consumer pipelines. It is:
  • Unbounded: Enqueue never blocks, unlike a buffered channel that back-pressures the producer when full.
  • Signal-based: consumers wait on a channel returned by WaitChan() rather than spinning or polling.
  • Flow-aware: watermark states let producers throttle themselves before the queue grows without bound.

When to use WaitableQueue vs channels

ConcernBuffered channelWaitableQueue
Producer blocking when fullYes — producer blocksNo — always accepts new items
Consumer blocking when emptyYes — consumer blocksConsumer waits on WaitChan()
Flow controlImplicit (back-pressure)Explicit watermark states
Graceful shutdown signalingclose(ch)q.Close()
Inspection (current length)len(ch)q.(Countable).Len() (observability only)
Use WaitableQueue when:
  • You cannot afford to block the producer.
  • You want explicit, graduated flow control (warn at high watermark, shed load near saturation).
  • The queue must remain responsive under bursty load.
Use a plain channel when:
  • Back-pressure on the producer is acceptable or desirable.
  • You want the simplest possible implementation.

Watermark states

WaitableQueue reports one of five states after every Enqueue or Dequeue:
StateMeaning
QueueIsEmptySet at creation only; not returned during normal operation.
QueueIsBelowLowWatermarkQueue is draining; producers can safely increase throughput.
QueueIsNominalOperating within normal bounds.
QueueIsAboveHighWatermarkQueue is filling up; consider throttling the producer.
QueueIsNearSaturationEmergency signal — the queue is approaching capacity. Trigger load shedding immediately.
Do not use QueueIsBelowLowWatermark as the sole signal to resume a fully-stopped producer. If the producer has stopped calling Enqueue, it will never observe the state change and will never restart. Check watermark state on the consumer (Dequeue) side for reliable flow control, or poll periodically.

Constructor parameters

q, err := queue.NewWaitableQueue[MyEvent](
    initialCapacity, // preallocate backing storage (use 0 if unknown)
    lowWatermark,    // items below this → QueueIsBelowLowWatermark
    highWatermark,   // items above this → QueueIsAboveHighWatermark
)
A saturation threshold above highWatermark is computed automatically from initialCapacity and highWatermark. You do not need to configure it directly.

Producer-consumer example

The pattern below shows a single producer and a single consumer. Scale to multiple goroutines by sharing the same WaitableQueue reference—it is safe to call Enqueue and Dequeue (and WaitChan) concurrently from any number of goroutines.
package main

import (
    "fmt"
    "sync"

    "github.com/fgm/container/queue"
    "github.com/fgm/container"
)

func main() {
    const (
        capacity      = 256
        lowWatermark  = 32
        highWatermark = 192
    )

    q, err := queue.NewWaitableQueue[int](capacity, lowWatermark, highWatermark)
    if err != nil {
        panic(err)
    }

    var wg sync.WaitGroup

    // Producer goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()
        defer q.Close() // signal consumers that no more items will arrive

        for i := 0; i < 1000; i++ {
            state := q.Enqueue(i)
            switch state {
            case container.QueueIsNearSaturation:
                // Emergency: drop or block until the consumer catches up.
                // Here we just log and continue; a real application might sleep.
                fmt.Println("queue near saturation, consider load shedding")
            case container.QueueIsAboveHighWatermark:
                // Soft throttle: slow down production.
                fmt.Println("above high watermark, throttling producer")
            }
        }
    }()

    // Consumer goroutine
    wg.Add(1)
    go func() {
        defer wg.Done()

        for {
            // Block until an item is available or the queue is closed.
            <-q.WaitChan()

            // Drain all available items without waiting again.
            for {
                item, ok, _ := q.Dequeue()
                if !ok {
                    break // queue is empty right now; go back to waiting
                }
                fmt.Println("consumed:", item)
            }

            // WaitChan is closed when q.Close() is called.
            // After draining, check whether the channel will never produce again.
            // A closed channel always returns immediately, so we re-enter the
            // outer loop, drain the remaining items, then break on the empty check.
            //
            // Detecting closure: use a select with a default to check without blocking.
            select {
            case _, open := <-q.WaitChan():
                if !open {
                    // Queue is closed and drained.
                    return
                }
            default:
                // Channel is still open; continue consuming.
            }
        }
    }()

    wg.Wait()
}

Closing the queue

Call q.Close() from the producer when no more items will be enqueued. This closes the signal channel returned by WaitChan(), permanently unblocking any consumer waiting on it. Consumers should drain remaining items after observing a closed channel.
Calling Enqueue on a closed WaitableQueue panics. Ensure only the producer calls Close, and only after it has finished enqueueing.

Using Len() safely

WaitableQueue implements container.Countable, so you can call Len() to inspect the current size. However, Len() acquires a separate mutex lock and its result is not atomic with respect to concurrent Enqueue or Dequeue calls.
Do not use Len() to make control-flow decisions (e.g., “dequeue only if Len() > 0”). By the time you act on the result, the queue may already be empty or have grown further. Use the WaitableQueueState returned by Enqueue and Dequeue for flow control, and reserve Len() for logging, metrics, and debugging.
// OK: observability
if c, ok := q.(container.Countable); ok {
    metrics.Record("queue.size", c.Len())
}

// NOT OK: control flow — race condition
if c, ok := q.(container.Countable); ok {
    if c.Len() > 0 {
        item, ok, _ := q.Dequeue() // Len() may already be stale
    }
}