Neste artigo iremos falar sobre Airflow. Para começar, você já se perguntou como fazer a orquestração dos seus job's ? Pois bem, com o Apache Airflow você será capaz de fazer a orquestração dos seus pipelines de dados e muito mais! Com ele você poderá criar fluxos de dados, agendar job's e ainda ter a possibilidade de analisar cada passo que seu fluxo de dados está executando.
De início vamos falar sobre como é feito para a criação desses workflows. O airflow foi desenvolvido em Python, então seus scripts serão todos em Python. Dentro dele existem alguns componentes importantes, como por exemplo:
Airflow-scheduler: Responsável por monitorar as tarefas e Dag's.
Airflow-worker: Responsável por executar as tarefas.
Broker: Responsável por levar mensagens entre os diferentes componentes no sistema, como scheduler, os workers e os processos de execução de tarefas.
Neste artigo veremos como construir uma DAG que ficará responsável por buscar dados de um arquivo ".parquet" em um bucket e jogar para dentro de uma tabela no bigquery. Precisaremos utilizar o Composer, que é um serviço gerenciado de orquestração baseado no Apache Airflow.
Para iniciarmos vamos criar o nosso arquivo "main.py" e importar algumas libs que precisamos para criar nossa primeira DAG:
Podemos notar que a primeira linha nós estamos importando a partir da lib airflow a classe DAG, que será responsável por instanciar nossa DAG.
Na próxima linha está a lib datetime, que nos possibilita trabalhar com datas e horas (isso será útil mais tarde).
E por fim a classe Variable que vem a partir do modulo models do airflow, que irá nos possibilitar resgatar valores que definimos dentro das nossas variáveis (iremos ver mais a seguir como criar essas variáveis).
Após importarmos todas as libs necessárias, vamos utilizar a classe Variable para ter acesso as nossas variáveis utilizando o seguinte código:
Podemos notar que utilizamos o método "get" que requer o nome da variável e ainda passamos outro parâmetro "deserialize_json" que irá transformar o valor da variável em um valor do formato dicionário do Python.
Como próximo passo na crição da nossa DAG, vamos agora criar um dicionário que irá conter algumas chaves:
Essas chaves irão definir algumas coisas, como por exemplo: qual a data início da DAG, quantas vezes irá tentar novamente caso dê algum erro e por fim o delay das tentativas.
Nesse próximo código iremos utilizar a classe DAG do airflow para instanciarmos e criarmos nossa DAG em si:
Veja que ele pede alguns parâmetros:
dag_id: é o identificador único de uma DAG. Ele deve ser único dentro do ambiente de execução do Airflow e é usado para identificar a DAG nos logs, na interface do usuário e em outras partes do sistema.
default_args: dicionário Python que define os argumentos padrão para todas as tarefas dentro da DAG.
Description: é uma descrição opcional da DAG. É usada para descrever o objetivo da DAG, sua funcionalidade e outras informações relevantes.
schedule_interval: define a frequência com que a DAG deve ser executada. Esse parâmetro pode ser especificado de diversas maneiras, como um intervalo de tempo cron ou utilizando o objeto timedelta do python. Caso seja "None" ela será executada apenas naquele momento.
dagrun_timeout: tempo máximo permitido para a execução de uma instância da DAG.
Agora já podemos criar o nosso fluxo de dados dentro da nossa DAG. Como queremos realizar a ingestão de dados a partir de um arquivo que está em um bucket para o bigquery, precisaremos importar um operator do airflow que seria a GCSToBigQueryOperator. Ele nada mais é que um operador que nos permite realizar alguma certa ação, que no nosso caso é exportar um arquivo do Cloud storage para o Bigquery. Existem diversos operators e cada operator tem sua respectiva funcionalidade. Para fazer o import deste operator basta adicionar a linha a seguir:
E para utilizar esse operator teremos alguns parâmetros que devemos passar para ele:
Para utilizar esse operador ele requer alguns parâmetros:
task_id: id único na DAG que servirá como representante da task.
Bucket: nome do bucket no Cloud Storage.
source_objects: URIs do/s arquivos que deseja fazer a exportação.
source_format: formato dos arquivos.
schema_fields: schema dos dados dentro dos arquivos.
write_disposition: especifica o que fazer se uma tabela de destino já existir durante a gravação de dados em uma tabela.
create_disposition: especifica o que fazer na criação da tabela caso ela não exista.
destination_project_dataset_table: tabela destino para onde os dados irão.
Veja que definimos o caminho do nosso arquivo dentro do source_objects e informamos que o write_disposition será para adicionar os dados a tabela. Coloquei os valores setado já nos parâmetros, mas o que iremos fazer é setar esses valores dentro das nossas variáveis que iremos criar dentro do airflow. Para isso criaremos um json que conterá esses nossos valores:
Depois basta substituir os valores setados nos parâmetros do operador utilizando os valores que estão dentro da variável que utilizamos para pegar as variáveis do airflow a "dag_variables":
Feito todo esse passo a passo agora podemos jogar nosso arquivo main.py dentro da pasta DAGs no composer, como o exemplo a seguir:
Passo 1: Abrir a pasta DAGs
Passo 2: Subir o arquivo main.py para a pasta
Passo 3: Abrir a interface do Airflow
Seguindo esses passos nós entraremos na interface gráfica do Airflow e iremos nos deparar com o seguinte erro:
Esse erro nos informa que não existe uma variável chamada "artigo_airflowo" nas variáveis do airflow, então seguiremos esses próximos passos para adicioná-la:
Passo 1: Abrir a página de variáveis
Passo 2: Clicar em adicionar uma nova variável
Passo 3: Definir o nome da variável no campo KEY e o no campo VAL colocaremos o nosso json que contem as informações que precisamos
Feito isso o erro desaparecerá e conseguiremos executar a nossa DAG:
E agora ao entrar na nossa DAG e indo na aba GRAPH poderemos observar que tem somente um pipeline que no caso é o responsável por fazer a extração do arquivo do Cloud Storage para o BigQuery:
Ao ser executado ele ficará com uma cor verde mais clara que indica que a task está sendo executada, caso dê algum erro ela irá esperar um tempo para tentar novamente e ficará com a coloração amarelada e caso dê tudo certo ficará com a cor que está na imagem acima. E agora se formos observar se foram inseridos na nossa tabela no bigquery veremos o seguinte resultado:
E assim termina este artigo com um pequeno aprendizado sobre como podemos fazer a orquestração dos nossos job's. Até a próxima!