← Back to all products
$29
Batch Processor
Configurable batch-job processing framework: reads work items, runs a transform pipeline in batches across a concurrent worker pool, with retry, backoff, and a dead-letter queue.
JSONMarkdownPython
📄 Product Preview
Try the interactive reader and demo tools below, or get the full product with all content unlocked.
📖 Interactive Reader (Free Preview) ⚙ Try Demo Tools 📦 Download Free Sample📁 File Structure 10 files
batch-processor/
├── LICENSE
├── README.md
├── examples/
│ └── batch_processor_config.json
├── free-sample.zip
├── guide/
│ ├── 01_features.md
│ ├── 02_quick-start.md
│ ├── 03_output.md
│ └── 04_license.md
├── index.html
└── src/
└── batch_processor.py
📖 Documentation Preview README excerpt
Batch Processor
A configurable batch-job processing framework built on Python stdlib. Reads work items from an input source, runs them through a transform pipeline in configurable-size batches across a concurrent worker pool, with retry + exponential backoff and a dead-letter queue for permanently failed items.
Features
- Concurrent worker pool — ThreadPoolExecutor with a configurable
max_workerslimit - Configurable batch sizes — Split large item sets into fixed-size batches for controlled throughput
- Retry with exponential backoff — Failed items are retried up to N times with increasing delays
- Dead-letter queue — Items that exhaust all retries are captured with full error context for inspection
- Transform pipeline — Chain cast, validate, map, rename, and enrich transforms in any order
- Multiple input sources — Inline items in config, JSON files on disk, or generated demo data
- Execution stats — Tracks submitted, succeeded, failed, dead-lettered, retries, and duration
- Progress callbacks — Real-time per-item completion logging
- JSON config support — Define complete jobs in a single config file for repeatable automation
Requirements
- Python 3.10+
- No external dependencies (stdlib only)
Quick Start
# Show all available options
python src/batch_processor.py --help
# Run the example job (processes 8 inline orders, 2 will fail validation)
python src/batch_processor.py --config examples/batch_processor_config.json
# Override workers and batch size from the CLI
python src/batch_processor.py --config examples/batch_processor_config.json --workers 4 --batch-size 5
# Save stats to a file
python src/batch_processor.py --config examples/batch_processor_config.json --stats-file run_stats.json
# Verbose debug logging (shows every retry attempt)
python src/batch_processor.py --config examples/batch_processor_config.json --log-level DEBUG
Configuration Reference
CLI Options
| Flag | Default | Description |
|---|---|---|
--config, -c | (required) | Batch job config file (JSON) |
--workers, -w | 4 | Max concurrent workers (overrides config) |
--batch-size, -b | 10 | Items per batch (overrides config) |
--stats-file | — | Write execution stats JSON to this file |
--log-level | INFO | Logging level (DEBUG, INFO, WARNING, ERROR) |
Config File Schema
{
"name": "job_name",
"source": {
"type": "inline | json_file | generated",
"id_field": "order_id",
*... continues with setup instructions, usage examples, and more.*
📄 Code Sample .py preview
src/batch_processor.py
#!/usr/bin/env python3
"""
Batch Processor — Automation Hub (DataNest)
A configurable batch-job processing framework built on Python stdlib.
Reads work items from an input source, runs them through a transform pipeline
in configurable-size batches across a concurrent worker pool (ThreadPoolExecutor),
with retry + exponential backoff and a dead-letter queue for permanently failed
items. Tracks execution stats and supports completion callbacks.
Usage:
python batch_processor.py --config batch_processor_config.json
python batch_processor.py --config job.json --workers 8 --batch-size 50
python batch_processor.py --config job.json --log-level DEBUG
Dependencies: Python 3.10+ stdlib only (no pip packages)
License: MIT
"""
from __future__ import annotations
import argparse
import hashlib
import json
import logging
import random
import time
from abc import ABC, abstractmethod
from concurrent.futures import Future, ThreadPoolExecutor, as_completed
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Any, Callable
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s: %(message)s"
DEFAULT_MAX_WORKERS = 4
DEFAULT_BATCH_SIZE = 10
DEFAULT_MAX_RETRIES = 3
DEFAULT_BASE_DELAY = 0.5
DEFAULT_BACKOFF_FACTOR = 2.0
logger = logging.getLogger("batch_processor")
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
# ... 659 more lines ...