Processamento em Streaming

É possível realizar processamento no streaming de dados do fluxo. O processamento ocorre no início do fluxo (após o input ou filtro), portanto, é possível aplicar uma transformação e/ou indexar os dados dentro de uma mesma configuração.

O diagrama a seguir ilustra em que parte do fluxo ocorre o processamento em streaming:

Tipos

Os seguintes tipos de processamento em streaming são aceitos:

Permite que um processamento utilizando o Confluent Flink SQL seja realizado no streaming de dados.

Nesse tipo de processamento os streamings, representados por tópicos Kafka com esquema definido, são considerados tabelas nas quais as declarações SQL são aplicadas.

O fluxo ocorre de acordo com o diagrama:

Parâmetros

Parâmetros aceitos na configuração de processamento em streaming:

CampoDescriçãoValor

processor.type

Tipo do processamento em streaming

Tipo: enum[FLINK_SQL] Obrigatório

processor.description

Descrição do estágio de processamento

Tipo: string

processor.flink_sql

Configuração do processamento com Flink SQL

Tipo: object Obrigatório se processor.type=FLINK_SQL

processor.flink_sql.timezone

Fuso horário a ser utilizado no processamento

Tipo: string Valor padrão: America/Sao_Paulo

processor.flink_sql.sources

Lista com os streamings de entrada do processamento

Tipo: list Obrigatório

processor.flink_sql.sources[].name

Nome do streaming de origem

Tipo: string Obrigatório - Único entre os streamings

processor.flink_sql.sources[].type

Tipo do streaming de origem

Tipo: enum[CREATE_TABLE] Obrigatório - Único entre as etapas

processor.flink_sql.sources[].create_table

Configuração de streaming de origem

Tipo: object Obrigatório se processor.flink_sql.sources.type=CREATE_TABLE

processor.flink_sql.sources[].create_table.columns

Colunas do streaming de origem

Tipo: list Obrigatório

processor.flink_sql.sources[].create_table.columns[].name

Nome da coluna

Tipo: list Obrigatório

processor.flink_sql.sources[].create_table.columns[].type

Tipo da coluna

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].nullable

Coluna aceita valores nulos

Tipo: boolean Valor padrão: true

processor.flink_sql.sources[].create_table.columns[].char

Configuração do tipo char

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].char.length

Tamanho máximo do campo

Tipo: int Valor padrão: 1

processor.flink_sql.sources[].create_table.columns[].varchar

Configuração do tipo varchar

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].varchar.length

Tamanho máximo do campo

Tipo: int Valor padrão: 1

processor.flink_sql.sources[].create_table.columns[].binary

Configuração do tipo binary

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].binary.length

Tamanho máximo do campo

Tipo: int Valor padrão: 1

processor.flink_sql.sources[].create_table.columns[].varbinary

Configuração do tipo varbinary

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].varbinary.length

Tamanho máximo do campo

Tipo: int Valor padrão: 1

processor.flink_sql.sources[].create_table.columns[].decimal

Configuração do tipo decimal

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].decimal.precision

Precisão do campo

Tipo: int Valor padrão: 10

processor.flink_sql.sources[].create_table.columns[].decimal.scale

Escala do campo

Tipo: int Valor padrão: 0

processor.flink_sql.sources[].create_table.columns[].interval_day_to_second

Configuração do tipo interval_day_to_second

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.precision

Precisão do campo

Tipo: int Valor padrão: 2

processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.fractional_precision

Precisão fracionária do campo

Tipo: int Valor padrão: 6

processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.resolution

Resolução do campo

Tipo: enum[DAY DAY_TO_SECOND DAY_TO_MINUTE DAY_TO_HOUR HOUR HOUR_TO_MINUTE HOUR_TO_SECOND MINUTE MINUTE_TO_SECOND SECOND] Obrigatório

processor.flink_sql.sources[].create_table.columns[].interval_year_to_month

Configuração do tipo interval_year_to_month

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].interval_year_to_month.precision

Precisão do campo

Tipo: int Valor padrão: 2

processor.flink_sql.sources[].create_table.columns[].interval_year_to_month.resolution

Resolução do campo

Tipo: enum[YEAR YEAR_TO_MONTH MONTH] Obrigatório

processor.flink_sql.sources[].create_table.columns[].time

Configuração do tipo time

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].time.precision

Precisão do campo

Tipo: int Valor padrão: 0

processor.flink_sql.sources[].create_table.columns[].timestamp

Configuração do tipo timestamp

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].timestamp.precision

Precisão do campo

Tipo: int Valor padrão: 6

processor.flink_sql.sources[].create_table.columns[].timestamp_ltz

Configuração do tipo timestamp_ltz

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].timestamp_ltz.precision

Precisão do campo

Tipo: int Valor padrão: 6

processor.flink_sql.sources[].create_table.columns[].array

Configuração do tipo array

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].array.element.type

Tipo de campo

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].map

Configuração do tipo map

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].map.key.type

Tipo da chave

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].map.value.type

Tipo do valor

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].multiset

Configuração do tipo multiset

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].multiset.element.type

Tipo do valor do campo

Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório

processor.flink_sql.sources[].create_table.columns[].row

Configuração do tipo row

Tipo: object Valor padrão: {}

processor.flink_sql.sources[].create_table.columns[].row.columns

Configuração das colunas aceitas em processor.flink_sql.sources[].create_table.columns

Tipo: list Obrigatório

processor.flink_sql.sources[].create_table.watermark

Configuração da estratégias de marca d'água

Tipo: object

processor.flink_sql.sources[].create_table.watermark.column

Nome da coluna a ser considerada

Tipo: string Obrigatório

processor.flink_sql.sources[].create_table.watermark.period

Período a ser considerado

Tipo: float

processor.flink_sql.sources[].create_table.watermark.time_unit

Unidade de tempo

Tipo: enum[SECOND MINUTE HOUR]

processor.flink_sql.select

Tipo: object Obrigatório

processor.flink_sql.select.statement

Declaração SQL do tipo SELECT

Tipo: string Obrigatório

processor.flink_sql.sink

Configuração do streaming de destino

Tipo: object Obrigatório

processor.flink_sql.sink.create_table

Configuração de criação do streaming de destino. Mesma configuração aceita em processor.flink_sql.sources[].create_table

Tipo: object

Requisição

Para habilitar o processamento em streaming a seguinte requisição pode ser realizada:

curl --location '<HOST>/v1/configs' \
--header 'Content-Type: application/json' \
--header 'Authorization: <ACCESS-TOKEN>' \
--data '{
{
  "processor": {
    "type": "FLINK_SQL",
    "description": "PROCESSOR-DESCRIPTION",
    "flink_sql": {
      "sources": [
        {
          "name": "MY-TABLE-ALIAS",
          "type": "CREATE_TABLE",
          "create_table": {
            "columns": [
              {
                "name": "Content",
                "type": "STRING"
              },
              {
                "name": "DateUnixMillis",
                "type": "TIMESTAMP",
                "timestamp": {
                  "precision": 3
                }
              }
            ],
            "watermark": {
              "column": "DateUnixMillis",
              "period": 1,
              "time_unit": "MINUTE"
            }
          }
        }
      ],
      "select": {
        "statement": "SELECT `Content` AS `content`, `window_start`, `window_end`, COUNT(`Content`) AS `count` FROM TABLE( TUMBLE(TABLE MY-TABLE-ALIAS, DESCRIPTOR(`DateUnixMillis`), INTERVAL '10' MINUTES)) GROUP BY `window_start`, `window_end`, `Content`"
      },
      "sink": {
        "create_table": {
          "columns": [
            {
              "name": "Content",
              "type": "STRING"
            },
            {
              "name": "window_start",
              "type": "TIMESTAMP",
              "timestamp": {
                "precision": 3
              }
            },
            {
              "name": "window_end",
              "type": "TIMESTAMP",
              "timestamp": {
                "precision": 3
              }
            },
            {
              "name": "count",
              "type": "BIGINT"
            }
          ]
        }
      }
    }
  }
}
}'

É uma boa prática colocar o nome das colunas a serem selecionadas da tabela entre ` para diferenciá-las de palavras reservadas da sintaxe SQL.

No exemplo acima foi gerada uma configuração do Firehose que aplica um processamento em streaming nos dados trafegados no fluxo que respeitam o seguinte esquema:

[
  {
    "name": "Content",
    "type": "STRING"
  },
  {
    "name": "DateUnixMillis",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  }
]

Caso os campos da mensagem trafegada não respeitem o esquema a mensagem é ignorada.

O processamento em questão realiza uma contagem de quantas vezes o conteúdo do campo Content se repete dentro de uma janela fixa de 10 minutos. O campo DateUnixMillis do tipo timestamp(3) é utilizado como marca d'água para definir as janelas de tempo no streaming de origem.

Somente um campo do tipo timestamp(3) pode ser utilizado como marca d'água para processamento em janela de tempo.

Após o término da janela o resultado possui os tempos de início e término da janela de tempo, o conteúdo e sua respectiva contagem. O resultado é enviado para um streaming de saída com o seguinte esquema:

[
  {
    "name": "Content",
    "type": "STRING"
  },
  {
    "name": "window_start",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  },
  {
    "name": "window_end",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  },
  {
    "name": "count",
    "type": "BIGINT"
  }
]

Limitações

Não é possível utilizar mais de uma fonte (source) para realizar uma operação de JOIN entre streams, por exemplo. Somente os dados de entrada do fluxo da configuração poderão ser processados em streaming.

Qualquer alteração no esquema da source ou sync resultará na deleção do tópico kafka correspondente que será recriado com o novo esquema, caso haja mensagens não consumidas nesses tópicos elas serão perdidas.

Mensagens que possuam campos não mapeados no esquema da tabela (source) resultarão em erro e não serão enviadas para o processamento em streaming.

Last updated