공부 및 정리/기타 정리

Private RDS에 Local Airflow에서 접근하기

스파이펭귄 2024. 7. 24. 00:14
728x90

프로젝트를 하다보니 Private Subnet에 생성한 RDS에 Local Airflow에서 테스트를 하고 싶은 일이 경우가 자주 발생했습니다.

이를 위해 먼저 SSH 터널링을 사용해 Local 포트와 RDS 포트(3306)를 연결합니다.

ssh -i ./BastionHostKey.pem -L 3307:[RDS 엔드포인트]:3306 ubuntu@[BastionHost IP]

이를 통하여 로컬 3307 포트가 RDS의 3306 포트로 포트포워딩 되었습니다.

이를 확인하기 위해 lsof -i :3307 명령을 통해 포트가 열려있는지 확인할 수 있습니다.

 

이후 Airflow를 Docker를 통하여 올린 후 Connections에 아래와 같이 추가합니다.

  • Connection ID: 마음대로 만드셔도 상관 없습니다.
  • Connection Type: MySQL
  • Host: host.docker.internal
    • 이 값은 docker container를 생성한 호스트를 향하게 합니다.
  • Schema: lecture
  • Login: private slack에 적힌 MySQL 마스터 이름
  • Password: private slack에 적힌 MySQL 마스터 비밀번호
  • Port: 3307
    • 앞서 연결한 local 포트 값을 넣어주신다면 어떤 값이든 상관 없습니다.
  • Extra: {"charset": "utf-8"}
    • mac의 경우: {"charset": "utf8mb4"}

 

 

접근 가능한지 테스트!

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.hooks.mysql_hook import MySqlHook
from datetime import datetime

def fetch_data_from_mysql():
    mysql_hook = MySqlHook(mysql_conn_id='conn-rds-mysql')
    connection = mysql_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM Category")
    result = cursor.fetchall()
    for row in result:
        print(row)

default_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 7, 23),
    'retries': 1,
}

dag = DAG(
    'mysql_rds_test_dag',
    default_args=default_args,
    description='Get AWS RDS MySQL Table',
    schedule_interval='@daily',
)

fetch_task = PythonOperator(
    task_id='fetch_categories',
    python_callable=fetch_data_from_mysql,
    dag=dag,
)

이후 접근 가능한지 위 코드를 실행해보겠습니다. (단순히 RDS에 접근하여 Category 테이블을 읽는 DAG입니다.)

다행이게도 잘 되는 것을 확인할 수 있었습니다.

이를 활용하여 그냥 로컬에서 RDS에 접근할 수도 있습니다! (이건 위 작업을 다 했으면 3307 포트에 이미 연결되어 있으므로 생략)

728x90