-
airflow-DAG하나씩 쌓아보자 2022. 5. 26. 11:27
DAG는 airflow 작업들 사이에 관계와 순서 표현 구조입니다.
각 노드는 작업을 방향성을 갖는 전후/병렬 관계의 링크순서대로 실행합니다.
방향성 비순환 그래프의 요점은 작업 시퀀스를 두어 무한히 반복되는걸 허용하지 않는 것입니다.
예를 들어 아래와 같은 DAG를 구성하면 ingest부터 마지막 report까지 순차적으로 실행합니다.
출처 곰탱푸닷컴 bearpooh.com/123 DAG 3가지 방식으로 선언합니다.
1. context안에서 생성과 동시에 실행을 하는 방식
with DAG( "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False ) as dag: op = EmptyOperator(task_id="task")
2. 생성자로 선언하여 정의(실행)하는 방식(표준 생성자)
my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False) op = EmptyOperator(task_id="task", dag=my_dag)
3. @dag 데코레이터를 활용하여 실행하는 방식
@dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"), schedule_interval="@daily", catchup=False) def generate_dag(): op = EmptyOperator(task_id="task") dag = generate_dag()
DAG
- Upstream DAG: A DAG that must reach a specified state before a downstream DAG can run
- Downstream DAG: A DAG that cannot run until an upstream DAG reaches a specified state
upstream dag 는 downstream dag가 실행되기전 완료된 앞선의 작업입니다.
두개의 개념을 설명하는 이유는 두가지로 설정되 두개의 dag가 존재함다면 서로 연관있지만 다른 스케줄이라는점을 설명하고 날짜도 다르게 지정할 수 있습니다.
DAG 구성
해당코드는 기본적인 bashoperator를 작성한 코드입니다.
from datetime import datetime, timedelta from textwrap import dedent from airflow import DAG from airflow.operators.bash import BashOperator with DAG( dag_id='log_test_tg', default_args={ 'depends_on_past':False, 'email':['woodywork1505@gmail.com'], 'email_on_failure': False, 'retries':1, 'retry_delay':timedelta(minutes=5), }, description='A simple DAG', schedule_interval="@once", start_date=datetime(2022, 5, 23), catchup=False, tags=['gt'], ) as dag: t1 = BashOperator( task_id='test1_print_date', bash_command='date', ) t2 = BashOperator( task_id='test2_sleep', depends_on_past=False, bash_command='sleep 5', retries=3, ) t1.doc_md = dedent( ''' sample test ''' ) dag.doc_md = __doc__ dag.doc_md = """ this is a doc placed """ templated_command = 'echo "hello"' t3 = BashOperator( task_id='templated', depends_on_past=False, bash_command=templated_command, ) t4 = BashOperator( task_id='test4_hello', bash_command='echo "bye"' ) t1 >> [t2, t3] t3 >> t4
DAG를 실행하기
DAG를 실행하는 방법은 두가지입니다.
- api에 의해 트리거 될 때 실행됩니다.
- 정해진 일정이 되면 schedule_interval 인자에 crontab스케줄값 처럼 셋팅하면 실행됩니다.
DAG구성요소
DAG의 구성요소는 몇가지 없습니다. args값을 다양하게 넣어 다양한 케이스를 만들 수 있습니다.
dag_id (str)
- 화면에서 출력될 DAG id(이름)
default_args (dict)
- 모든 작업 생성자에 arguments를 명시적으로 전달 할 수 있음
description (str)
- DAG에 대한 설명
schedule_interval (datetime.timedelta or str that acts as a cron expression)
- 스케줄링 횟수 날짜 시간을 지정하는 항목
- timedelta 객체가 execution_date에 추가
catchup
- 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는 것을 catchup이라고 합니다.
- 과거의 작업은 중요하지 않고, 현재 DAG만 실행되어야 하는 경우에 설정됩니다.
1) DAG 생성시 지정 DAG('tutorial', catchup=False, default_args=default_args) 2) airflow.cfg 설정 변경 catchup_by_default = False
catchup을 true로 설정을 한다면, 시작날짜부터 하루 단위로 DAG가 순서대로 실행 될 것이다.
(.....)
start_date(datetime.datetime)
- 실제 실행 날짜
- backfill을 위해 실행 날짜 자체를 queue에 기록(저장)
max_active_run = (int)
- DAG level에서 최대 실행 개수
tags (option: list[str])
- 화면상에 보여지는 DAG를 나누는 그룹
실제 속성값
자세하게 속성값들을 설명하겠습니다. (여기부터는 위의 코드를 세분화 설명의 주석을 달아 두었습니다)
with DAG( dag_id='log_test_tg', #dag_id 실제 시각화 된 웹에 보여지는 이름 description='A simple DAG', #설명 schedule_interval="@once", #시간을 줄때 사용 # schedule_interval='0 10, 16 * * *', #10시와 16시 하루 두번 실행 start_date=datetime(2022, 5, 23), #언제부터 dag 시작되었는가 catchup=False, tags=['gt'], #dag 카테고리 분류 기능
schedule_interval=" * * * *" 의 형식은 crontab schedule의 속성값과 같은 스케줄링으로 같은 포멧으로 입력해주면 됩니다.
ex)
45 22 * * * -> 22시 45분 실행
0,10 17 * * 0,2,3 -> 매주 일(0), 화(2), 수(3) 17시 00분과 17시 10분에 실행
10 17 1,15 * * -> 매달 1일과 15일 17시 10분에 실행
args
DAG안에 모든 오퍼레이터가 공통적으로 가져야할 인자들을 default_args에 담겨서 전달됩니다.
물론 operator를 선언할 때 변경 가능합니다.
default_args={ 'depends_on_past': False, 'email': ['airflow@example.com'], 'email_on_failure': False, 'email_on_retry': False, 'retries': 1, 'retry_delay': timedelta(minutes=5), # 'queue': 'bash_queue', # 'pool': 'backfill', # 'priority_weight': 10, # 'end_date': datetime(2016, 1, 1), # 'wait_for_downstream': False, # 'sla': timedelta(hours=2), # 'execution_timeout': timedelta(seconds=300), # 'on_failure_callback': some_function, # 'on_success_callback': some_other_function, # 'on_retry_callback': another_function, # 'sla_miss_callback': yet_another_function, # 'trigger_rule': 'all_success' }
owner (str)
- 해당 task의 소유자
deoenbds_on_past(bool)
- true 경우 task가 순차적으로 실행(이전 task 성공이거나 건너뛴 경우에만 실행)
email(str or list[str])
- 알림 받는 메일 주소, 여러 메일을 등록 할 수 있고 ,(comma) 나 ;(semi-colon)으로 분리
email_on_failure(bool)
- task 실패시 이메일 수신 여부
email_on_retry(bool)
- task 재시도 메일 수신 여부
retries(int)
- task가 실패할 경우 재시도 횟수
retry_delay(datetime.timedelta)
- 재시도 사이 딜레이 시간
첫번째 보이는 항목은 제어 흐름(control flow) 입니다.
테스크는 의존하는 모든 테스크들이 성공했을 때만 실행됩니다.
그러나 기본 동작을 변경 할 수 있는 방법들이 존재합니다.
- Branching
- Latest Only
- Depends On Past
- Trigger Rules
Branching
선행 작업의 결과에 따라 이어나갈 작업이 달라야 하는 겨우 Branch 분기로 나누어 줄 수 있습니다.
상황
1) 데이터 입력 후 검증
- 데이터에 이상 징후가 포착될 경우, 추가 전처리 작업 실행
- 아닐 경우 전처리 스킵
2) 모델 예측 후 적용
- 모델 예측 결과 기준치 이하 , 초과 결과에 따라 다른 작업 실행
첫번째 task가 완료되면 적절한 분기를 나누어 실행합니다.
jinja template 설명
https://velog.io/@jaytiger/Airflow-02.-DAG-%EA%B5%AC%EC%84%B1%ED%95%98%EA%B8%B0-2
잠깐! task의 종류와 사용법을 체크하고 가겠습니다.
DAG 테스크는 operator, sensors, taskflow같은 형태가 있습니다.
1. sensors
시간, 파일, 외부 이벤트를 기다리며 조건 이후의 작업을 진행 할 수 있게 해주는 airflow기능 입니다.
operator 처럼 하나의 task로 만들 수 있으며 filesystem, hdfs, hive... 의 형식으로 제공합니다.
1) FileSensor
2) decorator
버킷플레이스 Airflow 도입기 - 오늘의집 블로그
탁월한 데이터플랫폼을 위한 Airflow 도입기
www.bucketplace.co.kr
오늘의집 airflow 도입기.
'하나씩 쌓아보자' 카테고리의 다른 글
Redis 레디스 (0) 2022.05.24 Terraform -1 기본 설명 및 설치/ 단일 서버 배포 (0) 2022.05.23