YETL Framework
Yet another ETL Framework for Spark
How does it work?
Declaritive spark dataframe pipelining frameworking.
- Define your datastores e.g.
./project/datastores
- Define your datasets e.g.
./project/datasets
- Write spark extract, transform and load in sql, python or scala just referencing a dataset using yetl api. Api can also be used by orchestration tools to land data from source for spark
- Create datastore, datasets and workflow documentation automatically
- Execute workflows compiled from code dataset references in a basic execution engine or other tools e.g. databricks mulistep jobs etc
Progress log:
- 2021-11-21: Done : Metadata design 1st draft prototype
- WIP: Jinja2 Templating and Deserialization: WIP
- WIP: Proto-typing decorator model: WIP
Philosophical Goals
- Have fun learning and building something that has no affiliation to anything commercial
- Easy and fun to engineer pipelines
- Can be used in spark everywhere (Spark PaaS, IaaS, baremetal, locally)
- Bar landing the data into HDFS (prem or cloud) spark is all we need
- Excellent metadata configuration and tools - fun, simple, full featured and minimal to use
- No data loading anti-patterns resulting from poorly designed re-use and metadata
- Can be called if desired seamlessly from any workflow tool integrating transparent lineage
- Support batch and streaming
- Support datalake house add-on's - hudi, deltalake
- Extremely flexible
- Support behavior driven development
- Support test driven developement - Locally, CI/CD agent and remote envs
- Integrate and support data expectation frameworks
- Workflow engine
- Auto document pipelines
Example Usage
Once metadata is declared we can just create something like the following for a given dataset, many datasets or all datasets
from pyspark.sql import Row, DataFrame, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from yetl.yetl import Yetl
@Yetl.spark
def get_test_customer_df(spark:SparkSession):
# create test dataset
schema = StructType([
StructField("id", StringType(), True),
StructField("firstname", StringType(), True),
StructField("lastname", StringType(), True)
])
rows = [Row(1, "Terry", "Merry"),
Row(2, "Berry", "Gederry"),
Row(3, "Larry", "Tarry")]
df = spark.createDataFrame(rows, schema)
return df
def transform_customer_assert(df:DataFrame):
# do assertions
assert True
@Yetl.transform(
test_df=get_test_customer_df,
test_assert=transform_customer_assert
)
def transform_customer(df:DataFrame=None):
# do stranformations
transformed_df = (df.withColumn("Full Name",
concat_ws(" ", col("firstname"), col("lastname") ))
)
return transformed_df