공기 흐름에서 동적 작업 종속성의 힘을 잠금 해제합니다
Apache Airflow는 강력한 워크 플로우 자동화 도구이지만 동적 종속성을 처리하면 때때로 퍼즐을 해결하는 것처럼 느낄 수 있습니다. DAG (Directed Acyclic Graph)를 설계 할 때 하드 코딩 작업 시퀀스는 간단한 사용 사례에서 작동 할 수 있지만 런타임에 구조를 결정 해야하는 경우 어떻게해야합니까? 🤔
실행될 작업이 들어오는 데이터에 의존하는 데이터 파이프 라인을 연구하고 있다고 상상해보십시오. 예를 들어, 일일 구성을 기반으로 다른 파일 세트를 처리하거나 비즈니스 규칙에 따라 변수 변환을 실행합니다. 이러한 경우 정적 DAG는이를 잘라 내지 않습니다. 종속성을 동적으로 정의하는 방법이 필요합니다.
이것은 정확히 공기 흐름이있는 곳입니다 dag_run.conf 게임 체인저가 될 수 있습니다. DAG를 트리거 할 때 구성 사전을 전달하면 작업 시퀀스를 동적으로 생성 할 수 있습니다. 그러나이를 구조화 된 방식으로 구현하려면 공기 흐름의 실행 모델에 대한 깊은 이해가 필요합니다.
이 기사에서는 런타임을 사용하여 작업 종속성이 결정되는 동적 DAG를 구축하는 방법을 살펴 보겠습니다. dag_run.conf. 이를 달성하기 위해 고군분투하고 명확한 해결책을 찾지 못했다면 걱정하지 마십시오. 혼자가 아닙니다! 실제 사례와 함께 단계별로 분류합시다. 🚀
명령 | 사용의 예 |
---|---|
dag_run.conf | DAG 실행을 트리거 할 때 동적 구성 값을 검색 할 수 있습니다. 런타임 매개 변수를 전달하는 데 필수적입니다. |
PythonOperator | 공기 흐름에서 파이썬 함수를 실행하는 작업을 정의하여 DAG 내부에서 유연한 실행 로직을 허용합니다. |
set_upstream() | 작업 간의 종속성을 명시 적으로 정의하여 한 작업이 완료된 후에 만 실행되도록합니다. |
@dag | 작업 플로우 API가 제공하는 데코레이터는 DAGS를보다 시력적이고 구조화 된 방식으로 정의합니다. |
@task | 작업 플로우 API를 사용하여 공기 흐름에서 작업을 정의하여 작업 생성 및 데이터 전달을 단순화합니다. |
override(task_id=...) | 단일 함수에서 여러 작업을 인스턴스화 할 때 작업의 ID를 동적으로 수정하는 데 사용됩니다. |
extract_elements(dag_run=None) | DAG_RUN.CONF DICTIONARY에서 값을 추출하여 작업 실행을 동적으로 구성하는 함수. |
schedule_interval=None | 고정 일정에서 실행하는 대신 DAG가 수동으로 트리거 될 때만 실행되도록합니다. |
op_args=[element] | 동적 인수를 Pythonoperator 작업에 전달하여 작업 인스턴스마다 다른 실행을 가능하게합니다. |
catchup=False | 일시 정지 후 시작될 때 공기 흐름이 모두 누락 된 DAG 실행을 실행하는 것을 방지하여 실시간 구성에 유용합니다. |
공기 흐름에서 런타임 구성으로 동적 DAG를 구축합니다
Apache Airflow는 복잡한 워크 플로우를 조정하기위한 강력한 도구이지만 진정한 강점은 유연성에 있습니다. 앞서 제시된 스크립트는 a를 만드는 방법을 보여줍니다 동적 DAG 작업 종속성이 런타임을 사용하여 결정되는 경우 dag_run.conf. 처리 할 요소 목록을 하드 코딩하는 대신 DAG는 트리거시 동적으로 검색하여 더 적응할 수있는 워크 플로를 허용합니다. 이는 변수 데이터 세트 처리 또는 외부 조건에 따라 특정 작업을 실행하는 것과 같은 실제 시나리오에서 특히 유용합니다. 처리 할 파일이 매일 변경되는 ETL 파이프 라인을 상상해보십시오.이 접근 방식으로 자동화가 훨씬 쉬워집니다. 🚀
첫 번째 스크립트는 Pythonoperator 작업을 실행하고 종속성을 동적으로 설정합니다. 요소 목록을 추출합니다 dag_run.conf, 작업이 필요할 때만 생성되도록합니다. 목록의 각 요소는 고유 한 작업이되고 종속성은 순차적으로 설정됩니다. 두 번째 접근법은 작업 플로우 API, 이것은 장식 자와 같은 Dag Creation을 단순화합니다 @가리비 그리고 @일. 이 방법은 DAG를 더 읽기 쉽게 만들고 더 깨끗한 실행 로직을 유지합니다. 이러한 접근 방식은 워크 플로가 코드 변경없이 다른 구성에 적응할 수 있도록합니다.
예를 들어, 전자 상거래 회사가 배치로 주문을 처리하는 시나리오를 고려하십시오. 언젠가는 다른 작업 시퀀스가 필요한 다른 날보다 시급한 주문이있을 수 있습니다. 정적 DAG를 사용하면 우선 순위가 변경 될 때마다 코드를 수정하는 것을 의미합니다. 동적 DAG 접근 방식을 사용하면 외부 시스템이 특정 작업 시퀀스로 DAG를 트리거하여 프로세스를보다 효율적으로 만듭니다. 또 다른 사용 사례는 데이터 과학에 있으며, 여기서 모델은 들어오는 데이터 배포에 따라 재교육이 필요할 수 있습니다. 필요한 모델 구성을 동적으로 전달하면 필요한 계산 만 실행하여 시간과 리소스를 절약합니다. 🎯
요약하면,이 스크립트는 런타임 입력을 기반으로 동적으로 DAG를 생성하기위한 토대를 제공합니다. 활용하여 공기 흐름의 작업 흐름 API 또는 기존의 Pythonoperator 접근 방식으로 개발자는 유연하고 모듈 식이며 효율적인 워크 플로를 만들 수 있습니다. 이를 통해 수동 개입의 필요성을 제거하고 다른 자동화 시스템과 완벽하게 통합 할 수 있습니다. 고객 주문 처리, 데이터 파이프 라인 관리 또는 클라우드 워크 플로우 오케스트레이션에 관계없이 Dynamic DAG는 특정 비즈니스 요구에 맞는 더 현명한 자동화를 가능하게합니다.
런타임 구성으로 공기 흐름에서 동적 작업 시퀀싱 구현
Apache Airflow를 사용한 Python 기반 백엔드 자동화
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.models import DagRun
import json
# Define default args
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': days_ago(1),
}
# Function to process each element
def process_element(element, kwargs):
print(f"Processing element: {element}")
# Define DAG
dag = DAG(
'dynamic_task_dag',
default_args=default_args,
schedule_interval=None,
)
# Extract elements from dag_run.conf
def generate_tasks(kwargs):
conf = kwargs.get('dag_run').conf or {}
elements = conf.get('elements', [])
task_list = []
for i, group in enumerate(elements):
for j, element in enumerate(group):
task_id = f"process_element_{i}_{j}"
task = PythonOperator(
task_id=task_id,
python_callable=process_element,
op_args=[element],
dag=dag,
)
task_list.append(task)
return task_list
# Generate dynamic tasks
tasks = generate_tasks()
# Define dependencies dynamically
for i in range(len(tasks) - 1):
tasks[i + 1].set_upstream(tasks[i])
대체 접근 방식 : 더 나은 가독성을 위해 작업 플로우 API를 사용합니다
공기 흐름의 작업 흐름 API를 사용한 최신 파이썬 접근
from airflow.decorators import dag, task
from datetime import datetime
# Define DAG
@dag(schedule_interval=None, start_date=datetime(2025, 1, 28), catchup=False)
def dynamic_taskflow_dag():
@task
def process_element(element: str):
print(f"Processing {element}")
@task
def extract_elements(dag_run=None):
conf = dag_run.conf or {}
return conf.get('elements', [])
elements = extract_elements()
task_groups = [[process_element(element) for element in group] for group in elements]
# Define dependencies dynamically
for i in range(len(task_groups) - 1):
for upstream_task in task_groups[i]:
for downstream_task in task_groups[i + 1]:
downstream_task.set_upstream(upstream_task)
dynamic_taskflow_dag()
공기 흐름에서 조건부 실행으로 동적 작업 시퀀싱 향상
강력하지만 종종 간과되는 기능 중 하나입니다 아파치 공기 흐름 동적 작업 시퀀싱의 유연성을 더욱 향상시킬 수있는 조건부 실행입니다. 작업 종속성을 검색하는 동안 dag_run.conf 유용합니다. 실제 시나리오는 종종 특정 조건에 따라 특정 작업 만 실행해야합니다. 예를 들어, 일부 데이터 세트는 분석 전에 전처리가 필요할 수 있지만 다른 데이터 세트는 직접 처리 할 수 있습니다.
공기 흐름에서의 조건부 실행을 사용하여 구현할 수 있습니다 BranchPythonOperator사전 정의 된 논리를 기반으로 실행할 다음 작업을 결정합니다. 파일을 처리하는 동적 DAG가 있지만 특정 크기 이상의 파일 만 유효성 검사가 필요하다고 가정합니다. 모든 작업을 순차적으로 실행하는 대신 실행할 작업을 동적으로 결정하고 실행 시간을 최적화하며 리소스 사용량을 줄일 수 있습니다. 이 접근법은 관련 워크 플로 만 트리거되어 데이터 파이프 라인을보다 효율적으로 만들 수 있도록합니다. 🚀
동적 DAG를 향상시키는 또 다른 방법은 통합하는 것입니다 XComs (교차 통신 메시지). XCOM을 사용하면 작업이 데이터를 교환 할 수 있습니다. 즉, 동적으로 생성 된 작업 시퀀스는 단계간에 정보를 전달할 수 있습니다. 예를 들어, ETL 파이프 라인에서 전처리 작업은 필요한 변환을 결정하고 해당 세부 정보를 후속 작업에 전달할 수 있습니다. 이 방법을 사용하면 실시간 입력을 기반으로 실행 흐름이 적응하여 자동화 기능이 크게 증가하는 진정한 데이터 중심 워크 플로우를 가능하게합니다.
공기 흐름에서 동적 작업 시퀀싱에 대한 일반적인 질문
- 무엇인가요 dag_run.conf 사용?
- DAG를 트리거 할 때 런타임에 구성 매개 변수를 전달하여 워크 플로를보다 유연하게 만듭니다.
- 공기 흐름에서 작업을 동적으로 생성하려면 어떻게해야합니까?
- 루프를 사용하여 여러 인스턴스를 인스턴스화 할 수 있습니다. PythonOperator 또는 사용하십시오 @task 작업 플로우 API의 데코레이터.
- 사용의 장점은 무엇입니까? BranchPythonOperator?
- 조건부 실행을 가능하게하여 DAG가 사전 정의 된 논리에 따라 다른 경로를 따라 효율성을 향상시킬 수 있습니다.
- 어떻게합니까 XComs 동적 DAG를 향상시겠습니까?
- XCOM은 작업이 데이터를 공유 할 수 있도록하여 후속 작업이 이전 단계에서 관련 정보를 받도록합니다.
- 종속성을 동적으로 설정할 수 있습니까?
- 예, 사용할 수 있습니다 set_upstream() 그리고 set_downstream() DAG 내에서 의존성을 동적으로 정의하는 방법.
런타임 구성으로 동적 워크 플로우 최적화
구현 동적 작업 시퀀싱 공기 흐름에서 워크 플로 자동화를 크게 향상시켜 변화하는 요구 사항에 적응할 수 있습니다. 런타임 구성을 활용하여 개발자는 정적 DAG 정의를 피하고 유연한 데이터 중심 파이프 라인을 만들 수 있습니다. 이 접근법은 특히 재무보고 또는 기계 학습 모델 교육과 같은 실시간 입력을 기반으로 작업을 정의 해야하는 환경에서 특히 가치가 있습니다. 🎯
통합하여 dag_run.conf, 조건부 실행 및 종속성 관리, 팀은 확장 가능하고 효율적인 워크 플로를 구축 할 수 있습니다. 전자 상거래 트랜잭션 처리, 클라우드 기반 데이터 변환 관리 또는 복잡한 배치 작업을 조정하든 Airflow의 동적 DAG 기능은 최적화되고 자동화 된 솔루션을 제공합니다. 이러한 기술에 투자하면 비즈니스는 운영을 간소화하면서 수동 개입을 줄일 수 있습니다.
공기 흐름에서 동적 작업 시퀀싱에 대한 소스 및 참조
- Apache Airflow Documentation- DAG 구성 및 런타임 매개 변수에 대한 자세한 통찰력 : Apache Airflow 공식 문서
- 역동적 인 Dag Creation에 관한 중간 기사 - 사용에 대한 가이드 dag_run.conf 동적 작업 시퀀싱의 경우 : 중간 : 공기 흐름의 동적 DAG
- 스택 오버 플로우 토론 - 입력 구성을 기반으로 동적으로 DAG를 생성하기위한 커뮤니티 솔루션 : 오버플로 스레드 스택
- 데이터 엔지니어링 블로그 - 확장 가능한 공기 흐름 워크 플로 설계를위한 모범 사례 : 데이터 엔지니어링 블로그