← Back to all products
$29
Workflow Engine
DAG-based task execution engine with conditional branching, parallel tasks, state persistence, and retries.
JSONMarkdownPython
📄 Product Preview
Try the interactive reader and demo tools below, or get the full product with all content unlocked.
📖 Interactive Reader (Free Preview) ⚙ Try Demo Tools 📦 Download Free Sample📁 File Structure 9 files
workflow-engine/
├── LICENSE
├── README.md
├── examples/
│ └── workflow_config.json
├── free-sample.zip
├── guide/
│ ├── 01_features.md
│ ├── 02_quick-start.md
│ └── 03_license.md
├── index.html
└── src/
└── workflow_engine.py
📖 Documentation Preview README excerpt
Workflow Engine
DAG-based task execution engine with conditional branching, parallel tasks, state persistence, and retry policies. Define workflows in JSON, execute them with full observability.
Features
- DAG execution — Tasks execute in dependency order with automatic parallelism
- Conditional branching — Skip tasks based on Python expressions
- Parallel execution — Tasks without dependencies run concurrently via thread pool
- State persistence — Save and resume workflows after failures
- Retry policies — Per-task retry limits with configurable backoff
- DAG validation — Detects cycles and missing dependencies before execution
- Completion hooks — Run commands on workflow success or failure
- Dry-run mode — Preview execution plan without running anything
- Context passing — Upstream task output available to downstream tasks via env vars
Requirements
- Python 3.10+
- No external dependencies (stdlib only)
Quick Start
# Run a workflow
python src/workflow_engine.py --config examples/workflow_config.json
# Preview execution plan
python src/workflow_engine.py --config examples/workflow_config.json --dry-run
# Resume a failed workflow
python src/workflow_engine.py --config examples/workflow_config.json --resume workflow_state.json
Configuration Reference
{
"name": "my-pipeline",
"max_workers": 4,
"tasks": [
{
"name": "build",
"command": "make build",
"depends_on": ["lint", "test"],
"condition": "",
"retry_limit": 2,
"timeout": 300
}
],
"on_complete": "echo 'Done!'",
"on_failure": "curl https://hooks.example.com/alert"
}
| Field | Type | Default | Description |
|---|---|---|---|
name | string | "unnamed" | Workflow identifier |
max_workers | int | 4 | Max parallel task threads |
tasks[].name | string | required | Unique task name |
... continues with setup instructions, usage examples, and more.
📄 Code Sample .py preview
src/workflow_engine.py
#!/usr/bin/env python3
"""
Workflow Engine — Automation Hub (DataNest)
DAG-based task execution engine with conditional branching, parallel tasks,
state persistence, and retry policies. Define workflows in JSON, execute
them with full observability.
Usage:
python workflow_engine.py --config workflow.json
python workflow_engine.py --config workflow.json --dry-run
python workflow_engine.py --config workflow.json --resume state.json
Dependencies: Python 3.10+ stdlib only (no pip packages)
License: MIT
"""
from __future__ import annotations
import argparse
import concurrent.futures
import json
import logging
import subprocess
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Any
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
DEFAULT_MAX_WORKERS = 4
DEFAULT_RETRY_LIMIT = 2
DEFAULT_RETRY_DELAY = 5.0
logger = logging.getLogger("workflow_engine")
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
class TaskStatus(Enum):
PENDING = "pending"
RUNNING = "running"
# ... 436 more lines ...