Blip Firehose
  • Detalhes
    • Blip Firehose
    • UI (Alpha)
    • Casos de Uso
    • Arquitetura
    • Hosts
    • Autenticação
    • Postman
    • API
  • Configurações
    • Configuração
    • Configuração de Entrada
      • Azure Blob
      • Delta Sharing
      • Google Cloud PubSub
      • HTTP Client
      • HTTP
      • Kafka
      • N8N
      • SQL
    • Configuração de Saída
      • AWS Kinesis
      • AWS S3
      • Azure Blob
      • Delta Lake
      • Elastic Search
      • Email Transacional
      • Email
      • Firehose
      • Google Cloud PubSub
      • Google Cloud Storage
      • HTTP
      • Kafka
      • MongoDB
      • SFTP
      • SQL
    • Processamento em Streaming
    • Configuração de Indexação
    • Configuração de Filtro
    • Transformação
    • Verificar Configuração
  • Informações
    • Vídeo explicativo
    • Observabilidade
Powered by GitBook
On this page
  • Tipos
  • Flink SQL
  • Parâmetros
  • Requisição
  • Limitações
  1. Configurações

Processamento em Streaming

PreviousSQLNextConfiguração de Indexação

Last updated 8 months ago

É possível realizar processamento no streaming de dados do fluxo. O processamento ocorre no início do fluxo (após o input ou filtro), portanto, é possível aplicar uma transformação e/ou indexar os dados dentro de uma mesma configuração.

O diagrama a seguir ilustra em que parte do fluxo ocorre o processamento em streaming:

Tipos

Os seguintes tipos de processamento em streaming são aceitos:

Flink SQL

Permite que um processamento utilizando o seja realizado no streaming de dados.

Nesse tipo de processamento os streamings, representados por tópicos Kafka com esquema definido, são considerados tabelas nas quais as declarações SQL são aplicadas.

O fluxo ocorre de acordo com o diagrama:

Parâmetros

Parâmetros aceitos na configuração de processamento em streaming:

Campo
Descrição
Valor

processor.type

Tipo do processamento em streaming

Tipo: enum[FLINK_SQL] Obrigatório

processor.description

Descrição do estágio de processamento

Tipo: string

processor.flink_sql

Configuração do processamento com Flink SQL

Tipo: object Obrigatório se processor.type=FLINK_SQL

processor.flink_sql.timezone

Fuso horário a ser utilizado no processamento

Tipo: string Valor padrão: America/Sao_Paulo

processor.flink_sql.sources

Lista com os streamings de entrada do processamento

Tipo: list Obrigatório

processor.flink_sql.sources[].name

Nome do streaming de origem

Tipo: string Obrigatório - Único entre os streamings

processor.flink_sql.sources[].type

Tipo do streaming de origem

Tipo: enum[CREATE_TABLE] Obrigatório - Único entre as etapas

processor.flink_sql.sources[].create_table

Configuração de streaming de origem

Tipo: object Obrigatório se processor.flink_sql.sources.type=CREATE_TABLE

processor.flink_sql.sources[].create_table.columns

Colunas do streaming de origem

Tipo: list Obrigatório

processor.flink_sql.sources[].create_table.columns[].name

Nome da coluna

Tipo: list Obrigatório

processor.flink_sql.sources[].create_table.columns[].type

Tipo da coluna

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].nullable

Coluna aceita valores nulos

Tipo: boolean Valor padrão: true

processor.flink_sql.sources[].create_table.columns[].char

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].char.length

Tamanho máximo do campo

Tipo: int Valor padrão: 1

processor.flink_sql.sources[].create_table.columns[].varchar

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].varchar.length

Tamanho máximo do campo

Tipo: int Valor padrão: 1

processor.flink_sql.sources[].create_table.columns[].binary

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].binary.length

Tamanho máximo do campo

Tipo: int Valor padrão: 1

processor.flink_sql.sources[].create_table.columns[].varbinary

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].varbinary.length

Tamanho máximo do campo

Tipo: int Valor padrão: 1

processor.flink_sql.sources[].create_table.columns[].decimal

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].decimal.precision

Precisão do campo

Tipo: int Valor padrão: 10

processor.flink_sql.sources[].create_table.columns[].decimal.scale

Escala do campo

Tipo: int Valor padrão: 0

processor.flink_sql.sources[].create_table.columns[].interval_day_to_second

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.precision

Precisão do campo

Tipo: int Valor padrão: 2

processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.fractional_precision

Precisão fracionária do campo

Tipo: int Valor padrão: 6

processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.resolution

Resolução do campo

Tipo: enum[DAY DAY_TO_SECOND DAY_TO_MINUTE DAY_TO_HOUR HOUR HOUR_TO_MINUTE HOUR_TO_SECOND MINUTE MINUTE_TO_SECOND SECOND] Obrigatório

processor.flink_sql.sources[].create_table.columns[].interval_year_to_month

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].interval_year_to_month.precision

Precisão do campo

Tipo: int Valor padrão: 2

processor.flink_sql.sources[].create_table.columns[].interval_year_to_month.resolution

Resolução do campo

Tipo: enum[YEAR YEAR_TO_MONTH MONTH] Obrigatório

processor.flink_sql.sources[].create_table.columns[].time

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].time.precision

Precisão do campo

Tipo: int Valor padrão: 0

processor.flink_sql.sources[].create_table.columns[].timestamp

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].timestamp.precision

Precisão do campo

Tipo: int Valor padrão: 6

processor.flink_sql.sources[].create_table.columns[].timestamp_ltz

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].timestamp_ltz.precision

Precisão do campo

Tipo: int Valor padrão: 6

processor.flink_sql.sources[].create_table.columns[].array

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].array.element.type

Tipo de campo

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].map

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].map.key.type

Tipo da chave

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].map.value.type

Tipo do valor

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].multiset

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].multiset.element.type

Tipo do valor do campo

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].row

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].row.columns

Configuração das colunas aceitas em processor.flink_sql.sources[].create_table.columns

Tipo: list Obrigatório

processor.flink_sql.sources[].create_table.watermark

Tipo: object

processor.flink_sql.sources[].create_table.watermark.column

Nome da coluna a ser considerada

Tipo: string Obrigatório

processor.flink_sql.sources[].create_table.watermark.period

Período a ser considerado

Tipo: float

processor.flink_sql.sources[].create_table.watermark.time_unit

Unidade de tempo

Tipo: enum[SECOND MINUTE HOUR]

processor.flink_sql.select

Tipo: object Obrigatório

processor.flink_sql.select.statement

Declaração SQL do tipo SELECT

Tipo: string Obrigatório

processor.flink_sql.sink

Configuração do streaming de destino

Tipo: object Obrigatório

processor.flink_sql.sink.create_table

Configuração de criação do streaming de destino. Mesma configuração aceita em processor.flink_sql.sources[].create_table

Tipo: object

Requisição

Para habilitar o processamento em streaming a seguinte requisição pode ser realizada:

curl --location '<HOST>/v1/configs' \
--header 'Content-Type: application/json' \
--header 'Authorization: <ACCESS-TOKEN>' \
--data '{
{
  "processor": {
    "type": "FLINK_SQL",
    "description": "PROCESSOR-DESCRIPTION",
    "flink_sql": {
      "sources": [
        {
          "name": "MY-TABLE-ALIAS",
          "type": "CREATE_TABLE",
          "create_table": {
            "columns": [
              {
                "name": "Content",
                "type": "STRING"
              },
              {
                "name": "DateUnixMillis",
                "type": "TIMESTAMP",
                "timestamp": {
                  "precision": 3
                }
              }
            ],
            "watermark": {
              "column": "DateUnixMillis",
              "period": 1,
              "time_unit": "MINUTE"
            }
          }
        }
      ],
      "select": {
        "statement": "SELECT `Content` AS `content`, `window_start`, `window_end`, COUNT(`Content`) AS `count` FROM TABLE( TUMBLE(TABLE MY-TABLE-ALIAS, DESCRIPTOR(`DateUnixMillis`), INTERVAL '10' MINUTES)) GROUP BY `window_start`, `window_end`, `Content`"
      },
      "sink": {
        "create_table": {
          "columns": [
            {
              "name": "Content",
              "type": "STRING"
            },
            {
              "name": "window_start",
              "type": "TIMESTAMP",
              "timestamp": {
                "precision": 3
              }
            },
            {
              "name": "window_end",
              "type": "TIMESTAMP",
              "timestamp": {
                "precision": 3
              }
            },
            {
              "name": "count",
              "type": "BIGINT"
            }
          ]
        }
      }
    }
  }
}
}'

É uma boa prática colocar o nome das colunas a serem selecionadas da tabela entre ` para diferenciá-las de palavras reservadas da sintaxe SQL.

No exemplo acima foi gerada uma configuração do Firehose que aplica um processamento em streaming nos dados trafegados no fluxo que respeitam o seguinte esquema:

[
  {
    "name": "Content",
    "type": "STRING"
  },
  {
    "name": "DateUnixMillis",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  }
]

Caso os campos da mensagem trafegada não respeitem o esquema a mensagem é ignorada.

O processamento em questão realiza uma contagem de quantas vezes o conteúdo do campo Content se repete dentro de uma janela fixa de 10 minutos. O campo DateUnixMillis do tipo timestamp(3) é utilizado como marca d'água para definir as janelas de tempo no streaming de origem.

Somente um campo do tipo timestamp(3) pode ser utilizado como marca d'água para processamento em janela de tempo.

Após o término da janela o resultado possui os tempos de início e término da janela de tempo, o conteúdo e sua respectiva contagem. O resultado é enviado para um streaming de saída com o seguinte esquema:

[
  {
    "name": "Content",
    "type": "STRING"
  },
  {
    "name": "window_start",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  },
  {
    "name": "window_end",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  },
  {
    "name": "count",
    "type": "BIGINT"
  }
]

Limitações

Não é possível utilizar mais de uma fonte (source) para realizar uma operação de JOIN entre streams, por exemplo. Somente os dados de entrada do fluxo da configuração poderão ser processados em streaming.

Qualquer alteração no esquema da source ou sync resultará na deleção do tópico kafka correspondente que será recriado com o novo esquema, caso haja mensagens não consumidas nesses tópicos elas serão perdidas.

Mensagens que possuam campos não mapeados no esquema da tabela (source) resultarão em erro e não serão enviadas para o processamento em streaming.

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração do tipo

Configuração da estratégias de

Configuração da

char
varchar
binary
varbinary
decimal
interval_day_to_second
interval_year_to_month
time
timestamp
timestamp_ltz
array
map
multiset
row
marca d'água
declaração SQL do tipo SELECT
Confluent Flink SQL