Issue
I am using AWS's MWAA service (2.2.2) to run a variety of DAGs, most of which are implemented with standard PythonOperator types. I bundle the DAGs into an S3 bucket alongside any shared requirements, then point MWAA to the relevant objects & versions. Everything runs smoothly so far.
I would now like to implement a DAG using the PythonVirtualenvOperator type, which AWS acknowledge is not supported out of the box. I am following their guide on how to patch the behaviour using a custom plugin, but continue to receive an error from Airflow, shown at the top of the dashboard in big red writing:
DAG Import Errors (1) ... ... AirflowException: PythonVirtualenvOperator requires virtualenv, please install it.
I've confirmed that the plugin is indeed being picked up by Airflow (I see it referenced in the admin screen), and for the avoidance of doubt I am using the exact code provided by AWS in their examples for the DAG. AWS's documentation on this is pretty light and I've yet to stumble across any community discussion for the same.
From AWS's docs, we'd expect the plugin to run at startup prior to any DAGs being processed. The plugin itself appears to effectively rewrite the venv command to use the pip-installed version, rather than that which is installed on the machine, however I've struggled to verify that things are happening in the order I expect. Any pointers on debugging the instance's behavior would be very much appreciated.
Has anyone faced a similar issue? Is there a gap in the MWAA documentation that needs addressing? Am I missing something incredibly obvious?
Possibly related, but I do see this warning in the scheduler's logs, which may indicate why MWAA is struggling to resolve the dependency?
WARNING: The script virtualenv is installed in '/usr/local/airflow/.local/bin' which is not on PATH.
Solution
Airflow uses shutil.which to look for virtualenv. The installed virtualenv via requirements.txt isn't on the PATH. Adding the path to virtualenv to PATH solves this. The doc here is wrong https://docs.aws.amazon.com/mwaa/latest/userguide/samples-virtualenv.html
import os
from airflow.plugins_manager import AirflowPlugin
import airflow.utils.python_virtualenv
from typing import List
def _generate_virtualenv_cmd(tmp_dir: str, python_bin: str, system_site_packages: bool) -> List[str]:
cmd = ['python3','/usr/local/airflow/.local/lib/python3.7/site-packages/virtualenv', tmp_dir]
if system_site_packages:
cmd.append('--system-site-packages')
if python_bin is not None:
cmd.append(f'--python={python_bin}')
return cmd
airflow.utils.python_virtualenv._generate_virtualenv_cmd=_generate_virtualenv_cmd
#This is the added path code
os.environ["PATH"] = f"/usr/local/airflow/.local/bin:{os.environ['PATH']}"
class VirtualPythonPlugin(AirflowPlugin):
name = 'virtual_python_plugin'
Answered By - PeterRing Answer Checked By - Mildred Charles (WPSolving Admin)