db 연동

1. PostgresOperator 사용

https://airflow.apache.org/docs/apache-airflow-providers-postgres/stable/operators/postgres_operator_howto_guide.html

import datetime

from airflow import DAG
from airflow.providers.postgres.operators.postgres import PostgresOperator

with DAG(
    dag_id="postgres_operator_dag",
    start_date=datetime.datetime(2021, 8, 23),
    schedule_interval="@once",
    catchup=False,
) as dag:
    create_table_test = PostgresOperator(
        task_id="create_table_test",
        postgres_conn_id="postgres_conn_main",
        sql="""
            CREATE TABLE IF NOT EXISTS kbae_airflow_test (
            test_id SERIAL PRIMARY KEY,
            test_name VARCHAR NOT NULL,
            test_type VARCHAR NOT NULL,
            test_date DATE NOT NULL,
            test_OWNER VARCHAR NOT NULL);
          """,
    )
    Insert_table_test = PostgresOperator(
        task_id="airflow_test_table",
        postgres_conn_id="postgres_conn_main",
        sql="""
            INSERT INTO kbae_airflow_test(test_name, test_type, test_date, test_OWNER) VALUES ( 'Max', 'Dog', '2018-07-05', 'Jane');
            INSERT INTO kbae_airflow_test(test_name, test_type, test_date, test_OWNER) VALUES ( 'Susie', 'Cat', '2019-05-01', 'Phil');
            INSERT INTO kbae_airflow_test(test_name, test_type, test_date, test_OWNER) VALUES ( 'Lester', 'Hamster', '2020-06-23', 'Lily');
            INSERT INTO kbae_airflow_test(test_name, test_type, test_date, test_OWNER) VALUES ( 'Quincy', 'Parrot', '2013-08-11', 'Anne');
            """,
    )
    get_test = PostgresOperator(
        task_id="get_test", 
        postgres_conn_id="postgres_conn_main", 
        sql="SELECT * FROM kbae_airflow_test;"
    )
    get_test2 = PostgresOperator(
        task_id="get_date_test",
        postgres_conn_id="postgres_conn_main",
        sql="""
            SELECT * FROM kbae_airflow_test
            WHERE test_date
            BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
            """,
        params={'begin_date': '2010-01-01', 'end_date': '2021-08-31'},
    )
    get_date_test = PostgresOperator(
        task_id="get_date_test",
        postgres_conn_id="postgres_conn_main",
        sql="""
            SELECT * FROM kbae_airflow_test
            WHERE test_date
            BETWEEN SYMMETRIC {{ params.begin_date }} AND {{ params.end_date }};
            """,
        params={'begin_date': '2010-01-01', 'end_date': '2021-08-31'},
    )

    create_table_test >> Insert_table_test >> get_test >> get_test2 >>  get_date_test

https://berrrrr.github.io/programming/2020/01/12/what-is-apache-airflow/

https://berrrrr.github.io/programming/2020/01/12/what-is-apache-airflow/

https://isthecj.tistory.com/78

https://isthecj.tistory.com/78

전체적 설명

https://berrrrr.github.io/programming/2020/01/12/what-is-apache-airflow/

장기간 airflow 운영 경험

https://monkeydev.tistory.com/2

공식문서

https://airflow.apache.org/

airflow 기본 설명

https://yahwang.github.io/posts/airflow