Skip to content

Cron Worker Retry Mechanism

Table of Contents

Overview

The cron worker implements a sophisticated retry mechanism using RabbitMQ's Dead Letter Exchange (DLX) pattern. This ensures that transient failures don't result in lost tasks while preventing infinite retry loops.

Key Features

  • Automatic Retry: Failed tasks automatically retry without manual intervention
  • Exponential Backoff: Increasing delays between retries to avoid overwhelming failing services
  • Retry Limit: Maximum retry attempts prevent infinite loops
  • Failed Queue: Permanently failed tasks stored for investigation
  • x-death Tracking: RabbitMQ's built-in header tracks retry count
  • No Code Changes: Retry logic handled entirely by RabbitMQ and worker configuration

How It Works

Architecture

┌────────────────────────────────────────────────────────────────────────┐
│                      RETRY MECHANISM ARCHITECTURE                       │
└────────────────────────────────────────────────────────────────────────┘

1. TASK EXECUTION ATTEMPT

   [cron_scheduler] ← Main queue

         │ consume

   [cron-worker] → Execute task

    ┌────┴─────┐
    │          │
SUCCESS│          │FAILURE
    │          │
    ↓          ↓
  [ACK]     [NACK]
    │          │
    │          │ (requeue: false)
    │          │ Triggers dead-letter
    │          ↓
    │    [cron_dlx] ← Dead Letter Exchange
    │          │
    │          │ Routes based on retry count
    │          │
    │     ┌────┴────┐
    │     │         │
    │  retry < MAX  retry >= MAX
    │     │         │
    │     ↓         ↓
    │ [cron_scheduler_retry]   [cron_scheduler_failed]
    │     │                             │
    │     │ TTL: 30s-120s               │
    │     │ (waits...)                  │
    │     │                             │
    │     │ After TTL expires:          │
    │     │ Dead-letter back            │
    │     │ to main queue               │
    │     ↓                             ↓
    │ [cron_scheduler] ← retry     [Failed storage]
    │     │
    └─────┘ (loop)

2. RETRY COUNT TRACKING

   RabbitMQ x-death header:
   {
     "x-death": [{
       "count": 2,           ← Number of times dead-lettered
       "reason": "rejected",
       "queue": "cron_scheduler",
       "time": "2025-12-12T...",
       "exchange": "cron_dlx",
       "routing-keys": ["retry"]
     }]
   }

3. RETRY DECISION

   Worker reads x-death[0].count

   ├─ count = 0: First execution
   ├─ count = 1: First retry
   ├─ count = 2: Second retry
   └─ count = 3: Third retry (max reached)

         └→ Publish to failed queue

Step-by-Step Flow

Step 1: Initial Execution

Task arrives → Execute → Failure

Worker checks x-death header: count = 0 (first try)

Worker NACKs message (requeue: false)

RabbitMQ dead-letters message to cron_dlx exchange

DLX routes to cron_scheduler_retry queue (routing key: "retry")

Message waits in retry queue for TTL (30 seconds)

Step 2: First Retry

TTL expires → Message dead-lettered back to cron_scheduler

x-death header updated: count = 1

Worker consumes → Execute → Failure (again)

Worker checks x-death: count = 1 (first retry)

Worker NACKs message

RabbitMQ dead-letters to retry queue

Wait 60 seconds (exponential backoff: 30s × 2^1)

Step 3: Second Retry

TTL expires → Back to main queue

x-death header: count = 2

Execute → Failure

NACK → Retry queue

Wait 120 seconds (30s × 2^2)

Step 4: Max Retries Exceeded

TTL expires → Back to main queue

x-death header: count = 3

Execute → Failure

Worker checks: count >= MAX_RETRIES (3)

Worker publishes to cron_scheduler_failed queue

Worker ACKs original message (removes from main queue)

Task permanently failed (logged and stored)

Configuration

Environment Variables

bash
# Maximum number of retry attempts (default: 3)
# Total executions = MAX_RETRIES + 1 (initial try + retries)
CRON_MAX_RETRIES=3

# Initial retry delay in milliseconds (default: 30000 = 30 seconds)
CRON_RETRY_DELAY_MS=30000

# Exponential backoff multiplier (default: 2.0)
# Delay formula: RETRY_DELAY_MS × MULTIPLIER^retry_count
CRON_RETRY_DELAY_MULTIPLIER=2.0

Configuration in Code

cron-worker.ts:

typescript
const RETRY_CONFIG = {
  MAX_RETRIES: parseInt(process.env.CRON_MAX_RETRIES || '3', 10),
  RETRY_DELAY_MS: parseInt(process.env.CRON_RETRY_DELAY_MS || '30000', 10),
  RETRY_DELAY_MULTIPLIER: parseFloat(process.env.CRON_RETRY_DELAY_MULTIPLIER || '2.0'),
};

Queue Configuration:

typescript
// Main queue with DLX
await ch.assertQueue('cron_scheduler', {
  durable: true,
  arguments: {
    'x-message-deduplication': true,
    'x-dead-letter-exchange': 'cron_dlx',
    'x-dead-letter-routing-key': 'retry',
  },
});

// Retry queue with TTL
await ch.assertQueue('cron_scheduler_retry', {
  durable: true,
  arguments: {
    'x-dead-letter-exchange': '',  // Default exchange
    'x-dead-letter-routing-key': 'cron_scheduler',
    'x-message-ttl': RETRY_CONFIG.RETRY_DELAY_MS,
  },
});

Customizing Retry Behavior

Example: More aggressive retries

bash
export CRON_MAX_RETRIES=5          # Try 5 times instead of 3
export CRON_RETRY_DELAY_MS=10000   # Start with 10 seconds
export CRON_RETRY_DELAY_MULTIPLIER=1.5  # Slower growth

Retry Schedule:

  • Retry 1: 10s × 1.5^0 = 10 seconds
  • Retry 2: 10s × 1.5^1 = 15 seconds
  • Retry 3: 10s × 1.5^2 = 22.5 seconds
  • Retry 4: 10s × 1.5^3 = 33.75 seconds
  • Retry 5: 10s × 1.5^4 = 50.625 seconds

Example: Conservative retries

bash
export CRON_MAX_RETRIES=2          # Only 2 retries
export CRON_RETRY_DELAY_MS=60000   # Start with 1 minute
export CRON_RETRY_DELAY_MULTIPLIER=3.0  # Aggressive growth

Retry Schedule:

  • Retry 1: 60s × 3^0 = 60 seconds (1 minute)
  • Retry 2: 60s × 3^1 = 180 seconds (3 minutes)

Message Flow

Normal Success Flow

┌──────────────┐
│ Task arrives │
└──────┬───────┘


┌──────────────┐
│   Execute    │
└──────┬───────┘


┌──────────────┐
│   SUCCESS    │
└──────┬───────┘


┌──────────────┐
│  ACK message │
└──────┬───────┘


┌──────────────────┐
│ Remove from queue│
└──────────────────┘

Retry Flow with Recovery

┌──────────────┐
│ Task arrives │
│ (x-death: 0) │
└──────┬───────┘


┌──────────────┐
│  Execute     │
└──────┬───────┘


┌──────────────┐
│   FAILURE    │
└──────┬───────┘


┌──────────────┐
│ NACK message │
└──────┬───────┘


┌────────────────────┐
│ Dead-letter to DLX │
└──────┬─────────────┘


┌────────────────────┐
│  Route to retry    │
│  queue (TTL: 30s)  │
└──────┬─────────────┘

       │ Wait 30 seconds...

┌────────────────────┐
│ TTL expired        │
│ Dead-letter back   │
│ (x-death: 1)       │
└──────┬─────────────┘


┌──────────────┐
│  Execute     │
└──────┬───────┘


┌──────────────┐
│   SUCCESS    │ (Network recovered!)
└──────┬───────┘


┌──────────────┐
│  ACK message │
└──────┬───────┘


┌──────────────────┐
│ Remove from queue│
└──────────────────┘

Permanent Failure Flow

┌──────────────┐
│ Task arrives │
│ (x-death: 3) │
└──────┬───────┘


┌──────────────┐
│  Execute     │
└──────┬───────┘


┌──────────────┐
│   FAILURE    │
└──────┬───────┘


┌────────────────────────┐
│ Check x-death count    │
│ count >= MAX_RETRIES   │
└──────┬─────────────────┘


┌────────────────────────┐
│ Publish to             │
│ cron_scheduler_failed  │
└──────┬─────────────────┘


┌────────────────────────┐
│ ACK original message   │
└──────┬─────────────────┘


┌────────────────────────┐
│ Log failure details    │
└──────┬─────────────────┘


┌────────────────────────┐
│ Alert/notification     │
└────────────────────────┘

x-death Header

Structure

RabbitMQ automatically manages the x-death header:

json
{
  "x-death": [
    {
      "count": 2,
      "reason": "rejected",
      "queue": "cron_scheduler",
      "time": "2025-12-12T12:01:33.000Z",
      "exchange": "cron_dlx",
      "routing-keys": ["retry"],
      "original-expiration": null
    }
  ]
}

Fields

  • count: Number of times the message has been dead-lettered from this queue
  • reason: Why it was dead-lettered ("rejected", "expired", "maxlen")
  • queue: Original queue name
  • time: ISO 8601 timestamp of last dead-letter event
  • exchange: Dead letter exchange used
  • routing-keys: Routing keys used for dead-lettering

Reading x-death in Worker

typescript
// Helper function to get retry count
const getRetryCount = (msg: any): number => {
  const xDeath = msg.properties.headers?.['x-death'];
  if (!xDeath || !Array.isArray(xDeath) || xDeath.length === 0) {
    return 0;  // First execution
  }
  return xDeath[0].count || 0;
};

// Usage
const retryCount = getRetryCount(msg);
console.log(`Retry attempt: ${retryCount}/${RETRY_CONFIG.MAX_RETRIES}`);

x-death Evolution

Initial Message (first execution):

json
{
  "properties": {
    "headers": {}  // No x-death header
  }
}

After First NACK:

json
{
  "properties": {
    "headers": {
      "x-death": [
        {
          "count": 1,
          "reason": "rejected",
          "queue": "cron_scheduler",
          "time": "2025-12-12T12:00:00.000Z"
        }
      ]
    }
  }
}

After Second NACK:

json
{
  "properties": {
    "headers": {
      "x-death": [
        {
          "count": 2,  // Incremented
          "reason": "rejected",
          "queue": "cron_scheduler",
          "time": "2025-12-12T12:00:30.000Z"  // Updated
        }
      ]
    }
  }
}

Exponential Backoff

Formula

delay = RETRY_DELAY_MS × MULTIPLIER^retry_count

Default Configuration

RETRY_DELAY_MS = 30000 (30 seconds)
MULTIPLIER = 2.0

Retry Schedule:

AttemptRetry CountCalculationDelayCumulative Time
Initial0N/AN/A0s
Retry 1130s × 2^030s30s
Retry 2230s × 2^160s90s
Retry 3330s × 2^2120s210s (3.5 min)

Why Exponential Backoff?

Without Backoff (constant 30s delay):

Attempt 1 → Wait 30s → Attempt 2 → Wait 30s → Attempt 3 → Wait 30s → ...
  • Overwhelms failing service with constant load
  • Doesn't give service time to recover
  • All retries cluster together

With Exponential Backoff:

Attempt 1 → Wait 30s → Attempt 2 → Wait 60s → Attempt 3 → Wait 120s → ...
  • Gives failing service progressively more time to recover
  • Reduces load on failing service
  • Spreads retries over time

Implementation in Worker

typescript
// Calculate next retry delay
const nextRetry = retryCount + 1;
const retryDelay = Math.round(
  RETRY_CONFIG.RETRY_DELAY_MS *
    Math.pow(RETRY_CONFIG.RETRY_DELAY_MULTIPLIER, retryCount)
);

CronLogger.warn(
  `Scheduling retry ${nextRetry}/${RETRY_CONFIG.MAX_RETRIES} in ${retryDelay}ms`,
  taskContext
);

// NACK to trigger dead-letter
ch.nack(msg, false, false);

Note: The delay is configured in the retry queue's TTL, so the worker just NACKs and RabbitMQ handles the timing.

Adjusting Backoff Strategy

Linear Backoff (MULTIPLIER = 1.0):

Retry 1: 30s × 1.0^0 = 30s
Retry 2: 30s × 1.0^1 = 30s
Retry 3: 30s × 1.0^2 = 30s

Aggressive Backoff (MULTIPLIER = 3.0):

Retry 1: 30s × 3.0^0 = 30s
Retry 2: 30s × 3.0^1 = 90s (1.5 min)
Retry 3: 30s × 3.0^2 = 270s (4.5 min)

Custom Backoff (code modification):

typescript
// Custom backoff: Fibonacci sequence
const fibonacciBackoff = (retry: number): number => {
  const fib = [30000, 30000, 60000, 90000, 150000];  // milliseconds
  return fib[Math.min(retry, fib.length - 1)];
};

const retryDelay = fibonacciBackoff(retryCount);

Failed Task Handling

Failed Queue Structure

cron_scheduler_failed Queue Configuration:

typescript
await ch.assertQueue('cron_scheduler_failed', {
  durable: true,  // Persists across RabbitMQ restarts
});

No special arguments - just durable storage.

Publishing to Failed Queue

typescript
const publishToFailed = async (msg: any, reason: string) => {
  const failedPayload = {
    ...JSON.parse(msg.content.toString()),
    failed_at: new Date().toISOString(),
    failed_reason: reason,
    retry_count: getRetryCount(msg),
  };

  ch.publish(
    'cron_dlx',
    'failed',  // Routing key
    Buffer.from(JSON.stringify(failedPayload)),
    {
      persistent: true,
      headers: msg.properties.headers,  // Preserve x-death header
    }
  );
};

Failed Message Format

json
{
  "type": "report",
  "id": 25,
  "scheduler_id": 16,
  "params_scheduler": "{...}",
  "failed_at": "2025-12-12T12:03:39.002Z",
  "failed_reason": "Connection timeout after 30s",
  "retry_count": 3
}

Monitoring Failed Queue

Worker Monitoring (automatic):

typescript
ch.consume(
  'cron_scheduler_failed',
  (msg: any) => {
    if (!msg) return;

    try {
      const failedTask = JSON.parse(msg.content.toString());
      CronLogger.error(
        '⚠️  PERMANENTLY FAILED TASK',
        undefined,
        {
          type: failedTask.type,
          id: failedTask.id,
          scheduler_id: failedTask.scheduler_id,
          retry_count: failedTask.retry_count,
          failed_at: failedTask.failed_at,
        }
      );
      CronLogger.debug(`Failed task details: ${JSON.stringify(failedTask, null, 2)}`);

      ch.ack(msg);  // Remove from failed queue after logging
    } catch (err) {
      CronLogger.error('Error processing failed queue message', err);
      ch.ack(msg);
    }
  },
  {noAck: false}
);

Example Log Output:

2025-12-12T12:03:39.003Z [ERROR] [type=report, id=25, scheduler_id=16, retry_count=3, failed_at=2025-12-12T12:03:39.002Z] ⚠️  PERMANENTLY FAILED TASK
2025-12-12T12:03:39.004Z [DEBUG] Failed task details: {
  "type": "report",
  "id": 25,
  "scheduler_id": 16,
  "params_scheduler": "...",
  "failed_at": "2025-12-12T12:03:39.002Z",
  "failed_reason": "Connection timeout",
  "retry_count": 3
}

Monitoring and Recovery

Checking Failed Tasks

Via RabbitMQ Admin:

bash
# List queue statistics
docker exec rabbitmq rabbitmqadmin list queues name messages

# Output:
# name                       messages
# cron_scheduler            0
# cron_scheduler_retry      0
# cron_scheduler_failed     5   ← 5 failed tasks!

Get failed messages:

bash
docker exec rabbitmq rabbitmqadmin get queue=cron_scheduler_failed count=10

# Output shows:
# - payload: Full message
# - routing_key: "failed"
# - properties: Headers including x-death

Manual Recovery

Option 1: Republish to Main Queue

bash
# Get failed message
docker exec rabbitmq rabbitmqadmin get queue=cron_scheduler_failed requeue=false > failed_msg.json

# Extract payload
PAYLOAD=$(cat failed_msg.json | jq -r '.payload')

# Republish to main queue
docker exec rabbitmq rabbitmqadmin publish \
  exchange=amq.default \
  routing_key=cron_scheduler \
  payload="$PAYLOAD"

Option 2: Fix Issue and Re-enable Scheduler

sql
-- Failed due to bad configuration
-- Fix the issue first, then re-enable scheduler
UPDATE Scheduler
SET status = 1,
    params = '{"fixed": "configuration"}'
WHERE id = 16;

The scheduler will execute on next cron evaluation.

Option 3: Manually Execute Report

http
POST /reports/generate
Content-Type: application/json

{
  "controller": "RefreshUserToken",
  "options_interface": {
    "delivery": {
      "instant": true
    }
  }
}

Alerting on Failures

Script: Monitor Failed Queue

bash
#!/bin/bash
# monitor-failed-queue.sh

THRESHOLD=5

failed_count=$(docker exec rabbitmq rabbitmqadmin list queues -f tsv | \
  grep cron_scheduler_failed | \
  awk '{print $2}')

if [ "$failed_count" -gt "$THRESHOLD" ]; then
  echo "ALERT: $failed_count failed tasks (threshold: $THRESHOLD)"

  # Send notification
  curl -X POST https://your-alert-endpoint.com/alert \
    -H "Content-Type: application/json" \
    -d "{\"message\": \"$failed_count failed cron tasks\"}"
fi

Cron Job: Run every 5 minutes

bash
*/5 * * * * /path/to/monitor-failed-queue.sh

Database Correlation

Query database to find failed scheduler details:

sql
-- Failed message contains scheduler_id: 16
-- Get scheduler details
SELECT
  s.id,
  s.description,
  s.schedule,
  s.params,
  r.controller as report_name,
  u.email as created_by
FROM Scheduler s
LEFT JOIN Report r ON r.id = s.report_id
LEFT JOIN User u ON u.id = s.user_id_created
WHERE s.id = 16;

Best Practices

1. Design Idempotent Operations

Ensure tasks can be safely retried:

typescript
// ❌ BAD: Not idempotent
async refreshTokens() {
  await this.db.query('UPDATE users SET token_refresh_count = token_refresh_count + 1');
}

// ✅ GOOD: Idempotent
async refreshTokens() {
  const today = new Date().toISOString().split('T')[0];
  await this.db.query(
    'INSERT INTO token_refreshes (date, count) VALUES (?, 1) ' +
    'ON DUPLICATE KEY UPDATE count = 1',
    [today]
  );
}

2. Classify Error Types

Handle transient vs permanent errors differently:

typescript
async mainFunction() {
  try {
    await this.externalAPI.fetch();
  } catch (err) {
    // Transient errors: Throw to trigger retry
    if (err.code === 'ECONNREFUSED' ||
        err.code === 'ETIMEDOUT' ||
        err.code === 'ENOTFOUND') {
      await this.log(LogType.Warn, `Transient error: ${err.message} - will retry`);
      throw err;  // Triggers retry
    }

    // Permanent errors: Don't throw (avoid useless retries)
    if (err.code === 'AUTH_INVALID' ||
        err.code === 'RESOURCE_NOT_FOUND') {
      await this.log(LogType.Error, `Permanent error: ${err.message} - manual intervention required`);
      return;  // Don't throw - success to prevent retry
    }

    // Unknown errors: Throw to retry (better safe than sorry)
    throw err;
  }
}

3. Log Retry Information

typescript
async mainFunction() {
  const retryInfo = process.env.RETRY_COUNT || '0';
  await this.log(LogType.Info, `Starting execution (retry: ${retryInfo})`);

  try {
    // ... execution logic
  } catch (err) {
    await this.log(LogType.Error, `Execution failed (retry: ${retryInfo}): ${err.message}`);
    throw err;
  }
}

4. Use Appropriate Retry Configuration

Fast-failing tasks (API calls):

bash
CRON_MAX_RETRIES=3
CRON_RETRY_DELAY_MS=10000  # 10 seconds
CRON_RETRY_DELAY_MULTIPLIER=2.0

Slow-recovering tasks (database connection):

bash
CRON_MAX_RETRIES=5
CRON_RETRY_DELAY_MS=60000  # 1 minute
CRON_RETRY_DELAY_MULTIPLIER=1.5

5. Monitor Retry Patterns

Track retry rates to identify systemic issues:

sql
-- Analyze retry patterns from logs
SELECT
  DATE(failed_at) as date,
  type,
  COUNT(*) as failed_count,
  AVG(retry_count) as avg_retries
FROM (
  -- Parse failed messages from RabbitMQ or logs
  ...
)
GROUP BY DATE(failed_at), type
ORDER BY date DESC;

6. Set Up Alerting

Threshold-based alerts:

  • Alert if failed queue > 10 messages
  • Alert if any single task retries > 2 times
  • Alert if retry rate > 20% of total executions

Alerting service integration:

typescript
// In cron-worker.ts
if (retryCount >= RETRY_CONFIG.MAX_RETRIES) {
  // Send alert
  await alertingService.sendAlert({
    severity: 'high',
    message: `Task permanently failed after ${retryCount} retries`,
    task: payload,
  });
}

7. Implement Circuit Breaker

Stop retrying if error rate is too high:

typescript
// Pseudocode
const errorRate = await getRecentErrorRate();  // e.g., 0.8 (80%)
const threshold = 0.5;  // 50%

if (errorRate > threshold) {
  CronLogger.error('Circuit breaker: Error rate too high, stopping retries');
  await publishToFailed(msg, 'Circuit breaker activated');
  ch.ack(msg);
  return;
}

8. Document Retry Behavior

Add retry information to scheduler descriptions:

json
{
  "description": "Daily token refresh (retries: 3, delay: 30s-120s)",
  "schedule": "0 8 * * *",
  "params": {
    "controller": "RefreshUserToken"
  }
}

9. Test Retry Logic

Simulate failures:

typescript
// In report class
async mainFunction() {
  const simulateFailure = process.env.SIMULATE_FAILURE === 'true';

  if (simulateFailure) {
    throw new Error('Simulated failure for testing');
  }

  // Normal execution
  await this.doWork();
}

Run test:

bash
SIMULATE_FAILURE=true node ./dist/services/reports/TestReport/TestReport.js

10. Regular Failed Queue Review

Schedule regular reviews of failed tasks:

bash
# Weekly review script
#!/bin/bash
echo "=== Failed Tasks Report (Last 7 Days) ==="
docker exec rabbitmq rabbitmqadmin get queue=cron_scheduler_failed count=100 > failed_tasks.json

# Parse and analyze
jq '.[] | {type, id, scheduler_id, failed_reason}' failed_tasks.json | \
  jq -s 'group_by(.failed_reason) | map({reason: .[0].failed_reason, count: length})'

Syneo/Barcoding Documentation