Skip to main content

Visão Geral

A API DataSnap implementa limites para garantir performance estável e uso justo dos recursos. Este guia detalha todos os limites aplicados e como trabalhar eficientemente dentro deles.
Os limites são aplicados por tenant (conta) e são resetados automaticamente dentro das janelas de tempo especificadas.

Limites por Endpoint

Upload de Arquivos

LimiteValorObservações
Tamanho máximo por arquivo10 MBAplica-se a cada arquivo individual (recomendado para melhor experiência)
Formato suportadoJSONLCada linha deve ser um JSON válido

Listagem de Arquivos

LimiteValorJanela de Tempo
Requisições100 requests1 minuto
Itens por página1-100Por requisição

Consultas (Query)

LimiteValorJanela de Tempo
Requisições50 requests1 minuto
Registros por consulta1-10.000Por requisição
Timeout de consulta30 segundosExecução máxima

Boas Práticas

Implemente Rate Limiting

Para evitar atingir os limites, implemente controle de taxa em suas aplicações:
import time
from functools import wraps

def rate_limit(calls_per_minute=45):
    def decorator(func):
        last_called = [0.0]

        @wraps(func)
        def wrapper(*args, **kwargs):
            elapsed = time.time() - last_called[0]
            left_to_wait = 60.0 / calls_per_minute - elapsed

            if left_to_wait > 0:
                time.sleep(left_to_wait)

            ret = func(*args, **kwargs)
            last_called[0] = time.time()
            return ret

        return wrapper
    return decorator

# Uso
@rate_limit(calls_per_minute=40)  # Margem de segurança
def fazer_consulta(query_data):
    return requests.post(url, headers=headers, json=query_data)

Upload Eficiente

Para múltiplos arquivos, otimize o processo:
def upload_multiplos_arquivos(schema_slug, lista_arquivos, headers):
    """Upload otimizado para múltiplos arquivos"""
    files = []
    try:
        # Preparar todos os arquivos
        for arquivo in lista_arquivos:
            files.append(('files', open(arquivo, 'rb')))
        
        # Upload direto para nuvem usando URL do token
        response = requests.put(
            "{pre_signed_url}/dados.jsonl",
            headers={"Content-Type": "application/octet-stream"},
            data=arquivo,
            timeout=300  # 5 minutos para uploads
        )
        
        return response.json()
        
    finally:
        # Sempre fechar os arquivos
        for _, file_obj in files:
            file_obj.close()

# Dividir arquivos grandes em chunks menores
def dividir_arquivo_grande(arquivo_path, chunk_size_mb=8):
    """Divide arquivos grandes em chunks menores"""
    chunk_size = chunk_size_mb * 1024 * 1024
    chunks = []
    
    with open(arquivo_path, 'r') as f:
        chunk_lines = []
        current_size = 0
        
        for line in f:
            line_size = len(line.encode('utf-8'))
            
            if current_size + line_size > chunk_size and chunk_lines:
                # Salvar chunk atual
                chunk_file = f"{arquivo_path}.chunk_{len(chunks)}.jsonl"
                with open(chunk_file, 'w') as chunk_f:
                    chunk_f.writelines(chunk_lines)
                chunks.append(chunk_file)
                
                # Resetar para próximo chunk
                chunk_lines = [line]
                current_size = line_size
            else:
                chunk_lines.append(line)
                current_size += line_size
        
        # Último chunk
        if chunk_lines:
            chunk_file = f"{arquivo_path}.chunk_{len(chunks)}.jsonl"
            with open(chunk_file, 'w') as chunk_f:
                chunk_f.writelines(chunk_lines)
            chunks.append(chunk_file)
    
    return chunks

Consultas Otimizadas

Estruture suas consultas para melhor performance:
# ✅ Consulta otimizada
consulta_eficiente = {
    "select": ["id", "nome", "categoria"],  # Campos específicos
    "where": [
        {"field": "ativo", "op": "=", "value": True},
        {"field": "criado_em", "op": ">=", "value": "2024-01-01"}
    ],
    "order_by": [{"field": "id", "direction": "asc"}],
    "limit": 100,  # Limite razoável
    "format": "json_compact"  # Formato compacto para menos dados
}

# ❌ Consulta ineficiente
consulta_problematica = {
    "select": ["*"],  # Evite select * 
    "limit": 10000,   # Limite muito alto
    "format": "json"  # Formato verboso desnecessário
}

Paginação Eficiente

Use paginação baseada em cursor para melhor performance:
def buscar_todos_registros(schema_slug, headers, consulta_base):
    """Busca todos os registros usando paginação eficiente"""
    todos_registros = []
    page_token = None
    
    while True:
        consulta = consulta_base.copy()
        consulta["limit"] = 100  # Chunks menores
        
        if page_token:
            consulta["page_token"] = page_token
            
        try:
            response = requests.post(
                f"https://api.datasnap.com.br/api/v1/schemas/{schema_slug}/query",
                headers=headers,
                json=consulta,
                timeout=30
            )
            response.raise_for_status()
            
            dados = response.json()
            todos_registros.extend(dados.get("data", []))
            
            # Verificar se há mais páginas
            meta = dados.get("meta", {})
            if "next_page_token" not in meta:
                break
                
            page_token = meta["next_page_token"]
            
        except requests.exceptions.RequestException as e:
            print(f"Erro na paginação: {e}")
            break
    
    return todos_registros

Monitoramento de Uso

Implementando Logs Estruturados

import logging
import json
import time

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def log_api_usage(endpoint, method, status_code, response_time, data_size=None):
    """Log estruturado para monitorar uso da API"""
    log_data = {
        "timestamp": time.time(),
        "endpoint": endpoint,
        "method": method,
        "status_code": status_code,
        "response_time_ms": response_time * 1000,
        "success": 200 <= status_code < 300,
    }
    
    if data_size:
        log_data["data_size_mb"] = data_size / (1024 * 1024)
    
    # Adicionar alertas para limites próximos
    if status_code >= 400:
        log_data["alert"] = f"HTTP_ERROR_{status_code}"
    elif response_time > 25:  # Próximo do timeout
        log_data["alert"] = "SLOW_RESPONSE"
    
    logger.info(json.dumps(log_data))

# Wrapper para requests com monitoramento
def fazer_requisicao_monitorada(method, url, **kwargs):
    """Wrapper que adiciona monitoramento automático"""
    start_time = time.time()
    
    try:
        response = requests.request(method, url, **kwargs)
        response_time = time.time() - start_time
        
        # Calcular tamanho dos dados se possível
        data_size = len(response.content) if response.content else 0
        
        log_api_usage(
            endpoint=url.split('/')[-1],
            method=method,
            status_code=response.status_code,
            response_time=response_time,
            data_size=data_size
        )
        
        return response
        
    except Exception as e:
        response_time = time.time() - start_time
        logger.error(f"Erro na requisição: {e}")
        log_api_usage(
            endpoint=url.split('/')[-1],
            method=method,
            status_code=0,
            response_time=response_time
        )
        raise

Estratégias de Otimização

Para Alta Frequência de Dados

Se você processa muitos dados regularmente:
Implemente cache para consultas frequentes:
import time
from functools import lru_cache
import hashlib

class QueryCache:
    def __init__(self, ttl_seconds=300):  # 5 minutos
        self.cache = {}
        self.ttl = ttl_seconds
    
    def get_cache_key(self, query_data):
        """Gera chave única para a consulta"""
        query_str = json.dumps(query_data, sort_keys=True)
        return hashlib.md5(query_str.encode()).hexdigest()
    
    def get(self, query_data):
        key = self.get_cache_key(query_data)
        if key in self.cache:
            cached_data, timestamp = self.cache[key]
            if time.time() - timestamp < self.ttl:
                return cached_data
        return None
    
    def set(self, query_data, result):
        key = self.get_cache_key(query_data)
        self.cache[key] = (result, time.time())

# Uso do cache
cache = QueryCache(ttl_seconds=300)

def consulta_com_cache(query_data):
    # Tentar cache primeiro
    cached = cache.get(query_data)
    if cached:
        return cached
    
    # Fazer requisição se não estiver em cache
    result = fazer_consulta(query_data)
    cache.set(query_data, result)
    return result

Alertas e Monitoramento

Métricas Importantes

Monitore estas métricas em suas integrações:
  • Taxa de sucesso: % de requisições bem-sucedidas
  • Tempo de resposta médio: Performance da API
  • Taxa de erro por endpoint: Identifica problemas específicos
  • Uso de cota: Proximidade dos limites
  • Volume de dados consultados: Crescimento ao longo do tempo

Configuração de Alertas

def verificar_saude_api():
    """Verificação periódica da saúde da API"""
    metrics = {
        "success_rate": 0.0,
        "avg_response_time": 0.0,
        "error_count": 0,
        "quota_usage": 0.0
    }
    
    # Calcular métricas dos últimos logs
    # ... lógica de cálculo ...
    
    # Alertas baseados em thresholds
    if metrics["success_rate"] < 0.95:
        enviar_alerta("Taxa de sucesso baixa", metrics)
    
    if metrics["avg_response_time"] > 20:
        enviar_alerta("API lenta", metrics)
    
    if metrics["quota_usage"] > 0.8:
        enviar_alerta("Quota próxima do limite", metrics)

def enviar_alerta(tipo, metrics):
    """Enviar alerta via email, etc."""
    alert_data = {
        "alert_type": tipo,
        "timestamp": time.time(),
        "metrics": metrics
    }
    # Implementar envio do alerta

Troubleshooting

Problemas Comuns

Sintoma: Timeout em consultas ou respostas muito demoradasSoluções:
  1. Usar índices adequados nos filtros
  2. Limitar o número de registros retornados
  3. Usar formato json_compact
  4. Otimizar cláusulas WHERE
# ✅ Consulta otimizada
query_rapida = {
    "select": ["id", "campo_indexado"],
    "where": [
        {"field": "campo_indexado", "op": "=", "value": "valor_especifico"}
    ],
    "limit": 100,
    "format": "json_compact"
}
Sintoma: Falhas frequentes no upload de arquivosSoluções:
  1. Verificar tamanho dos arquivos (máx 10MB recomendado)
  2. Validar formato JSONL
  3. Implementar retry para falhas temporárias
  4. Dividir arquivos grandes
def validar_jsonl(arquivo_path):
    """Valida formato JSONL antes do upload"""
    try:
        with open(arquivo_path, 'r') as f:
            for i, linha in enumerate(f, 1):
                try:
                    json.loads(linha.strip())
                except json.JSONDecodeError as e:
                    return False, f"Linha {i}: {e}"
        return True, "Válido"
    except Exception as e:
        return False, str(e)

Planos e Limites Customizados

Para necessidades especiais de volume ou performance, entre em contato com nossa equipe:
Os limites e quotas estão sujeitos a alterações. Sempre consulte a documentação mais recente e monitore seu uso de forma programática.