← Back to all products

Batch Processor

$29

Configurable batch-job processing framework: reads work items, runs a transform pipeline in batches across a concurrent worker pool, with retry, backoff, and a dead-letter queue.

📁 10 files
JSONMarkdownPython

📄 Product Preview

Try the interactive reader and demo tools below, or get the full product with all content unlocked.

📖 Interactive Reader (Free Preview) ⚙ Try Demo Tools 📦 Download Free Sample

📁 File Structure 10 files

batch-processor/ ├── LICENSE ├── README.md ├── examples/ │ └── batch_processor_config.json ├── free-sample.zip ├── guide/ │ ├── 01_features.md │ ├── 02_quick-start.md │ ├── 03_output.md │ └── 04_license.md ├── index.html └── src/ └── batch_processor.py

📖 Documentation Preview README excerpt

Batch Processor

A configurable batch-job processing framework built on Python stdlib. Reads work items from an input source, runs them through a transform pipeline in configurable-size batches across a concurrent worker pool, with retry + exponential backoff and a dead-letter queue for permanently failed items.

Features

  • Concurrent worker pool — ThreadPoolExecutor with a configurable max_workers limit
  • Configurable batch sizes — Split large item sets into fixed-size batches for controlled throughput
  • Retry with exponential backoff — Failed items are retried up to N times with increasing delays
  • Dead-letter queue — Items that exhaust all retries are captured with full error context for inspection
  • Transform pipeline — Chain cast, validate, map, rename, and enrich transforms in any order
  • Multiple input sources — Inline items in config, JSON files on disk, or generated demo data
  • Execution stats — Tracks submitted, succeeded, failed, dead-lettered, retries, and duration
  • Progress callbacks — Real-time per-item completion logging
  • JSON config support — Define complete jobs in a single config file for repeatable automation

Requirements

  • Python 3.10+
  • No external dependencies (stdlib only)

Quick Start


# Show all available options
python src/batch_processor.py --help

# Run the example job (processes 8 inline orders, 2 will fail validation)
python src/batch_processor.py --config examples/batch_processor_config.json

# Override workers and batch size from the CLI
python src/batch_processor.py --config examples/batch_processor_config.json --workers 4 --batch-size 5

# Save stats to a file
python src/batch_processor.py --config examples/batch_processor_config.json --stats-file run_stats.json

# Verbose debug logging (shows every retry attempt)
python src/batch_processor.py --config examples/batch_processor_config.json --log-level DEBUG

Configuration Reference

CLI Options

FlagDefaultDescription
--config, -c(required)Batch job config file (JSON)
--workers, -w4Max concurrent workers (overrides config)
--batch-size, -b10Items per batch (overrides config)
--stats-fileWrite execution stats JSON to this file
--log-levelINFOLogging level (DEBUG, INFO, WARNING, ERROR)

Config File Schema


{
    "name": "job_name",
    "source": {
        "type": "inline | json_file | generated",
        "id_field": "order_id",

*... continues with setup instructions, usage examples, and more.*

📄 Code Sample .py preview

src/batch_processor.py #!/usr/bin/env python3 """ Batch Processor — Automation Hub (DataNest) A configurable batch-job processing framework built on Python stdlib. Reads work items from an input source, runs them through a transform pipeline in configurable-size batches across a concurrent worker pool (ThreadPoolExecutor), with retry + exponential backoff and a dead-letter queue for permanently failed items. Tracks execution stats and supports completion callbacks. Usage: python batch_processor.py --config batch_processor_config.json python batch_processor.py --config job.json --workers 8 --batch-size 50 python batch_processor.py --config job.json --log-level DEBUG Dependencies: Python 3.10+ stdlib only (no pip packages) License: MIT """ from __future__ import annotations import argparse import hashlib import json import logging import random import time from abc import ABC, abstractmethod from concurrent.futures import Future, ThreadPoolExecutor, as_completed from dataclasses import dataclass, field from datetime import datetime, timezone from typing import Any, Callable # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" DEFAULT_MAX_WORKERS = 4 DEFAULT_BATCH_SIZE = 10 DEFAULT_MAX_RETRIES = 3 DEFAULT_BASE_DELAY = 0.5 DEFAULT_BACKOFF_FACTOR = 2.0 logger = logging.getLogger("batch_processor") # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- # ... 659 more lines ...
Buy Now — $29 Back to Products