ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • airflow-DAG
    하나씩 쌓아보자 2022. 5. 26. 11:27

    DAG는 airflow 작업들 사이에 관계와 순서 표현 구조입니다.

    각 노드는 작업을 방향성을 갖는 전후/병렬 관계의 링크순서대로 실행합니다.

    방향성 비순환 그래프의 요점은 작업 시퀀스를 두어 무한히 반복되는걸 허용하지 않는 것입니다.

     

    예를 들어 아래와 같은 DAG를 구성하면 ingest부터 마지막 report까지 순차적으로 실행합니다.

    출처 곰탱푸닷컴 bearpooh.com/123

    DAG 3가지 방식으로 선언합니다.

    1. context안에서 생성과 동시에 실행을 하는 방식

    with DAG(
        "my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
        schedule_interval="@daily", catchup=False
    ) as dag:
        op = EmptyOperator(task_id="task")

     

     

    2. 생성자로 선언하여 정의(실행)하는 방식(표준 생성자)

    my_dag = DAG("my_dag_name", start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
                 schedule_interval="@daily", catchup=False)
    op = EmptyOperator(task_id="task", dag=my_dag)

     

     

    3. @dag 데코레이터를 활용하여 실행하는 방식

    @dag(start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
         schedule_interval="@daily", catchup=False)
    def generate_dag():
        op = EmptyOperator(task_id="task")
    
    dag = generate_dag()

     

    DAG

    • Upstream DAG: A DAG that must reach a specified state before a downstream DAG can run
    • Downstream DAG: A DAG that cannot run until an upstream DAG reaches a specified state

    upstream dag 는 downstream dag가 실행되기전 완료된 앞선의 작업입니다.

    두개의 개념을 설명하는 이유는 두가지로 설정되 두개의 dag가 존재함다면 서로 연관있지만 다른 스케줄이라는점을 설명하고 날짜도 다르게 지정할 수 있습니다.

     

     

    DAG 구성

    해당코드는 기본적인 bashoperator를 작성한 코드입니다.

    from datetime import datetime, timedelta
    from textwrap import dedent
    from airflow import DAG
    from airflow.operators.bash import BashOperator
    
    
    with DAG(
        dag_id='log_test_tg',
        
        default_args={
            'depends_on_past':False,
            'email':['woodywork1505@gmail.com'],
            'email_on_failure': False,
            'retries':1,
            'retry_delay':timedelta(minutes=5),
        },
        
        description='A simple DAG',
        schedule_interval="@once",
        start_date=datetime(2022, 5, 23),
        catchup=False,
        tags=['gt'],
    
    ) as dag:
    
        t1 = BashOperator(
            task_id='test1_print_date',
            bash_command='date',
        )
    
        t2 = BashOperator(
            task_id='test2_sleep',
            depends_on_past=False,
            bash_command='sleep 5',
            retries=3,
    
        )
        t1.doc_md = dedent(
            '''
            sample test
            '''
        )
    
        dag.doc_md = __doc__
        dag.doc_md = """
        this is a doc placed
        """
    
        templated_command = 'echo "hello"'    
    
        t3 = BashOperator(
            task_id='templated',
            depends_on_past=False,
            bash_command=templated_command,
        )
    
        t4 = BashOperator(
            task_id='test4_hello',
            bash_command='echo "bye"'
        )
    
        t1 >> [t2, t3]
        t3 >> t4

     

    DAG를 실행하기

    DAG를 실행하는 방법은 두가지입니다.

    - api에 의해 트리거 될 때 실행됩니다.

    - 정해진 일정이 되면 schedule_interval 인자에 crontab스케줄값 처럼 셋팅하면 실행됩니다.

     

    DAG구성요소

    DAG의 구성요소는 몇가지 없습니다. args값을 다양하게 넣어 다양한 케이스를 만들 수 있습니다.

     

    dag_id (str)

    - 화면에서 출력될 DAG id(이름)

     

    default_args (dict)

    - 모든 작업 생성자에 arguments를 명시적으로 전달 할 수 있음

     

    description (str)

    - DAG에 대한 설명

     

    schedule_interval (datetime.timedelta or str that acts as a cron expression)

    - 스케줄링 횟수 날짜 시간을 지정하는 항목

    - timedelta 객체가 execution_date에 추가

     

    catchup

    - 정해진 시간에 실행되지 못한 DAG를 늦게라도 실행하는 것을 catchup이라고 합니다.

    - 과거의 작업은 중요하지 않고, 현재 DAG만 실행되어야 하는 경우에 설정됩니다.

    1) DAG 생성시 지정
    DAG('tutorial', catchup=False, default_args=default_args)
    
    2) airflow.cfg 설정 변경
    catchup_by_default = False

    catchup을 true로 설정을 한다면, 시작날짜부터 하루 단위로 DAG가 순서대로 실행 될 것이다.

    (.....)

     

    start_date(datetime.datetime)

    - 실제 실행 날짜

    - backfill을 위해 실행 날짜 자체를 queue에 기록(저장)

     

    max_active_run = (int)

    - DAG level에서 최대 실행 개수

     

    tags (option: list[str])

    - 화면상에 보여지는 DAG를 나누는 그룹

     


    실제 속성값

    자세하게 속성값들을 설명하겠습니다. (여기부터는 위의 코드를 세분화 설명의 주석을 달아 두었습니다) 

    with DAG(
        dag_id='log_test_tg', #dag_id 실제 시각화 된 웹에 보여지는 이름
        description='A simple DAG', #설명
        schedule_interval="@once", #시간을 줄때 사용
        # schedule_interval='0 10, 16 * * *', #10시와 16시 하루 두번 실행
        start_date=datetime(2022, 5, 23), #언제부터 dag 시작되었는가
        catchup=False,
        tags=['gt'], #dag 카테고리 분류 기능

    schedule_interval=" * * * *" 의 형식은 crontab schedule의 속성값과 같은 스케줄링으로 같은 포멧으로 입력해주면 됩니다.

    ex) 

    45 22 * * *           -> 22시 45분 실행

    0,10 17 * * 0,2,3  -> 매주 일(0), 화(2), 수(3) 17시 00분과 17시 10분에 실행

    10 17 1,15 * *       -> 매달 1일과 15일 17시 10분에 실행


     

    args

    DAG안에 모든 오퍼레이터가 공통적으로 가져야할 인자들을 default_args에 담겨서 전달됩니다.

    물론 operator를 선언할 때 변경 가능합니다.

        default_args={
            'depends_on_past': False,
            'email': ['airflow@example.com'],
            'email_on_failure': False,
            'email_on_retry': False,
            'retries': 1,
            'retry_delay': timedelta(minutes=5),
            # 'queue': 'bash_queue',
            # 'pool': 'backfill',
            # 'priority_weight': 10,
            # 'end_date': datetime(2016, 1, 1),
            # 'wait_for_downstream': False,
            # 'sla': timedelta(hours=2),
            # 'execution_timeout': timedelta(seconds=300),
            # 'on_failure_callback': some_function,
            # 'on_success_callback': some_other_function,
            # 'on_retry_callback': another_function,
            # 'sla_miss_callback': yet_another_function,
            # 'trigger_rule': 'all_success'
        }

    owner (str)

    - 해당 task의 소유자

     

    deoenbds_on_past(bool)

    - true 경우 task가 순차적으로 실행(이전 task 성공이거나 건너뛴 경우에만 실행)

     

    email(str or list[str])

    - 알림 받는 메일 주소, 여러 메일을 등록 할 수 있고 ,(comma) 나 ;(semi-colon)으로 분리

     

    email_on_failure(bool)

    - task 실패시 이메일 수신 여부

     

    email_on_retry(bool)

    - task 재시도 메일 수신 여부

     

    retries(int)

    - task가 실패할 경우 재시도 횟수

     

    retry_delay(datetime.timedelta)

    - 재시도 사이 딜레이 시간

     

     

     

     


     

    첫번째 보이는 항목은 제어 흐름(control flow) 입니다.

    테스크는 의존하는 모든 테스크들이 성공했을 때만 실행됩니다.

    그러나 기본 동작을 변경 할 수 있는 방법들이 존재합니다.

    - Branching

    - Latest Only

    - Depends On Past

    - Trigger Rules

     

    Branching

    선행 작업의 결과에 따라 이어나갈 작업이 달라야 하는 겨우 Branch 분기로 나누어 줄 수 있습니다.

     

    상황

    1) 데이터 입력 후 검증

     - 데이터에 이상 징후가 포착될 경우, 추가 전처리 작업 실행

     - 아닐 경우 전처리 스킵 

     

    2) 모델 예측 후 적용

     - 모델 예측 결과 기준치 이하 , 초과 결과에 따라 다른 작업 실행

     

     

    첫번째 task가 완료되면 적절한 분기를 나누어 실행합니다.

     

     


    jinja template 설명

     

     

     

     

     

     

    https://velog.io/@jaytiger/Airflow-02.-DAG-%EA%B5%AC%EC%84%B1%ED%95%98%EA%B8%B0-2


    잠깐! task의 종류와 사용법을 체크하고 가겠습니다.

    DAG 테스크는 operator, sensors, taskflow같은 형태가 있습니다.

     

    1. sensors

    시간, 파일, 외부 이벤트를 기다리며 조건 이후의 작업을 진행 할 수 있게 해주는 airflow기능 입니다.

    operator 처럼 하나의 task로 만들 수 있으며 filesystem, hdfs, hive... 의 형식으로 제공합니다.

    1) FileSensor 

     

     

    2) decorator

     

     


    https://www.bucketplace.co.kr/post/2021-04-13-%EB%B2%84%ED%82%B7%ED%94%8C%EB%A0%88%EC%9D%B4%EC%8A%A4-airflow-%EB%8F%84%EC%9E%85%EA%B8%B0/

     

    버킷플레이스 Airflow 도입기 - 오늘의집 블로그

    탁월한 데이터플랫폼을 위한 Airflow 도입기

    www.bucketplace.co.kr

    오늘의집 airflow 도입기.

     

     

     

     

     

    '하나씩 쌓아보자' 카테고리의 다른 글

    Redis 레디스  (0) 2022.05.24
    Terraform -1 기본 설명 및 설치/ 단일 서버 배포  (0) 2022.05.23

    댓글

Designed by Tistory.