Cron Worker System - Architecture and Overview
Table of Contents
- Introduction
- System Architecture
- Core Components
- Data Flow
- Queue Infrastructure
- Key Features
- Related Documentation
Introduction
The Cron Worker System is a distributed, fault-tolerant task scheduling and execution framework built on top of RabbitMQ message queuing. It provides automated execution of reports and workers based on cron schedules or one-time scheduled dates, with sophisticated retry mechanisms and error handling.
Purpose
- Automated Task Execution: Execute reports and workers automatically based on schedules
- Reliable Delivery: Ensure tasks are executed reliably with retry mechanisms
- Scalability: Support horizontal scaling of both schedulers and workers
- Observability: Comprehensive logging and monitoring capabilities
- Deduplication: Prevent duplicate executions across multiple instances
Use Cases
- Daily/hourly report generation (e.g., "Generate sales report every day at 8 AM")
- Periodic data synchronization (e.g., "Sync user tokens every 30 minutes")
- One-time scheduled tasks (e.g., "Send reminder email on 2025-12-15 at 3 PM")
- Background processing of resource-intensive operations
- Automated maintenance tasks (e.g., "Cleanup old records daily at 3 AM")
System Architecture
The system consists of two main components that work together via RabbitMQ:
┌────────────────────────────────────────────────────────────────────────┐
│ CRON WORKER SYSTEM ARCHITECTURE │
└────────────────────────────────────────────────────────────────────────┘
┌─────────────────┐ ┌──────────────────┐
│ Database │ │ RabbitMQ │
│ (MySQL) │ │ Message Broker │
└────────┬────────┘ └────────┬─────────┘
│ │
│ Reads schedulers │
│ every minute │
│ │
┌────────▼────────┐ │
│ cron-scheduler │◄────────┬──────────────────────┘
│ (Producer) │ │
│ │ │ 5. Retry after TTL
│ • Loads DB │ │ (30s → 60s → 120s)
│ • Evaluates │ │
│ cron expr. │ │
│ • Publishes │ │
│ to queue │ ┌────┴────────────┐
└────────┬────────┘ │ cron_scheduler │
│ │ _retry (Queue) │
│ │ TTL: 30s-120s │
│ 1. Publish └─────────▲───────┘
│ task 4. │
│ NACK│
▼ │
┌─────────────────┐ │
│ cron_scheduler │ │
│ (Main Queue) │ │
│ │ │
│ DLX: cron_dlx │ │
└────────┬────────┘ │
│ │
│ 2. Consume │
│ task │
▼ │
┌─────────────────┐ ┌────┴──────┐
│ cron-worker │ │ FAILURE │ SUCCESS
│ (Consumer) │ └───────────┘ │
│ │ │
│ • Executes │ 3. ACK
│ reports │ │
│ • Executes │ ▼
│ workers │ ┌───────────────┐
│ • Retries │ │ Task Complete │
│ on failure │ └───────────────┘
└────────┬────────┘
│
│ If retry > MAX_RETRIES
▼
┌─────────────────────┐
│ cron_scheduler │
│ _failed (Queue) │
│ │
│ Permanent storage │
│ for failed tasks │
└─────────────────────┘Architecture Principles
- Separation of Concerns: Scheduler (producer) and Worker (consumer) are independent processes
- Message-Driven: All communication happens via RabbitMQ queues
- At-Least-Once Delivery: Tasks are guaranteed to execute at least once (may retry on failure)
- Idempotency: Tasks should be designed to be safely retried
- Fault Tolerance: System continues operating even if individual components fail
Core Components
1. cron-scheduler (Producer)
Location: /workspace/api/b3api/src/services/workers/cron/cron-scheduler.ts
Responsibilities:
- Reads active schedulers from the database every minute
- Evaluates cron expressions against current time
- Publishes matching tasks to RabbitMQ
- Prevents duplicate executions using
SchedulerExecutiontable - Performs daily cleanup of old execution records (3 AM)
Key Features:
- Multi-instance support with atomic claiming mechanism
- Container ID tracking for debugging
- Cron expression validation and parsing
- Support for both recurring (cron) and one-time (date_time) schedulers
- Automatic reconnection to RabbitMQ on connection loss
Process Flow:
1. Every minute:
├─ Load active schedulers from database
├─ For each scheduler:
│ ├─ Check if cron expression matches current time
│ ├─ Generate execution key (e.g., "cron-2025-12-12-8-0")
│ ├─ Try to claim execution atomically
│ └─ If claimed: publish task to RabbitMQ
└─ Log results
2. Daily at 3 AM:
└─ Cleanup SchedulerExecution records older than 30 days2. cron-worker (Consumer)
Location: /workspace/api/b3api/src/services/workers/cron/cron-worker.ts
Responsibilities:
- Consumes tasks from RabbitMQ queue
- Executes workers and reports
- Handles failures with retry logic
- Tracks retry count using RabbitMQ's
x-deathheader - Moves permanently failed tasks to failed queue
- Monitors failed queue and logs failures
Key Features:
- Prefetch limit of 1 (processes one task at a time)
- Exponential backoff retry mechanism
- Comprehensive error logging with stack traces
- Forced instant mode for reports (ensures proper error propagation)
- Graceful shutdown on SIGTERM/SIGINT
Process Flow:
1. Consume task from cron_scheduler queue
2. Parse task payload (type, id, scheduler_id, params)
3. Execute task:
├─ If type = "worker": Call workerRepo.startWorker()
├─ If type = "report": Call reportRepo.generateReport()
└─ If type = "reminder": (Not yet implemented)
4. Handle result:
├─ SUCCESS: ACK message (remove from queue)
└─ FAILURE:
├─ Check retry count from x-death header
├─ If retry < MAX_RETRIES:
│ └─ NACK message (triggers DLX → retry queue)
└─ If retry >= MAX_RETRIES:
├─ Publish to failed queue
└─ ACK original message3. Scheduler Database Models
Scheduler Model (/workspace/api/b3api/src/models/scheduler.model.ts):
{
id: number; // Primary key
schedule: string; // Cron expression (e.g., "0 8 * * *")
date_time: Date; // One-time execution date/time
params: object; // Task parameters (JSON)
status: boolean; // Active/inactive
stay_alive: boolean; // Standalone container flag
description: string; // Human-readable description
// Foreign keys
report_id: number; // Report to execute
worker_id: number; // Worker to execute
reminder_id: number; // Reminder to send
user_id: number; // User who created
group_id: number; // Module group
language_id: number; // Language for execution
}SchedulerExecution Model (/workspace/api/b3api/src/models/scheduler-execution.model.ts):
{
id: number; // Primary key
scheduler_id: number; // Foreign key to Scheduler
execution_key: string; // Deduplication key (e.g., "cron-2025-12-12-8-0")
executed_at: Date; // Timestamp of execution
container_id: string; // Container that claimed execution
}4. PM2Service - Process Management
Location: /workspace/api/b3api/src/services/pm2.service.ts
Responsibilities:
- Spawns child processes for report execution
- Captures stdout/stderr from child processes
- Manages instant vs background execution modes
- Handles process lifecycle and error propagation
Key Features:
- Child Process Stdio Capture: Changed from
['ignore', 'ignore', 'ignore', 'ipc']to['pipe', 'pipe', 'pipe', 'ipc']to capture all logs - Timestamped Logging: All child process logs include timestamps
- Error Propagation: Properly propagates errors from child processes in instant mode
- Process Tracking: Tracks process failures via IPC messages
Example Log Output:
2025-12-12T12:00:00.000Z [Report:RefreshUserToken:12345] Process spawned (instant mode)
2025-12-12T12:00:01.500Z [Report:RefreshUserToken:12345] Refreshing tokens for 150 users...
2025-12-12T12:00:05.200Z [Report:RefreshUserToken:12345] Process finished successfully
2025-12-12T12:00:05.201Z [Report:RefreshUserToken:12345] Process exited successfully (code: 0)5. CronLogger Utility
Location: /workspace/api/b3api/src/services/workers/cron/logger.util.ts
Responsibilities:
- Provides structured, consistent logging across cron components
- Supports multiple log levels (INFO, WARN, ERROR, DEBUG, SUCCESS)
- Formats logs with timestamps, context, and colors
Log Levels:
INFO(Cyan): General informational messagesWARN(Yellow): Warning messages (e.g., retries, non-critical issues)ERROR(Red): Error messages with full stack tracesDEBUG(Gray): Debug information (e.g., task parameters)SUCCESS(Green): Success confirmations
Example Usage:
CronLogger.info('Processing task', {type: 'report', id: 25, scheduler_id: 16});
// Output: 2025-12-12T12:00:00.000Z [INFO] [type=report, id=25, scheduler_id=16] Processing task
CronLogger.error('Task failed', error, {type: 'report', id: 25});
// Output: 2025-12-12T12:00:00.000Z [ERROR] [type=report, id=25] Task failed
// 2025-12-12T12:00:00.001Z [ERROR] Stack trace:
// Error: Something went wrong
// at ...Data Flow
Complete Flow: From Scheduler to Execution
┌─────────────────────────────────────────────────────────────────────────┐
│ COMPLETE DATA FLOW DIAGRAM │
└─────────────────────────────────────────────────────────────────────────┘
1. USER CREATES SCHEDULER
↓
[Web Application] → POST /schedulers
↓
[MySQL Database] → Scheduler table
{
id: 16,
schedule: "0 8 * * *", // Every day at 8 AM
report_id: 25,
params: {...},
status: true
}
2. SCHEDULER PICKS UP TASK (Every Minute)
↓
[cron-scheduler] → Reads Scheduler table
↓
Evaluates: Does "0 8 * * *" match current time (8:00)?
↓
YES → Generate execution_key: "cron-2025-12-12-8-0"
↓
Try to claim execution in SchedulerExecution table
↓
INSERT INTO SchedulerExecution (scheduler_id, execution_key, container_id, executed_at)
↓
IF INSERT SUCCESS (no duplicate):
↓
Publish to RabbitMQ:
{
type: "report",
id: 25,
scheduler_id: 16,
params_scheduler: "{...}"
}
3. WORKER EXECUTES TASK
↓
[cron-worker] → Consumes from cron_scheduler queue
↓
Parse message → Extract type, id, scheduler_id, params
↓
IF type = "report":
↓
reportRepo.generateReport(params)
↓
PM2Service.startReportNative(report, params)
↓
Fork child process: RefreshUserToken.js
↓
Child process executes report logic
↓
┌─────────────┬─────────────┐
│ SUCCESS │ FAILURE │
└─────────────┴─────────────┘
│ │
↓ ↓
Send IPC msg Send IPC msg
{finish: pid} {failed: true, error: {...}}
│ │
↓ ↓
Exit code 0 Exit code 1
4. RESULT HANDLING
↓
IF SUCCESS:
↓
[cron-worker] → ACK message
↓
Message removed from queue
↓
✅ Task completed successfully
IF FAILURE:
↓
[cron-worker] → Check x-death header
↓
retry_count = x-death[0].count (e.g., 0, 1, 2, ...)
↓
IF retry_count < MAX_RETRIES:
↓
NACK message (requeue: false)
↓
Message dead-lettered to cron_dlx exchange
↓
Routed to cron_scheduler_retry queue
↓
Wait for TTL (30s, 60s, 120s based on retry count)
↓
After TTL: Dead-letter back to cron_scheduler
↓
🔄 Retry execution (go back to step 3)
IF retry_count >= MAX_RETRIES:
↓
Publish to cron_scheduler_failed queue
↓
ACK original message
↓
⚠️ Task permanently failed
↓
[cron-worker] → Logs failure detailsExecution Deduplication
The system prevents duplicate executions using the SchedulerExecution table:
Time: 08:00:00
↓
[Container A] Evaluates scheduler #16
↓
Generate execution_key: "cron-2025-12-12-8-0"
↓
TRY: INSERT INTO SchedulerExecution (scheduler_id=16, execution_key="cron-2025-12-12-8-0", ...)
↓
SUCCESS → Claim execution, publish to queue
[Container B] Evaluates scheduler #16 (same minute)
↓
Generate execution_key: "cron-2025-12-12-8-0"
↓
TRY: INSERT INTO SchedulerExecution (scheduler_id=16, execution_key="cron-2025-12-12-8-0", ...)
↓
FAILURE (Duplicate key error) → Skip execution
Result: Only Container A publishes the task, Container B skips it.Queue Infrastructure
RabbitMQ Queues
1. cron_scheduler (Main Queue)
{
durable: true,
arguments: {
'x-message-deduplication': true,
'x-dead-letter-exchange': 'cron_dlx',
'x-dead-letter-routing-key': 'retry'
}
}- Purpose: Main task queue where scheduler publishes and worker consumes
- Durability: Survives RabbitMQ restarts
- Deduplication: Prevents duplicate messages
- Dead Letter Exchange: Failed messages go to
cron_dlx
2. cron_scheduler_retry (Retry Queue)
{
durable: true,
arguments: {
'x-dead-letter-exchange': '', // Default exchange
'x-dead-letter-routing-key': 'cron_scheduler',
'x-message-ttl': 30000 // 30 seconds (configurable)
}
}- Purpose: Temporary storage for messages awaiting retry
- TTL: Messages expire after configured delay
- Dead Letter: After TTL, messages return to main queue
3. cron_scheduler_failed (Failed Queue)
{
durable: true
}- Purpose: Permanent storage for tasks exceeding max retries
- Monitoring: Worker monitors this queue and logs failures
- Manual Recovery: Failed tasks can be manually moved back to main queue
RabbitMQ Exchange
cron_dlx (Dead Letter Exchange)
{
type: 'direct',
durable: true
}Bindings:
cron_scheduler_retrybound with routing keyretrycron_scheduler_failedbound with routing keyfailed
Purpose: Routes failed messages to either retry or failed queue based on retry count
Message Format
Task Message Payload:
{
"type": "report", // or "worker" or "reminder"
"id": 25, // Report/Worker/Reminder ID
"scheduler_id": 16, // Scheduler that triggered this task
"params_scheduler": "{...}" // JSON-encoded parameters
}Failed Message Payload (in cron_scheduler_failed):
{
"type": "report",
"id": 25,
"scheduler_id": 16,
"params_scheduler": "{...}",
"failed_at": "2025-12-12T12:03:39.002Z",
"failed_reason": "Connection timeout",
"retry_count": 3
}x-death Header
RabbitMQ automatically adds the x-death header when a message is dead-lettered:
{
"x-death": [
{
"count": 2, // Number of times dead-lettered
"reason": "rejected", // Why it was dead-lettered
"queue": "cron_scheduler", // Original queue
"time": "2025-12-12T12:01:33.000Z", // Timestamp
"exchange": "cron_dlx", // DLX exchange
"routing-keys": ["retry"] // Routing key used
}
]
}Usage: The count field is used to determine the number of retry attempts.
Key Features
1. Exponential Backoff Retry
Retry delays increase exponentially to avoid overwhelming failing services:
Retry #1: 30s × 2^0 = 30 seconds
Retry #2: 30s × 2^1 = 60 seconds
Retry #3: 30s × 2^2 = 120 secondsConfigurable via environment variables:
CRON_RETRY_DELAY_MS: Base delay (default: 30000ms)CRON_RETRY_DELAY_MULTIPLIER: Multiplier (default: 2.0)
2. Multi-Instance Coordination
The system supports running multiple scheduler and worker instances:
Scheduler Coordination:
- Each scheduler instance generates a unique
container_id(hostname + PID) - Atomic claiming via database INSERT prevents race conditions
- Only one instance successfully claims each execution
Worker Coordination:
- RabbitMQ distributes tasks across workers (round-robin with prefetch=1)
- No coordination needed - RabbitMQ handles distribution
3. Comprehensive Logging
All components use structured logging with timestamps and context:
2025-12-12T12:00:00.000Z [INFO] [type=report, id=25, scheduler_id=16] Processing task
2025-12-12T12:00:00.001Z [DEBUG] Task parameters: {"controller": "RefreshUserToken", ...}
2025-12-12T12:00:03.500Z [ERROR] [type=report, id=25, scheduler_id=16, duration=3500ms] Task execution failed
2025-12-12T12:00:03.501Z [ERROR] Stack trace:
Error: Connection timeout
at RefreshUserToken.mainFunction (/workspace/api/b3api/dist/services/reports/RefreshUserToken/RefreshUserToken.js:45:15)
...
2025-12-12T12:00:03.502Z [WARN] [type=report, id=25, scheduler_id=16] Scheduling retry 1/3 in 30000ms4. Failed Task Monitoring
The worker actively monitors the failed queue and logs all permanently failed tasks:
ch.consume('cron_scheduler_failed', (msg) => {
const failedTask = JSON.parse(msg.content.toString());
CronLogger.error('⚠️ PERMANENTLY FAILED TASK', undefined, {
type: failedTask.type,
id: failedTask.id,
scheduler_id: failedTask.scheduler_id,
retry_count: failedTask.retry_count,
failed_at: failedTask.failed_at
});
ch.ack(msg);
}, {noAck: false});5. Graceful Shutdown
Both components handle SIGTERM and SIGINT signals gracefully:
process.on('SIGTERM', async () => {
CronLogger.warn('SIGTERM received, shutting down gracefully...');
await ch.close();
await conn.close();
CronLogger.info('Shutdown complete');
process.exit(0);
});6. Automatic Cleanup
The scheduler runs a daily cleanup job at 3 AM to remove old execution records:
cron.schedule('0 3 * * *', async () => {
const deletedCount = await schedulerExecutionRepo.cleanupOldExecutions(30);
console.log(`[CLEANUP] ✓ Deleted ${deletedCount} execution records older than 30 days`);
});7. Forced Instant Mode for Reports
Critical feature to ensure errors are properly caught:
// CRITICAL: Force instant mode for cron tasks to properly catch errors
// Without this, the report runs in background and errors are not propagated back
if (params.options_interface && params.options_interface.delivery) {
params.options_interface.delivery.instant = true;
}This ensures that:
- Reports run in foreground (not background)
- Errors are propagated back to the worker
- Retry mechanism can properly trigger on failures
Related Documentation
- Report Scheduling Guide - How to create and manage schedulers
- Retry Mechanism - Detailed retry logic and configuration