← Back to all products

Task Scheduler

$19

In-process job scheduler with interval, cron, and one-shot triggers, priority run queue, concurrency limits, retry with backoff, missed-run policies, and state persistence.

📁 11 files
JSONMarkdownPython

📄 Product Preview

Try the interactive reader and demo tools below, or get the full product with all content unlocked.

📖 Interactive Reader (Free Preview) ⚙ Try Demo Tools 📦 Download Free Sample

📁 File Structure 11 files

task-scheduler/ ├── LICENSE ├── README.md ├── examples/ │ ├── scheduler_state.json │ └── task_scheduler_config.json ├── free-sample.zip ├── guide/ │ ├── 01_features.md │ ├── 02_quick-start.md │ ├── 03_output.md │ └── 04_license.md ├── index.html └── src/ └── task_scheduler.py

📖 Documentation Preview README excerpt

Task Scheduler

An in-process job scheduler built on Python stdlib. Schedule tasks by fixed interval, cron expression (5-field, with full wildcard/step/range/list syntax), or as a one-shot at a specific datetime. Features a priority-based run queue, configurable concurrency limits, retry with exponential backoff, missed-run policies, and automatic state persistence to JSON.

Features

  • Cron expressions — Full 5-field parser supporting , /n, a-b, a,b,c, and combinations
  • Interval scheduling — Run tasks every N seconds with drift-free tracking
  • One-shot tasks — Fire once at a specified datetime, then never again
  • Priority queue — Higher-priority tasks run first when multiple are due simultaneously
  • Concurrency control — Cap the number of tasks running in parallel via thread semaphore
  • Retry with backoff — Failed tasks retry up to N times with configurable exponential delay
  • Missed-run policy — Choose catch_up (fire immediately on overdue) or skip (wait for next window)
  • State persistence — Schedule state and last-run timestamps survive restarts via JSON file
  • Built-in demo tasksheartbeat, disk_usage, and cleanup_tmp for testing without shell commands
  • Single-tick mode--once runs one scheduler pass and exits (ideal for testing and cron-driven invocation)

Requirements

  • Python 3.10+
  • No external dependencies (stdlib only)

Quick Start


# Show all CLI options
python src/task_scheduler.py --help

# List tasks with their next scheduled fire times
python src/task_scheduler.py --config examples/task_scheduler_config.json --list

# Run a single scheduler tick (fires due tasks, then exits)
python src/task_scheduler.py --config examples/task_scheduler_config.json --once

# Start the daemon loop (Ctrl-C to stop)
python src/task_scheduler.py --config examples/task_scheduler_config.json

# Override state file location and log level
python src/task_scheduler.py --config examples/task_scheduler_config.json --state-file /tmp/state.json --log-level DEBUG

Configuration Reference

CLI Options

FlagDefaultDescription
--config, -c(required)Path to the JSON schedule config file
--oncefalseExecute a single scheduler tick then exit
--listfalsePrint every task with its next fire time and exit
--state-file<config_dir>/scheduler_state.jsonOverride the state-file path
--log-levelINFOLogging verbosity (DEBUG, INFO, WARNING, ERROR)

Cron Syntax

Standard 5-field format: minute hour day-of-month month day-of-week

FieldRangeExamples
Minute0–590, */5, 0,15,30,45

... continues with setup instructions, usage examples, and more.

📄 Code Sample .py preview

src/task_scheduler.py #!/usr/bin/env python3 """ Task Scheduler — Automation Hub (DataNest) An in-process job scheduler supporting fixed-interval, cron-expression, and one-shot scheduling. Features a priority-based run queue, configurable concurrency limits, retry with exponential backoff, missed-run policies (catch-up or skip), and JSON state persistence. Usage: python task_scheduler.py --config schedule.json --once python task_scheduler.py --config schedule.json --list python task_scheduler.py --config schedule.json python task_scheduler.py --help Dependencies: Python 3.10+ stdlib only (no pip packages) License: MIT """ from __future__ import annotations import argparse import heapq import json import logging import os import subprocess import sys import threading import time from dataclasses import dataclass, field from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Any, Callable # --------------------------------------------------------------------------- # Constants # --------------------------------------------------------------------------- LOG_FORMAT = "%(asctime)s [%(levelname)s] %(name)s: %(message)s" DEFAULT_TICK_INTERVAL = 1.0 # seconds between scheduler ticks DEFAULT_MAX_CONCURRENCY = 4 DEFAULT_RETRY_LIMIT = 3 DEFAULT_RETRY_BACKOFF = 2.0 # exponential backoff multiplier TASK_TIMEOUT = 300 # seconds before a shell command is killed logger = logging.getLogger("task_scheduler") # --------------------------------------------------------------------------- # ... 641 more lines ...
Buy Now — $19 Back to Products