Processamento em Streaming
Last updated
Last updated
É possível realizar processamento no streaming de dados do fluxo. O processamento ocorre no início do fluxo (após o input ou filtro), portanto, é possível aplicar uma transformação e/ou indexar os dados dentro de uma mesma configuração.
O diagrama a seguir ilustra em que parte do fluxo ocorre o processamento em streaming:
Os seguintes tipos de processamento em streaming são aceitos:
Permite que um processamento utilizando o Confluent Flink SQL seja realizado no streaming de dados.
Nesse tipo de processamento os streamings, representados por tópicos Kafka com esquema definido, são considerados tabelas nas quais as declarações SQL são aplicadas.
O fluxo ocorre de acordo com o diagrama:
Parâmetros aceitos na configuração de processamento em streaming:
processor.type
Tipo do processamento em streaming
Tipo: enum[FLINK_SQL] Obrigatório
processor.description
Descrição do estágio de processamento
Tipo: string
processor.flink_sql
Configuração do processamento com Flink SQL
Tipo: object
Obrigatório se processor.type=FLINK_SQL
processor.flink_sql.timezone
Fuso horário a ser utilizado no processamento
Tipo: string
Valor padrão: America/Sao_Paulo
processor.flink_sql.sources
Lista com os streamings de entrada do processamento
Tipo: list Obrigatório
processor.flink_sql.sources[].name
Nome do streaming de origem
Tipo: string Obrigatório - Único entre os streamings
processor.flink_sql.sources[].type
Tipo do streaming de origem
Tipo: enum[CREATE_TABLE] Obrigatório - Único entre as etapas
processor.flink_sql.sources[].create_table
Configuração de streaming de origem
Tipo: object
Obrigatório se processor.flink_sql.sources.type=CREATE_TABLE
processor.flink_sql.sources[].create_table.columns
Colunas do streaming de origem
Tipo: list Obrigatório
processor.flink_sql.sources[].create_table.columns[].name
Nome da coluna
Tipo: list Obrigatório
processor.flink_sql.sources[].create_table.columns[].type
Tipo da coluna
Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório
processor.flink_sql.sources[].create_table.columns[].nullable
Coluna aceita valores nulos
Tipo: boolean
Valor padrão: true
processor.flink_sql.sources[].create_table.columns[].char
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].char.length
Tamanho máximo do campo
Tipo: int
Valor padrão: 1
processor.flink_sql.sources[].create_table.columns[].varchar
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].varchar.length
Tamanho máximo do campo
Tipo: int
Valor padrão: 1
processor.flink_sql.sources[].create_table.columns[].binary
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].binary.length
Tamanho máximo do campo
Tipo: int
Valor padrão: 1
processor.flink_sql.sources[].create_table.columns[].varbinary
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].varbinary.length
Tamanho máximo do campo
Tipo: int
Valor padrão: 1
processor.flink_sql.sources[].create_table.columns[].decimal
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].decimal.precision
Precisão do campo
Tipo: int
Valor padrão: 10
processor.flink_sql.sources[].create_table.columns[].decimal.scale
Escala do campo
Tipo: int
Valor padrão: 0
processor.flink_sql.sources[].create_table.columns[].interval_day_to_second
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.precision
Precisão do campo
Tipo: int
Valor padrão: 2
processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.fractional_precision
Precisão fracionária do campo
Tipo: int
Valor padrão: 6
processor.flink_sql.sources[].create_table.columns[].interval_day_to_second.resolution
Resolução do campo
Tipo: enum[DAY DAY_TO_SECOND DAY_TO_MINUTE DAY_TO_HOUR HOUR HOUR_TO_MINUTE HOUR_TO_SECOND MINUTE MINUTE_TO_SECOND SECOND] Obrigatório
processor.flink_sql.sources[].create_table.columns[].interval_year_to_month
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].interval_year_to_month.precision
Precisão do campo
Tipo: int
Valor padrão: 2
processor.flink_sql.sources[].create_table.columns[].interval_year_to_month.resolution
Resolução do campo
Tipo: enum[YEAR YEAR_TO_MONTH MONTH] Obrigatório
processor.flink_sql.sources[].create_table.columns[].time
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].time.precision
Precisão do campo
Tipo: int
Valor padrão: 0
processor.flink_sql.sources[].create_table.columns[].timestamp
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].timestamp.precision
Precisão do campo
Tipo: int
Valor padrão: 6
processor.flink_sql.sources[].create_table.columns[].timestamp_ltz
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].timestamp_ltz.precision
Precisão do campo
Tipo: int
Valor padrão: 6
processor.flink_sql.sources[].create_table.columns[].array
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].array.element.type
Tipo de campo
Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório
processor.flink_sql.sources[].create_table.columns[].map
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].map.key.type
Tipo da chave
Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório
processor.flink_sql.sources[].create_table.columns[].map.value.type
Tipo do valor
Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório
processor.flink_sql.sources[].create_table.columns[].multiset
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].multiset.element.type
Tipo do valor do campo
Tipo: enum[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL_DAY_TO_SECOND INTERVAL_YEAR_TO_MONTH TIME TIMESTAMP TIMESTAMP_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL] Obrigatório
processor.flink_sql.sources[].create_table.columns[].row
Tipo: object
Valor padrão: {}
processor.flink_sql.sources[].create_table.columns[].row.columns
Configuração das colunas aceitas em processor.flink_sql.sources[].create_table.columns
Tipo: list Obrigatório
processor.flink_sql.sources[].create_table.watermark
Tipo: object
processor.flink_sql.sources[].create_table.watermark.column
Nome da coluna a ser considerada
Tipo: string Obrigatório
processor.flink_sql.sources[].create_table.watermark.period
Período a ser considerado
Tipo: float
processor.flink_sql.sources[].create_table.watermark.time_unit
Unidade de tempo
Tipo: enum[SECOND MINUTE HOUR]
processor.flink_sql.select
Tipo: object Obrigatório
processor.flink_sql.select.statement
Declaração SQL do tipo SELECT
Tipo: string Obrigatório
processor.flink_sql.sink
Configuração do streaming de destino
Tipo: object Obrigatório
processor.flink_sql.sink.create_table
Configuração de criação do streaming de destino. Mesma configuração aceita em processor.flink_sql.sources[].create_table
Tipo: object
Para habilitar o processamento em streaming a seguinte requisição pode ser realizada:
É uma boa prática colocar o nome das colunas a serem selecionadas da tabela entre ` para diferenciá-las de palavras reservadas da sintaxe SQL.
No exemplo acima foi gerada uma configuração do Firehose que aplica um processamento em streaming nos dados trafegados no fluxo que respeitam o seguinte esquema:
Caso os campos da mensagem trafegada não respeitem o esquema a mensagem é ignorada.
O processamento em questão realiza uma contagem de quantas vezes o conteúdo do campo Content
se repete dentro de uma janela fixa de 10 minutos. O campo DateUnixMillis
do tipo timestamp(3)
é utilizado como marca d'água para definir as janelas de tempo no streaming de origem.
Somente um campo do tipo timestamp(3)
pode ser utilizado como marca d'água para processamento em janela de tempo.
Após o término da janela o resultado possui os tempos de início e término da janela de tempo, o conteúdo e sua respectiva contagem. O resultado é enviado para um streaming de saída com o seguinte esquema:
Não é possível utilizar mais de uma fonte (source) para realizar uma operação de JOIN entre streams, por exemplo. Somente os dados de entrada do fluxo da configuração poderão ser processados em streaming.
Qualquer alteração no esquema da source ou sync resultará na deleção do tópico kafka correspondente que será recriado com o novo esquema, caso haja mensagens não consumidas nesses tópicos elas serão perdidas.
Mensagens que possuam campos não mapeados no esquema da tabela (source) resultarão em erro e não serão enviadas para o processamento em streaming.
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração do tipo
Configuração da estratégias de
Configuração da