Visão Geral
A API DataSnap implementa limites para garantir performance estável e uso justo dos recursos. Este guia detalha todos os limites aplicados e como trabalhar eficientemente dentro deles.
Os limites são aplicados por tenant (conta) e são resetados automaticamente dentro das janelas de tempo especificadas.
Limites por Endpoint
Upload de Arquivos
Limite Valor Observações Tamanho máximo por arquivo 10 MB Aplica-se a cada arquivo individual (recomendado para melhor experiência) Formato suportado JSONL Cada linha deve ser um JSON válido
Listagem de Arquivos
Limite Valor Janela de Tempo Requisições 100 requests 1 minuto Itens por página 1-100 Por requisição
Consultas (Query)
Limite Valor Janela de Tempo Requisições 50 requests 1 minuto Registros por consulta 1-10.000 Por requisição Timeout de consulta 30 segundos Execução máxima
Boas Práticas
Implemente Rate Limiting
Para evitar atingir os limites, implemente controle de taxa em suas aplicações:
Python - Rate Limiting
JavaScript - Rate Limiting
import time
from functools import wraps
def rate_limit ( calls_per_minute = 45 ):
def decorator ( func ):
last_called = [ 0.0 ]
@wraps (func)
def wrapper ( * args , ** kwargs ):
elapsed = time.time() - last_called[ 0 ]
left_to_wait = 60.0 / calls_per_minute - elapsed
if left_to_wait > 0 :
time.sleep(left_to_wait)
ret = func( * args, ** kwargs)
last_called[ 0 ] = time.time()
return ret
return wrapper
return decorator
# Uso
@rate_limit ( calls_per_minute = 40 ) # Margem de segurança
def fazer_consulta ( query_data ):
return requests.post(url, headers = headers, json = query_data)
Upload Eficiente
Para múltiplos arquivos, otimize o processo:
def upload_multiplos_arquivos ( schema_slug , lista_arquivos , headers ):
"""Upload otimizado para múltiplos arquivos"""
files = []
try :
# Preparar todos os arquivos
for arquivo in lista_arquivos:
files.append(( 'files' , open (arquivo, 'rb' )))
# Upload direto para nuvem usando URL do token
response = requests.put(
" {pre_signed_url} /dados.jsonl" ,
headers = { "Content-Type" : "application/octet-stream" },
data = arquivo,
timeout = 300 # 5 minutos para uploads
)
return response.json()
finally :
# Sempre fechar os arquivos
for _, file_obj in files:
file_obj.close()
# Dividir arquivos grandes em chunks menores
def dividir_arquivo_grande ( arquivo_path , chunk_size_mb = 8 ):
"""Divide arquivos grandes em chunks menores"""
chunk_size = chunk_size_mb * 1024 * 1024
chunks = []
with open (arquivo_path, 'r' ) as f:
chunk_lines = []
current_size = 0
for line in f:
line_size = len (line.encode( 'utf-8' ))
if current_size + line_size > chunk_size and chunk_lines:
# Salvar chunk atual
chunk_file = f " { arquivo_path } .chunk_ { len (chunks) } .jsonl"
with open (chunk_file, 'w' ) as chunk_f:
chunk_f.writelines(chunk_lines)
chunks.append(chunk_file)
# Resetar para próximo chunk
chunk_lines = [line]
current_size = line_size
else :
chunk_lines.append(line)
current_size += line_size
# Último chunk
if chunk_lines:
chunk_file = f " { arquivo_path } .chunk_ { len (chunks) } .jsonl"
with open (chunk_file, 'w' ) as chunk_f:
chunk_f.writelines(chunk_lines)
chunks.append(chunk_file)
return chunks
Consultas Otimizadas
Estruture suas consultas para melhor performance:
# ✅ Consulta otimizada
consulta_eficiente = {
"select" : [ "id" , "nome" , "categoria" ], # Campos específicos
"where" : [
{ "field" : "ativo" , "op" : "=" , "value" : True },
{ "field" : "criado_em" , "op" : ">=" , "value" : "2024-01-01" }
],
"order_by" : [{ "field" : "id" , "direction" : "asc" }],
"limit" : 100 , # Limite razoável
"format" : "json_compact" # Formato compacto para menos dados
}
# ❌ Consulta ineficiente
consulta_problematica = {
"select" : [ "*" ], # Evite select *
"limit" : 10000 , # Limite muito alto
"format" : "json" # Formato verboso desnecessário
}
Paginação Eficiente
Use paginação baseada em cursor para melhor performance:
def buscar_todos_registros ( schema_slug , headers , consulta_base ):
"""Busca todos os registros usando paginação eficiente"""
todos_registros = []
page_token = None
while True :
consulta = consulta_base.copy()
consulta[ "limit" ] = 100 # Chunks menores
if page_token:
consulta[ "page_token" ] = page_token
try :
response = requests.post(
f "https://api.datasnap.cloud/api/v1/schemas/ { schema_slug } /query" ,
headers = headers,
json = consulta,
timeout = 30
)
response.raise_for_status()
dados = response.json()
todos_registros.extend(dados.get( "data" , []))
# Verificar se há mais páginas
meta = dados.get( "meta" , {})
if "next_page_token" not in meta:
break
page_token = meta[ "next_page_token" ]
except requests.exceptions.RequestException as e:
print ( f "Erro na paginação: { e } " )
break
return todos_registros
Monitoramento de Uso
Implementando Logs Estruturados
import logging
import json
import time
logging.basicConfig( level = logging. INFO )
logger = logging.getLogger( __name__ )
def log_api_usage ( endpoint , method , status_code , response_time , data_size = None ):
"""Log estruturado para monitorar uso da API"""
log_data = {
"timestamp" : time.time(),
"endpoint" : endpoint,
"method" : method,
"status_code" : status_code,
"response_time_ms" : response_time * 1000 ,
"success" : 200 <= status_code < 300 ,
}
if data_size:
log_data[ "data_size_mb" ] = data_size / ( 1024 * 1024 )
# Adicionar alertas para limites próximos
if status_code >= 400 :
log_data[ "alert" ] = f "HTTP_ERROR_ { status_code } "
elif response_time > 25 : # Próximo do timeout
log_data[ "alert" ] = "SLOW_RESPONSE"
logger.info(json.dumps(log_data))
# Wrapper para requests com monitoramento
def fazer_requisicao_monitorada ( method , url , ** kwargs ):
"""Wrapper que adiciona monitoramento automático"""
start_time = time.time()
try :
response = requests.request(method, url, ** kwargs)
response_time = time.time() - start_time
# Calcular tamanho dos dados se possível
data_size = len (response.content) if response.content else 0
log_api_usage(
endpoint = url.split( '/' )[ - 1 ],
method = method,
status_code = response.status_code,
response_time = response_time,
data_size = data_size
)
return response
except Exception as e:
response_time = time.time() - start_time
logger.error( f "Erro na requisição: { e } " )
log_api_usage(
endpoint = url.split( '/' )[ - 1 ],
method = method,
status_code = 0 ,
response_time = response_time
)
raise
Estratégias de Otimização
Para Alta Frequência de Dados
Se você processa muitos dados regularmente:
Implemente cache para consultas frequentes: import time
from functools import lru_cache
import hashlib
class QueryCache :
def __init__ ( self , ttl_seconds = 300 ): # 5 minutos
self .cache = {}
self .ttl = ttl_seconds
def get_cache_key ( self , query_data ):
"""Gera chave única para a consulta"""
query_str = json.dumps(query_data, sort_keys = True )
return hashlib.md5(query_str.encode()).hexdigest()
def get ( self , query_data ):
key = self .get_cache_key(query_data)
if key in self .cache:
cached_data, timestamp = self .cache[key]
if time.time() - timestamp < self .ttl:
return cached_data
return None
def set ( self , query_data , result ):
key = self .get_cache_key(query_data)
self .cache[key] = (result, time.time())
# Uso do cache
cache = QueryCache( ttl_seconds = 300 )
def consulta_com_cache ( query_data ):
# Tentar cache primeiro
cached = cache.get(query_data)
if cached:
return cached
# Fazer requisição se não estiver em cache
result = fazer_consulta(query_data)
cache.set(query_data, result)
return result
Alertas e Monitoramento
Métricas Importantes
Monitore estas métricas em suas integrações:
Taxa de sucesso : % de requisições bem-sucedidas
Tempo de resposta médio : Performance da API
Taxa de erro por endpoint : Identifica problemas específicos
Uso de cota : Proximidade dos limites
Volume de dados consultados : Crescimento ao longo do tempo
Configuração de Alertas
def verificar_saude_api ():
"""Verificação periódica da saúde da API"""
metrics = {
"success_rate" : 0.0 ,
"avg_response_time" : 0.0 ,
"error_count" : 0 ,
"quota_usage" : 0.0
}
# Calcular métricas dos últimos logs
# ... lógica de cálculo ...
# Alertas baseados em thresholds
if metrics[ "success_rate" ] < 0.95 :
enviar_alerta( "Taxa de sucesso baixa" , metrics)
if metrics[ "avg_response_time" ] > 20 :
enviar_alerta( "API lenta" , metrics)
if metrics[ "quota_usage" ] > 0.8 :
enviar_alerta( "Quota próxima do limite" , metrics)
def enviar_alerta ( tipo , metrics ):
"""Enviar alerta via email, etc."""
alert_data = {
"alert_type" : tipo,
"timestamp" : time.time(),
"metrics" : metrics
}
# Implementar envio do alerta
Troubleshooting
Problemas Comuns
Sintoma : Timeout em consultas ou respostas muito demoradasSoluções :
Usar índices adequados nos filtros
Limitar o número de registros retornados
Usar formato json_compact
Otimizar cláusulas WHERE
# ✅ Consulta otimizada
query_rapida = {
"select" : [ "id" , "campo_indexado" ],
"where" : [
{ "field" : "campo_indexado" , "op" : "=" , "value" : "valor_especifico" }
],
"limit" : 100 ,
"format" : "json_compact"
}
Sintoma : Falhas frequentes no upload de arquivosSoluções :
Verificar tamanho dos arquivos (máx 10MB recomendado)
Validar formato JSONL
Implementar retry para falhas temporárias
Dividir arquivos grandes
def validar_jsonl ( arquivo_path ):
"""Valida formato JSONL antes do upload"""
try :
with open (arquivo_path, 'r' ) as f:
for i, linha in enumerate (f, 1 ):
try :
json.loads(linha.strip())
except json.JSONDecodeError as e:
return False , f "Linha { i } : { e } "
return True , "Válido"
except Exception as e:
return False , str (e)
Planos e Limites Customizados
Para necessidades especiais de volume ou performance, entre em contato com nossa equipe:
Os limites e quotas estão sujeitos a alterações. Sempre consulte a documentação mais recente e monitore seu uso de forma programática.