# Processamento em Streaming

É possível realizar processamento no streaming de dados do fluxo. O processamento ocorre no início do fluxo (após o input ou filtro), portanto, é possível aplicar uma transformação e/ou indexar os dados dentro de uma mesma configuração.

O diagrama a seguir ilustra em que parte do fluxo ocorre o processamento em streaming:

{% @mermaid/diagram content="flowchart LR
subgraph Inputs
Input1("Input")
Input2("Filter")
end
Processor("Processor")
Transformer("Transformer")
subgraph Outputs
Output1("Output")
Output2("Index")
end

```
Inputs --> Processor
Processor --> Transformer
Transformer --> Outputs" %}
```

## Tipos

Os seguintes tipos de processamento em streaming são aceitos:

### Flink SQL

Permite que um processamento utilizando o [Confluent Flink SQL](https://docs.confluent.io/cloud/current/flink/reference/overview.html) seja realizado no streaming de dados.

Nesse tipo de processamento os streamings, representados por tópicos Kafka com esquema definido, são considerados tabelas nas quais as declarações SQL são aplicadas.

O fluxo ocorre de acordo com o diagrama:

{% @mermaid/diagram content="flowchart LR
subgraph Inputs
Input1("Input")
Input2("Filter")
end
Processor("Processor")
Transformer("Transformer")
subgraph Outputs
Output1("Output")
Output2("Index")
end
ConfluentFlinkSQL("Confluent Flink SQL")
ConfluentSchemaRegistry("Confluent Schema Registry")

```
Inputs --> Processor
Processor --> Transformer
Transformer --> Outputs
Processor <--> ConfluentFlinkSQL
ConfluentFlinkSQL <--> ConfluentSchemaRegistry
ConfluentSchemaRegistry --> Inputs" %}
```

## Parâmetros

Parâmetros aceitos na configuração de processamento em streaming:

| Campo                                                                                                    | Descrição                                                                                                                                            | Valor                                                                                                                                                                                                                                                     |
| -------------------------------------------------------------------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------------------------- | --------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| processor.type                                                                                           | Tipo do processamento em streaming                                                                                                                   | <p>Tipo: enum\[FLINK\_SQL]<br>Obrigatório</p>                                                                                                                                                                                                             |
| processor.description                                                                                    | Descrição do estágio de processamento                                                                                                                | Tipo: string                                                                                                                                                                                                                                              |
| processor.flink\_sql                                                                                     | Configuração do processamento com Flink SQL                                                                                                          | <p>Tipo: object<br>Obrigatório se <code>processor.type=FLINK\_SQL</code></p>                                                                                                                                                                              |
| processor.flink\_sql.timezone                                                                            | Fuso horário a ser utilizado no processamento                                                                                                        | <p>Tipo: string<br>Valor padrão: <code>America/Sao\_Paulo</code></p>                                                                                                                                                                                      |
| processor.flink\_sql.sources                                                                             | Lista com os streamings de entrada do processamento                                                                                                  | <p>Tipo: list<br>Obrigatório</p>                                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].name                                                                     | Nome do streaming de origem                                                                                                                          | <p>Tipo: string<br>Obrigatório - Único entre os streamings</p>                                                                                                                                                                                            |
| processor.flink\_sql.sources\[].type                                                                     | Tipo do streaming de origem                                                                                                                          | <p>Tipo: enum\[CREATE\_TABLE]<br>Obrigatório - Único entre as etapas</p>                                                                                                                                                                                  |
| processor.flink\_sql.sources\[].create\_table                                                            | Configuração de streaming de origem                                                                                                                  | <p>Tipo: object<br>Obrigatório se <code>processor.flink\_sql.sources.type=CREATE\_TABLE</code></p>                                                                                                                                                        |
| processor.flink\_sql.sources\[].create\_table.columns                                                    | Colunas do streaming de origem                                                                                                                       | <p>Tipo: list<br>Obrigatório</p>                                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].name                                            | Nome da coluna                                                                                                                                       | <p>Tipo: list<br>Obrigatório</p>                                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].type                                            | Tipo da coluna                                                                                                                                       | <p>Tipo: enum\[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL\_DAY\_TO\_SECOND INTERVAL\_YEAR\_TO\_MONTH TIME TIMESTAMP TIMESTAMP\_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL]<br>Obrigatório</p> |
| processor.flink\_sql.sources\[].create\_table.columns\[].nullable                                        | Coluna aceita valores nulos                                                                                                                          | <p>Tipo: boolean<br>Valor padrão: <code>true</code></p>                                                                                                                                                                                                   |
| processor.flink\_sql.sources\[].create\_table.columns\[].char                                            | Configuração do tipo [char](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#char)                                             | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].char.length                                     | Tamanho máximo do campo                                                                                                                              | <p>Tipo: int<br>Valor padrão: <code>1</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].varchar                                         | Configuração do tipo [varchar](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#varchar-string)                                | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].varchar.length                                  | Tamanho máximo do campo                                                                                                                              | <p>Tipo: int<br>Valor padrão: <code>1</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].binary                                          | Configuração do tipo [binary](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#binary)                                         | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].binary.length                                   | Tamanho máximo do campo                                                                                                                              | <p>Tipo: int<br>Valor padrão: <code>1</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].varbinary                                       | Configuração do tipo [varbinary](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#varbinary-bytes)                             | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].varbinary.length                                | Tamanho máximo do campo                                                                                                                              | <p>Tipo: int<br>Valor padrão: <code>1</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].decimal                                         | Configuração do tipo [decimal](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#decimal)                                       | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].decimal.precision                               | Precisão do campo                                                                                                                                    | <p>Tipo: int<br>Valor padrão: <code>10</code></p>                                                                                                                                                                                                         |
| processor.flink\_sql.sources\[].create\_table.columns\[].decimal.scale                                   | Escala do campo                                                                                                                                      | <p>Tipo: int<br>Valor padrão: <code>0</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].interval\_day\_to\_second                       | Configuração do tipo [interval\_day\_to\_second](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#interval-day-to-second)      | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].interval\_day\_to\_second.precision             | Precisão do campo                                                                                                                                    | <p>Tipo: int<br>Valor padrão: <code>2</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].interval\_day\_to\_second.fractional\_precision | Precisão fracionária do campo                                                                                                                        | <p>Tipo: int<br>Valor padrão: <code>6</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].interval\_day\_to\_second.resolution            | Resolução do campo                                                                                                                                   | <p>Tipo: enum\[DAY DAY\_TO\_SECOND DAY\_TO\_MINUTE DAY\_TO\_HOUR HOUR HOUR\_TO\_MINUTE HOUR\_TO\_SECOND MINUTE MINUTE\_TO\_SECOND SECOND]<br>Obrigatório</p>                                                                                              |
| processor.flink\_sql.sources\[].create\_table.columns\[].interval\_year\_to\_month                       | Configuração do tipo [interval\_year\_to\_month](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#interval-year-to-month)      | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].interval\_year\_to\_month.precision             | Precisão do campo                                                                                                                                    | <p>Tipo: int<br>Valor padrão: <code>2</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].interval\_year\_to\_month.resolution            | Resolução do campo                                                                                                                                   | <p>Tipo: enum\[YEAR YEAR\_TO\_MONTH MONTH]<br>Obrigatório</p>                                                                                                                                                                                             |
| processor.flink\_sql.sources\[].create\_table.columns\[].time                                            | Configuração do tipo [time](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#time)                                             | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].time.precision                                  | Precisão do campo                                                                                                                                    | <p>Tipo: int<br>Valor padrão: <code>0</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].timestamp                                       | Configuração do tipo [timestamp](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#timestamp)                                   | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].timestamp.precision                             | Precisão do campo                                                                                                                                    | <p>Tipo: int<br>Valor padrão: <code>6</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].timestamp\_ltz                                  | Configuração do tipo [timestamp\_ltz](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#timestamp-ltz)                          | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].timestamp\_ltz.precision                        | Precisão do campo                                                                                                                                    | <p>Tipo: int<br>Valor padrão: <code>6</code></p>                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.columns\[].array                                           | Configuração do tipo [array](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#array)                                           | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].array.element.type                              | Tipo de campo                                                                                                                                        | <p>Tipo: enum\[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL\_DAY\_TO\_SECOND INTERVAL\_YEAR\_TO\_MONTH TIME TIMESTAMP TIMESTAMP\_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL]<br>Obrigatório</p> |
| processor.flink\_sql.sources\[].create\_table.columns\[].map                                             | Configuração do tipo [map](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#map)                                               | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].map.key.type                                    | Tipo da chave                                                                                                                                        | <p>Tipo: enum\[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL\_DAY\_TO\_SECOND INTERVAL\_YEAR\_TO\_MONTH TIME TIMESTAMP TIMESTAMP\_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL]<br>Obrigatório</p> |
| processor.flink\_sql.sources\[].create\_table.columns\[].map.value.type                                  | Tipo do valor                                                                                                                                        | <p>Tipo: enum\[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL\_DAY\_TO\_SECOND INTERVAL\_YEAR\_TO\_MONTH TIME TIMESTAMP TIMESTAMP\_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL]<br>Obrigatório</p> |
| processor.flink\_sql.sources\[].create\_table.columns\[].multiset                                        | Configuração do tipo [multiset](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#multiset)                                     | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].multiset.element.type                           | Tipo do valor do campo                                                                                                                               | <p>Tipo: enum\[CHAR VARCHAR STRING BINARY VARBINARY BYTES BIGINT DECIMAL INT SMALLINT TINYINT DOUBLE FLOAT DATE INTERVAL\_DAY\_TO\_SECOND INTERVAL\_YEAR\_TO\_MONTH TIME TIMESTAMP TIMESTAMP\_LTZ ARRAY MAP MULTISET ROW BOOLEAN NULL]<br>Obrigatório</p> |
| processor.flink\_sql.sources\[].create\_table.columns\[].row                                             | Configuração do tipo [row](https://docs.confluent.io/cloud/current/flink/reference/datatypes.html#row)                                               | <p>Tipo: object<br>Valor padrão: <code>{}</code></p>                                                                                                                                                                                                      |
| processor.flink\_sql.sources\[].create\_table.columns\[].row\.columns                                    | Configuração das colunas aceitas em `processor.flink_sql.sources[].create_table.columns`                                                             | <p>Tipo: list<br>Obrigatório</p>                                                                                                                                                                                                                          |
| processor.flink\_sql.sources\[].create\_table.watermark                                                  | Configuração da estratégias de [marca d'água](https://docs.confluent.io/cloud/current/flink/reference/statements/create-table.html#watermark-clause) | Tipo: object                                                                                                                                                                                                                                              |
| processor.flink\_sql.sources\[].create\_table.watermark.column                                           | Nome da coluna a ser considerada                                                                                                                     | <p>Tipo: string<br>Obrigatório</p>                                                                                                                                                                                                                        |
| processor.flink\_sql.sources\[].create\_table.watermark.period                                           | Período a ser considerado                                                                                                                            | Tipo: float                                                                                                                                                                                                                                               |
| processor.flink\_sql.sources\[].create\_table.watermark.time\_unit                                       | Unidade de tempo                                                                                                                                     | Tipo: enum\[SECOND MINUTE HOUR]                                                                                                                                                                                                                           |
| processor.flink\_sql.select                                                                              | Configuração da [declaração SQL do tipo SELECT](https://docs.confluent.io/cloud/current/flink/reference/queries/select.html)                         | <p>Tipo: object<br>Obrigatório</p>                                                                                                                                                                                                                        |
| processor.flink\_sql.select.statement                                                                    | Declaração SQL do tipo SELECT                                                                                                                        | <p>Tipo: string<br>Obrigatório</p>                                                                                                                                                                                                                        |
| processor.flink\_sql.sink                                                                                | Configuração do streaming de destino                                                                                                                 | <p>Tipo: object<br>Obrigatório</p>                                                                                                                                                                                                                        |
| processor.flink\_sql.sink.create\_table                                                                  | Configuração de criação do streaming de destino. Mesma configuração aceita em `processor.flink_sql.sources[].create_table`                           | Tipo: object                                                                                                                                                                                                                                              |

## Requisição

Para habilitar o processamento em streaming a seguinte requisição pode ser realizada:

```bash
curl --location '<HOST>/v1/configs' \
--header 'Content-Type: application/json' \
--header 'Authorization: <ACCESS-TOKEN>' \
--data '{
{
  "processor": {
    "type": "FLINK_SQL",
    "description": "PROCESSOR-DESCRIPTION",
    "flink_sql": {
      "sources": [
        {
          "name": "MY-TABLE-ALIAS",
          "type": "CREATE_TABLE",
          "create_table": {
            "columns": [
              {
                "name": "Content",
                "type": "STRING"
              },
              {
                "name": "DateUnixMillis",
                "type": "TIMESTAMP",
                "timestamp": {
                  "precision": 3
                }
              }
            ],
            "watermark": {
              "column": "DateUnixMillis",
              "period": 1,
              "time_unit": "MINUTE"
            }
          }
        }
      ],
      "select": {
        "statement": "SELECT `Content` AS `content`, `window_start`, `window_end`, COUNT(`Content`) AS `count` FROM TABLE( TUMBLE(TABLE MY-TABLE-ALIAS, DESCRIPTOR(`DateUnixMillis`), INTERVAL '10' MINUTES)) GROUP BY `window_start`, `window_end`, `Content`"
      },
      "sink": {
        "create_table": {
          "columns": [
            {
              "name": "Content",
              "type": "STRING"
            },
            {
              "name": "window_start",
              "type": "TIMESTAMP",
              "timestamp": {
                "precision": 3
              }
            },
            {
              "name": "window_end",
              "type": "TIMESTAMP",
              "timestamp": {
                "precision": 3
              }
            },
            {
              "name": "count",
              "type": "BIGINT"
            }
          ]
        }
      }
    }
  }
}
}'
```

{% hint style="warning" %}
É uma boa prática colocar o nome das colunas a serem selecionadas da tabela entre \` para diferenciá-las de palavras reservadas da sintaxe SQL.
{% endhint %}

No exemplo acima foi gerada uma configuração do Firehose que aplica um processamento em streaming nos dados trafegados no fluxo que respeitam o seguinte esquema:

```json
[
  {
    "name": "Content",
    "type": "STRING"
  },
  {
    "name": "DateUnixMillis",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  }
]
```

{% hint style="warning" %}
Caso os campos da mensagem trafegada não respeitem o esquema a mensagem é ignorada.
{% endhint %}

O processamento em questão realiza uma contagem de quantas vezes o conteúdo do campo `Content` se repete dentro de uma janela fixa de 10 minutos. O campo `DateUnixMillis` do tipo `timestamp(3)` é utilizado como marca d'água para definir as janelas de tempo no streaming de origem.

{% hint style="info" %}
Somente um campo do tipo `timestamp(3)` pode ser utilizado como marca d'água para processamento em janela de tempo.
{% endhint %}

Após o término da janela o resultado possui os tempos de início e término da janela de tempo, o conteúdo e sua respectiva contagem. O resultado é enviado para um streaming de saída com o seguinte esquema:

```json
[
  {
    "name": "Content",
    "type": "STRING"
  },
  {
    "name": "window_start",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  },
  {
    "name": "window_end",
    "type": "TIMESTAMP",
    "timestamp": {
      "precision": 3
    }
  },
  {
    "name": "count",
    "type": "BIGINT"
  }
]
```

## Limitações

Não é possível utilizar mais de uma fonte (source) para realizar uma operação de JOIN entre streams, por exemplo. Somente os dados de entrada do fluxo da configuração poderão ser processados em streaming.

Qualquer alteração no esquema da source ou sync resultará na deleção do tópico kafka correspondente que será recriado com o novo esquema, caso haja mensagens não consumidas nesses tópicos elas serão perdidas.

Mensagens que possuam campos não mapeados no esquema da tabela (source) resultarão em erro e não serão enviadas para o processamento em streaming.


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://docs-firehose.blip.ai/configuracoes/processamento-em-streaming.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
