Visão Geral A API DataSnap implementa limites para garantir performance estável e uso justo dos recursos. Este guia detalha todos os limites aplicados e como trabalhar eficientemente dentro deles. 
Os limites são aplicados por tenant (conta) e são resetados automaticamente dentro das janelas de tempo especificadas. 
Limites por Endpoint Upload de Arquivos Limite Valor Observações Tamanho máximo por arquivo 10 MB Aplica-se a cada arquivo individual (recomendado para melhor experiência) Formato suportado JSONL Cada linha deve ser um JSON válido 
Listagem de Arquivos Limite Valor Janela de Tempo Requisições 100 requests 1 minuto Itens por página 1-100 Por requisição 
Consultas (Query) Limite Valor Janela de Tempo Requisições 50 requests 1 minuto Registros por consulta 1-10.000 Por requisição Timeout de consulta 30 segundos Execução máxima 
Boas Práticas Implemente Rate Limiting Para evitar atingir os limites, implemente controle de taxa em suas aplicações: 
Python - Rate Limiting
JavaScript - Rate Limiting
import  time from  functools  import  wraps def  rate_limit ( calls_per_minute = 45 ):     def  decorator ( func ):         last_called  =  [ 0.0 ]         @wraps (func)         def  wrapper ( * args ,  ** kwargs ):             elapsed  =  time.time()  -  last_called[ 0 ]             left_to_wait  =  60.0  /  calls_per_minute  -  elapsed             if  left_to_wait  >  0 :                 time.sleep(left_to_wait)             ret  =  func( * args,  ** kwargs)             last_called[ 0 ]  =  time.time()             return  ret         return  wrapper     return  decorator # Uso @rate_limit ( calls_per_minute = 40 )   # Margem de segurança def  fazer_consulta ( query_data ):     return  requests.post(url,  headers = headers,  json = query_data) 
Upload Eficiente Para múltiplos arquivos, otimize o processo: 
def  upload_multiplos_arquivos ( schema_slug ,  lista_arquivos ,  headers ):     """Upload otimizado para múltiplos arquivos"""     files  =  []     try :         # Preparar todos os arquivos         for  arquivo  in  lista_arquivos:             files.append(( 'files' ,  open (arquivo,  'rb' )))                  # Upload direto para nuvem usando URL do token         response  =  requests.put(             " {pre_signed_url} /dados.jsonl" ,             headers = { "Content-Type" :  "application/octet-stream" },             data = arquivo,             timeout = 300   # 5 minutos para uploads         )                  return  response.json()              finally :         # Sempre fechar os arquivos         for  _, file_obj  in  files:             file_obj.close() # Dividir arquivos grandes em chunks menores def  dividir_arquivo_grande ( arquivo_path ,  chunk_size_mb = 8 ):     """Divide arquivos grandes em chunks menores"""     chunk_size  =  chunk_size_mb  *  1024  *  1024     chunks  =  []          with  open (arquivo_path,  'r' )  as  f:         chunk_lines  =  []         current_size  =  0                  for  line  in  f:             line_size  =  len (line.encode( 'utf-8' ))                          if  current_size  +  line_size  >  chunk_size  and  chunk_lines:                 # Salvar chunk atual                 chunk_file  =  f " { arquivo_path } .chunk_ { len (chunks) } .jsonl"                 with  open (chunk_file,  'w' )  as  chunk_f:                     chunk_f.writelines(chunk_lines)                 chunks.append(chunk_file)                                  # Resetar para próximo chunk                 chunk_lines  =  [line]                 current_size  =  line_size             else :                 chunk_lines.append(line)                 current_size  +=  line_size                  # Último chunk         if  chunk_lines:             chunk_file  =  f " { arquivo_path } .chunk_ { len (chunks) } .jsonl"             with  open (chunk_file,  'w' )  as  chunk_f:                 chunk_f.writelines(chunk_lines)             chunks.append(chunk_file)          return  chunks Consultas Otimizadas Estruture suas consultas para melhor performance: 
# ✅ Consulta otimizada consulta_eficiente  =  {     "select" : [ "id" ,  "nome" ,  "categoria" ],   # Campos específicos     "where" : [         { "field" :  "ativo" ,  "op" :  "=" ,  "value" :  True },         { "field" :  "criado_em" ,  "op" :  ">=" ,  "value" :  "2024-01-01" }     ],     "order_by" : [{ "field" :  "id" ,  "direction" :  "asc" }],     "limit" :  100 ,   # Limite razoável     "format" :  "json_compact"   # Formato compacto para menos dados } # ❌ Consulta ineficiente consulta_problematica  =  {     "select" : [ "*" ],   # Evite select *      "limit" :  10000 ,    # Limite muito alto     "format" :  "json"   # Formato verboso desnecessário } Paginação Eficiente Use paginação baseada em cursor para melhor performance: 
def  buscar_todos_registros ( schema_slug ,  headers ,  consulta_base ):     """Busca todos os registros usando paginação eficiente"""     todos_registros  =  []     page_token  =  None          while  True :         consulta  =  consulta_base.copy()         consulta[ "limit" ]  =  100   # Chunks menores                  if  page_token:             consulta[ "page_token" ]  =  page_token                      try :             response  =  requests.post(                 f "https://api.datasnap.com.br/api/v1/schemas/ { schema_slug } /query" ,                 headers = headers,                 json = consulta,                 timeout = 30             )             response.raise_for_status()                          dados  =  response.json()             todos_registros.extend(dados.get( "data" , []))                          # Verificar se há mais páginas             meta  =  dados.get( "meta" , {})             if  "next_page_token"  not  in  meta:                 break                              page_token  =  meta[ "next_page_token" ]                      except  requests.exceptions.RequestException  as  e:             print ( f "Erro na paginação:  { e } " )             break          return  todos_registros Monitoramento de Uso Implementando Logs Estruturados import  logging import  json import  time logging.basicConfig( level = logging. INFO ) logger  =  logging.getLogger( __name__ ) def  log_api_usage ( endpoint ,  method ,  status_code ,  response_time ,  data_size = None ):     """Log estruturado para monitorar uso da API"""     log_data  =  {         "timestamp" : time.time(),         "endpoint" : endpoint,         "method" : method,         "status_code" : status_code,         "response_time_ms" : response_time  *  1000 ,         "success" :  200  <=  status_code  <  300 ,     }          if  data_size:         log_data[ "data_size_mb" ]  =  data_size  /  ( 1024  *  1024 )          # Adicionar alertas para limites próximos     if  status_code  >=  400 :         log_data[ "alert" ]  =  f "HTTP_ERROR_ { status_code } "     elif  response_time  >  25 :   # Próximo do timeout         log_data[ "alert" ]  =  "SLOW_RESPONSE"          logger.info(json.dumps(log_data)) # Wrapper para requests com monitoramento def  fazer_requisicao_monitorada ( method ,  url ,  ** kwargs ):     """Wrapper que adiciona monitoramento automático"""     start_time  =  time.time()          try :         response  =  requests.request(method, url,  ** kwargs)         response_time  =  time.time()  -  start_time                  # Calcular tamanho dos dados se possível         data_size  =  len (response.content)  if  response.content  else  0                  log_api_usage(             endpoint = url.split( '/' )[ - 1 ],             method = method,             status_code = response.status_code,             response_time = response_time,             data_size = data_size         )                  return  response              except  Exception  as  e:         response_time  =  time.time()  -  start_time         logger.error( f "Erro na requisição:  { e } " )         log_api_usage(             endpoint = url.split( '/' )[ - 1 ],             method = method,             status_code = 0 ,             response_time = response_time         )         raise Estratégias de Otimização Para Alta Frequência de Dados Se você processa muitos dados regularmente: 
Implemente cache para consultas frequentes: import  time from  functools  import  lru_cache import  hashlib class  QueryCache :     def  __init__ ( self ,  ttl_seconds = 300 ):   # 5 minutos         self .cache  =  {}         self .ttl  =  ttl_seconds          def  get_cache_key ( self ,  query_data ):         """Gera chave única para a consulta"""         query_str  =  json.dumps(query_data,  sort_keys = True )         return  hashlib.md5(query_str.encode()).hexdigest()          def  get ( self ,  query_data ):         key  =  self .get_cache_key(query_data)         if  key  in  self .cache:             cached_data, timestamp  =  self .cache[key]             if  time.time()  -  timestamp  <  self .ttl:                 return  cached_data         return  None          def  set ( self ,  query_data ,  result ):         key  =  self .get_cache_key(query_data)         self .cache[key]  =  (result, time.time()) # Uso do cache cache  =  QueryCache( ttl_seconds = 300 ) def  consulta_com_cache ( query_data ):     # Tentar cache primeiro     cached  =  cache.get(query_data)     if  cached:         return  cached          # Fazer requisição se não estiver em cache     result  =  fazer_consulta(query_data)     cache.set(query_data, result)     return  result Alertas e Monitoramento Métricas Importantes Monitore estas métricas em suas integrações: 
Taxa de sucesso : % de requisições bem-sucedidasTempo de resposta médio : Performance da APITaxa de erro por endpoint : Identifica problemas específicosUso de cota : Proximidade dos limitesVolume de dados consultados : Crescimento ao longo do tempo 
Configuração de Alertas def  verificar_saude_api ():     """Verificação periódica da saúde da API"""     metrics  =  {         "success_rate" :  0.0 ,         "avg_response_time" :  0.0 ,         "error_count" :  0 ,         "quota_usage" :  0.0     }          # Calcular métricas dos últimos logs     # ... lógica de cálculo ...          # Alertas baseados em thresholds     if  metrics[ "success_rate" ]  <  0.95 :         enviar_alerta( "Taxa de sucesso baixa" , metrics)          if  metrics[ "avg_response_time" ]  >  20 :         enviar_alerta( "API lenta" , metrics)          if  metrics[ "quota_usage" ]  >  0.8 :         enviar_alerta( "Quota próxima do limite" , metrics) def  enviar_alerta ( tipo ,  metrics ):     """Enviar alerta via email, etc."""     alert_data  =  {         "alert_type" : tipo,         "timestamp" : time.time(),         "metrics" : metrics     }     # Implementar envio do alerta Troubleshooting Problemas Comuns 
Sintoma : Timeout em consultas ou respostas muito demoradasSoluções :
Usar índices adequados nos filtros 
Limitar o número de registros retornados 
Usar formato json_compact 
Otimizar cláusulas WHERE 
 # ✅ Consulta otimizada query_rapida  =  {     "select" : [ "id" ,  "campo_indexado" ],     "where" : [         { "field" :  "campo_indexado" ,  "op" :  "=" ,  "value" :  "valor_especifico" }     ],     "limit" :  100 ,     "format" :  "json_compact" } 
Sintoma : Falhas frequentes no upload de arquivosSoluções :
Verificar tamanho dos arquivos (máx 10MB recomendado) 
Validar formato JSONL 
Implementar retry para falhas temporárias 
Dividir arquivos grandes 
 def  validar_jsonl ( arquivo_path ):     """Valida formato JSONL antes do upload"""     try :         with  open (arquivo_path,  'r' )  as  f:             for  i, linha  in  enumerate (f,  1 ):                 try :                     json.loads(linha.strip())                 except  json.JSONDecodeError  as  e:                     return  False ,  f "Linha  { i } :  { e } "         return  True ,  "Válido"     except  Exception  as  e:         return  False ,  str (e) Planos e Limites Customizados Para necessidades especiais de volume ou performance, entre em contato com nossa equipe: 
Os limites e quotas estão sujeitos a alterações. Sempre consulte a documentação mais recente e monitore seu uso de forma programática.