Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions airflow-core/src/airflow/config_templates/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2779,6 +2779,15 @@ dag_processor:
example: "/tmp/some-place"
default: ~

dag_bundle_config_file:
description: |
String path to json file with Dag bundle configuration.
For more details see ``[dag_processor] dag_bundle_config_list``.
version_added: ~
type: string
example: "/opt/airflow/config/dag_bundles.json"
default: ~

dag_bundle_config_list:
description: |
List of backend configs. Must supply name, classpath, and kwargs for each backend.
Expand Down
10 changes: 9 additions & 1 deletion airflow-core/src/airflow/dag_processing/bundles/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
# under the License.
from __future__ import annotations

import json
import os
import warnings
from typing import TYPE_CHECKING

Expand Down Expand Up @@ -180,7 +182,13 @@ def parse_config(self) -> None:
if self._bundle_config:
return

config_list = conf.getjson("dag_processor", "dag_bundle_config_list")
bundle_config_file = conf.get("dag_processor", "dag_bundle_config_file", fallback=None)
if bundle_config_file and os.path.exists(bundle_config_file):
with open(bundle_config_file) as f:
config_list = json.load(f)
self.log.info("Loading bundle config from %s", bundle_config_file)
else:
config_list = conf.getjson("dag_processor", "dag_bundle_config_list")
if not config_list:
return
if not isinstance(config_list, list):
Expand Down
26 changes: 26 additions & 0 deletions airflow-core/tests/unit/dag_processing/test_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from sqlalchemy import func, select
from uuid6 import uuid7

from airflow._shared.module_loading import import_string
from airflow._shared.timezones import timezone
from airflow.callbacks.callback_requests import DagCallbackRequest
from airflow.dag_processing.bundles.base import BaseDagBundle
Expand Down Expand Up @@ -1572,6 +1573,31 @@ def test_dag_with_assets(self, session, configure_testing_dag_bundle):
TaskOutletAssetReference(asset_id=mock.ANY, dag_id="dag_with_skip_task", task_id="skip_task")
]

def test_loading_bundle_config_from_file(self, tmp_path):
config = [
{
"name": "bundleone",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 0},
},
{
"name": "bundletwo",
"classpath": "airflow.dag_processing.bundles.local.LocalDagBundle",
"kwargs": {"path": "/dev/null", "refresh_interval": 300},
},
]
config_file = tmp_path / "bundle_config.json"

with open(config_file, "w") as f:
json.dump(config, f)

with conf_vars({("dag_processor", "dag_bundle_config_file"): str(config_file)}):
bm = DagBundlesManager()
for bundle in config:
assert bundle["name"] in bm._bundle_config
assert import_string(bundle["classpath"]) == bm._bundle_config[bundle["name"]].bundle_class
assert bundle["kwargs"] == bm._bundle_config[bundle["name"]].kwargs

def test_bundles_are_refreshed(self):
"""
Ensure bundles are refreshed by the manager, when necessary.
Expand Down
Loading