Cron Worker Retry Mechanism
Table of Contents
- Overview
- How It Works
- Configuration
- Message Flow
- x-death Header
- Exponential Backoff
- Failed Task Handling
- Monitoring and Recovery
- Best Practices
Overview
The cron worker implements a sophisticated retry mechanism using RabbitMQ's Dead Letter Exchange (DLX) pattern. This ensures that transient failures don't result in lost tasks while preventing infinite retry loops.
Key Features
- Automatic Retry: Failed tasks automatically retry without manual intervention
- Exponential Backoff: Increasing delays between retries to avoid overwhelming failing services
- Retry Limit: Maximum retry attempts prevent infinite loops
- Failed Queue: Permanently failed tasks stored for investigation
- x-death Tracking: RabbitMQ's built-in header tracks retry count
- No Code Changes: Retry logic handled entirely by RabbitMQ and worker configuration
How It Works
Architecture
┌────────────────────────────────────────────────────────────────────────┐
│ RETRY MECHANISM ARCHITECTURE │
└────────────────────────────────────────────────────────────────────────┘
1. TASK EXECUTION ATTEMPT
[cron_scheduler] ← Main queue
│
│ consume
↓
[cron-worker] → Execute task
│
┌────┴─────┐
│ │
SUCCESS│ │FAILURE
│ │
↓ ↓
[ACK] [NACK]
│ │
│ │ (requeue: false)
│ │ Triggers dead-letter
│ ↓
│ [cron_dlx] ← Dead Letter Exchange
│ │
│ │ Routes based on retry count
│ │
│ ┌────┴────┐
│ │ │
│ retry < MAX retry >= MAX
│ │ │
│ ↓ ↓
│ [cron_scheduler_retry] [cron_scheduler_failed]
│ │ │
│ │ TTL: 30s-120s │
│ │ (waits...) │
│ │ │
│ │ After TTL expires: │
│ │ Dead-letter back │
│ │ to main queue │
│ ↓ ↓
│ [cron_scheduler] ← retry [Failed storage]
│ │
└─────┘ (loop)
2. RETRY COUNT TRACKING
RabbitMQ x-death header:
{
"x-death": [{
"count": 2, ← Number of times dead-lettered
"reason": "rejected",
"queue": "cron_scheduler",
"time": "2025-12-12T...",
"exchange": "cron_dlx",
"routing-keys": ["retry"]
}]
}
3. RETRY DECISION
Worker reads x-death[0].count
│
├─ count = 0: First execution
├─ count = 1: First retry
├─ count = 2: Second retry
└─ count = 3: Third retry (max reached)
│
└→ Publish to failed queueStep-by-Step Flow
Step 1: Initial Execution
Task arrives → Execute → Failure
↓
Worker checks x-death header: count = 0 (first try)
↓
Worker NACKs message (requeue: false)
↓
RabbitMQ dead-letters message to cron_dlx exchange
↓
DLX routes to cron_scheduler_retry queue (routing key: "retry")
↓
Message waits in retry queue for TTL (30 seconds)Step 2: First Retry
TTL expires → Message dead-lettered back to cron_scheduler
↓
x-death header updated: count = 1
↓
Worker consumes → Execute → Failure (again)
↓
Worker checks x-death: count = 1 (first retry)
↓
Worker NACKs message
↓
RabbitMQ dead-letters to retry queue
↓
Wait 60 seconds (exponential backoff: 30s × 2^1)Step 3: Second Retry
TTL expires → Back to main queue
↓
x-death header: count = 2
↓
Execute → Failure
↓
NACK → Retry queue
↓
Wait 120 seconds (30s × 2^2)Step 4: Max Retries Exceeded
TTL expires → Back to main queue
↓
x-death header: count = 3
↓
Execute → Failure
↓
Worker checks: count >= MAX_RETRIES (3)
↓
Worker publishes to cron_scheduler_failed queue
↓
Worker ACKs original message (removes from main queue)
↓
Task permanently failed (logged and stored)Configuration
Environment Variables
# Maximum number of retry attempts (default: 3)
# Total executions = MAX_RETRIES + 1 (initial try + retries)
CRON_MAX_RETRIES=3
# Initial retry delay in milliseconds (default: 30000 = 30 seconds)
CRON_RETRY_DELAY_MS=30000
# Exponential backoff multiplier (default: 2.0)
# Delay formula: RETRY_DELAY_MS × MULTIPLIER^retry_count
CRON_RETRY_DELAY_MULTIPLIER=2.0Configuration in Code
cron-worker.ts:
const RETRY_CONFIG = {
MAX_RETRIES: parseInt(process.env.CRON_MAX_RETRIES || '3', 10),
RETRY_DELAY_MS: parseInt(process.env.CRON_RETRY_DELAY_MS || '30000', 10),
RETRY_DELAY_MULTIPLIER: parseFloat(process.env.CRON_RETRY_DELAY_MULTIPLIER || '2.0'),
};Queue Configuration:
// Main queue with DLX
await ch.assertQueue('cron_scheduler', {
durable: true,
arguments: {
'x-message-deduplication': true,
'x-dead-letter-exchange': 'cron_dlx',
'x-dead-letter-routing-key': 'retry',
},
});
// Retry queue with TTL
await ch.assertQueue('cron_scheduler_retry', {
durable: true,
arguments: {
'x-dead-letter-exchange': '', // Default exchange
'x-dead-letter-routing-key': 'cron_scheduler',
'x-message-ttl': RETRY_CONFIG.RETRY_DELAY_MS,
},
});Customizing Retry Behavior
Example: More aggressive retries
export CRON_MAX_RETRIES=5 # Try 5 times instead of 3
export CRON_RETRY_DELAY_MS=10000 # Start with 10 seconds
export CRON_RETRY_DELAY_MULTIPLIER=1.5 # Slower growthRetry Schedule:
- Retry 1: 10s × 1.5^0 = 10 seconds
- Retry 2: 10s × 1.5^1 = 15 seconds
- Retry 3: 10s × 1.5^2 = 22.5 seconds
- Retry 4: 10s × 1.5^3 = 33.75 seconds
- Retry 5: 10s × 1.5^4 = 50.625 seconds
Example: Conservative retries
export CRON_MAX_RETRIES=2 # Only 2 retries
export CRON_RETRY_DELAY_MS=60000 # Start with 1 minute
export CRON_RETRY_DELAY_MULTIPLIER=3.0 # Aggressive growthRetry Schedule:
- Retry 1: 60s × 3^0 = 60 seconds (1 minute)
- Retry 2: 60s × 3^1 = 180 seconds (3 minutes)
Message Flow
Normal Success Flow
┌──────────────┐
│ Task arrives │
└──────┬───────┘
│
↓
┌──────────────┐
│ Execute │
└──────┬───────┘
│
↓
┌──────────────┐
│ SUCCESS │
└──────┬───────┘
│
↓
┌──────────────┐
│ ACK message │
└──────┬───────┘
│
↓
┌──────────────────┐
│ Remove from queue│
└──────────────────┘Retry Flow with Recovery
┌──────────────┐
│ Task arrives │
│ (x-death: 0) │
└──────┬───────┘
│
↓
┌──────────────┐
│ Execute │
└──────┬───────┘
│
↓
┌──────────────┐
│ FAILURE │
└──────┬───────┘
│
↓
┌──────────────┐
│ NACK message │
└──────┬───────┘
│
↓
┌────────────────────┐
│ Dead-letter to DLX │
└──────┬─────────────┘
│
↓
┌────────────────────┐
│ Route to retry │
│ queue (TTL: 30s) │
└──────┬─────────────┘
│
│ Wait 30 seconds...
↓
┌────────────────────┐
│ TTL expired │
│ Dead-letter back │
│ (x-death: 1) │
└──────┬─────────────┘
│
↓
┌──────────────┐
│ Execute │
└──────┬───────┘
│
↓
┌──────────────┐
│ SUCCESS │ (Network recovered!)
└──────┬───────┘
│
↓
┌──────────────┐
│ ACK message │
└──────┬───────┘
│
↓
┌──────────────────┐
│ Remove from queue│
└──────────────────┘Permanent Failure Flow
┌──────────────┐
│ Task arrives │
│ (x-death: 3) │
└──────┬───────┘
│
↓
┌──────────────┐
│ Execute │
└──────┬───────┘
│
↓
┌──────────────┐
│ FAILURE │
└──────┬───────┘
│
↓
┌────────────────────────┐
│ Check x-death count │
│ count >= MAX_RETRIES │
└──────┬─────────────────┘
│
↓
┌────────────────────────┐
│ Publish to │
│ cron_scheduler_failed │
└──────┬─────────────────┘
│
↓
┌────────────────────────┐
│ ACK original message │
└──────┬─────────────────┘
│
↓
┌────────────────────────┐
│ Log failure details │
└──────┬─────────────────┘
│
↓
┌────────────────────────┐
│ Alert/notification │
└────────────────────────┘x-death Header
Structure
RabbitMQ automatically manages the x-death header:
{
"x-death": [
{
"count": 2,
"reason": "rejected",
"queue": "cron_scheduler",
"time": "2025-12-12T12:01:33.000Z",
"exchange": "cron_dlx",
"routing-keys": ["retry"],
"original-expiration": null
}
]
}Fields
- count: Number of times the message has been dead-lettered from this queue
- reason: Why it was dead-lettered ("rejected", "expired", "maxlen")
- queue: Original queue name
- time: ISO 8601 timestamp of last dead-letter event
- exchange: Dead letter exchange used
- routing-keys: Routing keys used for dead-lettering
Reading x-death in Worker
// Helper function to get retry count
const getRetryCount = (msg: any): number => {
const xDeath = msg.properties.headers?.['x-death'];
if (!xDeath || !Array.isArray(xDeath) || xDeath.length === 0) {
return 0; // First execution
}
return xDeath[0].count || 0;
};
// Usage
const retryCount = getRetryCount(msg);
console.log(`Retry attempt: ${retryCount}/${RETRY_CONFIG.MAX_RETRIES}`);x-death Evolution
Initial Message (first execution):
{
"properties": {
"headers": {} // No x-death header
}
}After First NACK:
{
"properties": {
"headers": {
"x-death": [
{
"count": 1,
"reason": "rejected",
"queue": "cron_scheduler",
"time": "2025-12-12T12:00:00.000Z"
}
]
}
}
}After Second NACK:
{
"properties": {
"headers": {
"x-death": [
{
"count": 2, // Incremented
"reason": "rejected",
"queue": "cron_scheduler",
"time": "2025-12-12T12:00:30.000Z" // Updated
}
]
}
}
}Exponential Backoff
Formula
delay = RETRY_DELAY_MS × MULTIPLIER^retry_countDefault Configuration
RETRY_DELAY_MS = 30000 (30 seconds)
MULTIPLIER = 2.0Retry Schedule:
| Attempt | Retry Count | Calculation | Delay | Cumulative Time |
|---|---|---|---|---|
| Initial | 0 | N/A | N/A | 0s |
| Retry 1 | 1 | 30s × 2^0 | 30s | 30s |
| Retry 2 | 2 | 30s × 2^1 | 60s | 90s |
| Retry 3 | 3 | 30s × 2^2 | 120s | 210s (3.5 min) |
Why Exponential Backoff?
Without Backoff (constant 30s delay):
Attempt 1 → Wait 30s → Attempt 2 → Wait 30s → Attempt 3 → Wait 30s → ...- Overwhelms failing service with constant load
- Doesn't give service time to recover
- All retries cluster together
With Exponential Backoff:
Attempt 1 → Wait 30s → Attempt 2 → Wait 60s → Attempt 3 → Wait 120s → ...- Gives failing service progressively more time to recover
- Reduces load on failing service
- Spreads retries over time
Implementation in Worker
// Calculate next retry delay
const nextRetry = retryCount + 1;
const retryDelay = Math.round(
RETRY_CONFIG.RETRY_DELAY_MS *
Math.pow(RETRY_CONFIG.RETRY_DELAY_MULTIPLIER, retryCount)
);
CronLogger.warn(
`Scheduling retry ${nextRetry}/${RETRY_CONFIG.MAX_RETRIES} in ${retryDelay}ms`,
taskContext
);
// NACK to trigger dead-letter
ch.nack(msg, false, false);Note: The delay is configured in the retry queue's TTL, so the worker just NACKs and RabbitMQ handles the timing.
Adjusting Backoff Strategy
Linear Backoff (MULTIPLIER = 1.0):
Retry 1: 30s × 1.0^0 = 30s
Retry 2: 30s × 1.0^1 = 30s
Retry 3: 30s × 1.0^2 = 30sAggressive Backoff (MULTIPLIER = 3.0):
Retry 1: 30s × 3.0^0 = 30s
Retry 2: 30s × 3.0^1 = 90s (1.5 min)
Retry 3: 30s × 3.0^2 = 270s (4.5 min)Custom Backoff (code modification):
// Custom backoff: Fibonacci sequence
const fibonacciBackoff = (retry: number): number => {
const fib = [30000, 30000, 60000, 90000, 150000]; // milliseconds
return fib[Math.min(retry, fib.length - 1)];
};
const retryDelay = fibonacciBackoff(retryCount);Failed Task Handling
Failed Queue Structure
cron_scheduler_failed Queue Configuration:
await ch.assertQueue('cron_scheduler_failed', {
durable: true, // Persists across RabbitMQ restarts
});No special arguments - just durable storage.
Publishing to Failed Queue
const publishToFailed = async (msg: any, reason: string) => {
const failedPayload = {
...JSON.parse(msg.content.toString()),
failed_at: new Date().toISOString(),
failed_reason: reason,
retry_count: getRetryCount(msg),
};
ch.publish(
'cron_dlx',
'failed', // Routing key
Buffer.from(JSON.stringify(failedPayload)),
{
persistent: true,
headers: msg.properties.headers, // Preserve x-death header
}
);
};Failed Message Format
{
"type": "report",
"id": 25,
"scheduler_id": 16,
"params_scheduler": "{...}",
"failed_at": "2025-12-12T12:03:39.002Z",
"failed_reason": "Connection timeout after 30s",
"retry_count": 3
}Monitoring Failed Queue
Worker Monitoring (automatic):
ch.consume(
'cron_scheduler_failed',
(msg: any) => {
if (!msg) return;
try {
const failedTask = JSON.parse(msg.content.toString());
CronLogger.error(
'⚠️ PERMANENTLY FAILED TASK',
undefined,
{
type: failedTask.type,
id: failedTask.id,
scheduler_id: failedTask.scheduler_id,
retry_count: failedTask.retry_count,
failed_at: failedTask.failed_at,
}
);
CronLogger.debug(`Failed task details: ${JSON.stringify(failedTask, null, 2)}`);
ch.ack(msg); // Remove from failed queue after logging
} catch (err) {
CronLogger.error('Error processing failed queue message', err);
ch.ack(msg);
}
},
{noAck: false}
);Example Log Output:
2025-12-12T12:03:39.003Z [ERROR] [type=report, id=25, scheduler_id=16, retry_count=3, failed_at=2025-12-12T12:03:39.002Z] ⚠️ PERMANENTLY FAILED TASK
2025-12-12T12:03:39.004Z [DEBUG] Failed task details: {
"type": "report",
"id": 25,
"scheduler_id": 16,
"params_scheduler": "...",
"failed_at": "2025-12-12T12:03:39.002Z",
"failed_reason": "Connection timeout",
"retry_count": 3
}Monitoring and Recovery
Checking Failed Tasks
Via RabbitMQ Admin:
# List queue statistics
docker exec rabbitmq rabbitmqadmin list queues name messages
# Output:
# name messages
# cron_scheduler 0
# cron_scheduler_retry 0
# cron_scheduler_failed 5 ← 5 failed tasks!Get failed messages:
docker exec rabbitmq rabbitmqadmin get queue=cron_scheduler_failed count=10
# Output shows:
# - payload: Full message
# - routing_key: "failed"
# - properties: Headers including x-deathManual Recovery
Option 1: Republish to Main Queue
# Get failed message
docker exec rabbitmq rabbitmqadmin get queue=cron_scheduler_failed requeue=false > failed_msg.json
# Extract payload
PAYLOAD=$(cat failed_msg.json | jq -r '.payload')
# Republish to main queue
docker exec rabbitmq rabbitmqadmin publish \
exchange=amq.default \
routing_key=cron_scheduler \
payload="$PAYLOAD"Option 2: Fix Issue and Re-enable Scheduler
-- Failed due to bad configuration
-- Fix the issue first, then re-enable scheduler
UPDATE Scheduler
SET status = 1,
params = '{"fixed": "configuration"}'
WHERE id = 16;The scheduler will execute on next cron evaluation.
Option 3: Manually Execute Report
POST /reports/generate
Content-Type: application/json
{
"controller": "RefreshUserToken",
"options_interface": {
"delivery": {
"instant": true
}
}
}Alerting on Failures
Script: Monitor Failed Queue
#!/bin/bash
# monitor-failed-queue.sh
THRESHOLD=5
failed_count=$(docker exec rabbitmq rabbitmqadmin list queues -f tsv | \
grep cron_scheduler_failed | \
awk '{print $2}')
if [ "$failed_count" -gt "$THRESHOLD" ]; then
echo "ALERT: $failed_count failed tasks (threshold: $THRESHOLD)"
# Send notification
curl -X POST https://your-alert-endpoint.com/alert \
-H "Content-Type: application/json" \
-d "{\"message\": \"$failed_count failed cron tasks\"}"
fiCron Job: Run every 5 minutes
*/5 * * * * /path/to/monitor-failed-queue.shDatabase Correlation
Query database to find failed scheduler details:
-- Failed message contains scheduler_id: 16
-- Get scheduler details
SELECT
s.id,
s.description,
s.schedule,
s.params,
r.controller as report_name,
u.email as created_by
FROM Scheduler s
LEFT JOIN Report r ON r.id = s.report_id
LEFT JOIN User u ON u.id = s.user_id_created
WHERE s.id = 16;Best Practices
1. Design Idempotent Operations
Ensure tasks can be safely retried:
// ❌ BAD: Not idempotent
async refreshTokens() {
await this.db.query('UPDATE users SET token_refresh_count = token_refresh_count + 1');
}
// ✅ GOOD: Idempotent
async refreshTokens() {
const today = new Date().toISOString().split('T')[0];
await this.db.query(
'INSERT INTO token_refreshes (date, count) VALUES (?, 1) ' +
'ON DUPLICATE KEY UPDATE count = 1',
[today]
);
}2. Classify Error Types
Handle transient vs permanent errors differently:
async mainFunction() {
try {
await this.externalAPI.fetch();
} catch (err) {
// Transient errors: Throw to trigger retry
if (err.code === 'ECONNREFUSED' ||
err.code === 'ETIMEDOUT' ||
err.code === 'ENOTFOUND') {
await this.log(LogType.Warn, `Transient error: ${err.message} - will retry`);
throw err; // Triggers retry
}
// Permanent errors: Don't throw (avoid useless retries)
if (err.code === 'AUTH_INVALID' ||
err.code === 'RESOURCE_NOT_FOUND') {
await this.log(LogType.Error, `Permanent error: ${err.message} - manual intervention required`);
return; // Don't throw - success to prevent retry
}
// Unknown errors: Throw to retry (better safe than sorry)
throw err;
}
}3. Log Retry Information
async mainFunction() {
const retryInfo = process.env.RETRY_COUNT || '0';
await this.log(LogType.Info, `Starting execution (retry: ${retryInfo})`);
try {
// ... execution logic
} catch (err) {
await this.log(LogType.Error, `Execution failed (retry: ${retryInfo}): ${err.message}`);
throw err;
}
}4. Use Appropriate Retry Configuration
Fast-failing tasks (API calls):
CRON_MAX_RETRIES=3
CRON_RETRY_DELAY_MS=10000 # 10 seconds
CRON_RETRY_DELAY_MULTIPLIER=2.0Slow-recovering tasks (database connection):
CRON_MAX_RETRIES=5
CRON_RETRY_DELAY_MS=60000 # 1 minute
CRON_RETRY_DELAY_MULTIPLIER=1.55. Monitor Retry Patterns
Track retry rates to identify systemic issues:
-- Analyze retry patterns from logs
SELECT
DATE(failed_at) as date,
type,
COUNT(*) as failed_count,
AVG(retry_count) as avg_retries
FROM (
-- Parse failed messages from RabbitMQ or logs
...
)
GROUP BY DATE(failed_at), type
ORDER BY date DESC;6. Set Up Alerting
Threshold-based alerts:
- Alert if failed queue > 10 messages
- Alert if any single task retries > 2 times
- Alert if retry rate > 20% of total executions
Alerting service integration:
// In cron-worker.ts
if (retryCount >= RETRY_CONFIG.MAX_RETRIES) {
// Send alert
await alertingService.sendAlert({
severity: 'high',
message: `Task permanently failed after ${retryCount} retries`,
task: payload,
});
}7. Implement Circuit Breaker
Stop retrying if error rate is too high:
// Pseudocode
const errorRate = await getRecentErrorRate(); // e.g., 0.8 (80%)
const threshold = 0.5; // 50%
if (errorRate > threshold) {
CronLogger.error('Circuit breaker: Error rate too high, stopping retries');
await publishToFailed(msg, 'Circuit breaker activated');
ch.ack(msg);
return;
}8. Document Retry Behavior
Add retry information to scheduler descriptions:
{
"description": "Daily token refresh (retries: 3, delay: 30s-120s)",
"schedule": "0 8 * * *",
"params": {
"controller": "RefreshUserToken"
}
}9. Test Retry Logic
Simulate failures:
// In report class
async mainFunction() {
const simulateFailure = process.env.SIMULATE_FAILURE === 'true';
if (simulateFailure) {
throw new Error('Simulated failure for testing');
}
// Normal execution
await this.doWork();
}Run test:
SIMULATE_FAILURE=true node ./dist/services/reports/TestReport/TestReport.js10. Regular Failed Queue Review
Schedule regular reviews of failed tasks:
# Weekly review script
#!/bin/bash
echo "=== Failed Tasks Report (Last 7 Days) ==="
docker exec rabbitmq rabbitmqadmin get queue=cron_scheduler_failed count=100 > failed_tasks.json
# Parse and analyze
jq '.[] | {type, id, scheduler_id, failed_reason}' failed_tasks.json | \
jq -s 'group_by(.failed_reason) | map({reason: .[0].failed_reason, count: length})'Related Documentation
- Cron Worker System - System architecture
- Report Scheduling - Creating schedulers