Skip to content

Cron Worker System - Architecture and Overview

Table of Contents

Introduction

The Cron Worker System is a distributed, fault-tolerant task scheduling and execution framework built on top of RabbitMQ message queuing. It provides automated execution of reports and workers based on cron schedules or one-time scheduled dates, with sophisticated retry mechanisms and error handling.

Purpose

  • Automated Task Execution: Execute reports and workers automatically based on schedules
  • Reliable Delivery: Ensure tasks are executed reliably with retry mechanisms
  • Scalability: Support horizontal scaling of both schedulers and workers
  • Observability: Comprehensive logging and monitoring capabilities
  • Deduplication: Prevent duplicate executions across multiple instances

Use Cases

  • Daily/hourly report generation (e.g., "Generate sales report every day at 8 AM")
  • Periodic data synchronization (e.g., "Sync user tokens every 30 minutes")
  • One-time scheduled tasks (e.g., "Send reminder email on 2025-12-15 at 3 PM")
  • Background processing of resource-intensive operations
  • Automated maintenance tasks (e.g., "Cleanup old records daily at 3 AM")

System Architecture

The system consists of two main components that work together via RabbitMQ:

┌────────────────────────────────────────────────────────────────────────┐
│                     CRON WORKER SYSTEM ARCHITECTURE                    │
└────────────────────────────────────────────────────────────────────────┘

  ┌─────────────────┐                        ┌──────────────────┐
  │   Database      │                        │   RabbitMQ       │
  │   (MySQL)       │                        │   Message Broker │
  └────────┬────────┘                        └────────┬─────────┘
           │                                          │
           │ Reads schedulers                        │
           │ every minute                            │
           │                                          │
  ┌────────▼────────┐                                │
  │ cron-scheduler  │◄────────┬──────────────────────┘
  │   (Producer)    │         │
  │                 │         │ 5. Retry after TTL
  │ • Loads DB      │         │    (30s → 60s → 120s)
  │ • Evaluates     │         │
  │   cron expr.    │         │
  │ • Publishes     │         │
  │   to queue      │    ┌────┴────────────┐
  └────────┬────────┘    │  cron_scheduler │
           │             │  _retry (Queue) │
           │             │  TTL: 30s-120s  │
           │ 1. Publish  └─────────▲───────┘
           │    task            4. │
           │                    NACK│
           ▼                       │
  ┌─────────────────┐              │
  │ cron_scheduler  │              │
  │  (Main Queue)   │              │
  │                 │              │
  │ DLX: cron_dlx   │              │
  └────────┬────────┘              │
           │                       │
           │ 2. Consume            │
           │    task               │
           ▼                       │
  ┌─────────────────┐         ┌────┴──────┐
  │  cron-worker    │         │  FAILURE  │  SUCCESS
  │   (Consumer)    │         └───────────┘     │
  │                 │                            │
  │ • Executes      │                         3. ACK
  │   reports       │                            │
  │ • Executes      │                            ▼
  │   workers       │                    ┌───────────────┐
  │ • Retries       │                    │ Task Complete │
  │   on failure    │                    └───────────────┘
  └────────┬────────┘

           │ If retry > MAX_RETRIES

  ┌─────────────────────┐
  │ cron_scheduler      │
  │ _failed (Queue)     │
  │                     │
  │ Permanent storage   │
  │ for failed tasks    │
  └─────────────────────┘

Architecture Principles

  1. Separation of Concerns: Scheduler (producer) and Worker (consumer) are independent processes
  2. Message-Driven: All communication happens via RabbitMQ queues
  3. At-Least-Once Delivery: Tasks are guaranteed to execute at least once (may retry on failure)
  4. Idempotency: Tasks should be designed to be safely retried
  5. Fault Tolerance: System continues operating even if individual components fail

Core Components

1. cron-scheduler (Producer)

Location: /workspace/api/b3api/src/services/workers/cron/cron-scheduler.ts

Responsibilities:

  • Reads active schedulers from the database every minute
  • Evaluates cron expressions against current time
  • Publishes matching tasks to RabbitMQ
  • Prevents duplicate executions using SchedulerExecution table
  • Performs daily cleanup of old execution records (3 AM)

Key Features:

  • Multi-instance support with atomic claiming mechanism
  • Container ID tracking for debugging
  • Cron expression validation and parsing
  • Support for both recurring (cron) and one-time (date_time) schedulers
  • Automatic reconnection to RabbitMQ on connection loss

Process Flow:

1. Every minute:
   ├─ Load active schedulers from database
   ├─ For each scheduler:
   │  ├─ Check if cron expression matches current time
   │  ├─ Generate execution key (e.g., "cron-2025-12-12-8-0")
   │  ├─ Try to claim execution atomically
   │  └─ If claimed: publish task to RabbitMQ
   └─ Log results

2. Daily at 3 AM:
   └─ Cleanup SchedulerExecution records older than 30 days

2. cron-worker (Consumer)

Location: /workspace/api/b3api/src/services/workers/cron/cron-worker.ts

Responsibilities:

  • Consumes tasks from RabbitMQ queue
  • Executes workers and reports
  • Handles failures with retry logic
  • Tracks retry count using RabbitMQ's x-death header
  • Moves permanently failed tasks to failed queue
  • Monitors failed queue and logs failures

Key Features:

  • Prefetch limit of 1 (processes one task at a time)
  • Exponential backoff retry mechanism
  • Comprehensive error logging with stack traces
  • Forced instant mode for reports (ensures proper error propagation)
  • Graceful shutdown on SIGTERM/SIGINT

Process Flow:

1. Consume task from cron_scheduler queue
2. Parse task payload (type, id, scheduler_id, params)
3. Execute task:
   ├─ If type = "worker": Call workerRepo.startWorker()
   ├─ If type = "report": Call reportRepo.generateReport()
   └─ If type = "reminder": (Not yet implemented)
4. Handle result:
   ├─ SUCCESS: ACK message (remove from queue)
   └─ FAILURE:
      ├─ Check retry count from x-death header
      ├─ If retry < MAX_RETRIES:
      │  └─ NACK message (triggers DLX → retry queue)
      └─ If retry >= MAX_RETRIES:
         ├─ Publish to failed queue
         └─ ACK original message

3. Scheduler Database Models

Scheduler Model (/workspace/api/b3api/src/models/scheduler.model.ts):

typescript
{
  id: number;                  // Primary key
  schedule: string;            // Cron expression (e.g., "0 8 * * *")
  date_time: Date;             // One-time execution date/time
  params: object;              // Task parameters (JSON)
  status: boolean;             // Active/inactive
  stay_alive: boolean;         // Standalone container flag
  description: string;         // Human-readable description

  // Foreign keys
  report_id: number;           // Report to execute
  worker_id: number;           // Worker to execute
  reminder_id: number;         // Reminder to send
  user_id: number;             // User who created
  group_id: number;            // Module group
  language_id: number;         // Language for execution
}

SchedulerExecution Model (/workspace/api/b3api/src/models/scheduler-execution.model.ts):

typescript
{
  id: number;                  // Primary key
  scheduler_id: number;        // Foreign key to Scheduler
  execution_key: string;       // Deduplication key (e.g., "cron-2025-12-12-8-0")
  executed_at: Date;           // Timestamp of execution
  container_id: string;        // Container that claimed execution
}

4. PM2Service - Process Management

Location: /workspace/api/b3api/src/services/pm2.service.ts

Responsibilities:

  • Spawns child processes for report execution
  • Captures stdout/stderr from child processes
  • Manages instant vs background execution modes
  • Handles process lifecycle and error propagation

Key Features:

  • Child Process Stdio Capture: Changed from ['ignore', 'ignore', 'ignore', 'ipc'] to ['pipe', 'pipe', 'pipe', 'ipc'] to capture all logs
  • Timestamped Logging: All child process logs include timestamps
  • Error Propagation: Properly propagates errors from child processes in instant mode
  • Process Tracking: Tracks process failures via IPC messages

Example Log Output:

2025-12-12T12:00:00.000Z [Report:RefreshUserToken:12345] Process spawned (instant mode)
2025-12-12T12:00:01.500Z [Report:RefreshUserToken:12345] Refreshing tokens for 150 users...
2025-12-12T12:00:05.200Z [Report:RefreshUserToken:12345] Process finished successfully
2025-12-12T12:00:05.201Z [Report:RefreshUserToken:12345] Process exited successfully (code: 0)

5. CronLogger Utility

Location: /workspace/api/b3api/src/services/workers/cron/logger.util.ts

Responsibilities:

  • Provides structured, consistent logging across cron components
  • Supports multiple log levels (INFO, WARN, ERROR, DEBUG, SUCCESS)
  • Formats logs with timestamps, context, and colors

Log Levels:

  • INFO (Cyan): General informational messages
  • WARN (Yellow): Warning messages (e.g., retries, non-critical issues)
  • ERROR (Red): Error messages with full stack traces
  • DEBUG (Gray): Debug information (e.g., task parameters)
  • SUCCESS (Green): Success confirmations

Example Usage:

typescript
CronLogger.info('Processing task', {type: 'report', id: 25, scheduler_id: 16});
// Output: 2025-12-12T12:00:00.000Z [INFO] [type=report, id=25, scheduler_id=16] Processing task

CronLogger.error('Task failed', error, {type: 'report', id: 25});
// Output: 2025-12-12T12:00:00.000Z [ERROR] [type=report, id=25] Task failed
//         2025-12-12T12:00:00.001Z [ERROR] Stack trace:
//         Error: Something went wrong
//             at ...

Data Flow

Complete Flow: From Scheduler to Execution

┌─────────────────────────────────────────────────────────────────────────┐
│                    COMPLETE DATA FLOW DIAGRAM                           │
└─────────────────────────────────────────────────────────────────────────┘

1. USER CREATES SCHEDULER

[Web Application] → POST /schedulers

[MySQL Database] → Scheduler table
   {
     id: 16,
     schedule: "0 8 * * *",  // Every day at 8 AM
     report_id: 25,
     params: {...},
     status: true
   }

2. SCHEDULER PICKS UP TASK (Every Minute)

[cron-scheduler] → Reads Scheduler table

   Evaluates: Does "0 8 * * *" match current time (8:00)?

   YES → Generate execution_key: "cron-2025-12-12-8-0"

   Try to claim execution in SchedulerExecution table

   INSERT INTO SchedulerExecution (scheduler_id, execution_key, container_id, executed_at)

   IF INSERT SUCCESS (no duplicate):

      Publish to RabbitMQ:
      {
        type: "report",
        id: 25,
        scheduler_id: 16,
        params_scheduler: "{...}"
      }

3. WORKER EXECUTES TASK

[cron-worker] → Consumes from cron_scheduler queue

   Parse message → Extract type, id, scheduler_id, params

   IF type = "report":

      reportRepo.generateReport(params)

         PM2Service.startReportNative(report, params)

            Fork child process: RefreshUserToken.js

               Child process executes report logic

               ┌─────────────┬─────────────┐
               │   SUCCESS   │   FAILURE   │
               └─────────────┴─────────────┘
                     │               │
                     ↓               ↓
               Send IPC msg    Send IPC msg
               {finish: pid}   {failed: true, error: {...}}
                     │               │
                     ↓               ↓
               Exit code 0     Exit code 1

4. RESULT HANDLING

   IF SUCCESS:

      [cron-worker] → ACK message

      Message removed from queue

      ✅ Task completed successfully

   IF FAILURE:

      [cron-worker] → Check x-death header

      retry_count = x-death[0].count (e.g., 0, 1, 2, ...)

      IF retry_count < MAX_RETRIES:

         NACK message (requeue: false)

         Message dead-lettered to cron_dlx exchange

         Routed to cron_scheduler_retry queue

         Wait for TTL (30s, 60s, 120s based on retry count)

         After TTL: Dead-letter back to cron_scheduler

         🔄 Retry execution (go back to step 3)

      IF retry_count >= MAX_RETRIES:

         Publish to cron_scheduler_failed queue

         ACK original message

         ⚠️ Task permanently failed

         [cron-worker] → Logs failure details

Execution Deduplication

The system prevents duplicate executions using the SchedulerExecution table:

Time: 08:00:00

[Container A] Evaluates scheduler #16

   Generate execution_key: "cron-2025-12-12-8-0"

   TRY: INSERT INTO SchedulerExecution (scheduler_id=16, execution_key="cron-2025-12-12-8-0", ...)

   SUCCESS → Claim execution, publish to queue

[Container B] Evaluates scheduler #16 (same minute)

   Generate execution_key: "cron-2025-12-12-8-0"

   TRY: INSERT INTO SchedulerExecution (scheduler_id=16, execution_key="cron-2025-12-12-8-0", ...)

   FAILURE (Duplicate key error) → Skip execution

Result: Only Container A publishes the task, Container B skips it.

Queue Infrastructure

RabbitMQ Queues

1. cron_scheduler (Main Queue)

javascript
{
  durable: true,
  arguments: {
    'x-message-deduplication': true,
    'x-dead-letter-exchange': 'cron_dlx',
    'x-dead-letter-routing-key': 'retry'
  }
}
  • Purpose: Main task queue where scheduler publishes and worker consumes
  • Durability: Survives RabbitMQ restarts
  • Deduplication: Prevents duplicate messages
  • Dead Letter Exchange: Failed messages go to cron_dlx

2. cron_scheduler_retry (Retry Queue)

javascript
{
  durable: true,
  arguments: {
    'x-dead-letter-exchange': '',  // Default exchange
    'x-dead-letter-routing-key': 'cron_scheduler',
    'x-message-ttl': 30000  // 30 seconds (configurable)
  }
}
  • Purpose: Temporary storage for messages awaiting retry
  • TTL: Messages expire after configured delay
  • Dead Letter: After TTL, messages return to main queue

3. cron_scheduler_failed (Failed Queue)

javascript
{
  durable: true
}
  • Purpose: Permanent storage for tasks exceeding max retries
  • Monitoring: Worker monitors this queue and logs failures
  • Manual Recovery: Failed tasks can be manually moved back to main queue

RabbitMQ Exchange

cron_dlx (Dead Letter Exchange)

javascript
{
  type: 'direct',
  durable: true
}

Bindings:

  • cron_scheduler_retry bound with routing key retry
  • cron_scheduler_failed bound with routing key failed

Purpose: Routes failed messages to either retry or failed queue based on retry count

Message Format

Task Message Payload:

json
{
  "type": "report",           // or "worker" or "reminder"
  "id": 25,                   // Report/Worker/Reminder ID
  "scheduler_id": 16,         // Scheduler that triggered this task
  "params_scheduler": "{...}" // JSON-encoded parameters
}

Failed Message Payload (in cron_scheduler_failed):

json
{
  "type": "report",
  "id": 25,
  "scheduler_id": 16,
  "params_scheduler": "{...}",
  "failed_at": "2025-12-12T12:03:39.002Z",
  "failed_reason": "Connection timeout",
  "retry_count": 3
}

x-death Header

RabbitMQ automatically adds the x-death header when a message is dead-lettered:

json
{
  "x-death": [
    {
      "count": 2,                           // Number of times dead-lettered
      "reason": "rejected",                 // Why it was dead-lettered
      "queue": "cron_scheduler",           // Original queue
      "time": "2025-12-12T12:01:33.000Z", // Timestamp
      "exchange": "cron_dlx",              // DLX exchange
      "routing-keys": ["retry"]            // Routing key used
    }
  ]
}

Usage: The count field is used to determine the number of retry attempts.

Key Features

1. Exponential Backoff Retry

Retry delays increase exponentially to avoid overwhelming failing services:

Retry #1: 30s  × 2^0 = 30 seconds
Retry #2: 30s  × 2^1 = 60 seconds
Retry #3: 30s  × 2^2 = 120 seconds

Configurable via environment variables:

  • CRON_RETRY_DELAY_MS: Base delay (default: 30000ms)
  • CRON_RETRY_DELAY_MULTIPLIER: Multiplier (default: 2.0)

2. Multi-Instance Coordination

The system supports running multiple scheduler and worker instances:

Scheduler Coordination:

  • Each scheduler instance generates a unique container_id (hostname + PID)
  • Atomic claiming via database INSERT prevents race conditions
  • Only one instance successfully claims each execution

Worker Coordination:

  • RabbitMQ distributes tasks across workers (round-robin with prefetch=1)
  • No coordination needed - RabbitMQ handles distribution

3. Comprehensive Logging

All components use structured logging with timestamps and context:

2025-12-12T12:00:00.000Z [INFO] [type=report, id=25, scheduler_id=16] Processing task
2025-12-12T12:00:00.001Z [DEBUG] Task parameters: {"controller": "RefreshUserToken", ...}
2025-12-12T12:00:03.500Z [ERROR] [type=report, id=25, scheduler_id=16, duration=3500ms] Task execution failed
2025-12-12T12:00:03.501Z [ERROR] Stack trace:
Error: Connection timeout
    at RefreshUserToken.mainFunction (/workspace/api/b3api/dist/services/reports/RefreshUserToken/RefreshUserToken.js:45:15)
    ...
2025-12-12T12:00:03.502Z [WARN] [type=report, id=25, scheduler_id=16] Scheduling retry 1/3 in 30000ms

4. Failed Task Monitoring

The worker actively monitors the failed queue and logs all permanently failed tasks:

typescript
ch.consume('cron_scheduler_failed', (msg) => {
  const failedTask = JSON.parse(msg.content.toString());
  CronLogger.error('⚠️  PERMANENTLY FAILED TASK', undefined, {
    type: failedTask.type,
    id: failedTask.id,
    scheduler_id: failedTask.scheduler_id,
    retry_count: failedTask.retry_count,
    failed_at: failedTask.failed_at
  });
  ch.ack(msg);
}, {noAck: false});

5. Graceful Shutdown

Both components handle SIGTERM and SIGINT signals gracefully:

typescript
process.on('SIGTERM', async () => {
  CronLogger.warn('SIGTERM received, shutting down gracefully...');
  await ch.close();
  await conn.close();
  CronLogger.info('Shutdown complete');
  process.exit(0);
});

6. Automatic Cleanup

The scheduler runs a daily cleanup job at 3 AM to remove old execution records:

typescript
cron.schedule('0 3 * * *', async () => {
  const deletedCount = await schedulerExecutionRepo.cleanupOldExecutions(30);
  console.log(`[CLEANUP] ✓ Deleted ${deletedCount} execution records older than 30 days`);
});

7. Forced Instant Mode for Reports

Critical feature to ensure errors are properly caught:

typescript
// CRITICAL: Force instant mode for cron tasks to properly catch errors
// Without this, the report runs in background and errors are not propagated back
if (params.options_interface && params.options_interface.delivery) {
  params.options_interface.delivery.instant = true;
}

This ensures that:

  • Reports run in foreground (not background)
  • Errors are propagated back to the worker
  • Retry mechanism can properly trigger on failures

Syneo/Barcoding Documentation