에어플로우 설치
아파치 에어플로우는 데이터 처리 파이프라인을 조율하기 위해 만들어진 오픈 소스 소프트웨어이다. 구글 클라우드 플랫폼은 이를 클라우드 컴포저라는 이름으로 쉽게 사용할 수 있도록 서비스 형태로 제공하고 있다. 하지만 클라우드 컴포저는 쿠버네티스로최소 3개의 워커노드를 사용을 요구하며 그외에 추가적인 부가 사용료를 따져보면 월에 700달러 이상의 요금이 부과된다. 따라서 우리는 컴퓨트 엔진에서 서버 인스턴스를 생성한 후 에어플로우를 설치하는 것으로 한다.
VM 인스턴트 준비하기
에어플로우를 설치할 서버는 구글 컴퓨트엔진을 사용하도록 한다.1. COMPUTE > Compute Engine > VM instances 메뉴를 선택한다.

2. CREATE INSTANCES 메뉴를 선택한다.

3. 생성할 VM 인스턴트 옵션의 각 항목을 다음과 같이 설정한 후 Create 버튼을 클릭하여 VM 인스턴스를 생성한다.

에어플로우 설치하기
구글 클라우드 플랫폼은 따로 SSH 접속 프로그램 없이도 리눅스 쉘을 실행할 수 있다. 다음과 같인 VM 인스턴스의 SSH 버튼을 눌러서 에어플로우를 설치한 VM 인스턴스에 접속한다.

그리고 쉘에서 다음 명령어를 입력하여 에어플로우를 설치한다. 현재 패키지로 설치되는 에어플로우 버전은 1.10.0 이다. -E를 빠뜨리지 않도록 주의한다. 이 옵션은 사용자가 기존 환경 변수를 보존하기를 원하는 보안 정책을 설정하는 것을 의미한다.
sudo apt-get install -y python3-pip python3-dev build-essential libssl-dev libffi-dev defult-libmysqlclient-dev export AIRFLOW_GPL_UNIDECODE=y # Install Airflow with the extra package gcp_api containing the hooks and operators for the GCP services. sudo -E pip3 install apache-airflow[gcp_api]
AIRFLOW_HOME 위치를 쉘로 접속한 계정의 상위 폴더로 설정한후 에어플로우 데이터베이스를 생성하고 웹서버를 시작한다. -E 옵션을 줘야 AIRFLOW_HOME 위치를 읽을 수 있으므로 빼먹지 않도록 주의한다.
export $AIRFLOW_HOME=~/airflow sudo -E airflow initdb sudo -E nohup airflow webserver -p 80 >/dev/null 2>&1 &

에어플로우 웹서버가 시작되었으면 VM 인스턴스의 External IP를 통해 서버에 접속할 수 있다.
브라우저에서 다음과 같은 화면이 나오면 성공적으로 웹서버가 시작된 것이다.
에어플로우를 이용해 스케쥴링 처리를 위해서는 스케쥴러 데몬도 실행해야 한다. 이 책에서 데이터 처리를 위해 사용할 서비스는 클라우드 데이터플로우로 자바를 이용해서 개발하므로 쉘에서 다음 명령어를 입력하여 jre 부터 설치를 한다.
sudo apt-get install default-jre
다음 명령어를 통해 스케줄러를 시작한다. 이제 스케쥴러 룰을 정의한 DAG 파일을 처리할 준비가 모두 끝났다.
sudo -E nohup airflow scheduler >/dev/null 2>&1 &
마지막으로 ps ax 명령어로 웹과 스케쥴러 데몬이 모두 정상적으로 떠 있는지 확인한다.

Hello DAG 작성하기
에어플로우 작업 간의 관계와 흐름을 DAG(Directed Acylic Graph, 방향성 비순환 그래프)로 표현한다. DAG는 파이썬 스크립트를 이용하여 만들어야 한다. 각 작업은 오퍼레이터 클래스를 이용하여 인스턴스화 된다. BashOperator 객체를 이용하여 쉘 명령어를 실행하는 간단한 DAG를 만들어 보자. 먼저 hello_dag.py라는 파이썬 파일을 만들고 다음 소스를 입력한다.
import airflow from airflow import DAG from airflow import models from airflow.operators.bash_operator import BashOperator from datetime import datetime from datetime import date, timedelta # DAG를 언제부터 시작할지 지정한다. 에어플로우에 등록한 이틀 전부터 실행하도록 설정하였다. default_dag_args = { 'start_date': airflow.utils.dates.days_ago(2) } # 크론탭 방식의 시간 설정 방법으로 DAG의 반복 단위를 설정할 수 있다. 0 0 * * * 은 매일 0시에 실행하라는 의미이다. dag = models.DAG( 'hello-dag-v1', schedule_interval='0 0 * * *', default_args=default_dag_args ) # BashOperator를 이용하여 echo 커맨드를 실행 run_this = BashOperator( task_id='run_bash_operator', bash_command='echo hello dag!', dag=dag )
DAG를 등록하려면 VM 인스턴스에 소스파일을 업로드해야 한다. 쉘로 들어간후 다음 명령어로 airflow 폴더 하위에 dags 폴더를 먼저 생성한다. 그리고 스케쥴러와 웹서버를 재실행한다.
cd ~/ sudo mkdir airflow/dags sudo kill -9 `ps -ef | grep 'scheduler' | grep -v grep | awk '{print $2}'` sudo -E nohup airflow scheduler >/dev/null 2>&1 & sudo kill -9 `ps -ef | grep 'webserver' | grep -v grep | awk '{print $2}'` sudo -E nohup airflow webserver -p 80 >/dev/null 2>&1 &
그리고 cd 명령어로 dags 폴더로 이동후 웹 기반 쉘의 Upload file 메뉴를 통해 hello_dag.py파일을 업로드 한다.Upload files 메뉴는 계정의 홈 폴더로 파일을 업로드하는 기능이다.

다음 명령어로 홈폴더에 위치한 DAG 파일을 dags 폴더로 이동시킨다.
sudo mv ~/hello-dag.py ~/airflow/dags/
잠시 기다리면 스케쥴러가 DAG를 인식하고 다음 화면과 같이 우리가 작성한 DAG의 이름이 출력된다. OFF라고 적혀있는 토글 버튼을 눌러서 DAG를 활성화 한다.

몇분이 지나면 DAG가 우리가 지정한 start_date 부터 시작하고 결과를 보여준다. start_date를 DAG를 활성화한 시점에 이틀전부터 지정했으므로 처리가 완료된 DAG Runs의 갯수가 2개가 뜨는 것을 볼 수 있다.
