엔지니어 블로그

데이터 파이프라인 프로젝트 #4 Airflow & GCP PubSub 본문

개인 프로젝트/Data Pipeline 프로젝트

데이터 파이프라인 프로젝트 #4 Airflow & GCP PubSub

안기용 2025. 3. 7. 13:41

이전에 생성해둔 api 서버를 호출해서 데이터를 PubSub에 Publish 하는 작업을 할 차례다. 필요 작업은 다음과 같다.

1.Airflow Connection 등록

-> Airflwo가 PubSub에 접근할 수 있도록 Connection을 등록

2.Publish Task 작성

-> DAG에 데이터를 Publish 할 수 있도록 task 작성

 

1.Airflow Connection 등록

먼저 connection 등록이다. Airflow 상단 메뉴에 Admin-Connection을 클릭하면 Connection 등록이 가능하다. + 버튼을 눌러 등록해보자

 

+ 버튼을 눌러보면 다음과 같은 화면이 나온다. 

여기서 필수로 작성해야 하는 것은 * 처리 되어있다. 추가로 Keyfile 내용을 작성하면 된다. Keyfile은 경로로 작성해도 되고 JSON 내용을 직접 넣어도 된다. 나는 JSON을 넣기로했다.

 

Connection id / Connection Type / Keyfile JSON

 

등록을 완료하면 위와같은 목록을 볼 수 있게 된다.(kubernetes 관련 connection은 지금은 필요 없는 내용이다.)

 

2.Publish Task 작성

이제 DAG을 작성해볼 것이다. DAG은 2부분으로 나뉜다. 1.api에서 데이터 호출 2.데이터 publish

@dag(schedule_interval=None,start_date= datetime.now(),catchup=False)
def publish_to_pubsub():
    @task
    def get_order_data_after_last_value():
        last_value_path = "/opt/airflow/logs/last_value.txt"
        if not os.path.exists(last_value_path):
            with open(last_value_path, "w", encoding="utf-8") as file:
                file.write("0")
                last_value = 0
        else:
            with open(last_value_path,encoding="utf-8") as file:
                last_value = file.read()
        interval = random.randrange(3,30)
        url = f"http://{{api-server-host}}:8000/orders/id/{int(last_value)}/{int(last_value)+interval}"
        with open(last_value_path, "w", encoding="utf-8") as file:
            file.write(f"{int(last_value)+interval}")
        response = requests.get(url)
        if response.status_code == 200:
            print("Connect Success")
            return response.json()
    def publish_data(ti):
        data = ti.xcom_pull(task_ids="get_order_data_after_last_value")
        if not data:
            print("데이터가 존재하지 않습니다.")
            return None

        for d in data:
            json_data = json.dumps(d).encode("utf-8")
            publish_task = PubSubPublishMessageOperator(
                task_id='publish_message',
                project_id={{GCP_PROJECT_ID}},
                topic={{PUBSUB_TOPIC_NAME}},
                messages=[{'data': json_data}]
            )
            publish_task.execute(context={})

    publish_task = PythonOperator(
        task_id = "publish_message",
        python_callable=publish_data,
        provide_context=True,
    )

    data = get_order_data_after_last_value()
    data >> publish_task
publish_to_pubsub_dag = publish_to_pubsub()

 

task1 - api에서 데이터를 받아와 xcom에 저장한다.

task2 - xcom의 데이터를 json 형태로 변환 후 publish한다.

 

하나 걸리는 것이 Airflow는 실행 주기를 최소 1분까지 잡을 수 있다. 하지만 나는 사용자 구매 로그가 실시간으로 쌓이는 것을 재현하고 싶다. 따라서 TriggerDagRunOperator를 사용했다. DAG이 종료되면서 해당 DAG을 다시 Trigger하는 기능이다. 

추가 된 내용은 다음과 같다.

    trigger_next_run = TriggerDagRunOperator(
        task_id="trigger_next_run",
        trigger_dag_id="publish_to_pubsub",
        wait_for_completion=False,
    )

 

이제 데이터를 지속적으로 호출해서 publish할 수 있게 되었다.

 


2025.02.12 - [개인 프로젝트/Data Pipeline 프로젝트] - 데이터 파이프라인 프로젝트 #1 아키텍처 구성

2025.02.17 - [개인 프로젝트/Data Pipeline 프로젝트] - 데이터 파이프라인 프로젝트 #2 인프라 구성

2025.02.28 - [개인 프로젝트/Data Pipeline 프로젝트] - 데이터 파이프라인 프로젝트 #3 API 서버 구축

2025.03.06 - [개인 프로젝트/Data Pipeline 프로젝트] - 데이터 파이프라인 프로젝트 #4 아키텍처 변경