공부 및 정리/기타 정리
Private RDS에 Local Airflow에서 접근하기
스파이펭귄
2024. 7. 24. 00:14
728x90
프로젝트를 하다보니 Private Subnet에 생성한 RDS에 Local Airflow에서 테스트를 하고 싶은 일이 경우가 자주 발생했습니다.
이를 위해 먼저 SSH 터널링을 사용해 Local 포트와 RDS 포트(3306)를 연결합니다.
ssh -i ./BastionHostKey.pem -L 3307:[RDS 엔드포인트]:3306 ubuntu@[BastionHost IP]
이를 통하여 로컬 3307 포트가 RDS의 3306 포트로 포트포워딩 되었습니다.
이를 확인하기 위해 lsof -i :3307
명령을 통해 포트가 열려있는지 확인할 수 있습니다.
이후 Airflow를 Docker를 통하여 올린 후 Connections에 아래와 같이 추가합니다.
- Connection ID: 마음대로 만드셔도 상관 없습니다.
- Connection Type: MySQL
- Host: host.docker.internal
- 이 값은 docker container를 생성한 호스트를 향하게 합니다.
- Schema: lecture
- Login: private slack에 적힌 MySQL 마스터 이름
- Password: private slack에 적힌 MySQL 마스터 비밀번호
- Port: 3307
- 앞서 연결한 local 포트 값을 넣어주신다면 어떤 값이든 상관 없습니다.
- Extra: {"charset": "utf-8"}
- mac의 경우: {"charset": "utf8mb4"}
접근 가능한지 테스트!
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.mysql_hook import MySqlHook
from datetime import datetime
def fetch_data_from_mysql():
mysql_hook = MySqlHook(mysql_conn_id='conn-rds-mysql')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute("SELECT * FROM Category")
result = cursor.fetchall()
for row in result:
print(row)
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 7, 23),
'retries': 1,
}
dag = DAG(
'mysql_rds_test_dag',
default_args=default_args,
description='Get AWS RDS MySQL Table',
schedule_interval='@daily',
)
fetch_task = PythonOperator(
task_id='fetch_categories',
python_callable=fetch_data_from_mysql,
dag=dag,
)
이후 접근 가능한지 위 코드를 실행해보겠습니다. (단순히 RDS에 접근하여 Category 테이블을 읽는 DAG입니다.)
다행이게도 잘 되는 것을 확인할 수 있었습니다.
이를 활용하여 그냥 로컬에서 RDS에 접근할 수도 있습니다! (이건 위 작업을 다 했으면 3307 포트에 이미 연결되어 있으므로 생략)
728x90