Skip to content

Connection not available in triggerer #57145

@vatsrahul1001

Description

@vatsrahul1001

Apache Airflow version

3.1.1rc1

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

I’m getting a “connection not found” error with the async DAG in 3.1.1rc1 when using a UI-created connection, though it works fine with deferrable=False or when the connection is set via environment variables

Error logs

[2025-10-23 14:49:13] ERROR - Trigger example_async_databricks/manual__2025-10-23T09:19:01+00:00/submit_run/-1/1 (ID 1) exited with error The conn_id `databricks_default` isn't defined loc=triggerer_job_runner.py:1001 AirflowNotFoundException: The conn_id `databricks_default` isn't defined File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 992 in cleanup_finished_triggers File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 116 in greenback_shim File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 201 in _greenback_shim File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 81 in trampoline File "/usr/python/lib/python3.10/site-packages/outcome/_impl.py", line 185 in send File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 1106 in run_trigger File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py", line 90 in run File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py", line 524 in a_get_run_state File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 713 in _a_do_api_call File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 623 in _endpoint_url File "/usr/python/lib/python3.10/functools.py", line 981 in __get__ File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 142 in databricks_conn File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61 in get_connection File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 226 in get File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 172 in _get_connection [2025-10-23 14:49:13] ERROR - Trigger exited without sending an event. Dependent tasks will be failed. name=example_async_databricks/manual__2025-10-23T09:19:01+00:00/submit_run/-1/1 (ID 1) loc=triggerer_job_runner.py:1016 [2025-10-23 14:49:14] INFO - DAG bundles loaded: dags-folder source=airflow.dag_processing.bundles.manager.DagBundlesManager loc=manager.py:179 [2025-10-23 14:49:14] INFO - Filling up the DagBag from /files/dags/example_databricks.py source=airflow.models.dagbag.DagBag loc=dagbag.py:593 [2025-10-23 14:49:14] WARNING - The `airflow.utils.timezone.datetime` attribute is deprecated. Please use `'airflow.sdk.timezone.datetime'`. category=DeprecatedImportWarning source=py.warnings loc=/files/dags/example_databricks.py:7 [2025-10-23 14:49:14] ERROR - Trigger failed: Traceback (most recent call last): File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 992, in cleanup_finished_triggers result = details["task"].result() File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 116, in greenback_shim return await _greenback_shim(orig_coro, next_send) # type: ignore File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 201, in _greenback_shim next_yield, resume_greenlet = resume_greenlet.switch(next_send) File "/usr/python/lib/python3.10/site-packages/greenback/_impl.py", line 81, in trampoline next_yield: Any = next_send.send(orig_coro) # type: ignore File "/usr/python/lib/python3.10/site-packages/outcome/_impl.py", line 185, in send return gen.send(self.value) File "/opt/airflow/airflow-core/src/airflow/jobs/triggerer_job_runner.py", line 1106, in run_trigger async for event in trigger.run(): File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/triggers/databricks.py", line 90, in run run_state = await self.hook.a_get_run_state(self.run_id) File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks.py", line 524, in a_get_run_state response = await self._a_do_api_call(GET_RUN_ENDPOINT, json) File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 713, in _a_do_api_call url = self._endpoint_url(full_endpoint) File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 623, in _endpoint_url port = f":{self.databricks_conn.port}" if self.databricks_conn.port else "" File "/usr/python/lib/python3.10/functools.py", line 981, in __get__ val = self.func(instance) File "/opt/airflow/providers/databricks/src/airflow/providers/databricks/hooks/databricks_base.py", line 142, in databricks_conn return self.get_connection(self.databricks_conn_id) # type: ignore[return-value] File "/opt/airflow/task-sdk/src/airflow/sdk/bases/hook.py", line 61, in get_connection conn = Connection.get(conn_id) File "/opt/airflow/task-sdk/src/airflow/sdk/definitions/connection.py", line 226, in get return _get_connection(conn_id) File "/opt/airflow/task-sdk/src/airflow/sdk/execution_time/context.py", line 172, in _get_connection raise AirflowNotFoundException(f"The conn_id `{conn_id}` isn't defined") airflow.exceptions.AirflowNotFoundException: The conn_id `databricks_default` isn't defined 

What you think should happen instead?

No response

How to reproduce

  1. Create databricks_default connection from UI.
  2. Execute below DAG
import json import os from datetime import timedelta from typing import Dict, Optional from airflow.models.dag import DAG from airflow.utils.timezone import datetime from airflow.providers.databricks.operators.databricks import DatabricksSubmitRunOperator, DatabricksRunNowOperator DATABRICKS_CONN_ID = os.getenv("ASTRO_DATABRICKS_CONN_ID", "databricks_default") # Notebook path as a Json object notebook_task = '{"notebook_path": "/Shared/Notebook_1"}' NOTEBOOK_TASK = json.loads(os.getenv("DATABRICKS_NOTEBOOK_TASK", notebook_task)) notebook_params: Optional[Dict[str, str]] = {"Variable": "5"} EXECUTION_TIMEOUT = int(os.getenv("EXECUTION_TIMEOUT", 6)) default_args = { "execution_timeout": timedelta(hours=EXECUTION_TIMEOUT), "retries": int(os.getenv("DEFAULT_TASK_RETRIES", 2)), "retry_delay": timedelta(seconds=int(os.getenv("DEFAULT_RETRY_DELAY_SECONDS", 60))), } new_cluster = { "num_workers": 1, "spark_version": "10.4.x-scala2.12", "spark_conf": {}, "azure_attributes": { "availability": "ON_DEMAND_AZURE", "spot_bid_max_price": -1, }, "node_type_id": "Standard_D3_v2", "ssh_public_keys": [], "custom_tags": {}, "spark_env_vars": {"PYSPARK_PYTHON": "/databricks/python3/bin/python3"}, "cluster_source": "JOB", "init_scripts": [], } with DAG( dag_id="example_async_databricks", start_date=datetime(2022, 1, 1), schedule=None, catchup=False, default_args=default_args, tags=["example", "async", "databricks"], ) as dag: # [START howto_operator_databricks_submit_run_async] opr_submit_run = DatabricksSubmitRunOperator( task_id="submit_run", databricks_conn_id=DATABRICKS_CONN_ID, new_cluster=new_cluster, notebook_task=NOTEBOOK_TASK, do_xcom_push=True, deferrable=True, ) # [END howto_operator_databricks_submit_run_async] # [START howto_operator_databricks_run_now_async] opr_run_now = DatabricksRunNowOperator( task_id="run_now", databricks_conn_id=DATABRICKS_CONN_ID, job_id="{{ task_instance.xcom_pull(task_ids='submit_run', dag_id='example_async_databricks', key='job_id') }}", notebook_params=notebook_params, deferrable=True ) # [END howto_operator_databricks_run_now_async] opr_submit_run >> opr_run_now 

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions