Exploring: Data-Aware Orchestration with Apache Airflow Datasets

Introduction

Welcome to “Exploring: Dataset-aware Orchestration”, where we delve into the power of “Airflow Datasets”. In this post, I invite you to my journey throughout a development-need, and the expansive scope, that led to the following article.

Scope

At one of the largest companies in Belgium, there arose a need to integrate data from various sources, including databases, XML files, GEO data, and more. These datasets were dispersed across different systems, requiring collaboration among teams to adopt a data mesh approach, consolidating data into a PostgreSQL database. As a member of this team, I was tasked with extracting data from two ERP systems stored on separate SAP HANA databases.

Recognizing the necessity for an orchestration tool, we chose “Apache Airflow” for its versatility. After initial brainstorming, we identified the need for a clear flow of different jobs, leading us to adopt “Data-aware Orchestration” with “Airflow Datasets.”

Development process

The development process is still in its initial phase, focusing on extracting data from various source systems into a source-aligned data product. This article primarily concentrates on one of the ERP systems involved in the project. The four tables I extracted had the following record counts at the time of writing:

– Table 1: 3.099.596

– Table 2: 2.765.956

– Table 3: 6.319.547

– Table 4: 805.159

Given the volume of records, we faced challenges during data loading. To address this, we decided to load data in chunks based on a provided chunk size and total record count. However, this approach presented additional challenges as chunks varied during data loading, risking duplicated or incomplete datasets.

To mitigate complexity in DAG definitions and troubleshoot difficulties, we opted to, maybe over-, split processes by creating dynamic DAGs. This approach allowed us to reuse configurations and Jinja query definitions, ensuring a manageable workflow.

These considerations led to the following approach in my development, as depicted in the Dataset graph visualization of one table.

Note: The creation of loads is dynamic, and no hard-coded names were used; all were parameterized by configuration files.

The visualization illustrates a DAG named “load_to_HANA_SA_PROJECT_ART_STGORDER” and a dataset labeled “HANA.SA_PROJECT_ART_STGORDER.” The DAG initiates a load into a staging table on the SAP HANA database, triggering the subsequent DAG “load_to_Postgres_SA_PROJECT_ART_STGORDER.”

This visualization shows us that, first of all, there is a load towards a staging table on the SAP HANA database so we had a fixed dataset for all subsequent loads. The dataset at the end of this load triggers DAG “load_to_Postgres_SA_PROJECT_ART_STGORDER”.

DAG “load_to_Postgres_SA_PROJECT_ART_STGORDER” also creates a staging table, this time in the PostgreSQL database, to mitigate risks to the final table used by consumers. A dataset triggers the subsequent load from the staging table to the final table, ready for consumption.

Details of the processes within these loads are beyond this scope. However, understanding how datasets function and are set up is crucial. If you’re intrigued by the potential of leveraging datasets for orchestrating your data workflows effectively, read on to discover the intricacies of setting up datasets in your Airflow environment.

Within the DAG design of “load_to_HANA_SA_PROJECT_ART_STGORDER,” the SQLExecuteOperator, shown below, doesn’t automatically mark the outlet as ready; it is parameterized to do so. Future iterations may include additional operators for data quality checks before marking the outlet as ready.

This operator will mark the dataset as ready

To work with datasets, we need to import the “Dataset” module from airflow.datasets which is needed once if you are working with one file to create dynamic DAGs for your flow.

Adding the needed import to work with Dataset

In the parameterization of the “transfer_to_table” task, an outlet is added to mark the dataset as ready.

Adding an outlet to the operator to mark the Dataset as ready

Upon successful completion of the load, the outlet is marked as ready, initiating the data transfer from the SAP HANA staging table to the PostgreSQL staging table.

The DAG parameters include scheduling options, among which the “schedule” option is essential for working with datasets prior version 2.9.

Adding a schedule to start flow when a Dataset is marked as ready

Note: I have yet to explore the new exciting features introduced in Apache Airflow version 2.9. However, once I delve into them, I’ll be sure to share my experience. These enhancements promise greater flexibility in scheduling, offering a wider range of options to tailor orchestration according to specific needs.

As previously stated, I worked with four tables, each undergoing the same process. These specifications were not hardcoded but derived from the configuration of the loaded tables. The outcome of dynamically creating DAGs for these four tables is depicted in the dataset visualization within the Airflow UI.

Four dynamic DAG’s based on configuration file

Personally, I prefer to streamline the process by avoiding separate execution of the four tables in case of errors. To achieve this, I adopted the same dataset approach, enabling a single DAG to initiate my entire workflow.

Four dynamic DAG’s with one starting DAG

Final conclusion

Datasets offer a functional approach to pipelining in Airflow. The success or failure of a preceding job triggers subsequent jobs with their set of tasks, ensuring a seamless workflow.

Kenny Peeters

consultant @Cubis