Queue

Provides a simple message queue based on SQLite

Technology

The queue module provides a database-backed message queue system for Project Forge applications. It enables reliable, persistent messaging between application components with topic-based routing and automatic retry mechanisms.

Overview

This module provides a simple yet robust message queue built on SQLite, offering:

Key Features

Reliability

Performance

Observability

Developer Experience

Package Structure

Core Queue Components

Admin Interface

Database Schema

Usage Examples

Basic Message Sending

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import "{{{ .Package }}}/app/lib/queue"

// Create a new queue instance
q := queue.New(db, logger)

// Send a message to a topic
params := map[string]any{
"userId": 123,
"action": "send_email",
"email": "user@example.com",
}

err := q.Send(ctx, "email_notifications", params)
if err != nil {
return errors.Wrap(err, "failed to send message")
}

Message Processing

 1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Receive messages from a topic (blocking)
message, err := q.Receive(ctx, "email_notifications", 30*time.Second)
if err != nil {
return errors.Wrap(err, "failed to receive message")
}

if message != nil {
// Process the message
err = processEmailNotification(message.Params)
if err != nil {
// Message will be retried automatically
return errors.Wrap(err, "failed to process message")
}
}

Non-blocking Operations

 1
2
3
4
5
6
7
8
9
10
11
12
// Non-blocking receive (returns immediately)
message, err := q.ReceiveNonBlocking(ctx, "background_tasks")
if err != nil {
return err
}

if message == nil {
// No messages available
return nil
}

// Process message...

Configuration

The queue module supports configuration through environment variables and database settings:

Queue Behavior

Performance Tuning

Dependencies

This module requires the following:

Required Modules

External Dependencies

Admin Interface

The module provides a web-based admin interface accessible at /admin/queue:

Source Code

See Also