← Back to all products

PySpark Utils Library

€29

Battle-tested utility functions for PySpark data engineering — transformations, data quality, SCD, schema evolution, logging, dedup, and DataFrame diffing. Stop rewriting the same boilerplate on every project.

📁 20 files🏷 v1.0.0
PythonPySparkDelta LakePyPI

📁 File Structure 20 files

pyspark-utils-library/ ├── README.md ├── LICENSE ├── setup.py ├── pyproject.toml │ ├── pyspark_utils/ │ ├── __init__.py │ ├── transformations.py │ ├── data_quality.py │ ├── scd.py │ ├── schema_utils.py │ ├── logging_utils.py │ ├── dedup.py │ └── diff.py │ ├── tests/ │ ├── __init__.py │ ├── conftest.py │ ├── test_transformations.py │ ├── test_data_quality.py │ ├── test_scd.py │ ├── test_schema_utils.py │ ├── test_dedup.py │ └── test_diff.py │ └── examples/ └── usage_examples.py

📖 Documentation Preview README excerpt

What's Inside

  • transformations — 15 reusable DataFrame transforms: column cleaning, casting, flattening, pivoting, hashing
  • data_quality — Chainable DQ validation framework with structured reports and severity levels
  • scd — SCD Type 1 (overwrite) and Type 2 (full history) merge utilities for Delta Lake
  • schema_utils — Schema comparison, evolution, DDL conversion, and compatibility checking
  • logging_utils — Structured pipeline logging with correlation IDs, metrics, and Delta table sink
  • dedup — Window-based, hash-based, and fuzzy deduplication strategies
  • diff — DataFrame comparison with row-level, column-level, and schema diffs

Quick Start

# Install the library
pip install pyspark-utils-library

# Import and use
from pyspark_utils.transformations import clean_column_names
from pyspark_utils.data_quality import DQValidator

df = clean_column_names(spark.read.table("bronze.raw_orders"))

report = (
    DQValidator(df)
    .check_nulls(["order_id", "customer_id"])
    .check_unique(["order_id"])
    .validate()
)

Requirements

  • Python 3.9+
  • PySpark 3.3.0+
  • Delta Spark 2.3.0+
  • Databricks Runtime 13.x+ (if using on Databricks)

📄 Code Sample .py preview

pyspark_utils/transformations.py """ Common PySpark DataFrame transformations. This module provides 15+ reusable, composable transformation functions that follow a consistent pattern: each takes a DataFrame (plus parameters) and returns a new DataFrame. Example:: from pyspark_utils.transformations import ( clean_column_names, trim_strings, add_metadata_columns, ) df = ( spark.read.table("bronze.raw_orders") .transform(clean_column_names) .transform(trim_strings) .transform(add_metadata_columns, source="erp") ) """ from __future__ import annotations import re from datetime import datetime, timezone from typing import Any, Dict, List, Optional, Sequence, Union from pyspark.sql import DataFrame from pyspark.sql import functions as F from pyspark.sql.types import ( ArrayType, DataType, DateType, DoubleType, IntegerType, LongType, StringType, ... remaining implementation in full product
Buy Now — €29 Back to Products