For years, I relied on a messy collection of bash scripts and cron jobs to move data. It worked—until it didn’t. When a script failed at 3 AM, I had no visibility into why, no easy way to retry just the failed step, and zero dependency management. This is exactly why I switched to orchestration. If you’re wondering how to build a data pipeline with python and airflow, you’re looking for a way to turn those fragile scripts into a resilient, observable system.
Apache Airflow allows you to define your workflows as code (DAGs), making them version-controllable and scalable. While there are newer alternatives—and if you’re undecided, you might want to check out my Prefect vs Airflow comparison 2026—Airflow remains the industry standard for a reason.
Prerequisites
Before we dive into the code, ensure you have the following installed in your environment:
- Python 3.9+: Airflow is Python-native.
- Docker & Docker Compose: While you can install Airflow locally, using Docker is the only way to maintain sanity regarding dependencies.
- Basic SQL Knowledge: You’ll need this to interact with your data sources.
- A Cloud API or Local Database: For this tutorial, we’ll assume you’re pulling from a JSON API and loading into a local SQLite or Postgres instance.
Step 1: Setting Up Airflow with Docker
I always recommend the official docker-compose.yaml provided by the Airflow community. It sets up the scheduler, webserver, and a Postgres metadata database automatically.
# Download the official docker-compose file
curl -LfO 'https://airflow.apache.org/docs/apache-airflow/stable/docker-compose.yaml'
# Create necessary directories
mkdir -p ./dags ./logs ./plugins
# Initialize the database
docker compose up airflow-init
# Start all services
docker compose up -d
Once the containers are up, you can access the Airflow UI at http://localhost:8080. Use the default credentials (airflow/airflow) to log in. As shown in the image below, you’ll see a blank dashboard awaiting your first DAG.
Step 2: Designing the DAG (Directed Acyclic Graph)
In Airflow, a pipeline is called a DAG. It defines the order of execution. For this project, we are building a simple ETL: Extract from API $\rightarrow$ Transform with Pandas $\rightarrow$ Load to Database.
Create a file named my_first_pipeline.py in your /dags folder:
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta
import pandas as pd
import requests
# 1. Define the default arguments
default_args = {
'owner': 'ajmani',
'depends_on_past': False,
'start_date': datetime(2026, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
# 2. Define the logic functions
def extract_data():
response = requests.get('https://api.example.com/data')
data = response.json()
# Save to temporary CSV
df = pd.DataFrame(data)
df.to_csv('/tmp/raw_data.csv', index=False)
def transform_data():
df = pd.read_csv('/tmp/raw_data.csv')
# Perform a simple cleanup
df['processed_at'] = datetime.now()
df.to_csv('/tmp/transformed_data.csv', index=False)
def load_data():
df = pd.read_csv('/tmp/transformed_data.csv')
# In a real scenario, use a PostgresHook here
print("Loading data into the warehouse...")
# 3. Instantiate the DAG
with DAG(
'api_to_db_pipeline',
default_args=default_args,
description='A simple ETL pipeline',
schedule_interval='@daily',
catchup=False
) as dag:
extract_task = PythonOperator(task_id='extract', python_callable=extract_data)
transform_task = PythonOperator(task_id='transform', python_callable=transform_data)
load_task = PythonOperator(task_id='load', python_callable=load_data)
# Define the dependency flow
extract_task >> transform_task >> load_task
Step 3: Managing Dependencies and State
One common mistake I see is passing large dataframes between tasks using XComs. XComs are meant for small metadata (like a file path or a status code), not actual data. In the example above, I used local CSVs in /tmp. In a production environment, you should use S3 or GCS as an intermediary landing zone.
If you need more lightweight automation for simpler tasks, you might consider ETL automation with Python and GitHub Actions, but for complex scheduling, Airflow is the way to go.
Pro Tips for Production Pipelines
- Use TaskFlow API: If you’re on Airflow 2.0+, use the
@dagand@taskdecorators instead ofPythonOperatorfor cleaner code. - Implement Idempotency: Ensure that running the same pipeline twice for the same date doesn’t duplicate data. Use
upserts(INSERT ON CONFLICT) in your SQL. - Variable Management: Never hardcode API keys. Use Airflow’s Admin $\rightarrow$ Variables or Connections UI to store secrets securely.
- Monitoring: Set up SLA alerts. Airflow can send emails or Slack notifications the moment a task fails.
Troubleshooting Common Issues
| Issue | Likely Cause | Solution |
|---|---|---|
| DAG not appearing in UI | Syntax error in Python file | Check the “DAG Import Errors” banner at the top of the UI. |
| Task stuck in ‘queued’ state | Scheduler is down | Run docker compose ps to ensure the scheduler container is healthy. |
| ModuleNotFoundError | Missing package in Docker image | Add the package to a requirements.txt and rebuild your image. |
What’s Next?
Now that you know how to build a data pipeline with Python and Airflow, you can start exploring more advanced patterns. I recommend looking into Dynamic DAGs (where one Python loop generates 50 similar pipelines) or integrating dbt (data build tool) for the transformation layer.
If you’re building this for a small team and find Airflow too heavy, remember to look into the comparison between Airflow and Prefect to see which fits your scale better. Ready to automate more? Check out how to combine this with GitHub Actions for CI/CD of your pipelines.