본문 바로가기
공부 및 정리/데이터 엔지니어링

[Airflow] 비트코인 일일 분 봉 데이터 파이프라인 프로젝트 (작업편)

by 스파이펭귄 2024. 3. 6.
728x90

개요

이전 환경 설정 포스트는 아래 링크를 이용하면 된다.

[Airflow] 비트코인 일일 분 봉 데이터 파이프라인 프로젝트 (환경 설정편)

 

[Airflow] 비트코인 일일 분 봉 데이터 파이프라인 프로젝트 (환경 설정편)

개요 최근 Airflow에 대해서 공부해보며 데이터에 대해 간단한 파이프라인을 만들어 데이터베이스에 저장하는 프로젝트라도 직접 해보면 좋겠다 싶었다. 요즘 가격이 미친듯이 오르는 비트코인

bestech49.tistory.com

이 포스트에서는 실제 코딩을 통한 Upbit API 호출 및 데이터베이스 저장 과정을 다룬다.

Upbit API 확인

분(Minute) 캔들

 

업비트 개발자 센터

 

docs.upbit.com

먼저 내가 사용할 API는 Upbit의 분봉 호출 API이다.

아주 친절하게 API 사용법이 다 나와있다. 이 데이터는 실시간으로 생성되며 매일 0시에 어제 하루 동안 생성된 분 봉 데이터를 받아오면 된다.

또한 다행스럽게도 이 API는 인증 없이 사용할 수 있는 API이기 때문에 나와 같은 간단한 프로젝트를 위해 매우 접근성이 좋다는 장점을 가진다.

위 API에 따르면 KRW-BTC의 최근 200개의 1분 봉 데이터를 받기 위해서는 아래 URL로 get 요청을 보내면 된다.

https://api.upbit.com/v1/candles/minutes/1?market=KRW-BTC&count=200

실제로 주소창에 쳐보면 위와 같이 오는 것을 확인할 수 있다.

1개만 받아 결과를 한번 자세히 보도록 하자.

[
    {
        "market":"KRW-BTC",
        "candle_date_time_utc":"2024-03-06T05:28:00",
        "candle_date_time_kst":"2024-03-06T14:28:00",
        "opening_price":92427000.00000000,
        "high_price":92487000.00000000,
        "low_price":92381000.00000000,
        "trade_price":92452000.00000000,
        "timestamp":1709702925730,
        "candle_acc_trade_price":718217026.11837000,
        "candle_acc_trade_volume":7.76896112,
        "unit":1
    }
]

먼저 리스트 형태로 오며, market에 symbol, candle_date_time_utc에 utc 시간대가 오는 것을 확인할 수 있다.

또한 OHLC가 각각 opening_price, high_price, low_price, trade_price로 오는 것을 확인할 수 있다.

마지막으로 해당 분봉 동안의 거래량은 candle_acc_trade_price와 candle_acc_trade_volume으로 오는 것을 확인할 수 있었다.

이를 통해 저장할 table의 스키마를 짜보도록 하자.

table의 이름은 ‘upbit_btc_1m_candle’로 정하고, 아래와 같이 정의할 수 있다.

CREATE TABLE upbit_btc_1m_candle (
    market VARCHAR(15),
    candle_date_time_utc DATETIME,
    candle_date_time_kst DATETIME,
    opening_price BIGINT,
    high_price BIGINT,
    low_price BIGINT,
    trade_price BIGINT,
    timestamp BIGINT,
    candle_acc_trade_price DECIMAL(30, 12),
    candle_acc_trade_volume DECIMAL(20, 12),
    unit INT,
    made_date DATETIME NOT NULL
) default character set utf8 collate utf8_general_ci;

MYSQL WorkBench에 아래 SQL 을 입력하자. (없다면 그냥 MYSQL을 켜서 테이블을 만들어주자.

use 이전에 만든 데이터 베이스;
CREATE TABLE upbit_btc_1m_candle (
    market VARCHAR(15),
    candle_date_time_utc DATETIME,
    candle_date_time_kst DATETIME,
    opening_price BIGINT,
    high_price BIGINT,
    low_price BIGINT,
    trade_price BIGINT,
    timestamp BIGINT,
    candle_acc_trade_price DECIMAL(30, 12),
    candle_acc_trade_volume DECIMAL(20, 12),
    unit INT,
    made_date DATETIME NOT NULL
) default character set utf8 collate utf8_general_ci;
show tables;

이제 dag 작성을 시작해보자.

먼저 파이썬 코드로 get 요청을 보내고 이를 Pandas로 받는 코드를 작성해보자.

import requests
import json 
import pandas as pd

# 최대 200건
url = 'https://api.upbit.com/v1/candles/minutes/1?market=KRW-BTC&count=200'
response = requests.get(url)
data_sample = pd.json_normalize(response.json())
data_sample

위와 같이 결과를 얻을 수 있는 것을 확인하였다.

이제 먼저 api를 보내고 결과를 mysql에 저장하는 오퍼레이터를 만들어보자.

일반적으로 재활용이 가능한 파일들은 /plugins에 저장하게 된다. 아래 명령어를 통해 plugins에 폴더와 파일을 만들자.

mkdir ./plugins/operators
vi ./plugins/operators/upbit_candle_api_save_to_mysql.py

주피터로 가서 해당 파일을 열고 구현해준다.

from airflow.models.baseoperator import BaseOperator
from airflow.hooks.base import BaseHook
import pandas as pd
import sqlalchemy
from airflow import macros

class UpbitCandleApiSaveToMysql(BaseOperator):
    template_fields = ("exec_date")
    def __init__(self, end_point, db_conn_id, table_nm, symbol, exec_date, **kwargs):
        super().__init__(**kwargs)
        self.api_base_url = "https://api.upbit.com/v1/"
        self.end_point = end_point
        self.db_conn_id = db_conn_id
        self.table_nm = table_nm
        self.symbol = symbol
        self.exec_date = exec_date


    def execute(self, context):
        candle_df = self._call_api()

        connection = BaseHook.get_connection(self.db_conn_id)
        db_connection_url = f"mysql+mysqlconnector://{connection.login}:{connection.password}@{connection.host}/{connection.schema}"
        connection = sqlalchemy.create_engine(db_connection_url)
        candle_df.to_sql(name = self.table_nm,
                         con = connection,
                         index = False,
                         if_exists = 'append')

    def _call_api(self):
        import requests
        from datetime import datetime, timedelta
        import time

        start_time = (datetime.strptime(self.exec_date, "%Y-%m-%d") + timedelta(days = 1)).strftime("%Y-%m-%d") + ' 00:00:00'
        result_lst = []
        self.log.info(start_time)
        for i in range(60 * 24 // 200 + 1):
            url = self.api_base_url + self.end_point + '?market=' + self.symbol + '&count=200&to=' + start_time
            self.log.info(url)
            response = requests.get(url).json()
            is_break = False
            print(response)
            for elem in response:
                if self.exec_date not in elem['candle_date_time_utc']:
                    is_break = True
                    break
                result_lst.append(elem)

            if is_break or len(response) < 200:
                break
            start_time = response[-1]['candle_date_time_utc']
            time.sleep(1)

        data_sample = pd.json_normalize(result_lst)
        data_sample['made_date'] = self.exec_date
        return data_sample

이후 dags로 가서 dags_get_upbit_BTC_1m_candle.py 파일을 만들고 아래처럼 구현한다.

from airflow import DAG, macros
import pendulum
from airflow.providers.http.operators.http import SimpleHttpOperator
from airflow.providers.mysql.operators.mysql import MySqlOperator
from operators.upbit_candle_api_save_to_mysql import UpbitCandleApiSaveToMysql

default_args = {
    'owner': 'airflow',
    'start_date': pendulum.datetime(2024, 3, 4, 0, 0),
    'schedule_interval': '0 0 * * *',
}

with DAG(
    dag_id = 'dags_get_upbit_BTC_1m_candle',
    default_args = default_args
) as dag:
    sql_already_data_delete = MySqlOperator(
        task_id = 'sql_already_data_delete',
        mysql_conn_id = 'conn-db-mysql',
        sql= "DELETE FROM airflow_test.upbit_btc_1m_candle WHERE made_date='{{ execution_date.strftime('%Y-%m-%d') }}'"
    )
    print("sql" + "DELETE FROM airflow_test.upbit_btc_1m_candle WHERE made_date='{{ execution_date.strftime('%Y-%m-%d') }}'")

    upbit_BTC_1m_candle_api_save_to_mysql = UpbitCandleApiSaveToMysql(
        task_id = "upbit_BTC_1m_candle_api_save_to_mysql",
        end_point = "candles/minutes/1", 
        db_conn_id = "conn-db-mysql", 
        table_nm = "upbit_btc_1m_candle",
        symbol = "KRW-BTC",
        exec_date = '{{ execution_date.strftime("%Y-%m-%d") }}'
        )

    sql_already_data_delete >> upbit_BTC_1m_candle_api_save_to_mysql

매일 UTC+0 0시 0분에 전날의 BTC 분봉 데이터를 모으는 DAG이다.

이제 airflow UI로 이동한다. 반영될때까지 5분 정도까지 걸릴 수 있다.

일단은 오류 없이 올라간 것을 확인할 수 있다.

이를 unpause 시키면…

성공했다!!!

이제 mysql을 확인해보면!!

데이터베이스에 잘 들어간 것을 확인할 수 있다.

 

지속적으로 잘 되는 것도 확인할 수 있었다.

728x90