엔지니어 블로그
데이터 파이프라인 프로젝트 #4 Airflow & GCP PubSub 본문
이전에 생성해둔 api 서버를 호출해서 데이터를 PubSub에 Publish 하는 작업을 할 차례다. 필요 작업은 다음과 같다.
1.Airflow Connection 등록
-> Airflwo가 PubSub에 접근할 수 있도록 Connection을 등록
2.Publish Task 작성
-> DAG에 데이터를 Publish 할 수 있도록 task 작성
1.Airflow Connection 등록
먼저 connection 등록이다. Airflow 상단 메뉴에 Admin-Connection을 클릭하면 Connection 등록이 가능하다. + 버튼을 눌러 등록해보자


+ 버튼을 눌러보면 다음과 같은 화면이 나온다.

여기서 필수로 작성해야 하는 것은 * 처리 되어있다. 추가로 Keyfile 내용을 작성하면 된다. Keyfile은 경로로 작성해도 되고 JSON 내용을 직접 넣어도 된다. 나는 JSON을 넣기로했다.
Connection id / Connection Type / Keyfile JSON

등록을 완료하면 위와같은 목록을 볼 수 있게 된다.(kubernetes 관련 connection은 지금은 필요 없는 내용이다.)
2.Publish Task 작성
이제 DAG을 작성해볼 것이다. DAG은 2부분으로 나뉜다. 1.api에서 데이터 호출 2.데이터 publish
@dag(schedule_interval=None,start_date= datetime.now(),catchup=False)
def publish_to_pubsub():
@task
def get_order_data_after_last_value():
last_value_path = "/opt/airflow/logs/last_value.txt"
if not os.path.exists(last_value_path):
with open(last_value_path, "w", encoding="utf-8") as file:
file.write("0")
last_value = 0
else:
with open(last_value_path,encoding="utf-8") as file:
last_value = file.read()
interval = random.randrange(3,30)
url = f"http://{{api-server-host}}:8000/orders/id/{int(last_value)}/{int(last_value)+interval}"
with open(last_value_path, "w", encoding="utf-8") as file:
file.write(f"{int(last_value)+interval}")
response = requests.get(url)
if response.status_code == 200:
print("Connect Success")
return response.json()
def publish_data(ti):
data = ti.xcom_pull(task_ids="get_order_data_after_last_value")
if not data:
print("데이터가 존재하지 않습니다.")
return None
for d in data:
json_data = json.dumps(d).encode("utf-8")
publish_task = PubSubPublishMessageOperator(
task_id='publish_message',
project_id={{GCP_PROJECT_ID}},
topic={{PUBSUB_TOPIC_NAME}},
messages=[{'data': json_data}]
)
publish_task.execute(context={})
publish_task = PythonOperator(
task_id = "publish_message",
python_callable=publish_data,
provide_context=True,
)
data = get_order_data_after_last_value()
data >> publish_task
publish_to_pubsub_dag = publish_to_pubsub()
task1 - api에서 데이터를 받아와 xcom에 저장한다.
task2 - xcom의 데이터를 json 형태로 변환 후 publish한다.
하나 걸리는 것이 Airflow는 실행 주기를 최소 1분까지 잡을 수 있다. 하지만 나는 사용자 구매 로그가 실시간으로 쌓이는 것을 재현하고 싶다. 따라서 TriggerDagRunOperator를 사용했다. DAG이 종료되면서 해당 DAG을 다시 Trigger하는 기능이다.
추가 된 내용은 다음과 같다.
trigger_next_run = TriggerDagRunOperator(
task_id="trigger_next_run",
trigger_dag_id="publish_to_pubsub",
wait_for_completion=False,
)
이제 데이터를 지속적으로 호출해서 publish할 수 있게 되었다.
2025.02.12 - [개인 프로젝트/Data Pipeline 프로젝트] - 데이터 파이프라인 프로젝트 #1 아키텍처 구성
2025.02.17 - [개인 프로젝트/Data Pipeline 프로젝트] - 데이터 파이프라인 프로젝트 #2 인프라 구성
2025.02.28 - [개인 프로젝트/Data Pipeline 프로젝트] - 데이터 파이프라인 프로젝트 #3 API 서버 구축
2025.03.06 - [개인 프로젝트/Data Pipeline 프로젝트] - 데이터 파이프라인 프로젝트 #4 아키텍처 변경
'개인 프로젝트 > Data Pipeline 프로젝트' 카테고리의 다른 글
데이터 파이프라인 프로젝트 #4 아키텍처 변경 (0) | 2025.03.06 |
---|---|
데이터 파이프라인 프로젝트 #3 API 서버 구축 (0) | 2025.02.28 |
데이터 파이프라인 프로젝트 #2 인프라 구성 (0) | 2025.02.17 |
데이터 파이프라인 프로젝트 #1 아키텍처 구성 (0) | 2025.02.12 |