Alguma vez você já se perguntou, o que seria o Dataflow? Se sim, venho aqui te responder essa pergunta e lhe dar dicas de como utilizar esse serviço incrível. Bom, o Dataflow nada mais é que um serviço gerenciado que tem a capacidade de executar uma grande gama de variações de padrões para o processamento dados.
Mas agora você deve estar se perguntando: Certo, mas como faço para começar a processar meus dados? A resposta é simples, utilizando o SDK do Apache Beam. O Apache Beam nada mais é um modelo de código aberto para a criação de pipelines de processamento de dados em paralelo em lote ou em streaming. Com ele é possível criar um pipeline e executá-lo utilizando o motor de processamento, o Dataflow. Com essa junção você tem em mãos tudo o que precisa para processar seus dados, pois com o Apache Beam você se preocupa apenas em construir o job e o Dataflow é quem irá comandar e saber como o seu job deverá ser executado.
Agora vamos dar uma olhada em como funciona para construir o seu primeiro pipeline.
Primeiramente devemos ter instalado na nossa máquina a versão do Python compatível com Apache Beam, que são as versões 3.6 até 3.8. Feito isso, podemos já instalar o pacote do Apache Beam, utilizando o seguinte comando no terminal:
pip install apache-beam['gcp']
Veja que ao final do nome do pacote, coloquei "['gcp']" para que junto ao pacote venham dependências extras como alguns conectores para conseguirmos realizar algumas ações na GCP, como por exemplo o conectore de GSC IO que irão auxiliar a escrever ou ler arquivos que estão nos nossos buckets na Google cloud Storage.
Com os pacotes instalados, já podemos dar início a criação do nosso pipeline. Veja um exemplo a seguir:
Este é um exemplo simples de como começamos a construir o nosso pipeline, mas agora iremos colocar mais algumas coisas, como por exemplo as "options" do nosso pipeline. Veja um exemplo a seguir:
As "options" do pipeline servem basicamente para que você possa especificar alguns aspectos, como por exemplo o Runner(que é quem irá executar o seu pipeline), o nome do projeto, etc. Você também pode criar "options customizadas e que quando você for executar seu pipeline consiga pegar os valores dentro do seu código. Exemplo:
É criada uma classe onde ela receberá o construtor "PipelineOptions"e dentro dessa classe terá uma função que irá informar quais os argumentos extras que você deseja que tenha. Aqui por exemplo eu informei que deverá ter o argumento "--bucket_name". Esse argumento irá ser informado no momento em que eu for executar o nosso pipeline, e com isso, eu irei utiliza-lo para informar o caminho do nosso bucket no Google Cloud Storage. Criada a nossa classe, basta agora informamos para o nosso pipeline, como no exemplo a seguir:
Para que as nossas "options" sejam enxergadas temos que utilizar o "view_as" para que possa ser adicionado essas novas "options" e nos retorna um objeto tal como o "PipelineOptions" nos retorna. E passando para uma variável podemos resgatar o campo que informamos na classe PipeCustom.
Após ajustar as configurações iniciais, vamos criar o nosso pipeline de fato. O nosso pipeline será organizado entre PCollections e Transformações. As PColletions são uma representação de um conjunto de dados que serão usados como entradas e saídas do nosso pipeline e esses conjuntos de dados servem como entrada para as nossas transformações.
Começamos criando uma PCollection utilizando o método Create(), nele você irá passar como parâmetro um um array com elementos, seja ele números, strings ou dicionários. No próximo passo passamos a saída do passo anterior e utilizamos no método Filter(), que irá nos possibilitar fazer uma filtragem específica, como por exemplo filtrar somente os que moram em Natal e em Recife. O método Filter recebe uma função como parâmetro, que no caso passamos uma função anônima. E no nosso último passo eu pedi para escreve no nosso diretório que informamos no parâmetro --bucket_name(iremos informar o diretório do nosso bucket na GCP) e informei que seria do tipo ".json".
Feito o nosso pipeline, iremos agora executá-lo:
OBS.: Para que o pipeline consiga ser executado no seu ambiente na GCP, necessário que você já tenha configurado na sua máquina o gCloud CLI.
Pode-se notar que há alguns parâmetros padrões que temos que informar quando vamos executar o nosso pipeline.
runner: Parâmetro que serve para informar quem será o executor do nosso pipeline, que no nosso caso será o DataflowRunner.
project: Nome do nosso projeto na GCP.
temp_location e staging_location: Será onde alguns arquivos criados pelo dataflow serão inseridos.
job_name: Nome do nosso job no dataflow.
region: Região onde será executado o job.
bucket_name: Nossa custom option que criamos para informar o diretório do nosso bucket.
Após executar veremos que o nosso job do dataflow já está sendo executado.
Entrando no job poderemos observar cada passo do nosso pipeline:
Após a execução todos os steps ficarão verdes com um símbolo de verificado, assim como na imagem abaixo:
Agora que o nosso pipeline executou, podemos olhar no nosso bucket que criamos e observar que foi criado o arquivo com os dados que foram criados a partir do que queríamos:
Assim podemos concluir como o Dataflow funciona e quais todas as possibilidades que temos para executar os nossos processamentos de dados.