Processamento em Streaming
É possível realizar processamento no streaming de dados do fluxo. O processamento ocorre no início do fluxo (após o input ou filtro), portanto, é possível aplicar uma transformação e/ou indexar os dados dentro de uma mesma configuração.
O diagrama a seguir ilustra em que parte do fluxo ocorre o processamento em streaming:
Tipos
Os seguintes tipos de processamento em streaming são aceitos:
Flink SQL
Permite que um processamento utilizando o Confluent Flink SQL seja realizado no streaming de dados.
Nesse tipo de processamento os streamings, representados por tópicos Kafka com esquema definido, são considerados tabelas nas quais as declarações SQL são aplicadas.
O fluxo ocorre de acordo com o diagrama:
Parâmetros
Parâmetros aceitos na configuração de processamento em streaming:
Campo | Descrição | Valor |
---|---|---|
processor.type | Tipo do processamento em streaming | Tipo: enum[FLINK_SQL] Obrigatório |
processor.description | Descrição do estágio de processamento | Tipo: string |
processor.flink_sql | Configuração do processamento com Flink SQL | Tipo: object
Obrigatório se |
processor.flink_sql.timezone | Fuso horário a ser utilizado no processamento | Tipo: string
Valor padrão: |
processor.flink_sql.sources | Lista com os streamings de entrada do processamento | Tipo: list Obrigatório |
processor.flink_sql.sources[].name | Nome do streaming de origem | Tipo: string Obrigatório - Único entre os streamings |
processor.flink_sql.sources[].type | Tipo do streaming de origem | Tipo: enum[CREATE_TABLE] Obrigatório - Único entre as etapas |
processor.flink_sql.sources[].create_table | Configuração de streaming de origem | Tipo: object
Obrigatório se |
processor.flink_sql.sources[].create_table.columns | Colunas do streaming de origem | Tipo: list Obrigatório |
processor.flink_sql.sources[].create_table.columns[].name | Nome da coluna | Tipo: list Obrigatório |
processor.flink_sql.sources[].create_table.columns[].type | Tipo da coluna | Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório |
processor.flink_sql.sources[].create_table.columns[].nullable | Coluna aceita valores nulos | Tipo: boolean
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].char | Configuração do tipo char | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].char.length | Tamanho máximo do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].varchar | Configuração do tipo varchar | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].varchar.length | Tamanho máximo do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].binary | Configuração do tipo binary | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].binary.length | Tamanho máximo do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].varbinary | Configuração do tipo varbinary | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].varbinary.length | Tamanho máximo do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].decimal | Configuração do tipo decimal | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].decimal.precision | Precisão do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].decimal.scale | Escala do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].interval_day_to_second | Configuração do tipo interval_day_to_second | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.precision | Precisão do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.fractional_precision | Precisão fracionária do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.resolution | Resolução do campo | Tipo: enum[DAY DAY_TO_SECOND DAY_TO_MINUTE DAY_TO_HOUR HOUR HOUR_TO_MINUTE HOUR_TO_SECOND MINUTE MINUTE_TO_SECOND SECOND] Obrigatório |
processor.flink_sql.sources[].create_table.columns[].interval_year_to_month | Configuração do tipo interval_year_to_month | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].interval_year_to_month.precision | Precisão do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].interval_year_to_month.resolution | Resolução do campo | Tipo: enum[YEAR YEAR_TO_MONTH MONTH] Obrigatório |
processor.flink_sql.sources[].create_table.columns[].time | Configuração do tipo time | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].time.precision | Precisão do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].timestamp | Configuração do tipo timestamp | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].timestamp.precision | Precisão do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].timestamp_ltz | Configuração do tipo timestamp_ltz | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].timestamp_ltz.precision | Precisão do campo | Tipo: int
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].array | Configuração do tipo array | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].array.element.type | Tipo de campo | Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório |
processor.flink_sql.sources[].create_table.columns[].map | Configuração do tipo map | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].map.key.type | Tipo da chave | Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório |
processor.flink_sql.sources[].create_table.columns[].map.value.type | Tipo do valor | Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório |
processor.flink_sql.sources[].create_table.columns[].multiset | Configuração do tipo multiset | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].multiset.element.type | Tipo do valor do campo | Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório |
processor.flink_sql.sources[].create_table.columns[].row | Configuração do tipo row | Tipo: object
Valor padrão: |
processor.flink_sql.sources[].create_table.columns[].row.columns | Configuração das colunas aceitas em | Tipo: list Obrigatório |
processor.flink_sql.sources[].create_table.watermark | Configuração da estratégias de marca d'água | Tipo: object |
processor.flink_sql.sources[].create_table.watermark.column | Nome da coluna a ser considerada | Tipo: string Obrigatório |
processor.flink_sql.sources[].create_table.watermark.period | Período a ser considerado | Tipo: float |
processor.flink_sql.sources[].create_table.watermark.time_unit | Unidade de tempo | Tipo: enum[SECOND MINUTE HOUR] |
processor.flink_sql.select | Configuração da declaração SQL do tipo SELECT | Tipo: object Obrigatório |
processor.flink_sql.select.statement | Declaração SQL do tipo SELECT | Tipo: string Obrigatório |
processor.flink_sql.sink | Configuração do streaming de destino | Tipo: object Obrigatório |
processor.flink_sql.sink.create_table | Configuração de criação do streaming de destino. Mesma configuração aceita em | Tipo: object |
Requisição
Para habilitar o processamento em streaming a seguinte requisição pode ser realizada:
É uma boa prática colocar o nome das colunas a serem selecionadas da tabela entre ` para diferenciá-las de palavras reservadas da sintaxe SQL.
No exemplo acima foi gerada uma configuração do Firehose que aplica um processamento em streaming nos dados trafegados no fluxo que respeitam o seguinte esquema:
Caso os campos da mensagem trafegada não respeitem o esquema a mensagem é ignorada.
O processamento em questão realiza uma contagem de quantas vezes o conteúdo do campo Content
se repete dentro de uma janela fixa de 10 minutos. O campo DateUnixMillis
do tipo timestamp(3)
é utilizado como marca d'água para definir as janelas de tempo no streaming de origem.
Somente um campo do tipo timestamp(3)
pode ser utilizado como marca d'água para processamento em janela de tempo.
Após o término da janela o resultado possui os tempos de início e término da janela de tempo, o conteúdo e sua respectiva contagem. O resultado é enviado para um streaming de saída com o seguinte esquema:
Limitações
Não é possível utilizar mais de uma fonte (source) para realizar uma operação de JOIN entre streams, por exemplo. Somente os dados de entrada do fluxo da configuração poderão ser processados em streaming.
Qualquer alteração no esquema da source ou sync resultará na deleção do tópico kafka correspondente que será recriado com o novo esquema, caso haja mensagens não consumidas nesses tópicos elas serão perdidas.
Mensagens que possuam campos não mapeados no esquema da tabela (source) resultarão em erro e não serão enviadas para o processamento em streaming.
Last updated