Issue
from airflow.operators.python import get_current_context
context = get_current_context()
ti = context['ti']
ti.xcom_push(key="file", value = doc )
I have the above code in a task and doc is the data that I want to pass to xcom. Its throwing the following error stack trace :
Traceback (most recent call last):
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/decorators/base.py", line 217, in execute
return_value = super().execute(context)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 175, in execute
return_value = self.execute_callable()
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/operators/python.py", line 192, in execute_callable
return self.python_callable(*self.op_args, **self.op_kwargs)
File "/opt/bitnami/airflow/dags/rover_ocr_pipeline.py", line 65, in retrieve
ti.xcom_push(key="file", value = doc )
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 75, in wrapper
return func(*args, session=session, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/taskinstance.py", line 2294, in xcom_push
XCom.set(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/session.py", line 72, in wrapper
return func(*args, **kwargs)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 234, in set
value = cls.serialize_value(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/models/xcom.py", line 627, in serialize_value
return json.dumps(value, cls=XComEncoder).encode("UTF-8")
File "/opt/bitnami/python/lib/python3.9/json/__init__.py", line 234, in dumps
return cls(
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 176, in encode
return super().encode(o)
File "/opt/bitnami/python/lib/python3.9/json/encoder.py", line 199, in encode
chunks = self.iterencode(o, _one_shot=True)
File "/opt/bitnami/python/lib/python3.9/json/encoder.py", line 257, in iterencode
return _iterencode(o, 0)
File "/opt/bitnami/airflow/venv/lib/python3.9/site-packages/airflow/utils/json.py", line 153, in default
CLASSNAME: o.__module__ + "." + o.__class__.__qualname__,
AttributeError: 'bytes' object has no attribute '__module__'
This was working till now, I am guessing its an issue with airflow version. Previously I was using 2.3.4 , now using 2.5.0.
Airflow is running on kubernetes cluster and using airflow:2.5.0-debian-11-r11 image.
Solution
Moving from comments to an actual answer, see above comments for full conversation
XCOM tries to convert everything to a string before storing in the XCOM tables. In this case since bytes is a class, it was trying to serialize it which isn't possible. Converting the bytes to a normal string by base64 encoding the bytes allowed for it to be stored in xcom.
While probably not worth the effort for just this case, this could be handled automatically by creating a custom xcom backend that accurately detects when dealing with byte strings and performs the conversion behind the scenes.
Answered By - Tevett Goad Answer Checked By - Senaida (WPSolving Volunteer)