Skip to main content
WaitableQueue is a concurrency-safe, unbounded FIFO queue designed for producer-consumer patterns where Go channels are inconvenient — for example, when you need flow control without the hard capacity limits of a buffered channel, or when the number of producers and consumers is dynamic.

When to use WaitableQueue vs Queue

SituationUse
Single goroutine, no shared accessQueue
Multiple goroutines sharing the same queueWaitableQueue
Need to signal consumers when items arriveWaitableQueue
Need producer throttling based on queue depthWaitableQueue

Installation

import "github.com/fgm/container/queue"

Constructor

func NewWaitableQueue[E any](
    initialCapacity int,
    lowWatermark int,
    highWatermark int,
) (container.WaitableQueue[E], error)
initialCapacity
int
required
Pre-allocates internal storage for this many elements. Must be ≥ 0.
lowWatermark
int
required
The queue depth below which Dequeue reports QueueIsBelowLowWatermark. Must be ≥ 0.
highWatermark
int
required
The queue depth at or above which Enqueue and Dequeue report QueueIsAboveHighWatermark. Must be ≥ lowWatermark.

Error cases

The constructor returns an error for invalid arguments:
Error variableCondition
queue.ErrCapacityIsNegativeinitialCapacity < 0
queue.ErrLowWatermarkIsNegativelowWatermark < 0
queue.ErrHighWatermarkIsNegativehighWatermark < 0
queue.ErrHighWatermarkIsLessThanLowWatermarklowWatermark > highWatermark
wq, err := queue.NewWaitableQueue[string](256, 32, 128)
if err != nil {
    log.Fatal(err)
}

Interface

type WaitableQueue[E any] interface {
    Enqueue(E) WaitableQueueState
    Dequeue() (e E, ok bool, result WaitableQueueState)
    WaitChan() <-chan Unit
    Close()
}

Enqueue

Enqueue(e E) WaitableQueueState
Appends an element to the back of the queue and returns the current WaitableQueueState. Panics if called after Close().
Most callers can ignore the return value. The most common reason to check it is to detect QueueIsNearSaturation and throttle the producer.

Dequeue

Dequeue() (e E, ok bool, result WaitableQueueState)
Removes and returns the front element. If the queue is empty, ok is false, e is the zero value, and result is QueueIsBelowLowWatermark.

WaitChan

WaitChan() <-chan Unit
Returns a channel that receives a signal when an item may be available or when the queue has been closed. Use this in a select statement to avoid busy-polling. After receiving on this channel, always call Dequeue in a loop until ok is false — a single signal may correspond to multiple enqueued items.

Close

Close()
Marks the queue as closed and permanently unblocks every goroutine waiting on WaitChan. Once closed, calling Enqueue panics. You should drain any remaining items with Dequeue after Close returns. In most cases, Close should be called only by the producer.

Queue states

WaitableQueueState is returned by Enqueue and Dequeue to describe the current depth of the queue relative to the watermarks you configured.
ConstantMeaning
QueueIsEmptySet at creation only; not returned during normal operation.
QueueIsBelowLowWatermarkDepth is at or below lowWatermark. Returned by Dequeue when the queue is empty.
QueueIsNominalDepth is between the low and high watermarks. Normal operating range.
QueueIsAboveHighWatermarkDepth is at or above highWatermark. Consider scaling consumers or throttling producers.
QueueIsNearSaturationDepth is approaching the saturation threshold (computed from initialCapacity and highWatermark). Emergency signal; consider immediate load shedding.

Producer-consumer pattern

The typical pattern is one or more producer goroutines calling Enqueue, and one or more consumer goroutines selecting on WaitChan and then draining with Dequeue.
package main

import (
    "fmt"
    "sync"

    "github.com/fgm/container"
    "github.com/fgm/container/queue"
)

func main() {
    wq, err := queue.NewWaitableQueue[int](64, 8, 32)
    if err != nil {
        panic(err)
    }

    var wg sync.WaitGroup

    // Consumer goroutine.
    wg.Add(1)
    go func() {
        defer wg.Done()
        for {
            select {
            case _, open := <-wq.WaitChan():
                // Drain all available items after each signal.
                for {
                    item, ok, state := wq.Dequeue()
                    if !ok {
                        // Queue is empty; check if it was also closed.
                        if !open {
                            fmt.Println("queue closed, consumer done")
                            return
                        }
                        break
                    }
                    fmt.Printf("consumed: %d (state: %v)\n", item, state)
                }
            }
        }
    }()

    // Producer: send items and check for saturation.
    for i := 0; i < 10; i++ {
        state := wq.Enqueue(i)
        if state == container.QueueIsNearSaturation {
            fmt.Println("warning: queue near saturation, consider throttling")
        }
    }

    wq.Close() // Signal to the consumer that no more items will arrive.
    wg.Wait()
}

Flow control with WaitChan

Use the state returned by Dequeue to scale consumers dynamically:
_, ok, state := wq.Dequeue()
switch state {
case container.QueueIsAboveHighWatermark:
    // Spin up an additional consumer goroutine.
case container.QueueIsNearSaturation:
    // Shed load or alert.
}
Do not use QueueIsBelowLowWatermark from Enqueue as a resume signal for a throttled producer. If the producer has stopped enqueuing entirely, it will never observe that state, and it will never unthrottle. Use the Dequeue/WaitChan side for flow control instead.
WaitableQueue is concurrency-safe. Its Len() method (if called) is not atomic with respect to Enqueue/Dequeue and should only be used for observability or debugging, not for control flow decisions.