Bull Queue Guide
Overview
The framework uses Bull for background job processing, replacing the previous Kue implementation.
Redis: separate connection for queue vs cache
Cache (API response cache, rate limiting) and the job queue use separate Redis connections so heavy queue traffic does not block cache reads/writes.
- Cache uses
REDIS_URL(defaultredis://127.0.0.1:6379/1) viasrc/services/database/(Redis client). - Queue (Bull) uses
REDIS_QUEUE_URL(defaultredis://127.0.0.1:6379/2).
Using different DB numbers on the same host (e.g. /1 for cache, /2 for queue) gives separate connections and key isolation. For full isolation under load, point REDIS_QUEUE_URL at a different Redis instance (e.g. another host or port).
Queue Types
searchIndex- Create search tags for database recordslogRequest- Log API requestslogResponse- Log API responsessaveToTrash- Backup deleted data to trashsendWebhook- Send webhook eventssendHTTPRequest- Make HTTP requests
Creating Jobs
From application code (e.g. controllers or services), import the queue and create jobs. In this project the queue lives under src/services/queue/; use the path that resolves from your file (e.g. ../services/queue or the project’s alias).
import queue from './services/queue'; // or your project's path to src/services/queue
// Create a job
await queue.create('logRequest', {
RequestId: '...',
ipAddress: '...',
url: '...',
method: 'GET',
body: {}
}).save();
Await vs fire-and-forget
queue.create(...).save() returns a Promise (the job is added to Redis asynchronously). You can either await it or not:
- With
await— The caller waits until the job is enqueued. You get the created job (e.g.job.id) and can handle enqueue errors with try/catch. Use this when you need to confirm the job was added or use the job object. - Without
await— Fire-and-forget: the caller continues immediately while the job is added in the background. If you skipawait, attach.catch(...)to the returned Promise so enqueue failures don’t become unhandled rejections.
// Wait for enqueue, handle errors, use job
const job = await queue.create('logRequest', data).save();
console.log('Job id:', job.id);
// Fire-and-forget (add .catch to handle errors)
queue.create('logRequest', data).save().catch((err) => log.error('Enqueue failed', err));
Job Processors
Job processors are defined in src/services/queue/workers.ts. Each queue has a processor that handles jobs. Sample job definitions live in src/services/queue/jobs.ts. The clock and workers are started with npm run clock and npm run workers (they run the compiled code in dist/ after npm run build).
Scheduled Jobs
Use the queue clock to schedule recurring jobs:
import queue from './services/queue';
// Schedule a job
await queue.addSchedule(
'0 0 * * *', // Cron expression (daily at midnight)
'daily-cleanup',
'saveToTrash',
{ /* job data */ }
);
Job Options
Jobs support:
- Retry attempts (default: 3)
- Exponential backoff
- Job removal policies
- Job priorities
Monitoring
Use Bull Board or similar tools to monitor queue status and job processing.