엔지니어 블로그
[DataStreaming] 1.프로젝트 소개 본문
Kafka를 통한 데이터 스트리밍 프로젝트를 진행하려고합니다.이번 프로젝트에서는 원유 및 금의 시세 변화가 S&P 500 지수 내 종목들의 섹터별 주식 가격에 미치는 영향을 분석합니다.
프로젝트 배경
금융 시장에서는 원유와 금 가격의 변화가 경제 전반에 영향을 미치며, 특히 섹터별로 그 영향의 정도가 다를 수 있습니다. 본 프로젝트의 목적은 원유와 금 시세가 특정 섹터에 속한 기업 주가에 얼마나 영향을 주는지 분석하는 것입니다.
아키텍처
Postgresql의 데이터를 Python에서 Producer를 개발하여 수집합니다. raw 데이터를 CloudStorage에 저장 후 Spark Streaming으로 데이터를 처리, 완료 된 데이터를 BigQuery와 CloudStorage에 저장합니다. 마지막으로 Looker Studio로 시각화 하는 것으로 마무리합니다.
소스 데이터
데이터는 kaggle에서 가져왔으며 2013년부터 2018년까지의 주식,금,원유 가격 데이터를 사용합니다. PostgreSQL DB의 테이블 형태로 저장 되어있으며, 테이블 구조는 아래와 같습니다.
주식(stock_prices) 데이터
- date: 거래일자
- open: 시가
- high: 고가
- low: 저가
- close: 종가
- volume: 거래량
- Name: 종목 티커
금(gold_prices) 데이터
- date: 거래일자
- open: 시가
- high: 고가
- low: 저가
- close: 종가
원유(oil_prices) 데이터
- date: 거래일자
- open: 시가
- high: 고가
- low: 저가
- close: 종가
추가적으로, 주식 데이터에는 Sector 정보가 없으므로 yfinance 라이브러리를 이용하여 각 종목의 Sector를 찾아서 별도의 테이블로 만들어 저장해야 합니다.
import yfinance as yf
from dotenv import load_dotenv
from database import psql_crud
load_dotenv()
def main(ticker):
try:
company = yf.Ticker(ticker)
sector = company.info.get("sector", "Unknows")
return sector
except Exception as e:
return "Unknowns"
if __name__ == "__main__":
db = psql_crud.CRUD()
sql = 'select DISTINCT as2."Name" from all_stocks as2'
create_sql = """
CREATE TABLE SECTORS
ticker char(20) NOT NULL,
sector char(30)
"""
db.create_db(create_sql)
names = [name[0] for name in db.read_db(sql=sql)]
sector_info = []
for ticker in names:
sector = main(ticker)
db.insert_data(ticker, sector)
다음 단계
- Python Producer 개발
- Kafka로부터 데이터를 받아 Spark Structured Streaming으로 실시간 분석
- 분석 결과는 Google Cloud Storage(GCS)에 저장하고 BigQuery로 전송하여 상세 분석 수행
- 최종적으로 Looker Studio를 통해 분석 결과 시각화
'개인 프로젝트 > Data Streaming 프로젝트' 카테고리의 다른 글
[DataStreaming] 2.환경 구축(kafka) (0) | 2025.04.07 |
---|