Visão Geral

A API DataSnap implementa limites para garantir performance estável e uso justo dos recursos. Este guia detalha todos os limites aplicados e como trabalhar eficientemente dentro deles.
Os limites são aplicados por tenant (conta) e são resetados automaticamente dentro das janelas de tempo especificadas.

Limites por Endpoint

Upload de Arquivos

LimiteValorObservações
Tamanho máximo por arquivo100 MBAplica-se a cada arquivo individual
Formato suportadoJSONLCada linha deve ser um JSON válido
Detecção de duplicatasAutomáticaVia hash MD5
Taxa de requisiçõesSem limite fixoSujeito a throttling automático

Listagem de Arquivos

LimiteValorJanela de Tempo
Requisições100 requests1 minuto
Itens por página1-100Por requisição

Consultas (Query)

LimiteValorJanela de Tempo
Requisições50 requests1 minuto
Registros por consulta1-10.000Por requisição
Timeout de consulta30 segundosExecução máxima

Processamento DataFlow

LimiteValorJanela de Tempo
Requisições10 requests5 minutos
Executores máximo10Por processamento
Tempo de execução10-30 minutosVaria conforme volume

Códigos de Resposta de Limite

429 - Too Many Requests

Retornado quando você excede os limites de taxa:
{
  "error": "Muitas requisições. Tente novamente em 60 segundos.",
  "retry_after": 60,
  "limit_type": "rate_limit",
  "current_usage": 51,
  "limit": 50
}

413 - Payload Too Large

Para arquivos que excedem o tamanho máximo:
{
  "error": "Arquivo muito grande. Limite máximo: 100MB",
  "file_size_mb": 150,
  "max_size_mb": 100
}

Boas Práticas

Gerenciamento de Taxa de Requisições

Implemente throttling em suas aplicações para evitar atingir os limites:
import time
from functools import wraps

def rate_limit(calls_per_minute=50):
    def decorator(func):
        last_called = [0.0]
        
        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            left_to_wait = 60.0 / calls_per_minute - elapsed
            
            if left_to_wait > 0:
                time.sleep(left_to_wait)
                
            ret = func(*args, **kwargs)
            last_called[0] = time.time()
            return ret
            
        return wrapper
    return decorator

# Uso
@rate_limit(calls_per_minute=45)  # Margem de segurança
def fazer_consulta(query_data):
    return requests.post(url, headers=headers, json=query_data)

Upload Eficiente

Para múltiplos arquivos, otimize o processo:
def upload_multiplos_arquivos(schema_slug, lista_arquivos, headers):
    """Upload otimizado para múltiplos arquivos"""
    files = []
    try:
        # Preparar todos os arquivos
        for arquivo in lista_arquivos:
            files.append(('files', open(arquivo, 'rb')))
        
        # Upload em lote
        response = requests.post(
            f"https://api.datasnap.com.br/api/v1/schemas/{schema_slug}/files",
            headers=headers,
            files=files,
            timeout=300  # 5 minutos para uploads grandes
        )
        
        return response.json()
        
    finally:
        # Sempre fechar os arquivos
        for _, file_obj in files:
            file_obj.close()

# Dividir arquivos grandes em chunks menores
def dividir_arquivo_grande(arquivo_path, chunk_size_mb=80):
    """Divide arquivos grandes em chunks menores"""
    chunk_size = chunk_size_mb * 1024 * 1024
    chunks = []
    
    with open(arquivo_path, 'r') as f:
        chunk_lines = []
        current_size = 0
        
        for line in f:
            line_size = len(line.encode('utf-8'))
            
            if current_size + line_size > chunk_size and chunk_lines:
                # Salvar chunk atual
                chunk_file = f"{arquivo_path}.chunk_{len(chunks)}.jsonl"
                with open(chunk_file, 'w') as chunk_f:
                    chunk_f.writelines(chunk_lines)
                chunks.append(chunk_file)
                
                # Resetar para próximo chunk
                chunk_lines = [line]
                current_size = line_size
            else:
                chunk_lines.append(line)
                current_size += line_size
        
        # Último chunk
        if chunk_lines:
            chunk_file = f"{arquivo_path}.chunk_{len(chunks)}.jsonl"
            with open(chunk_file, 'w') as chunk_f:
                chunk_f.writelines(chunk_lines)
            chunks.append(chunk_file)
    
    return chunks

Consultas Otimizadas

Estruture suas consultas para melhor performance:
# ✅ Consulta otimizada
consulta_eficiente = {
    "select": ["id", "nome", "categoria"],  # Campos específicos
    "where": [
        {"field": "ativo", "op": "=", "value": True},
        {"field": "criado_em", "op": ">=", "value": "2024-01-01"}
    ],
    "order_by": [{"field": "id", "direction": "asc"}],
    "limit": 100,  # Limite razoável
    "format": "json_compact"  # Formato compacto para menos dados
}

# ❌ Consulta ineficiente
consulta_problematica = {
    "select": ["*"],  # Evite select * 
    "limit": 10000,   # Limite muito alto
    "format": "json"  # Formato verboso desnecessário
}

Paginação Eficiente

Use paginação baseada em cursor para melhor performance:
def buscar_todos_registros(schema_slug, headers, consulta_base):
    """Busca todos os registros usando paginação eficiente"""
    todos_registros = []
    page_token = None
    
    while True:
        consulta = consulta_base.copy()
        consulta["limit"] = 100  # Chunks menores
        
        if page_token:
            consulta["page_token"] = page_token
            
        try:
            response = requests.post(
                f"https://api.datasnap.com.br/api/v1/schemas/{schema_slug}/query",
                headers=headers,
                json=consulta,
                timeout=30
            )
            response.raise_for_status()
            
            dados = response.json()
            todos_registros.extend(dados.get("data", []))
            
            # Verificar se há mais páginas
            meta = dados.get("meta", {})
            if "next_page_token" not in meta:
                break
                
            page_token = meta["next_page_token"]
            
        except requests.exceptions.RequestException as e:
            print(f"Erro na paginação: {e}")
            break
    
    return todos_registros

Monitoramento de Uso

Implementando Logs Estruturados

import logging
import json
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def log_api_usage(endpoint, method, status_code, response_time, data_size=None):
    """Log estruturado para monitorar uso da API"""
    log_data = {
        "timestamp": time.time(),
        "endpoint": endpoint,
        "method": method,
        "status_code": status_code,
        "response_time_ms": response_time * 1000,
        "success": 200 <= status_code < 300,
    }
    
    if data_size:
        log_data["data_size_mb"] = data_size / (1024 * 1024)
    
    # Adicionar alertas para limites próximos
    if status_code == 429:
        log_data["alert"] = "RATE_LIMIT_EXCEEDED"
    elif response_time > 25:  # Próximo do timeout
        log_data["alert"] = "SLOW_RESPONSE"
    
    logger.info(json.dumps(log_data))

# Wrapper para requests com monitoramento
def fazer_requisicao_monitorada(method, url, **kwargs):
    """Wrapper que adiciona monitoramento automático"""
    start_time = time.time()
    
    try:
        response = requests.request(method, url, **kwargs)
        response_time = time.time() - start_time
        
        # Calcular tamanho dos dados se possível
        data_size = len(response.content) if response.content else 0
        
        log_api_usage(
            endpoint=url.split('/')[-1],
            method=method,
            status_code=response.status_code,
            response_time=response_time,
            data_size=data_size
        )
        
        return response
        
    except Exception as e:
        response_time = time.time() - start_time
        logger.error(f"Erro na requisição: {e}")
        log_api_usage(
            endpoint=url.split('/')[-1],
            method=method,
            status_code=0,
            response_time=response_time
        )
        raise

Estratégias de Otimização

Para Alta Frequência de Dados

Se você processa muitos dados regularmente:

Alertas e Monitoramento

Métricas Importantes

Monitore estas métricas em suas integrações:
  • Taxa de sucesso: % de requisições bem-sucedidas
  • Tempo de resposta médio: Performance da API
  • Taxa de erro por endpoint: Identifica problemas específicos
  • Uso de cota: Proximidade dos limites
  • Volume de dados processados: Crescimento ao longo do tempo

Configuração de Alertas

def verificar_saude_api():
    """Verificação periódica da saúde da API"""
    metrics = {
        "success_rate": 0.0,
        "avg_response_time": 0.0,
        "error_count": 0,
        "quota_usage": 0.0
    }
    
    # Calcular métricas dos últimos logs
    # ... lógica de cálculo ...
    
    # Alertas baseados em thresholds
    if metrics["success_rate"] < 0.95:
        enviar_alerta("Taxa de sucesso baixa", metrics)
    
    if metrics["avg_response_time"] > 20:
        enviar_alerta("API lenta", metrics)
    
    if metrics["quota_usage"] > 0.8:
        enviar_alerta("Quota próxima do limite", metrics)

def enviar_alerta(tipo, metrics):
    """Enviar alerta via webhook, email, etc."""
    alert_data = {
        "alert_type": tipo,
        "timestamp": time.time(),
        "metrics": metrics
    }
    # Implementar envio do alerta

Troubleshooting

Problemas Comuns

Planos e Limites Customizados

Para necessidades especiais de volume ou performance, entre em contato com nossa equipe:
Os limites e quotas estão sujeitos a alterações. Sempre consulte a documentação mais recente e monitore seu uso de forma programática.