728x90
프로젝트를 하다보니 Private Subnet에 생성한 RDS에 Local Airflow에서 테스트를 하고 싶은 일이 경우가 자주 발생했습니다.
이를 위해 먼저 SSH 터널링을 사용해 Local 포트와 RDS 포트(3306)를 연결합니다.
ssh -i ./BastionHostKey.pem -L 3307:[RDS 엔드포인트]:3306 ubuntu@[BastionHost IP]
이를 통하여 로컬 3307 포트가 RDS의 3306 포트로 포트포워딩 되었습니다.
이를 확인하기 위해 lsof -i :3307
명령을 통해 포트가 열려있는지 확인할 수 있습니다.
이후 Airflow를 Docker를 통하여 올린 후 Connections에 아래와 같이 추가합니다.
- Connection ID: 마음대로 만드셔도 상관 없습니다.
- Connection Type: MySQL
- Host: host.docker.internal
- 이 값은 docker container를 생성한 호스트를 향하게 합니다.
- Schema: lecture
- Login: private slack에 적힌 MySQL 마스터 이름
- Password: private slack에 적힌 MySQL 마스터 비밀번호
- Port: 3307
- 앞서 연결한 local 포트 값을 넣어주신다면 어떤 값이든 상관 없습니다.
- Extra: {"charset": "utf-8"}
- mac의 경우: {"charset": "utf8mb4"}
접근 가능한지 테스트!
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.mysql_hook import MySqlHook
from datetime import datetime
def fetch_data_from_mysql():
mysql_hook = MySqlHook(mysql_conn_id='conn-rds-mysql')
connection = mysql_hook.get_conn()
cursor = connection.cursor()
cursor.execute("SELECT * FROM Category")
result = cursor.fetchall()
for row in result:
print(row)
default_args = {
'owner': 'airflow',
'start_date': datetime(2024, 7, 23),
'retries': 1,
}
dag = DAG(
'mysql_rds_test_dag',
default_args=default_args,
description='Get AWS RDS MySQL Table',
schedule_interval='@daily',
)
fetch_task = PythonOperator(
task_id='fetch_categories',
python_callable=fetch_data_from_mysql,
dag=dag,
)
이후 접근 가능한지 위 코드를 실행해보겠습니다. (단순히 RDS에 접근하여 Category 테이블을 읽는 DAG입니다.)
다행이게도 잘 되는 것을 확인할 수 있었습니다.
이를 활용하여 그냥 로컬에서 RDS에 접근할 수도 있습니다! (이건 위 작업을 다 했으면 3307 포트에 이미 연결되어 있으므로 생략)
728x90
'공부 및 정리 > 기타 정리' 카테고리의 다른 글
ChatGPT API 비용 감축하기 (0) | 2024.07.29 |
---|---|
EC2 실행시 RDS & Airflow 자동 실행 (0) | 2024.07.29 |
Github Actions를 사용한 꺼진 AWS Instance에도 CI/CD하기 (1) | 2024.07.23 |
AWS EMR을 활용한 Spark 클러스터 서버 구축 (0) | 2024.06.24 |
Github Action을 통해 좋아하는 블로그의 새 포스팅 자동 알림 만들기 (8) | 2024.05.29 |