Outshift Logo

PRODUCT

9 min read

Blog thumbnail
Published on 04/20/2022
Last updated on 03/26/2024

Optimize Your Data Pipeline with Apache Airflow and Great Expectations

Share

If you build or maintain applications that rely on data, there’s a decent chance you’re familiar with both Apache Airflow and Great Expectations. Apache Airflow is an open source tool that helps DevOps teams build and manage workflows programmatically. It can help drive data pipelines by using standard Python features to define complex workflow requirements as Directed Acyclic Graphs, or DAGs, then execute those workflows automatically. Apache_Airflow Great Expectations, which is also open source, is a data validation tool. Engineers can use it to define data testing, documentation and profiling requirements which data must pass before it is moved down the workflow pipeline. It also flags data quality errors so that teams can remedy them before progressing to the next step. open_source

Combining Apache Airflow with Great Expectations data

You can use Airflow and Great Expectations separately. But wouldn’t it be great if you could integrate them in such a way that they work seamlessly in tandem, with Airflow defining data pipeline flows and Great Expectations automatically validating data as it moves according to those flows? Well, you can, and this blog shows you how. By integrating Airflow and Great Expectations, you not only save time, but you can also validate data quality automatically within a database. You can find data errors faster, reduce the occurrence of data quality issues that may slow down your data pipeline, and minimize the manual effort that your engineers spend managing data quality and movement. Keep reading for a step-by-step guide to using Airflow and Great Expectations in tandem.

Step 1: Set up Great Expectations

We’ll start by installing and configuring Great Expectations. We will use simple MySQL DB “world_x” containing four tables country, city, countrylanguage, countryinfo listed here: https://dev.mysql.com/doc/world-x-setup/en/ Follow the instructions above to set it up on your local host and port 3306 With our MySQL database set up, let’s install Great Expectations. Here’s a Bash script (ge.sh) that will do everything you require.
#!/bin/bash

## Database Credentials
world_db_host="localhost"
port="3306"
username="demo_user"
password="demo_pass"
world_db="world_x"

printf "Setting up Great Expectations for UMP...\n\n"

printf "Using following Database config:\n"
printf "HOST: %s\n" "$world_db_host"
printf "PORT: %s\n" "$port"
printf "USERNAME: %s\n" "$username"
printf "DATABASE: %s\n" "$world_db"

printf "Remove existing GE dirs, if exists...\n\n"
rm -rf great_expectations

printf "Setting up Python venv...\n\n"
python3 -m venv venv
source ./venv/bin/activate

printf "Installing required GE pip packages...\n\n"
python3 -m pip install --upgrade pip
pip3 install great_expectations
pip3 install pymysql
pip3 install sqlalchemy
pip3 install jupyterlab
pip3 install notebook

printf "\nGE is up with version:\n"
great_expectations --version

printf "Initializing GE and datasource...\n\n"
/usr/bin/expect -c '
spawn great_expectations --v3-api init
expect -re {OK to proceed?} {send "Y\r"}
send "great_expectations --v3-api datasource new\n"
expect -re ":"
send "2\r"
expect -re ":"
send "1\r"
'
ls -l

printf "Setting up MySQL Data Sources...\n\n"
python3 setup_datasources.py world_db_datasource $world_db_host $port $username $password $world_db

printf "Setting up Expectations Suites for world_x DB...\n\n"
python3 setup_expectations.py world_db_city_expectation world_db_datasource city ID,Name,CountryCode,District,Info
python3 setup_expectations.py world_db_country_expectation world_db_datasource country Code,Name,Capital,Code2,Covid_Hotspot
python3 setup_expectations.py world_db_countryinfo_expectation world_db_datasource countryinfo doc,_id,_json_schema
python3 setup_expectations.py world_db_countrylanguage_expectation world_db_datasource countrylanguage CountryCode,Language,IsOfficial,Percentage

printf "Setting up checkpoints for world_x DB...\n\n"
python3 setup_checkpoints.py world_db_city_checkpoint world_db_city_expectation world_db_datasource city
python3 setup_checkpoints.py world_db_country_checkpoint world_db_country_expectation world_db_datasource country
python3 setup_checkpoints.py world_db_countryinfo_checkpoint world_db_countryinfo_expectation world_db_datasource countryinfo
python3 setup_checkpoints.py world_db_countrylanguage_checkpoint world_db_countrylanguage_expectation world_db_datasource countrylanguage

printf "Setting up s3_site locations for GE...\n\n"
sed -i "" "/class_name: DefaultSiteIndexBuilder/r s3_site.txt" great_expectations/great_expectations.yml
sed -i "" "s/site_names: \[\]/site_names: \[\"local_site\"\, \"s3_site\"\]/g" great_expectations/checkpoints/*.yml

printf "Listing all checkpoints...\n\n"
great_expectations checkpoint list
Specifically, this script does the following:
  • Install required packages.
  • Initialize great_expectations.
  • Creates new datasource for MySQL using python script setup_datasources.py
  • and locally hosted “world_x” DB .
  • Creates expectations for each table in the DB using python script setup_expectations.py
  • Create checkpoints for each table in the database using the setup_checkpoints.py script.
You’ll notice that the script references a configuration file called S3_site.txt. This script stores the S3 bucket name where you’ll store Great Expectations data:
s3_site:
  class_name: SiteBuilder
  store_backend:
    class_name: TupleS3StoreBackend
    bucket: greatexpectations-s3
  site_index_builder:
    class_name: DefaultSiteIndexBuilder
Once setup is complete, you should see the following data_docs sites (local and s3) added in your great_expectations.yml file:
data_docs_sites:
  local_site:
    class_name: SiteBuilder
    show_how_to_buttons: true
    store_backend:
      class_name: TupleFilesystemStoreBackend
      base_directory: uncommitted/data_docs/local_site/
    site_index_builder:
      class_name: DefaultSiteIndexBuilder
  s3_site:
    class_name: SiteBuilder
    store_backend:
      class_name: TupleS3StoreBackend
      bucket: ump-greatexpectations-s3
    site_index_builder:
      class_name: DefaultSiteIndexBuilder
Note that this ge.sh script calls additional Python scripts to complete installation. One is setup_datasources.py, which looks like this:
import sys
import great_expectations as ge
from great_expectations.cli.datasource import sanitize_yaml_and_save_datasource, check_if_datasource_name_exists
context = ge.get_context()

print(sys.argv)
datasource_name = sys.argv[1]
host = sys.argv[2]
port = sys.argv[3]
username = sys.argv[4]
password = sys.argv[5]
database = sys.argv[6]

example_yaml = f"""
name: {datasource_name}
class_name: Datasource
execution_engine:
  class_name: SqlAlchemyExecutionEngine
  credentials:
    host: {host}
    port: '{port}'
    username: {username}
    password: {password}
    database: {database}
    drivername: mysql+pymysql
data_connectors:
  default_runtime_data_connector_name:
    class_name: RuntimeDataConnector
    batch_identifiers:
      - default_identifier_name
  default_inferred_data_connector_name:
    class_name: InferredAssetSqlDataConnector
    name: whole_table"""
print(example_yaml)

context.test_yaml_config(yaml_config=example_yaml)

sanitize_yaml_and_save_datasource(context, example_yaml, overwrite_existing=False)
context.list_datasources()
Another is setup_expectations.py:
import sys

from great_expectations.checkpoint import SimpleCheckpoint
from great_expectations.profile.user_configurable_profiler import UserConfigurableProfiler

import great_expectations as ge
from great_expectations.core.batch import BatchRequest
from great_expectations.data_context.types.resource_identifiers import ExpectationSuiteIdentifier
from great_expectations.exceptions import DataContextError

print(sys.argv)
expectation_suite_name = sys.argv[1]
datasource_name = sys.argv[2]
data_asset_name = sys.argv[3]
ignored_columns = sys.argv[4].split(',')
open_data_docs = sys.argv[5] if len(sys.argv) >= 6 else 'false'

context = ge.data_context.DataContext()

try:
    suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(
        f'Loaded ExpectationSuite "{suite.expectation_suite_name}" containing {len(suite.expectations)} expectations.')
except DataContextError:
    suite = context.create_expectation_suite(expectation_suite_name=expectation_suite_name)
    print(f'Created ExpectationSuite "{suite.expectation_suite_name}".')

print(context.get_expectation_suite(expectation_suite_name=expectation_suite_name))
context.save_expectation_suite(expectation_suite=suite, expectation_suite_name=expectation_suite_name)

suite_identifier = ExpectationSuiteIdentifier(expectation_suite_name=expectation_suite_name)
context.build_data_docs(resource_identifiers=[suite_identifier])

batch_request = {'datasource_name': datasource_name, 'data_connector_name': 'default_inferred_data_connector_name',
                 'data_asset_name': data_asset_name, 'limit': 1000}

validator = context.get_validator(
    batch_request=BatchRequest(**batch_request),
    expectation_suite_name=expectation_suite_name
)
column_names = [f'"{column_name}"' for column_name in validator.columns()]
print(f"Columns: {', '.join(column_names)}.")
validator.head(n_rows=5, fetch_all=False)

profiler = UserConfigurableProfiler(
    profile_dataset=validator,
    excluded_expectations=None,
    ignored_columns=ignored_columns,
    not_null_only=False,
    primary_or_compound_key=False,
    semantic_types_dict=None,
    table_expectations_only=False,
    value_set_threshold="MANY",
)
suite = profiler.build_suite()

# Additional Expectations
for x in range(len(ignored_columns)):
    validator.expect_column_values_to_not_be_null(column=ignored_columns[x])

print(validator.get_expectation_suite(discard_failed_expectations=False))
validator.save_expectation_suite(discard_failed_expectations=False)

checkpoint_config = {
    "class_name": "SimpleCheckpoint",
    "run_name_template": "%Y%m%d-%H%M%S-" + expectation_suite_name,
    "validations": [
        {
            "batch_request": batch_request,
            "expectation_suite_name": expectation_suite_name
        }
    ]
}
checkpoint = SimpleCheckpoint(
    f"_tmp_checkpoint_{expectation_suite_name}",
    context,
    **checkpoint_config
)
checkpoint_result = checkpoint.run()

context.build_data_docs()

validation_result_identifier = checkpoint_result.list_validation_result_identifiers()  # [0]
if open_data_docs == 'true':
    context.open_data_docs()
Finally, we have setup_checkpoints.py:
from ruamel.yaml import YAML
import great_expectations as ge
from pprint import pprint
import sys

yaml = YAML()
context = ge.get_context()

print(sys.argv)
my_checkpoint_name = sys.argv[1]
expectation_suite_name = sys.argv[2]
datasource_name = sys.argv[3]
data_asset_name = sys.argv[4]
open_data_docs = sys.argv[5] if len(sys.argv) >= 6 else 'false'

yaml_config = f"""
name: {my_checkpoint_name}
config_version: 1.0
class_name: SimpleCheckpoint
run_name_template: "%Y%m%d-%H%M%S-{my_checkpoint_name}"
validations:
  - batch_request:
      datasource_name: {datasource_name}
      data_connector_name: default_inferred_data_connector_name
      data_asset_name: {data_asset_name}
      data_connector_query:
        index: -1
    expectation_suite_name: {expectation_suite_name}
"""
print(yaml_config)

# Run this cell to print out the names of your Data Sources, Data Connectors and Data Assets
pprint(context.get_available_data_asset_names())

context.list_expectation_suite_names()

my_checkpoint = context.test_yaml_config(yaml_config=yaml_config)

print(my_checkpoint.get_substituted_config().to_yaml_str())
context.add_checkpoint(**yaml.load(yaml_config))

context.run_checkpoint(checkpoint_name=my_checkpoint_name)

if open_data_docs == 'true':
    context.open_data_docs()
How to Execute
  • Copy all files (ge.sh, s3_site.txt, setup_datasources.py, setup_expectations.py, setup_checkpoints.py) in the same directory.
  • Execute ge.sh script which will do all the required setup and creates necessary great_expectation’s new Data Context to hold your project configuration.
Great Expectations will create a new directory with the following structure:
great_expectations
|-- great_expectations.yml
|-- expectations
|-- checkpoints
|-- plugins
|-- .gitignore
|-- uncommitted
    |-- config_variables.yml
    |-- data_docs
    |-- validations
Note: Ensure path for above root directory for great_expectations is set to ENV variable “GE_ROOT_DIR” before proceeding with Airflow setup

Step 2: Set up Apache Airflow

Now that Great Expectations is installed, you can set up Apache Airflow and configure DAGs to integrate Airflow with Great Expectations. Copy following two files airflowinstall.sh and airflow_greatexpectations.py in a directory This script(airflowinstall.sh) will do basic setup required for Airflow on your machine
#! /bin/bash

# set airflow home as
export AIRFLOW_HOME=$(pwd)/air
export AIRFLOW__CORE__ENABLE_XCOM_PICKLING=True

printf $AIRFLOW_HOME

printf "Installing required pip packages...\n\n"
pip3 install apache-airflow
pip3 install great_expectations airflow-provider-great-expectations>=0.1.0
pip3 install pymysql

printf "Moving DAG file to airflow dir...\n\n"
pwd
mkdir -p air/dags
cp airflow_greatexpectations.py air/dags/
ls -l air/dags/

printf "Initialize and Start Airflow...\n\n"
airflow dags list

# initialize the database
airflow db init

# provide your user login credentials and save
airflow users create \
    --username demo_user \
    --firstname airflowr \
    --password demo_pass \
    --lastname airflowr\
    --role Admin \
    --email demo123@gmail.com

# start the web server and scheduler
airflow webserver --port 8080 -D
airflow scheduler
This is a script (airflow_greatexpectations.py) to execute great_expectations checkpoints with Airflow DAGs using GreatExpectationsOperator:
import datetime
import logging
import os

from airflow import DAG
from great_expectations_provider.operators.great_expectations import GreatExpectationsOperator

# Set ENV variable(GE_ROOT_DIR) path to absolute path of great_expectations directory on your machine
ge_root_dir = os.environ.get('GE_ROOT_DIR')
print("GE_ROOT_DIR:", ge_root_dir)


def on_failure_func(context):
    """Define custom failure notification behavior"""
    dag_run = context.get('dag_run')
    task_instances = dag_run.get_task_instances()
    print("WARNING: These task instances failed which can cause data consistencies", task_instances)
    logging.error('WARNING: These task instances failed which can cause data consistencies: {}'.format(task_instances))


with DAG(
        dag_id="great_expectations_dag",
        start_date=datetime.datetime(2022, 1, 19),
        on_failure_callback=on_failure_func,
        catchup=False,
        schedule_interval=None
) as dag:
    ge_world_db_city_checkpoint_pass = GreatExpectationsOperator(
        task_id="task_world_db_city_checkpoint",
        data_context_root_dir=ge_root_dir,
        checkpoint_name="world_db_city_checkpoint"
    )
    ge_world_db_country_checkpoint_pass = GreatExpectationsOperator(
        task_id="task_world_db_country_checkpoint",
        data_context_root_dir=ge_root_dir,
        checkpoint_name="world_db_country_checkpoint",
        trigger_rule="all_done"
    )
    ge_world_db_countryinfo_checkpoint_pass = GreatExpectationsOperator(
        task_id="task_world_db_countryinfo_checkpoint",
        data_context_root_dir=ge_root_dir,
        checkpoint_name="world_db_countryinfo_checkpoint",
        trigger_rule="all_done"
    )
    ge_world_db_countrylanguage_checkpoint_pass = GreatExpectationsOperator(
        task_id="task_world_db_countrylanguage_checkpoint",
        data_context_root_dir=ge_root_dir,
        checkpoint_name="world_db_countrylanguage_checkpoint",
        trigger_rule="all_done"
    )

    ge_world_db_city_checkpoint_pass >> ge_world_db_country_checkpoint_pass >> ge_world_db_countryinfo_checkpoint_pass \
        >> ge_world_db_countrylanguage_checkpoint_pass
Now, run the airflowinstall.sh script to complete Airflow setup.

Step 3: View Apache Airflow & Great Expectations in action

With all the pieces in place, you can see your creation by opening a browser to http://localhost:8080/. Notice the Airflow dashboards and Great Expectations DAG. You can trigger the DAG and validate that all tasks have run successfully: View_Apache_Airflow_1 View_Apache_Airflow_2 Upon completion of this run, you’ll see all the Great Expectations results in the S3 bucket you defined earlier, and/or localhost based on data_docs_sites entries in great_expectations.yml: View_Apache_Airflow_3 You can also get detailed results on individual expectation suites by clicking on individual rows for specific tables: View_Apache_Airflow_4 View_Apache_Airflow_8 Congratulations! Your data pipeline is now built, and you can automatically validate and monitor scheduled workflows.

Conclusion

Integrating Apache Airflow and Great Expectations helps engineers double down on the value of both tools. The ability to automate data validation within an automated data workflow saves time and adds efficiency. Learn more by checking out the Great Expectations and Apache Airflow documentation.
Subscribe card background
Subscribe
Subscribe to
the Shift!

Get emerging insights on emerging technology straight to your inbox.

Unlocking Multi-Cloud Security: Panoptica's Graph-Based Approach

Discover why security teams rely on Panoptica's graph-based technology to navigate and prioritize risks across multi-cloud landscapes, enhancing accuracy and resilience in safeguarding diverse ecosystems.

thumbnail
I
Subscribe
Subscribe
 to
the Shift
!
Get
emerging insights
on emerging technology straight to your inbox.

The Shift keeps you at the forefront of cloud native modern applications, application security, generative AI, quantum computing, and other groundbreaking innovations that are shaping the future of technology.

Outshift Background