Visão geral

Este guia apresenta as melhores práticas para integrar o DataSnap em suas aplicações, incluindo autenticação, tratamento de erros e padrões de integração recomendados.
Pré-requisitos:
  • Token de autenticação válido
  • Conhecimento básico de APIs RESTful
  • Familiaridade com formato JSONL

Autenticação

Token Bearer

Todas as requisições à API do DataSnap requerem autenticação via Bearer Token:
Authorization: Bearer SEU_TOKEN_AQUI

Obtendo seu token

Para obter seu token de acesso:
  1. Acesse o painel de controle da DataSnap
  2. Navegue até a seção “API” ou “Tokens de Acesso”
  3. Clique em “Gerar Novo Token” ou “Criar Token”
  4. Copie o token gerado e mantenha-o seguro
Cada token é associado a um tenant específico e possui permissões definidas.
Mantenha seu token seguro e nunca o compartilhe em repositórios públicos ou logs. Use variáveis de ambiente para armazená-lo.

Padrões de integração

Fluxo completo recomendado

Siga este padrão para uma integração robusta:
1

Validação de dados

Valide seus dados localmente antes do upload para evitar erros desnecessários.
import json

def validar_jsonl(arquivo):
    with open(arquivo, 'r') as f:
        for linha_num, linha in enumerate(f, 1):
            try:
                json.loads(linha.strip())
            except json.JSONDecodeError as e:
                print(f"Erro na linha {linha_num}: {e}")
                return False
    return True
2

Upload com retry

Implemente retry automático para uploads que podem falhar temporariamente.
import time
import requests
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.retry import Retry

def criar_sessao_com_retry():
    session = requests.Session()
    retry = Retry(
        total=3,
        status_forcelist=[429, 500, 502, 503, 504],
        method_whitelist=["HEAD", "GET", "PUT", "POST", "DELETE", "OPTIONS", "TRACE"],
        backoff_factor=1
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("http://", adapter)
    session.mount("https://", adapter)
    return session
3

Monitoramento de processamento

Verifique o status do processamento antes de realizar consultas.
def aguardar_processamento(schema_slug, headers):
    while True:
        response = requests.get(
            f"https://api.datasnap.com.br/api/v1/schemas/{schema_slug}/files",
            headers=headers,
            params={"processing_status": "pending"}
        )
        
        if response.json()["meta"]["total"] == 0:
            print("Processamento concluído!")
            break
            
        print("Aguardando processamento...")
        time.sleep(30)

Métodos de processamento

O DataSnap oferece 4 formas diferentes de processar seus dados, sendo uma manual e três automáticas:

1. Processamento Manual

Você pode iniciar o processamento manualmente usando o endpoint /process-files sempre que necessário. Esta opção oferece controle total sobre quando seus dados são processados.

2. Processamento Automático a Cada Upload

O DataSnap pode processar automaticamente a cada novo arquivo recebido, independente do tamanho. Esta opção garante que seus dados estejam sempre atualizados e prontos para consulta imediatamente após o upload.
Esta opção pode resultar em custos mais altos de processamento, pois cada arquivo é processado individualmente, mesmo os menores.

3. Processamento por Volume Acumulado

O sistema monitora o tamanho total dos arquivos pendentes de processamento. Quando o volume acumulado atingir o limite configurado (por exemplo, 50MB), o processamento será iniciado automaticamente.
Esta opção é eficiente para otimizar recursos, pois processa lotes de dados em vez de arquivos individuais. Ideal para uploads frequentes de arquivos pequenos.

4. Processamento Programado

Com esta configuração, o sistema processará automaticamente todos os arquivos pendentes em intervalos regulares (por exemplo, a cada 30 minutos), dando tempo para acumular quantos arquivos forem necessários até chegar o momento do processamento.
Perfeito para estabelecer uma rotina previsível de processamento, como a cada hora ou a cada 30 minutos, independentemente do volume de dados. Ideal para relatórios periódicos ou quando você precisa de atualizações consistentes.

Notificações via Webhook

O DataSnap pode enviar notificações via webhook para um ou mais endpoints que você definir, permitindo que suas aplicações acompanhem o status em tempo real de forma automatizada.
Para informações completas sobre configuração, implementação e melhores práticas de webhooks, consulte nossa documentação dedicada de Webhooks.

Exemplo Rápido

{
  "topic": "schemas.run.status",
  "metadata": {
    "run_id": 24,
    "tenant": "sua_empresa",
    "status": "completed",
    "started_at": "2025-08-12T23:14:28-03:00"
  }
}
A documentação de Webhooks inclui exemplos completos de implementação em Python, Node.js, PHP e muito mais.

Tratamento de erros

Implemente tratamento robusto de erros para diferentes cenários:
def fazer_requisicao_api(url, headers, data=None, files=None):
    try:
        if files:
            response = requests.post(url, headers=headers, files=files)
        elif data:
            response = requests.post(url, headers=headers, json=data)
        else:
            response = requests.get(url, headers=headers)
            
        response.raise_for_status()
        return response.json()
        
    except requests.exceptions.HTTPError as e:
        status_code = e.response.status_code
        
        if status_code == 401:
            print("Erro de autenticação. Verifique seu token.")
        elif status_code == 404:
            print("Schema não encontrado.")
        elif status_code == 422:
            print("Erro de validação:", e.response.json())
        elif status_code == 429:
            print("Limite de taxa atingido. Aguarde antes de tentar novamente.")
        else:
            print(f"Erro HTTP {status_code}: {e.response.text}")
            
    except requests.exceptions.ConnectionError:
        print("Erro de conexão. Verifique sua internet.")
    except requests.exceptions.Timeout:
        print("Timeout na requisição. Tente novamente.")
    except requests.exceptions.RequestException as e:
        print(f"Erro na requisição: {e}")

Limites e quotas

A API DataSnap implementa limites para garantir performance estável e uso justo dos recursos para todos os usuários.
Para informações completas sobre limites, códigos de erro, boas práticas e estratégias de otimização, consulte nossa documentação dedicada de Limites e Quotas.

Resumo dos Principais Limites

EndpointLimiteJanela
Upload de arquivos100 MB por arquivo-
Listagem de arquivos100 requests1 minuto
Consultas50 requests1 minuto
Processamento10 requests5 minutos
A documentação de Limites e Quotas inclui exemplos de implementação de rate limiting, tratamento de erros, monitoramento e muito mais.

Consultas otimizadas

Estrutura de consulta eficiente

Para melhor performance, siga estas práticas:
# ✅ Bom - consulta específica e limitada
consulta_otimizada = {
    "select": ["id", "nome", "idade"],
    "where": [
        {"field": "idade", "op": ">=", "value": 18},
        {"field": "ativo", "op": "=", "value": True}
    ],
    "order_by": [{"field": "id", "direction": "asc"}],
    "limit": 100
}

# ❌ Evitar - consulta muito ampla
consulta_ineficiente = {
    "select": ["*"],  # Evite select *
    "limit": 10000    # Limite muito alto
}

Paginação eficiente

Use cursor-based pagination para melhor performance:
def buscar_todos_registros(schema_slug, headers, consulta_base):
    todos_registros = []
    page_token = None
    
    while True:
        consulta = consulta_base.copy()
        if page_token:
            consulta["page_token"] = page_token
            
        response = requests.post(
            f"https://api.datasnap.com.br/api/v1/schemas/{schema_slug}/query",
            headers=headers,
            json=consulta
        )
        
        dados = response.json()
        todos_registros.extend(dados["data"])
        
        # Verificar se há mais páginas
        if "next_page_token" not in dados.get("meta", {}):
            break
            
        page_token = dados["meta"]["next_page_token"]
    
    return todos_registros

Monitoramento e logs

Implementando logs estruturados

import logging
import json

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def log_api_call(endpoint, method, status_code, response_time):
    log_data = {
        "endpoint": endpoint,
        "method": method,
        "status_code": status_code,
        "response_time_ms": response_time,
        "timestamp": time.time()
    }
    logger.info(json.dumps(log_data))

Métricas importantes

Monitore estas métricas em suas integrações:
  • Taxa de sucesso de uploads
  • Tempo de processamento médio
  • Taxa de erro por endpoint
  • Latência de consultas
  • Volume de dados processados

Ambientes

URLs por ambiente

AMBIENTES = {
    "producao": "https://api.datasnap.com.br"
}

def obter_url_base(ambiente="producao"):
    return AMBIENTES.get(ambiente, AMBIENTES["producao"])

Configuração por ambiente

Use variáveis de ambiente para configuração:
import os

class Config:
    DATASNAP_TOKEN = os.getenv("DATASNAP_TOKEN")
    DATASNAP_ENVIRONMENT = os.getenv("DATASNAP_ENV", "producao")
    DATASNAP_BASE_URL = obter_url_base(DATASNAP_ENVIRONMENT)
    
    @classmethod
    def validate(cls):
        if not cls.DATASNAP_TOKEN:
            raise ValueError("DATASNAP_TOKEN não configurado")

Próximos passos