← Back to all products

Workflow Engine

$29

DAG-based task execution engine with conditional branching, parallel tasks, state persistence, and retries.

📁 9 files
JSONMarkdownPython

📄 Product Preview

Try the interactive reader and demo tools below, or get the full product with all content unlocked.

📖 Interactive Reader (Free Preview) ⚙ Try Demo Tools 📦 Download Free Sample

📁 File Structure 9 files

workflow-engine/ ├── LICENSE ├── README.md ├── examples/ │ └── workflow_config.json ├── free-sample.zip ├── guide/ │ ├── 01_features.md │ ├── 02_quick-start.md │ └── 03_license.md ├── index.html └── src/ └── workflow_engine.py

📖 Documentation Preview README excerpt

Workflow Engine

DAG-based task execution engine with conditional branching, parallel tasks, state persistence, and retry policies. Define workflows in JSON, execute them with full observability.

Features

  • DAG execution — Tasks execute in dependency order with automatic parallelism
  • Conditional branching — Skip tasks based on Python expressions
  • Parallel execution — Tasks without dependencies run concurrently via thread pool
  • State persistence — Save and resume workflows after failures
  • Retry policies — Per-task retry limits with configurable backoff
  • DAG validation — Detects cycles and missing dependencies before execution
  • Completion hooks — Run commands on workflow success or failure
  • Dry-run mode — Preview execution plan without running anything
  • Context passing — Upstream task output available to downstream tasks via env vars

Requirements

  • Python 3.10+
  • No external dependencies (stdlib only)

Quick Start


# Run a workflow
python src/workflow_engine.py --config examples/workflow_config.json

# Preview execution plan
python src/workflow_engine.py --config examples/workflow_config.json --dry-run

# Resume a failed workflow
python src/workflow_engine.py --config examples/workflow_config.json --resume workflow_state.json

Configuration Reference


{
    "name": "my-pipeline",
    "max_workers": 4,
    "tasks": [
        {
            "name": "build",
            "command": "make build",
            "depends_on": ["lint", "test"],
            "condition": "",
            "retry_limit": 2,
            "timeout": 300
        }
    ],
    "on_complete": "echo 'Done!'",
    "on_failure": "curl https://hooks.example.com/alert"
}
FieldTypeDefaultDescription
namestring"unnamed"Workflow identifier
max_workersint4Max parallel task threads
tasks[].namestringrequiredUnique task name

... continues with setup instructions, usage examples, and more.

📄 Code Sample .py preview

src/workflow_engine.py #!/usr/bin/env python3 """ Workflow Engine — Automation Hub (DataNest) DAG-based task execution engine with conditional branching, parallel tasks, state persistence, and retry policies. Define workflows in JSON, execute them with full observability. Usage: python workflow_engine.py --config workflow.json python workflow_engine.py --config workflow.json --dry-run python workflow_engine.py --config workflow.json --resume state.json Dependencies: Python 3.10+ stdlib only (no pip packages) License: MIT """ from __future__ import annotations import argparse import concurrent.futures import json import logging import subprocess import sys import time from dataclasses import dataclass, field from datetime import datetime, timezone from enum import Enum from pathlib import Path from typing import Any # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- DEFAULT_MAX_WORKERS = 4 DEFAULT_RETRY_LIMIT = 2 DEFAULT_RETRY_DELAY = 5.0 logger = logging.getLogger("workflow_engine") # --------------------------------------------------------------------------- # Data models # --------------------------------------------------------------------------- class TaskStatus(Enum): PENDING = "pending" RUNNING = "running" # ... 436 more lines ...
Buy Now — $29 Back to Products