Templates prontos
Acelere seu desenvolvimento com estes templates testados e otimizados para integração com DataSnap.Todos os templates incluem tratamento de erro, validação e boas práticas de segurança. Adapte conforme suas necessidades.
Cliente base reutilizável
Template Python
Copy
import requests
import json
import time
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
@dataclass
class DataSnapConfig:
token: str
base_url: str = "https://api.datasnap.cloud"
timeout: int = 30
retry_attempts: int = 3
class DataSnapClient:
"""Cliente base reutilizável para integração com DataSnap"""
def __init__(self, config: DataSnapConfig):
self.config = config
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {config.token}",
"User-Agent": "DataSnap-Client/1.0"
})
# Configurar logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
"""Método base para requisições com retry automático"""
for attempt in range(self.config.retry_attempts):
try:
response = self.session.request(
method,
f"{self.config.base_url}{url}",
timeout=self.config.timeout,
**kwargs
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code >= 400: # Erro HTTP
wait_time = 2 ** attempt
self.logger.warning(f"Rate limit hit, waiting {wait_time}s")
time.sleep(wait_time)
continue
raise
except requests.exceptions.RequestException as e:
if attempt == self.config.retry_attempts - 1:
raise
time.sleep(2 ** attempt)
def upload_file(self, schema_slug: str, file_path: str) -> Dict[str, Any]:
"""Upload de arquivo com validação"""
with open(file_path, 'rb') as f:
files = {'files': f}
# Remove Content-Type header para multipart
headers = {"Authorization": f"Bearer {self.config.token}"}
response = self.session.post(
f"{self.config.base_url}/api/v1/schemas/{schema_slug}/files",
files=files,
headers=headers,
timeout=self.config.timeout
)
response.raise_for_status()
return response.json()
def query(self, schema_slug: str, query_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute consulta com validação de parâmetros"""
required_fields = ['select']
for field in required_fields:
if field not in query_data:
raise ValueError(f"Campo obrigatório ausente: {field}")
return self._make_request(
'POST',
f'/api/v1/schemas/{schema_slug}/query',
json=query_data
)
# Exemplo de uso
config = DataSnapConfig(token="seu_token_aqui")
client = DataSnapClient(config)
Template JavaScript/Node.js
Copy
const axios = require('axios');
const FormData = require('form-data');
const fs = require('fs');
class DataSnapClient {
constructor(config = {}) {
this.config = {
baseUrl: 'https://api.datasnap.cloud',
timeout: 30000,
retryAttempts: 3,
...config
};
this.axios = axios.create({
baseURL: this.config.baseUrl,
timeout: this.config.timeout,
headers: {
'Authorization': `Bearer ${this.config.token}`,
'User-Agent': 'DataSnap-Client-JS/1.0'
}
});
// Interceptor para retry automático
this.setupRetryInterceptor();
}
setupRetryInterceptor() {
this.axios.interceptors.response.use(
response => response,
async error => {
const { config } = error;
if (!config.retryCount) {
config.retryCount = 0;
}
if (
config.retryCount < this.config.retryAttempts &&
(error.response?.status >= 400 || error.code === 'ECONNRESET')
) {
config.retryCount++;
const delay = Math.pow(2, config.retryCount) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
return this.axios(config);
}
return Promise.reject(error);
}
);
}
async uploadFile(schemaSlug, filePath) {
const formData = new FormData();
formData.append('files', fs.createReadStream(filePath));
const response = await this.axios.post(
`/api/v1/schemas/${schemaSlug}/files`,
formData,
{ headers: formData.getHeaders() }
);
return response.data;
}
async query(schemaSlug, queryData) {
if (!queryData.select) {
throw new Error('Campo "select" é obrigatório');
}
const response = await this.axios.post(
`/api/v1/schemas/${schemaSlug}/query`,
queryData
);
return response.data;
}
}
module.exports = DataSnapClient;
Snippets de validação
Validador JSONL
Copy
def validate_jsonl_file(file_path: str, max_errors: int = 10) -> Dict[str, Any]:
"""
Valida arquivo JSONL e retorna relatório detalhado
"""
errors = []
valid_lines = 0
total_lines = 0
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
total_lines += 1
line = line.strip()
if not line: # Pular linhas vazias
continue
try:
json.loads(line)
valid_lines += 1
except json.JSONDecodeError as e:
errors.append({
'line': line_num,
'error': str(e),
'content': line[:100] + '...' if len(line) > 100 else line
})
if len(errors) >= max_errors:
break
return {
'valid': len(errors) == 0,
'total_lines': total_lines,
'valid_lines': valid_lines,
'error_count': len(errors),
'errors': errors,
'success_rate': (valid_lines / total_lines) * 100 if total_lines > 0 else 0
}
# Uso
result = validate_jsonl_file('dados.jsonl')
if result['valid']:
print(f"✅ Arquivo válido: {result['valid_lines']} linhas")
else:
print(f"❌ {result['error_count']} erros encontrados")
for error in result['errors'][:5]: # Mostrar primeiros 5 erros
print(f" Linha {error['line']}: {error['error']}")
Gerador de schemas
Copy
def infer_schema_from_jsonl(file_path: str, sample_size: int = 1000) -> Dict[str, Any]:
"""
Infere schema a partir de arquivo JSONL
"""
fields = {}
samples_processed = 0
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
if samples_processed >= sample_size:
break
try:
data = json.loads(line.strip())
for key, value in data.items():
if key not in fields:
fields[key] = {
'type': None,
'nullable': False,
'samples': []
}
# Inferir tipo
if value is None:
fields[key]['nullable'] = True
else:
value_type = type(value).__name__
if fields[key]['type'] is None:
fields[key]['type'] = value_type
elif fields[key]['type'] != value_type:
fields[key]['type'] = 'mixed'
# Coletar amostras
if len(fields[key]['samples']) < 5:
fields[key]['samples'].append(value)
samples_processed += 1
except json.JSONDecodeError:
continue
return {
'inferred_schema': fields,
'samples_processed': samples_processed
}
Templates de consulta
Consultas comuns
Copy
class QueryTemplates:
"""Templates de consultas frequentemente usadas"""
@staticmethod
def top_values(field: str, limit: int = 10) -> Dict[str, Any]:
"""Top N valores mais frequentes"""
return {
"select": [field, "count(*) as frequency"],
"group_by": [field],
"order_by": [{"field": "frequency", "direction": "desc"}],
"limit": limit
}
@staticmethod
def daily_aggregation(date_field: str, value_field: str,
start_date: str, end_date: str) -> Dict[str, Any]:
"""Agregação diária de valores"""
return {
"select": [
f"date({date_field}) as date",
f"count(*) as count",
f"sum({value_field}) as total",
f"avg({value_field}) as average"
],
"where": [
{"field": date_field, "op": ">=", "value": start_date},
{"field": date_field, "op": "<=", "value": end_date}
],
"group_by": [f"date({date_field})"],
"order_by": [{"field": "date", "direction": "asc"}]
}
@staticmethod
def percentile_analysis(field: str) -> Dict[str, Any]:
"""Análise de percentis"""
return {
"select": [
f"min({field}) as min_value",
f"max({field}) as max_value",
f"avg({field}) as avg_value",
f"quantile(0.25)({field}) as p25",
f"quantile(0.5)({field}) as p50",
f"quantile(0.75)({field}) as p75",
f"quantile(0.95)({field}) as p95"
]
}
# Exemplos de uso
templates = QueryTemplates()
# Top 10 categorias mais frequentes
top_categories = templates.top_values("categoria", 10)
# Vendas diárias do mês
daily_sales = templates.daily_aggregation(
"data_venda", "valor", "2024-08-01", "2024-08-31"
)
# Análise estatística de preços
price_stats = templates.percentile_analysis("preco")
Copy
import asyncio
from datetime import datetime, timedelta
Batch uploader
Copy
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchUploader:
def __init__(self, client: DataSnapClient, max_workers: int = 5):
self.client = client
self.max_workers = max_workers
def upload_directory(self, directory_path: str, schema_slug: str,
pattern: str = "*.jsonl") -> Dict[str, Any]:
"""Upload todos os arquivos JSONL de um diretório"""
directory = Path(directory_path)
files = list(directory.glob(pattern))
results = {
'successful': [],
'failed': [],
'total': len(files)
}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_file = {
executor.submit(self._upload_single_file, schema_slug, str(file)): file
for file in files
}
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
results['successful'].append({
'file': str(file_path),
'result': result
})
print(f"✅ {file_path.name} enviado com sucesso")
except Exception as e:
results['failed'].append({
'file': str(file_path),
'error': str(e)
})
print(f"❌ Erro ao enviar {file_path.name}: {e}")
return results
def _upload_single_file(self, schema_slug: str, file_path: str):
"""Upload de um único arquivo com validação"""
# Validar antes do upload
validation = validate_jsonl_file(file_path, max_errors=1)
if not validation['valid']:
raise ValueError(f"Arquivo inválido: {validation['errors'][0]['error']}")
return self.client.upload_file(schema_slug, file_path)
Templates de configuração
Configuração ambiente
Copy
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class EnvironmentConfig:
"""Configuração por ambiente"""
# Obrigatórios
token: str
# Opcionais com defaults
environment: str = "production"
timeout: int = 30
retry_attempts: int = 3
max_file_size_mb: int = 100
@classmethod
def from_env(cls) -> 'EnvironmentConfig':
"""Cria configuração a partir de variáveis de ambiente"""
token = os.getenv('DATASNAP_TOKEN')
if not token:
raise ValueError("DATASNAP_TOKEN é obrigatório")
return cls(
token=token,
environment=os.getenv('DATASNAP_ENV', 'production'),
timeout=int(os.getenv('DATASNAP_TIMEOUT', '30')),
retry_attempts=int(os.getenv('DATASNAP_RETRIES', '3')),
max_file_size_mb=int(os.getenv('DATASNAP_MAX_SIZE', '100'))
)
@property
def base_url(self) -> str:
"""URL base por ambiente"""
urls = {
'production': 'https://api.datasnap.cloud'
}
return urls.get(self.environment, urls['production'])
# Uso
config = EnvironmentConfig.from_env()
client = DataSnapClient(DataSnapConfig(
token=config.token,
base_url=config.base_url,
timeout=config.timeout,
retry_attempts=config.retry_attempts
))
Docker Compose template
Copy
version: '3.8'
services:
datasnap-processor:
build: .
environment:
- DATASNAP_TOKEN=${DATASNAP_TOKEN}
- DATASNAP_ENV=${DATASNAP_ENV:-production}
- DATASNAP_TIMEOUT=30
- DATASNAP_RETRIES=3
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
datasnap-scheduler:
build: .
command: python scheduler.py
environment:
- DATASNAP_TOKEN=${DATASNAP_TOKEN}
- SCHEDULE_INTERVAL=${SCHEDULE_INTERVAL:-3600}
depends_on:
- datasnap-processor
restart: unless-stopped
Logging e monitoramento
Logger estruturado
Copy
import logging
import json
from datetime import datetime
class DataSnapLogger:
def __init__(self, name: str = "datasnap", level: str = "INFO"):
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, level))
# Handler para saída estruturada
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_request(self, method: str, url: str, status_code: int,
response_time: float, **kwargs):
"""Log estruturado para requisições"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'event': 'api_request',
'method': method,
'url': url,
'status_code': status_code,
'response_time_ms': round(response_time * 1000, 2),
**kwargs
}
self.logger.info(json.dumps(log_data))
def log_upload(self, schema_slug: str, file_path: str,
file_size: int, success: bool, **kwargs):
"""Log estruturado para uploads"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'event': 'file_upload',
'schema': schema_slug,
'file_path': file_path,
'file_size_mb': round(file_size / (1024*1024), 2),
'success': success,
**kwargs
}
self.logger.info(json.dumps(log_data))
# Uso
logger = DataSnapLogger()
logger.log_upload("vendas", "data.jsonl", 1048576, True, file_id=123)

