← Back to all products
€29
PySpark Utils Library
Battle-tested utility functions for PySpark data engineering — transformations, data quality, SCD, schema evolution, logging, dedup, and DataFrame diffing. Stop rewriting the same boilerplate on every project.
PythonPySparkDelta LakePyPI
📁 File Structure 20 files
pyspark-utils-library/
├── README.md
├── LICENSE
├── setup.py
├── pyproject.toml
│
├── pyspark_utils/
│ ├── __init__.py
│ ├── transformations.py
│ ├── data_quality.py
│ ├── scd.py
│ ├── schema_utils.py
│ ├── logging_utils.py
│ ├── dedup.py
│ └── diff.py
│
├── tests/
│ ├── __init__.py
│ ├── conftest.py
│ ├── test_transformations.py
│ ├── test_data_quality.py
│ ├── test_scd.py
│ ├── test_schema_utils.py
│ ├── test_dedup.py
│ └── test_diff.py
│
└── examples/
└── usage_examples.py
📖 Documentation Preview README excerpt
What's Inside
- transformations — 15 reusable DataFrame transforms: column cleaning, casting, flattening, pivoting, hashing
- data_quality — Chainable DQ validation framework with structured reports and severity levels
- scd — SCD Type 1 (overwrite) and Type 2 (full history) merge utilities for Delta Lake
- schema_utils — Schema comparison, evolution, DDL conversion, and compatibility checking
- logging_utils — Structured pipeline logging with correlation IDs, metrics, and Delta table sink
- dedup — Window-based, hash-based, and fuzzy deduplication strategies
- diff — DataFrame comparison with row-level, column-level, and schema diffs
Quick Start
# Install the library
pip install pyspark-utils-library
# Import and use
from pyspark_utils.transformations import clean_column_names
from pyspark_utils.data_quality import DQValidator
df = clean_column_names(spark.read.table("bronze.raw_orders"))
report = (
DQValidator(df)
.check_nulls(["order_id", "customer_id"])
.check_unique(["order_id"])
.validate()
)
Requirements
- Python 3.9+
- PySpark 3.3.0+
- Delta Spark 2.3.0+
- Databricks Runtime 13.x+ (if using on Databricks)
📄 Code Sample .py preview
pyspark_utils/transformations.py
"""
Common PySpark DataFrame transformations.
This module provides 15+ reusable, composable transformation
functions that follow a consistent pattern: each takes a
DataFrame (plus parameters) and returns a new DataFrame.
Example::
from pyspark_utils.transformations import (
clean_column_names,
trim_strings,
add_metadata_columns,
)
df = (
spark.read.table("bronze.raw_orders")
.transform(clean_column_names)
.transform(trim_strings)
.transform(add_metadata_columns, source="erp")
)
"""
from __future__ import annotations
import re
from datetime import datetime, timezone
from typing import Any, Dict, List, Optional, Sequence, Union
from pyspark.sql import DataFrame
from pyspark.sql import functions as F
from pyspark.sql.types import (
ArrayType, DataType, DateType, DoubleType,
IntegerType, LongType, StringType,
... remaining implementation in full product