2 분 소요

Array

  • 빅쿼리로 Array 만들기
     SELECT
     -- {project_id}
          -- {dataset1}
          ['{project_id}.{dataset1}.{datatable_names}'
          , '{project_id}.{dataset1}.{datatable_names}'
          , '{project_id}.{dataset1}.{datatable_names}'
          , '{project_id}.{dataset1}.{datatable_names}'
          -- {dataset2}
          , '{project_id}.{dataset2}.{datatable_names}'
          , '{project_id}.{dataset2}.{datatable_names}'
          , '{project_id}.{dataset2}.{datatable_names}'
          , '{project_id}.{dataset2}.{datatable_names}'
          , '{project_id}.{dataset2}.{datatable_names}'
          , '{project_id}.{dataset2}.{datatable_names}'
          , '{project_id}.{dataset2}.{datatable_names}'
          -- {dataset3} 
          , '{project_id}.{dataset3}.{datatable_names}'
          , '{project_id}.{dataset3}.{datatable_names}'
          , '{project_id}.{dataset3}.{datatable_names}'
          , '{project_id}.{dataset3}.{datatable_names}'
          -- {dataset4}
          , '{project_id}.{dataset4}.{datatable_names}' ] as table_list

Struct

  • struct() 함수 사용해서 하나의 Array 만들기
     SELECT
     -- {project_id}
          -- {dataset1}
          [struct('{project_id}.{dataset1}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset1}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset1}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset1}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          -- {dataset2}
          , struct('{project_id}.{dataset2}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset2}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset2}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset2}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset2}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset2}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset2}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          -- {dataset3} 
          , struct('{project_id}.{dataset3}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset3}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset3}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          , struct('{project_id}.{dataset3}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link)
          -- {dataset4}
          , struct('{project_id}.{dataset4}.{datatable_names}' as table_address, 'DAG' as gen_type, 'a' as gen_link, 'b' as regen_link) ] as meta

참고링크

Use dag_run.conf

  • Airflow 변수 dag_run.conf 사용해보기
    • BashOperator 버전

        # file name : rho_test_dag_config.py
      
        #############################  LIBRARIES
        import sys
        import os
        from datetime import datetime, timedelta, timezone
        from google.cloud import bigquery
        import json
      
        from airflow import DAG
        from airflow.operators.python_operator import PythonOperator
        from airflow.operators.bash import BashOperator
        from airflow.models import Variable
        from airflow.utils.dates import days_ago
      
        from dag_utils.slack_cloudfn import SlackAlert
        #############################  MODULE PATH
        def get_path(_path, step, _dir=None):
            up_path = os.sep.join(_path.split(os.sep)[:-step])
            if _dir is None:
                return up_path
            return os.path.join(up_path, _dir)
      
        module_path = get_path(os.path.dirname(os.path.abspath(__file__)), 1)
        sys.path.append(module_path)
      
      
        # ############################################################################################################################### common #####
        default_args= {
            'start_date': days_ago(1),
            'retries': 0,
            'catchup': False,
            'retry_delay': timedelta(minutes=5),
        }
      
        templated_command = """
            echo "ds : "
            """
      
        dag = DAG(
            'templated_test',
            default_args=default_args,
            schedule_interval="@daily",
        )
      
        t1 = BashOperator(
            task_id='bash_templated',
            bash_command=templated_command,
            dag=dag,
        )
      
        t1
      
    • pythonOperator

       # file name : rho_test_dag_config.py
      
       #############################  LIBRARIES
       import sys
       import os
       from datetime import datetime, timedelta, timezone
       from google.cloud import bigquery
       import json
      
       from airflow import DAG
       from airflow.operators.python_operator import PythonOperator
       from airflow.operators.bash import BashOperator
       from airflow.models import Variable
       from airflow.utils.dates import days_ago
      
       from dag_utils.slack_cloudfn import SlackAlert
       #############################  MODULE PATH
       def get_path(_path, step, _dir=None):
           up_path = os.sep.join(_path.split(os.sep)[:-step])
           if _dir is None:
               return up_path
           return os.path.join(up_path, _dir)
      
       module_path = get_path(os.path.dirname(os.path.abspath(__file__)), 1)
       sys.path.append(module_path)
      
      
       # ############################################################################################################################### common #####
       default_args = {
           'start_date': datetime.now() - timedelta(days=1),
           'retries': 0,
           'catchup': False,
           'retry_delay': timedelta(minutes=5),
       }
      
       def print_ds_conf(**kwargs):
           conf = kwargs['dag_run'].conf
           if conf == {} :
               print("conf is empty")
           else :
               print("ds : ", conf)
      
       dag = DAG(
           'templated_test_python',
           default_args=default_args,
           schedule_interval="@daily",
       )
      
       t1 = PythonOperator(
           task_id='python_templated_python',
           python_callable=print_ds_conf,
           provide_context=True,
           dag=dag,
       )
      
       t1
      

댓글남기기