bedda.tech logobedda.tech
← Back to blog

Debug Airflow DAG Failures: 8 Common Errors & Fixes

Matthew J. Whitney
13 min read
devopsdebuggingbest practicescloud computing

Debugging Apache Airflow DAG Failures: 8 Common Errors and Fixes

If you've been running Apache Airflow in production for more than a week, you've probably stared at a failed DAG wondering what went wrong. I've spent countless hours debugging everything from mysterious zombie tasks to scheduler deadlocks that brought entire data pipelines to a halt.

After architecting data platforms that process millions of records daily, I've encountered virtually every Airflow failure mode imaginable. This guide covers the 8 most common DAG failures I see in production environments, complete with actual error messages, debugging steps, and proven fixes.

The Reality of Airflow DAG Failures in Production

Airflow is powerful, but it's also complex. Between task dependencies, resource allocation, database connections, and distributed execution, there are numerous points of failure. The key is knowing how to quickly identify and resolve issues before they cascade.

Most DAG failures fall into these categories:

  • Dependency issues (timeouts, upstream failures)
  • Resource constraints (memory, CPU, connections)
  • Concurrency problems (locks, zombie processes)
  • Data serialization errors (XCom, large payloads)
  • Infrastructure failures (database, Kubernetes, networking)

Let's dive into the specific errors and their solutions.

Error #1: Task Dependency Timeout - 'upstream_failed' Status

Error Message:

Task instance marked as upstream_failed due to dependency timeout

This happens when a task waits too long for its upstream dependencies to complete. The task never runs because Airflow gives up waiting.

Root Causes:

  • Upstream task stuck in running state
  • Database lock preventing status updates
  • Sensor tasks waiting indefinitely

Debugging Steps:

# Check task instance status
airflow tasks state my_dag my_task 2025-06-16

# View task logs
airflow tasks log my_dag my_task 2025-06-16

# Check dependency status
airflow tasks depends-on my_dag my_task 2025-06-16

Fix #1: Increase Dependency Timeout

from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime, timedelta

dag = DAG(
    'my_dag',
    default_args={
        'depends_on_past': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
        # Increase timeout for slow upstream tasks
        'execution_timeout': timedelta(hours=2),
    },
    schedule_interval='@daily',
    start_date=datetime(2025, 1, 1),
    catchup=False
)

Fix #2: Add Proper Task Dependencies

# Instead of relying on implicit dependencies
task_a = PythonOperator(
    task_id='extract_data',
    python_callable=extract_function,
    dag=dag
)

task_b = PythonOperator(
    task_id='transform_data',
    python_callable=transform_function,
    dag=dag,
    # Explicit dependency with timeout
    task_concurrency=1,
)

# Clear dependency chain
task_a >> task_b

Error #2: Memory Allocation Failures in PythonOperator Tasks

Error Message:

Process finished with exit code 137 (SIGKILL)
MemoryError: Unable to allocate array with shape and data type

This occurs when your Python tasks consume more memory than allocated, especially common with pandas operations on large datasets.

Debugging Steps:

# Check worker resource limits
kubectl describe pod airflow-worker-xxx

# Monitor memory usage during task execution
airflow tasks run my_dag memory_intensive_task 2025-06-16 --local

Fix #1: Optimize Memory Usage in Tasks

import pandas as pd
from airflow.operators.python import PythonOperator

def process_large_dataset(**context):
    # Bad: Load entire dataset into memory
    # df = pd.read_csv('large_file.csv')
    
    # Good: Process in chunks
    chunk_size = 10000
    processed_data = []
    
    for chunk in pd.read_csv('large_file.csv', chunksize=chunk_size):
        # Process each chunk
        processed_chunk = chunk.groupby('category').sum()
        processed_data.append(processed_chunk)
        
        # Clear memory
        del chunk
    
    # Combine results
    final_result = pd.concat(processed_data)
    return final_result.to_dict()

memory_task = PythonOperator(
    task_id='process_data',
    python_callable=process_large_dataset,
    # Increase memory limit for this specific task
    executor_config={
        "KubernetesExecutor": {
            "request_memory": "4Gi",
            "limit_memory": "8Gi"
        }
    },
    dag=dag
)

Fix #2: Use Streaming Operations

def streaming_transform(**context):
    import dask.dataframe as dd
    
    # Use Dask for out-of-core processing
    df = dd.read_csv('large_file.csv')
    result = df.groupby('category').sum().compute()
    
    # Write directly to destination without storing in memory
    result.to_parquet('s3://bucket/processed_data.parquet')
    
    return "Processing complete"

Error #3: Database Connection Pool Exhaustion

Error Message:

sqlalchemy.exc.TimeoutError: QueuePool limit of size 5 overflow 10 reached
airflow.exceptions.AirflowException: Cannot create more connections

This happens when too many tasks try to access the Airflow metadata database simultaneously, exhausting the connection pool.

Debugging Steps:

-- Check active connections
SELECT count(*) FROM pg_stat_activity WHERE datname = 'airflow';

-- View connection details
SELECT pid, usename, application_name, state 
FROM pg_stat_activity 
WHERE datname = 'airflow';

Fix #1: Increase Connection Pool Size

# airflow.cfg
[database]
sql_alchemy_pool_size = 20
sql_alchemy_max_overflow = 40
sql_alchemy_pool_recycle = 3600
sql_alchemy_pool_pre_ping = True

Fix #2: Optimize Database Usage in Tasks

from airflow.hooks.postgres_hook import PostgresHook
from contextlib import contextmanager

@contextmanager
def get_db_connection():
    """Context manager for database connections"""
    hook = PostgresHook(postgres_conn_id='my_db')
    conn = hook.get_conn()
    try:
        yield conn
    finally:
        conn.close()

def database_task(**context):
    # Bad: Multiple long-lived connections
    # hook = PostgresHook(postgres_conn_id='my_db')
    # records = hook.get_records("SELECT * FROM large_table")
    
    # Good: Use connection pooling and close quickly
    with get_db_connection() as conn:
        cursor = conn.cursor()
        cursor.execute("SELECT * FROM large_table LIMIT 1000")
        records = cursor.fetchall()
        
    return len(records)

Error #4: Zombie Tasks and Stale Lock Files

Error Message:

Task is in RUNNING state but the corresponding process is dead
Detected zombie job with PID 12345

Zombie tasks occur when the worker process dies but Airflow still thinks the task is running, preventing new instances from starting.

Debugging Steps:

# Find zombie processes
airflow celery flower  # Check worker status

# Check for stale PIDs
ps aux | grep airflow

# View task state
airflow tasks state my_dag zombie_task 2025-06-16

Fix #1: Clear Zombie Tasks

# Clear specific task instance
airflow tasks clear my_dag -t zombie_task -s 2025-06-16 -e 2025-06-16

# Clear all zombie tasks
airflow db clean --clean-before-timestamp 2025-06-16T00:00:00

# Reset task state
airflow tasks state my_dag zombie_task 2025-06-16 --upstream --downstream

Fix #2: Implement Health Checks

from airflow.operators.python import PythonOperator
import os
import signal

def resilient_task(**context):
    def timeout_handler(signum, frame):
        raise TimeoutError("Task exceeded maximum runtime")
    
    # Set task timeout
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(3600)  # 1 hour timeout
    
    try:
        # Your task logic here
        result = perform_work()
        signal.alarm(0)  # Cancel timeout
        return result
    except TimeoutError:
        # Clean up and exit gracefully
        cleanup_resources()
        raise

resilient_task_op = PythonOperator(
    task_id='resilient_task',
    python_callable=resilient_task,
    # Add execution timeout
    execution_timeout=timedelta(minutes=90),
    dag=dag
)

Error #5: XCom Serialization Errors with Large Datasets

Error Message:

airflow.exceptions.AirflowException: XCom value too large for serialization
ValueError: Cannot serialize object of type DataFrame

XCom has size limits and can't serialize all Python objects, causing failures when tasks try to pass large or complex data.

Debugging Steps:

# Check XCom size
from airflow.models import XCom
xcoms = XCom.get_many(dag_id='my_dag', task_id='my_task')
for xcom in xcoms:
    print(f"XCom size: {len(str(xcom.value))} characters")

Fix #1: Use External Storage for Large Data

import boto3
import pickle
from airflow.operators.python import PythonOperator

def store_large_data(**context):
    # Process your large dataset
    large_df = process_data()
    
    # Store in S3 instead of XCom
    s3_client = boto3.client('s3')
    data_key = f"temp_data/{context['run_id']}/processed_data.pkl"
    
    # Serialize and upload
    pickled_data = pickle.dumps(large_df)
    s3_client.put_object(
        Bucket='my-airflow-bucket',
        Key=data_key,
        Body=pickled_data
    )
    
    # Return only the S3 key via XCom
    return {'s3_key': data_key, 'record_count': len(large_df)}

def consume_large_data(**context):
    # Get S3 key from XCom
    upstream_data = context['task_instance'].xcom_pull(task_ids='store_data')
    s3_key = upstream_data['s3_key']
    
    # Download and deserialize
    s3_client = boto3.client('s3')
    obj = s3_client.get_object(Bucket='my-airflow-bucket', Key=s3_key)
    large_df = pickle.loads(obj['Body'].read())
    
    # Process the data
    return process_further(large_df)

Fix #2: Use Custom XCom Backend

# airflow.cfg
[core]
xcom_backend = airflow.models.xcom.S3XComBackend

# Custom XCom backend for large objects
from airflow.models.xcom import BaseXCom
import boto3

class S3XComBackend(BaseXCom):
    PREFIX = "xcom_s3://"
    BUCKET_NAME = "my-airflow-xcom-bucket"
    
    @staticmethod
    def serialize_value(value):
        if isinstance(value, (pd.DataFrame, dict)) and len(str(value)) > 1000000:
            # Store large objects in S3
            s3_client = boto3.client('s3')
            key = f"xcom/{uuid.uuid4()}.pkl"
            
            s3_client.put_object(
                Bucket=S3XComBackend.BUCKET_NAME,
                Key=key,
                Body=pickle.dumps(value)
            )
            
            return f"{S3XComBackend.PREFIX}{key}"
        
        return BaseXCom.serialize_value(value)

Error #6: Dynamic DAG Generation Import Failures

Error Message:

Failed to import DAG file /opt/airflow/dags/dynamic_dag.py
ModuleNotFoundError: No module named 'custom_module'
ImportError: cannot import name 'generate_tasks' from 'utils'

Dynamic DAGs that generate tasks programmatically often fail due to import issues or circular dependencies.

Fix #1: Proper Import Structure

# dynamic_dag.py
import sys
import os
from datetime import datetime, timedelta

# Add custom modules to path
sys.path.append(os.path.join(os.path.dirname(__file__), 'utils'))

try:
    from airflow import DAG
    from airflow.operators.python import PythonOperator
    from custom_module import get_data_sources  # Your custom logic
except ImportError as e:
    # Graceful degradation for import issues
    print(f"Import error in dynamic DAG: {e}")
    # Create empty DAG to prevent parsing failures
    dag = DAG(
        'dynamic_dag_fallback',
        start_date=datetime(2025, 1, 1),
        schedule_interval=None,
        is_paused_upon_creation=True
    )

def create_dynamic_dag():
    dag = DAG(
        'dynamic_processing_dag',
        default_args={
            'owner': 'data_team',
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
        },
        start_date=datetime(2025, 1, 1),
        schedule_interval='@daily',
        catchup=False
    )
    
    # Generate tasks dynamically
    data_sources = get_data_sources()
    
    for source in data_sources:
        task = PythonOperator(
            task_id=f'process_{source["name"]}',
            python_callable=process_source,
            op_kwargs={'source_config': source},
            dag=dag
        )
    
    return dag

# Create the DAG
if 'get_data_sources' in globals():
    dag = create_dynamic_dag()
else:
    # Fallback DAG
    dag = DAG('dynamic_dag_error', start_date=datetime(2025, 1, 1))

Fix #2: Validate DAG Before Registration

def validate_dag_structure(dag_dict):
    """Validate dynamic DAG configuration"""
    required_fields = ['dag_id', 'schedule', 'tasks']
    
    for field in required_fields:
        if field not in dag_dict:
            raise ValueError(f"Missing required field: {field}")
    
    for task in dag_dict['tasks']:
        if 'task_id' not in task:
            raise ValueError(f"Task missing task_id: {task}")
    
    return True

def create_validated_dag(config):
    try:
        validate_dag_structure(config)
        
        dag = DAG(
            config['dag_id'],
            schedule_interval=config['schedule'],
            start_date=datetime(2025, 1, 1),
            catchup=False
        )
        
        # Create tasks from config
        for task_config in config['tasks']:
            task = PythonOperator(
                task_id=task_config['task_id'],
                python_callable=globals()[task_config['function']],
                dag=dag
            )
        
        return dag
        
    except Exception as e:
        print(f"DAG validation failed: {e}")
        return None

Error #7: Resource Contention in KubernetesExecutor

Error Message:

Pod my-dag-task-12345 failed: Insufficient cpu/memory resources
CreateContainerError: container failed to start
ImagePullBackOff: Failed to pull image

When using KubernetesExecutor, tasks compete for cluster resources, and pods can fail to start or get evicted.

Debugging Steps:

# Check pod status
kubectl get pods -n airflow | grep my-dag

# Describe failed pod
kubectl describe pod my-dag-task-12345 -n airflow

# Check cluster resources
kubectl top nodes
kubectl describe node worker-node-1

Fix #1: Proper Resource Requests and Limits

from kubernetes.client import models as k8s

def resource_intensive_task(**context):
    # Your processing logic
    return "Task complete"

task_with_resources = PythonOperator(
    task_id='resource_intensive_task',
    python_callable=resource_intensive_task,
    executor_config={
        "pod_template_file": "/opt/airflow/pod_templates/resource_template.yaml",
        "pod_override": k8s.V1Pod(
            spec=k8s.V1PodSpec(
                containers=[
                    k8s.V1Container(
                        name="base",
                        resources=k8s.V1ResourceRequirements(
                            requests={
                                "memory": "2Gi",
                                "cpu": "1000m"
                            },
                            limits={
                                "memory": "4Gi",
                                "cpu": "2000m"
                            }
                        )
                    )
                ]
            )
        )
    },
    dag=dag
)

Fix #2: Pod Template for Consistent Configuration

# pod_template.yaml
apiVersion: v1
kind: Pod
metadata:
  name: airflow-worker-template
spec:
  containers:
    - name: base
      image: my-airflow:latest
      resources:
        requests:
          memory: "1Gi"
          cpu: "500m"
        limits:
          memory: "2Gi"
          cpu: "1000m"
      env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: "KubernetesExecutor"
      volumeMounts:
        - name: airflow-logs
          mountPath: /opt/airflow/logs
  volumes:
    - name: airflow-logs
      persistentVolumeClaim:
        claimName: airflow-logs-pvc
  nodeSelector:
    workload: "airflow"
  tolerations:
    - key: "airflow"
      operator: "Equal"
      value: "true"
      effect: "NoSchedule"

Error #8: Scheduler Deadlocks with High DAG Volume

Error Message:

Scheduler appears to have stopped processing new DAG runs
DagFileProcessor timed out after 30 seconds
Too many DAG files to process, scheduler falling behind

With hundreds of DAGs, the scheduler can become overwhelmed, leading to delays and apparent deadlocks.

Debugging Steps:

# Check scheduler performance
airflow db check

# Monitor DAG processing
grep "DagFileProcessor" /opt/airflow/logs/scheduler/*.log

# Check database performance
SELECT count(*) FROM dag_run WHERE state = 'running';

Fix #1: Optimize Scheduler Configuration

# airflow.cfg
[scheduler]
# Increase processing capacity
max_dagruns_per_loop_to_schedule = 20
max_dagruns_to_create_per_loop = 10
max_active_runs_per_dag = 3

# Reduce parsing frequency for stable DAGs
min_file_process_interval = 300
dag_dir_list_interval = 300

# Increase worker capacity
parsing_processes = 4
scheduler_heartbeat_sec = 5

[core]
# Reduce database load
max_active_tasks_per_dag = 16
max_active_runs_per_dag = 16
parallelism = 32

Fix #2: DAG Optimization Strategies

# Optimize DAG structure
dag = DAG(
    'optimized_dag',
    # Reduce scheduler overhead
    schedule_interval='@daily',
    max_active_runs=1,
    max_active_tasks=4,
    
    # Prevent unnecessary parsing
    is_paused_upon_creation=False,
    catchup=False,
    
    # Optimize task execution
    default_args={
        'retries': 1,
        'retry_delay': timedelta(minutes=1),
        'depends_on_past': False,
        'email_on_failure': False,  # Reduce notification overhead
        'email_on_retry': False,
    }
)

# Use task groups for better organization
from airflow.utils.task_group import TaskGroup

with TaskGroup("data_processing", dag=dag) as processing_group:
    extract_task = PythonOperator(
        task_id='extract',
        python_callable=extract_data,
    )
    
    transform_task = PythonOperator(
        task_id='transform',
        python_callable=transform_data,
    )
    
    extract_task >> transform_task

Essential Airflow Debugging Commands and Tools

Here are the commands I use most frequently when debugging DAG failures:

Task State Investigation:

# Check task state and history
airflow tasks state my_dag my_task 2025-06-16
airflow tasks state my_dag my_task 2025-06-16 --subdir /path/to/dags

# View task logs
airflow tasks log my_dag my_task 2025-06-16 -l DEBUG

# Test task locally
airflow tasks test my_dag my_task 2025-06-16

DAG Management:

# List all DAGs and their status
airflow dags list
airflow dags state my_dag 2025-06-16

# Pause/unpause DAGs
airflow dags pause my_dag
airflow dags unpause my_dag

# Clear task instances
airflow tasks clear my_dag -s 2025-06-16 -e 2025-06-16

Database Debugging:

# Check database connections
airflow db check-migrations
airflow db check

# Clean up metadata
airflow db clean --clean-before-timestamp 2025-06-01T00:00:00

Performance Monitoring:

# Monitor Celery workers (if using CeleryExecutor)
airflow celery flower

# Check scheduler health
airflow scheduler --daemon
tail -f /opt/airflow/logs/scheduler/*.log

Prevention Strategies: Monitoring and Alerting Setup

1. Implement Comprehensive Logging:

import logging
from airflow.operators.python import PythonOperator

def monitored_task(**context):
    logger = logging.getLogger(__name__)
    
    try:
        logger.info(f"Starting task {context['task_instance'].task_id}")
        
        # Your task logic
        result = perform_work()
        
        logger.info(f"Task completed successfully. Processed {len(result)} records")
        return result
        
    except Exception as e:
        logger.error(f"Task failed with error: {str(e)}", exc_info=True)
        raise

2. Set Up Failure Callbacks:

def task_failure_callback(context):
    """Send alerts on task failure"""
    task_instance = context['task_instance']
    
    # Send to monitoring system
    send_alert({
        'dag_id': task_instance.dag_id,
        'task_id': task_instance.task_id,
        'execution_date': str(task_instance.execution_date),
        'error': str(context['exception']),
        'log_url': task_instance.log_url
    })

dag = DAG(
    'monitored_dag',
    default_args={
        'on_failure_callback': task_failure_callback,
        'retries': 2,
    }
)

3. Health Check DAG:

def check_system_health(**context):
    """Monitor Airflow system health"""
    from airflow.models import DagRun, TaskInstance
    
    # Check for stuck DAG runs
    stuck_runs = session.query(DagRun).filter(
        DagRun.state == 'running',
        DagRun.start_date < datetime.now() - timedelta(hours=24)
    ).count()
    
    if stuck_runs > 0:
        raise AirflowException(f"Found {stuck_runs} stuck DAG runs")
    
    return "System healthy"

health_check = PythonOperator(
    task_id='health_check',
    python_callable=check_system_health,
    schedule_interval='@hourly',
    dag=health_dag
)

When to Restart vs Retry vs Skip Failed Tasks

Restart When:

  • Scheduler appears hung or unresponsive
  • Database connection issues persist
  • Worker nodes are unresponsive
  • Memory leaks in long-running processes

Retry When:

  • Transient network errors
  • Temporary resource unavailability
  • External service timeouts
  • Database deadlocks

Skip When:

  • Data source is permanently unavailable
  • Task logic has fundamental errors
  • Downstream tasks don't depend on the output
  • Manual intervention is required
# Configure retry behavior per task type
network_task = PythonOperator(
    task_id='api_call',
    python_callable=call_external_api,
    retries=3,
    retry_delay=timedelta(minutes=5),
    retry_exponential_backoff=True,
    dag=dag
)

critical_task = PythonOperator(
    task_id='critical_processing',
    python_callable=process_critical_data,
    retries=0,  # Don't retry, investigate immediately
    on_failure_callback=alert_on_critical_failure,
    dag=dag
)

Conclusion

Debugging Airflow DAG failures doesn't have to be a mystery. By understanding these common failure patterns and having the right debugging tools ready, you can quickly identify and resolve issues before they impact your data pipelines.

The key is proactive monitoring combined with proper error handling and resource management. Start with the basics—ensure your tasks have appropriate timeouts, resource limits, and retry logic. Then implement comprehensive logging and alerting to catch issues early.

Need help implementing robust data pipelines with Airflow? At Bedda.tech, we specialize in building resilient data infrastructure that scales. Our team has debugged everything from simple DAG failures to complex distributed processing issues across enterprise environments.

Contact us to discuss how we can help optimize your Airflow deployment and prevent these common failures from impacting your business.

Have Questions or Need Help?

Our team is ready to assist you with your project needs.

Contact Us