Thursday, October 6, 2022

[SOLVED] Airflow Docker AWS EC2 DAG file log error

Issue

ISSUE

I have run other dag files previously they all give this message even if they pass or fail.

Goal

Get this error fixed

Log that contains the error

I got this after running simple dag

241adsgf1108
*** Log file does not exist: /opt/airflow/logs/dag_id=numpy_pandas/run_id=manual__2022-09-27T16:31:22.968544+00:00/task_id=print_the_context/attempt=1.log
*** Fetching from: http://241adsgf1108:8793/dag_id=numpy_pandas/run_id=manual__2022-09-27T16:31:22.968544+00:00/task_id=print_the_context/attempt=1.log
*** !!!! Please make sure that all your Airflow components (e.g. schedulers, webservers and workers) have the same 'secret_key' configured in 'webserver' section and time is synchronized on all your machines (for example with ntpd) !!!!!
****** See more at https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#secret-key
****** Failed to fetch log file from worker. Client error '403 FORBIDDEN' for url 'http://241adsgf1108:8793/dag_id=numpy_pandas/run_id=manual__2022-09-27T16:31:22.968544+00:00/task_id=print_the_context/attempt=1.log'
For more information check: https://httpstatuses.com/403

Commands

  • docker build -t my-image-apache/airflow:latest-python3.8 .
  • docker-compose up

Environment

  • AWS EC2
  • Ubuntu 20.04

Folder Structure

  • airflow /
    • docker-compose.yml
    • Dockerfile
    • dags [FOLDER]/
      • all_python_dag_files.py

Files

my dag file

import pendulum

from airflow import DAG
#from airflow.decorators import task
from airflow.operators.python import PythonOperator

with DAG(
    dag_id="numpy_pandas",
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example"],) as dag:

    def numpy_something():
        """Print Numpy array."""
        import numpy as np  # <- THIS IS HOW NUMPY SHOULD BE IMPORTED IN THIS CASE
        import pandas as pd

        d = {'col1': [1, 2], 'col2': [3, 4]}
        df = pd.DataFrame(data=d)
        print(df)
        a = np.array([[[1, 2], [3, 4]], [[5, 6], [7, 8]]])
        print(a)
        #a= 10
        return a

    run_this = PythonOperator(
        task_id="print_the_context",
        python_callable=numpy_something,
    )

dockerfile

FROM apache/airflow:latest-python3.8
COPY requirements.txt .
COPY personal_python_file.py /usr/local/airflow/dags/personal_python_file.py
RUN pip install -r requirements.txt

docker-compose.yml

---
version: '3'
x-airflow-common:
  &airflow-common
  # In order to add custom dependencies or upgrade provider packages you can use your extended image.
  # Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
  # and uncomment the "build" line below, Then run `docker-compose build` to build the images.
  image: ${AIRFLOW_IMAGE_NAME:-my-image-apache/airflow:latest-python3.8} 
  ### SAME AS IN MY DOCKERFILE "FROM apache/airflow:latest-python3.8" BUT I CAN MODIFY IT AS THE IAMGE THAT I GNERATE IS THE IMPORTANT ONE
  # my-image-apache/airflow:latest-python3.8}
  # build: .
  environment:
    &airflow-common-env
    AIRFLOW__CORE__EXECUTOR: CeleryExecutor
    AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    # For backward compatibility, with Airflow <2.3
    AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:airflow@postgres/airflow
    AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0
    AIRFLOW__CORE__FERNET_KEY: ''
    AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
    AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
    AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth'
    _PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}

    # I have added these
    AIRFLOW__CORE__ENABLE_XCOM_PICKLING: 'true'
    #AIRFLOW__CORE__DAGS_FOLDER: /opt/airflow/dags
    #connect local and docker conatiner DAGS folers
    #AIRFLOW__CORE__DAGS_FOLDER: ./dags
  

    # AIRFLOW__CORE__PLUGINS_FOLDER: /opt/airflow/plugins
    # AIRFLOW__CORE__LOGGING_CONFIG_CLASS: airflow.utils.log.logging_config.DEFAULT_LOGGING_CONFIG
    # AIRFLOW__CORE__LOGGING_LEVEL: INFO

  volumes:
    - ./dags/:/opt/airflow/dags
    - ./logs/:/opt/airflow/logs
    - ./plugins/:/opt/airflow/plugins
  user: "${AIRFLOW_UID:-50000}:0"
  depends_on:
    &airflow-common-depends-on
    redis:
      condition: service_healthy
    postgres:
      condition: service_healthy

services:
  postgres:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    volumes:
      - postgres-db-volume:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD", "pg_isready", "-U", "airflow"]
      interval: 5s
      retries: 5
    restart: always

  redis:
    image: redis:latest
    expose:
      - 6379
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always

  airflow-webserver:
    <<: *airflow-common
    command: webserver
    ports:
      - 8080:8080
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    stdin_open: true
    tty: true
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully
    volumes:
      - ./airflow:/usr/local/airflow

  airflow-scheduler:
    <<: *airflow-common
    command: scheduler
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type SchedulerJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-worker:
    <<: *airflow-common
    command: celery worker
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    environment:
      <<: *airflow-common-env
      # Required to handle warm shutdown of the celery workers properly
      # See https://airflow.apache.org/docs/docker-stack/entrypoint.html#signal-propagation
      DUMB_INIT_SETSID: "0"
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-triggerer:
    <<: *airflow-common
    command: triggerer
    healthcheck:
      test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

  airflow-init:
    <<: *airflow-common
    entrypoint: /bin/bash
    # yamllint disable rule:line-length
    command:
      - -c
      - |
        function ver() {
          printf "%04d%04d%04d%04d" $${1//./ }
        }
        airflow_version=$$(AIRFLOW__LOGGING__LOGGING_LEVEL=INFO && gosu airflow airflow version)
        airflow_version_comparable=$$(ver $${airflow_version})
        min_airflow_version=2.2.0
        min_airflow_version_comparable=$$(ver $${min_airflow_version})
        if (( airflow_version_comparable < min_airflow_version_comparable )); then
          echo
          echo -e "\033[1;31mERROR!!!: Too old Airflow version $${airflow_version}!\e[0m"
          echo "The minimum Airflow version supported: $${min_airflow_version}. Only use this or higher!"
          echo
          exit 1
        fi
        if [[ -z "${AIRFLOW_UID}" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
          echo "If you are on Linux, you SHOULD follow the instructions below to set "
          echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
          echo "For other operating systems you can get rid of the warning with manually created .env file:"
          echo "    See: https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#setting-the-right-airflow-user"
          echo
        fi
        one_meg=1048576
        mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
        cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
        disk_available=$$(df / | tail -1 | awk '{print $$4}')
        warning_resources="false"
        if (( mem_available < 4000 )) ; then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
          echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
          echo
          warning_resources="true"
        fi
        if (( cpus_available < 2 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
          echo "At least 2 CPUs recommended. You have $${cpus_available}"
          echo
          warning_resources="true"
        fi
        if (( disk_available < one_meg * 10 )); then
          echo
          echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
          echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
          echo
          warning_resources="true"
        fi
        if [[ $${warning_resources} == "true" ]]; then
          echo
          echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
          echo "Please follow the instructions to increase amount of resources available:"
          echo "   https://airflow.apache.org/docs/apache-airflow/stable/start/docker.html#before-you-begin"
          echo
        fi
        mkdir -p /sources/logs /sources/dags /sources/plugins
        chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
        exec /entrypoint airflow version
    # yamllint enable rule:line-length
    environment:
      <<: *airflow-common-env
      _AIRFLOW_DB_UPGRADE: 'true'
      _AIRFLOW_WWW_USER_CREATE: 'true'
      _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
      _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
      _PIP_ADDITIONAL_REQUIREMENTS: ''
    user: "0:0"
    volumes:
      - .:/sources

  airflow-cli:
    <<: *airflow-common
    profiles:
      - debug
    environment:
      <<: *airflow-common-env
      CONNECTION_CHECK_MAX_COUNT: "0"
    # Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
    command:
      - bash
      - -c
      - airflow

  # You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
  # or by explicitly targeted on the command line e.g. docker-compose up flower.
  # See: https://docs.docker.com/compose/profiles/
  flower:
    <<: *airflow-common
    command: celery flower
    profiles:
      - flower
    ports:
      - 5555:5555
    healthcheck:
      test: ["CMD", "curl", "--fail", "http://localhost:5555/"]
      interval: 10s
      timeout: 10s
      retries: 5
    restart: always
    depends_on:
      <<: *airflow-common-depends-on
      airflow-init:
        condition: service_completed_successfully

volumes:
  postgres-db-volume:

Tried Already:


Solution

I have solved it airflow 2.4.0 and 2.3.4 has this bug but they will resolve it at the 2.4.1 update. You can also install 2.3.3 that is totally fine now I am using that.

SOLUTION [official] i. don’t use latest ii. use 2.3.3 - https://airflow.apache.org/announcements/ 1) Sorry if not clear, but this already happened in 2.4.0, which I installed today (It was fine on 2.3.3 and before). Running on Python 3.10.5. - https://github.com/apache/airflow/issues/26492#issuecomment-1251078527 2) ACTUAL FIX - https://github.com/astronomer/airflow/commit/af01ddf5b1c528c3de2c959c499e7289decc7a26



Answered By - sogu
Answer Checked By - Clifford M. (WPSolving Volunteer)