Arquitetura

Arquitetura

O diagrama a seguir demonstra a visão macro da plataforma.

Visão Macro

Ela Ć© dividida nas seguintes partes:

API

A aplicação golang sindicator-manager possui a API responsÔvel pelo CRUD das configurações com os pipelines.

Entrada

Conecta-se com diferentes tipos de entrada de dados.

Adaptação

Aplica transformações no dado trafegado. Essas transformações podem ser realizadas através de inversão de controle (chamando-se uma API externa, por exemplo) ou por meio de processamento em streaming (usando o https://flink.apache.org/ ).

Indexação

Permite indexar dados para serem filtrados e entregues posteriormente.

Filtro

Permite filtrar dados previamente indexados para serem entregues.

SaĆ­da

Conecta-se com diferentes tipos de saĆ­da para entrega de dados.

Funcionamento

Descreve em detalhes cada etapa do fluxo desde a criação do pipeline até o fluxo das mensagens nesse pipeline.

Novo pipeline

O diagrama a seguir ilustra as conexões realizadas durante a criação de um novo pipeline:

A primeira etapa consiste em verificar a autenticação do usuÔrio. O Keycloak é utilizado como autenticador nessa etapa. Ele também é utilizado como gerenciador de identidade.

Uma vez autenticado ocorre a validação do payload da configuração. Utilizamos a lib de validação para esse fim.

Depois são realizadas requisições em APIs externas dependendo do que estÔ sendo configurado no pipeline.

  • Caso seja uma entrada do tipo N8N a API Ć© utilizada para registrar o workflow e credenciais para recuperar os dados;

  • Caso possua um processamento em streaming com Flink configurado as APIs de Schema Registry, Kafka e Flink SQL da Confluent sĆ£o utilizadas para configurar os pipelines de processamento em streaming;

  • Caso seja um input do tipo streaming (Ex: KAFKA, GOOGLE_CLOUD_PUBSUB), Ć© adicionada uma mensagem na fila dynamic_input_streamming do Deckard que serĆ” utilizada posteriormente pelo fluxo do pipeline para realizar a conexĆ£o com a entrada e recuperar as mensagens;

  • Caso seja um input do tipo batching (Ex: SQL, DELTA_SHARING), Ć© adicionada uma mensagem na fila dynamic_input_batching do Deckard que serĆ” utilizada posteriormente pelo fluxo do pipeline para realizar a conexĆ£o com a entrada e recuperar as mensagens.

Por fim a configuração é salva nos repositórios.

  • O Mongo Ć© utilizado para armazenar a configuração do pipeline de forma persistente;

  • O Redis utilizado para armazenar a configuração do pipeline para acesso rĆ”pido. As configuraƧƵes sĆ£o salvas sem TTL e em qualquer alteração toda a configuração Ć© substituĆ­da nele;

  • O Hashicorp Vault Ć© utilizado para guardar os segredos (tokens, senhas, credenciais, ...) da configuração. Esses dados sĆ£o salvos encriptados em aes no esquema ctr, a chave e vetor de criptografia só sĆ£o conhecidos pela aplicação do sindicator-manager.

O payload de resposta para o usuƔrio em caso de sucesso aplica uma mƔscara nos campos com segredos.

Fluxo do pipeline

O diagrama a seguir ilustra como Ć© o fluxo das mensagens em um pipeline:

Input HTTP

Nesse caso as mensagens sĆ£o enviadas para o fluxo de um pipeline a partir da API HTTP do Firehose. Ɖ necessĆ”rio que o usuĆ”rio tenha criado uma configuração de pipeline previamente pois o id do pipeline, alĆ©m da autenticação Ć© obrigatório para enviar as mensagens.

A aplicação que recebe essas mensagens é o webhooks-server. O fluxo que ele executa é o seguinte:

Em um primeiro momento a aplicação valida se requisição contém o content-type do payload enviado.

Depois verifica se o tamanho do payload Ć© inferior a 1MB.

Ɖ realizada entĆ£o uma requisição gRPC no sindicator-manager para recuperar os dados da configuração do pipeline para a qual a mensagem foi enviada.

A partir da configuração define-se para qual etapa (tópico Kafka) a mensagem serÔ enviada para seguir no fluxo. A verificação ocorre nessa ordem:

  1. Se a configuração possui algum estÔgio de transformação ou processamento em streaming a mensagem segue para o tópico: firehose_transformer;

  2. Caso contrÔrio a mensagem segue para o tópico: firehose_input.

Por fim ocorre o retorno do status da requisição para o cliente.

Input Batching

Nesse caso o Firehose se conecta a entrada cadastrada na configuração do pipeline e recupera as mensagens em lotes.

A aplicação responsÔvel por essa entrada é o input-batching. O fluxo que ela executa é o seguinte:

Inicialmente a aplicação recupera uma mensagem com o id da configuração de pipeline cadastrada a partir da fila do deckard: dynamic_input_batching.

A mesma mensagem é retornada pelo deckard a cada 30s mas um batch só é iniciado quando o batch anterior for finalizado, a mensagem vai sendo ignorada enquanto isso. O timeout de processamento do batch é de 30 minutos. Nesse caso a mensagem é liberada para que o mesmo batching seja processado novamente.

A partir do id da configuração é realizada uma requisição gRPC no sindicator-manager para recuperar a configuração dinâmica do pipeline. A resposta dessa requisição contém todos os dados de autenticação necessÔrios para se conectar com a entrada em questão (Ex: SQL, Delta Sharing, ...).

Ɖ realizado entĆ£o o registro da entrada de forma dinĆ¢mica utilizando a requisição HTTP do Benthos no localhost jĆ” que a aplicação possui um http server pois possui uma entrada dinĆ¢mica.

Uma vez que a entrada Ʃ registrada o consumo das mensagens jƔ comeƧa a acontecer.

Ɖ realizada entĆ£o uma requisição gRPC no sindicator-manager para recuperar os dados da configuração do pipeline para a qual a mensagem estĆ” trafegando.

A partir da configuração define-se para qual etapa (tópico Kafka) a mensagem serÔ enviada para seguir no fluxo. A verificação ocorre nessa ordem:

  1. Se a configuração possui algum estÔgio de transformação ou processamento em streaming a mensagem segue para o tópico: firehose_transformer;

  2. Caso contrÔrio a mensagem segue para o tópico: firehose_input.

Quando todas as mensagens são consumidas ocorre a atualização do breakpoint da mensagem no deckard para que o próximo batch seja consumido da entrada a partir do ponto atual desconsiderando o que jÔ foi consumido.

Input Streaming

Nesse caso o Firehose se conecta a entrada cadastrada na configuração do pipeline e recupera as mensagens de forma contínua a partir do streaming de entrada.

A aplicação responsÔvel por essa entrada é o input-streamming. O fluxo que ela executa é o seguinte:

Inicialmente a aplicação recupera uma mensagem com o id da configuração de pipeline cadastrada a partir da fila do deckard: dynamic_input_streamming.

A partir do id da configuração é realizada uma requisição gRPC no sindicator-manager para recuperar a configuração dinâmica do pipeline. A resposta dessa requisição contém todos os dados de autenticação necessÔrios para se conectar com a entrada em questão (Ex: Kafka, GCP PubSub, ...). Esse processo se repete a cada 15 minutos para garantir que a configuração estÔ atualizada e que a entrada serÔ removida caso o pipeline em questão seja excluído ou mesmo alterado para um outro tipo de entrada.

Ɖ realizado entĆ£o o registro da entrada de forma dinĆ¢mica utilizando a requisição HTTP do Benthos no localhost jĆ” que a aplicação possui um http server pois possui uma entrada dinĆ¢mica.

Uma vez que a entrada Ʃ registrada o consumo das mensagens jƔ comeƧa a acontecer.

Ɖ realizada entĆ£o uma requisição gRPC no sindicator-manager para recuperar os dados da configuração do pipeline para a qual a mensagem estĆ” trafegando.

A partir da configuração define-se para qual etapa (tópico Kafka) a mensagem serÔ enviada para seguir no fluxo. A verificação ocorre nessa ordem:

  1. Se a configuração possui algum estÔgio de transformação ou processamento em streaming a mensagem segue para o tópico: firehose_transformer;

  2. Caso contrÔrio a mensagem segue para o tópico: firehose_input.

Adapting

Realiza transformaƧƵes configuradas no pipeline em cada mensagem trafegada.

A aplicação responsÔvel é o adapting-transformer-pipeline. Ela se conecta ao tópico firehose_transformer populado pelos inputs caso a configuração possua etapa de transformação ou processamento em streaming.

Também existe a aplicação adapting-pipeline que executa o mesmo pipeline, porém não aplica transformações nas mensagens e segue com elas no fluxo. Ela se conecta ao tópico firehose_input. Isso é necessÔrio devido a infraestrutura de tópico Kafka único. Dessa forma, configurações de pipeline que não possuem etapas de transformação seguem por um fluxo apartado (tópico kafka diferente) e não são prejudicadas por latência de configurações que executam transformações.

O fluxo que ela executa Ć© o seguinte:

Em um primeiro momento é realizada então uma requisição gRPC no sindicator-manager para recuperar os dados da configuração do pipeline para a qual a mensagem estÔ trafegando.

A partir da configuração define-se para qual etapa (tópico Kafka) a mensagem serÔ enviada para seguir no fluxo. A verificação ocorre nessa ordem:

  1. Se a configuração possui um estÔgio processamento em streaming a mensagem segue para o tópico: firehoseprocessing_source{CONFIG-ID};

  2. Se a configuração possui um estÔgio de indexação a mensagem segue para o tópico: firehose_filter. Nesse caso é adicionado um metadado nas mensagens (firehose_index) com um json contendo os campos que poderão ser filtrados posteriormente por uma configuração de filtro.

  3. Caso contrÔrio a mensagem segue para o tópico: firehose_adapted.

Ɖ realizada entĆ£o uma requisição gRPC no sindicator-manager para recuperar a configuração dinĆ¢mica do pipeline. A resposta dessa requisição contĆ©m todos os dados de autenticação necessĆ”rios para se conectar com as APIs externas caso exista um estĆ”gio de transformação com requisiƧƵes Ć  APIs externas.

A partir da configuração dinâmica são executadas as etapas de transformação na mensagem seguindo o workflow configurado no pipeline. Nesse momento são realizadas as requisições em APIs externas, se necessÔrio.

Se uma configuração possui estÔgio de processamento em streaming é realizado a conversão das mensagens para o formato Avro que serÔ enviado para o tópico de processamento em streaming com Flink. A API de Schema Registry da Confluent é chamada nesse momento para recuperar o schema Avro que serÔ usado na conversão das mensagens.

Syndicating

Envia os dados de forma transacional para a sindicância de destino (saída) configurada no pipeline.

A aplicação responsÔvel é o syndicating-pipeline. O fluxo que ela executa é o seguinte:

Em um primeiro momento é realizada então uma requisição gRPC no sindicator-manager para recuperar os dados da configuração do pipeline para a qual a mensagem estÔ trafegando.

Ɖ realizado entĆ£o um controle de circuit breaker utilizando o Redis que verifica se mensagens de uma configuração estĆ£o resultando em erros na entrega. Caso uma configuração apresente pelo menos 5 erros em 1 minuto ela Ć© penalizada por 15 minutos e suas mensagens sĆ£o enviadas diretamente para o dead letter pra que sejam entregues posteriormente a medida que o dead letter vai sendo consumido.

Também existe um controle de rate limit utilizando o Redis, nesse caso o cliente configura no output uma taxa de entrega com quantidade de mensagens por tempo e a aplicação respeita esse throughput enviando as mensagens para o dead letter para que sejam entregues posteriormente a medida que o dead letter vai sendo consumido e respeitando o rate limit configurado.

Ɖ realizada entĆ£o uma requisição gRPC no sindicator-manager para recuperar a configuração dinĆ¢mica do pipeline. A resposta dessa requisição contĆ©m todos os dados de autenticação necessĆ”rios para se conectar com a saĆ­da em questĆ£o (Ex: Kafka, GCP PubSub, ...), caso exista. Esse processo se repeta a cada 1 minuto para garantir que a configuração da saĆ­da estĆ” atualizada.

Caso exista uma configuração dinâmica de saída é realizado o registro da saída de forma dinâmica utilizando a requisição HTTP do Benthos no localhost jÔ que a aplicação possui um http server pois possui um output dinâmico.

Uma vez que a saída é registrada as mensagens jÔ podem ser enviadas. A aplicação também envia um evento para o tópico de billing que serÔ utilizado para contabilizar as mensagens entregues para uma configuração.

Exporter

Envia os dados agregados em arquivos (Ex: csv, json) para a sindicância de destino (saída) configurada no pipeline.

A aplicação responsÔvel é o exporter-pipeline. O fluxo que ela executa é o seguinte:

Inicialmente a aplicação recupera uma mensagem com o id da configuração de pipeline cadastrada a partir de uma das filas do deckard: export_mongo_messages ou export_parquet_messages.

Então é realizada então uma requisição gRPC no sindicator-manager para recuperar os dados da configuração do pipeline para a qual os dados serão enviados.

O próximo passo é agregar os dados a serem entregues em arquivos em um formato específico (Ex: csv, json) e escrever esses arquivos em disco no diretório temporÔrio.

Ɖ realizada entĆ£o uma requisição gRPC no sindicator-manager para recuperar a configuração dinĆ¢mica do pipeline. A resposta dessa requisição contĆ©m todos os dados de autenticação necessĆ”rios para se conectar com a saĆ­da em questĆ£o (Ex: AWS S3, GCP Storage, SFTP, ...). Esse processo se repete a cada 1 minuto para garantir que a configuração da saĆ­da estĆ” atualizada.

Caso exista uma configuração dinâmica de saída é realizado o registro da saída de forma dinâmica utilizando a requisição HTTP do Benthos no localhost jÔ que a aplicação possui um http server pois possui um output dinâmico.

Uma vez que a saída é registrada as mensagens os arquivos são enviados.

Filtering

Filtra mensagens indexadas.

A aplicação responsÔvel é o filtering-pipeline. O fluxo que ela executa é o seguinte:

A cada 1 minuto a aplicação realiza uma requisição gRPC no sindicator-manager para recuperar todas as configurações de pipeline que possuem filtro. De posse dessas configurações é executado um filtro em cada mensagem existente no tópico de mensagens indexadas (firehose_filter) verificando quais configurações casam com a mensagem.

Caso a mensagem seja filtrada para pelo menos uma configuração de pipeline é realizada uma requisição gRPC no sindicator-manager para recuperar os dados de cada uma dessas configurações.

A partir da configuração define-se para qual etapa (tópico Kafka) a mensagem serÔ enviada para seguir no fluxo. A verificação ocorre nessa ordem:

  1. Se a configuração possui algum estÔgio de transformação ou processamento em streaming a mensagem segue para o tópico: firehose_transformer;

  2. Caso contrÔrio a mensagem segue para o tópico: firehose_adapted.

Last updated