본문 바로가기
공부 및 정리/기타 정리

Private RDS에 Local Airflow에서 접근하기

by 스파이펭귄 2024. 7. 24.
728x90

프로젝트를 하다보니 Private Subnet에 생성한 RDS에 Local Airflow에서 테스트를 하고 싶은 일이 경우가 자주 발생했습니다.

이를 위해 먼저 SSH 터널링을 사용해 Local 포트와 RDS 포트(3306)를 연결합니다.

ssh -i ./BastionHostKey.pem -L 3307:[RDS 엔드포인트]:3306 ubuntu@[BastionHost IP]

이를 통하여 로컬 3307 포트가 RDS의 3306 포트로 포트포워딩 되었습니다.

이를 확인하기 위해 lsof -i :3307 명령을 통해 포트가 열려있는지 확인할 수 있습니다.

 

이후 Airflow를 Docker를 통하여 올린 후 Connections에 아래와 같이 추가합니다.

  • Connection ID: 마음대로 만드셔도 상관 없습니다.
  • Connection Type: MySQL
  • Host: host.docker.internal
    • 이 값은 docker container를 생성한 호스트를 향하게 합니다.
  • Schema: lecture
  • Login: private slack에 적힌 MySQL 마스터 이름
  • Password: private slack에 적힌 MySQL 마스터 비밀번호
  • Port: 3307
    • 앞서 연결한 local 포트 값을 넣어주신다면 어떤 값이든 상관 없습니다.
  • Extra: {"charset": "utf-8"}
    • mac의 경우: {"charset": "utf8mb4"}

 

 

접근 가능한지 테스트!

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.mysql_hook import MySqlHook
from datetime import datetime

def fetch_data_from_mysql():
    mysql_hook = MySqlHook(mysql_conn_id='conn-rds-mysql')
    connection = mysql_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM Category")
    result = cursor.fetchall()
    for row in result:
        print(row)

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 7, 23),
    'retries': 1,
}

dag = DAG(
    'mysql_rds_test_dag',
    default_args=default_args,
    description='Get AWS RDS MySQL Table',
    schedule_interval='@daily',
)

fetch_task = PythonOperator(
    task_id='fetch_categories',
    python_callable=fetch_data_from_mysql,
    dag=dag,
)

이후 접근 가능한지 위 코드를 실행해보겠습니다. (단순히 RDS에 접근하여 Category 테이블을 읽는 DAG입니다.)

다행이게도 잘 되는 것을 확인할 수 있었습니다.

이를 활용하여 그냥 로컬에서 RDS에 접근할 수도 있습니다! (이건 위 작업을 다 했으면 3307 포트에 이미 연결되어 있으므로 생략)

728x90