Casos de Uso

Acesso a Dados Comuns em Streaming

Para facilitar o acesso a dados de interesse comum, disponibilizamos dados em streaming por meio de índices, sem a necessidade de realizar consultas no Lake ou em outros bancos de dados. Confira alguns dos índices que você pode consumir:

  • messages

  • eventtracks

  • changed tickets

  • contacts

Compartilhamento de Dados

Utilizando índices e filtros, é possível compartilhar dados de uma entrada com outras configurações.

Índice A
{
    "description": "Indexar mensagens do Kafka por bot_id",
    "input": {
        "type": "KAFKA",
        "kafka": {
            "addresses": ["<ADDRESS>"],
            "topics": ["<TOPIC>"],
            "tls": {
                "enabled": "true",
                "skip_cert_verify": "true"
            },
            "sasl": {
                "mechanism": "PLAIN",
                "user": "<USER>",
                "password": "<PASSWORD>"
            }
        }
    },
    "index": [
        {
            "key": "group",
            "values": ["<USER_GROUP>"],
            "required": true
        },
        {
            "key": "bot_id",
            "value": "$..from",
            "required": true
        }
    ]
}
Filtro B
{
    "description": "Filtrar mensagens do bot 'blip' e enviar para um servidor HTTP",
    "filter": {
        "groups": ["<USER_GROUP>"],
        "indexes": ["<INDEX_ID>"],
        "filter": {
            "op": "EQ",
            "field_name": "bot_id",
            "value": "blip",
            "value_type": "string"
        }
    },
    "output": {
        "target": "https://example.com",
        "type": "HTTP",
        "http": {
            "verify_token": "<VERIFICATION_TOKEN>",
            "verb": "POST"
        }
    }
}
Filtro C
{
    "description": "Filtrar mensagens do bot 'firehose' e enviar para o MongoDB",
    "filter": {
        "groups": ["<USER_GROUP>"],
        "indexes": ["<INDEX_ID>"],
        "filter": {
            "op": "EQ",
            "field_name": "bot_id",
            "value": "firehose",
            "value_type": "string"
        }
    },
    "output": {
        "mongodb": {
            "database": "<DATABASE>",
            "collection": "<COLLECTION>",
            "uri": "mongodb+srv://<USERNAME>:<PASSWORD>@<HOST>"
        }
    }
}

Multiplas Escritas

Caso não seja necessário realizar nenhum tipo de filtro, é possível redirecionar mensagens para outras configurações utilizando o output Firehose.

Configuração
{
    "description": "Redirecionar mensagens para outras configurações",
    "input": {},
    "output": {
        "type": "FIREHOSE",
        "firehose": {
            "config_ids": [
                "<ID_CONFIG_B>",
                "<ID_CONFIG_C>"
            ]
        }
    }
}

Escrever em um Delta Lake [beta]

Para escrever diretamente em uma tabela Delta Lake, é possível utilizar o output correspondente.

Configuração
{
    "description": "Escrever dados em uma tabela Delta Lake",
    "input": {},
    "output": {
        "type": "DELTALAKE",
        "deltalake": {
            "azure_blob_storage": {
                "storage_account": "<STORAGE_ACCOUNT>",
                "storage_access_key": "<STORAGE_ACCESS_KEY>",
                "root_path": "<ROOT_PATH_IN_AZURE>"
            },
            "table_name": "<TABLE_NAME>"
        }
    }
}

Enriquecimento de Dados

Para enriquecer ou manipular os dados trafegados, é possível realizar tranformações.

Configuração
{
    "description": "Enriquecer dados com uma requisição HTTP e multiplicar um campo por 3",
    "input": {},
    "transformer": {
        "workflow": [
            [
                "http",
            ],
            [
                "bloblang"
            ]
        ],
        "steps": [
            {
                "id": "http",
                "type": "HTTP",
                "override": false,
                "ignore_error": false,
                "http": {
                    "url": "https://api.example.com",
                    "verb": "POST",
                    "headers": {
                        "key": "value"
                    },
                    "body": {
                        "mapping": "root = this"
                    }
                }
            },
            {
                "id": "bloblang",
                "type": "BLOBLANG",
                "override": false,
                "ignore_error": false,
                "bloblang": {
                    "mapping": "root.year = this.year * 3"
                }
            }
        ]
    },
    "output": {}
}

Controle de Vazão

Para trafegar um grande volume de dados sem sobrecarregar a saída de destino, é possível configurar um controle de vazão, que limita quantas mensagens podem ser enviadas em um intervalo de tempo pré-definido.

Escrita de Arquivos .csv

Também é possível escrever arquivos .csv com os dados trafegados ao utilizar algumas saídas, como Azure Blob ou e-mail. O arquivo resultante contém os dados trafegados que foram acumulados em uma janela de tempo.

Mais detalhes sobre os parâmetros configuráveis podem ser encontrados em suas respectivas páginas de documentação.

Processamento em Streaming

[MongoDB] Conversão de String para Tipo Date

Essas operações de conversão utilizando as Transformações são necessárias para garantir que o tipo correto seja escrito no documento do MongoDB, permitindo que os campos de data sejam armazenados e manipulados adequadamente.

RFC 3339 ou Unix Timestamp

Se o campo de data já estiver no formato RFC 3339 ou Unix Timestamp, você deve utilizar o método ts_unix_milli().

Playground

Exemplo de Entrada (Payload):

{
    "myTimestampField": "1985-04-12T23:20:50.52Z"
}

Transformação:

root = this
root.myTimestampField = {
 "$date": {
   "$numberLong": this.myTimestampField.ts_unix_milli().string()
 }
}

Formato Personalizado

Se a string de data estiver em um formato diferente, além do ts_unix_milli(), será necessário utilizar o método ts_strptime(). Este método permite especificar o formato da data conforme as instruções do manual do strptime.

Playground

Exemplo de Entrada (Payload):

{
    "myDateField": "23/07/2025",
    "myDateTimeField": "23/07/2025 16:00:00"
}

Transformação:

root = this
root.myDateField = {
 "$date": {
   "$numberLong": this.myDateField.ts_strptime("%d/%m/%Y").ts_unix_milli().string()
 }
}
root.myDateTimeField = {
 "$date": {
   "$numberLong": this.myDateTimeField.ts_strptime("%d/%m/%Y %T").ts_unix_milli().string()
 }
}

Last updated