Acelere seu desenvolvimento com templates prontos e snippets reutilizáveis para integração com DataSnap
import requests
import json
import time
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass
@dataclass
class DataSnapConfig:
token: str
base_url: str = "https://api.datasnap.com.br"
timeout: int = 30
retry_attempts: int = 3
class DataSnapClient:
"""Cliente base reutilizável para integração com DataSnap"""
def __init__(self, config: DataSnapConfig):
self.config = config
self.session = requests.Session()
self.session.headers.update({
"Authorization": f"Bearer {config.token}",
"User-Agent": "DataSnap-Client/1.0"
})
# Configurar logging
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
"""Método base para requisições com retry automático"""
for attempt in range(self.config.retry_attempts):
try:
response = self.session.request(
method,
f"{self.config.base_url}{url}",
timeout=self.config.timeout,
**kwargs
)
response.raise_for_status()
return response.json()
except requests.exceptions.HTTPError as e:
if e.response.status_code == 429: # Rate limit
wait_time = 2 ** attempt
self.logger.warning(f"Rate limit hit, waiting {wait_time}s")
time.sleep(wait_time)
continue
raise
except requests.exceptions.RequestException as e:
if attempt == self.config.retry_attempts - 1:
raise
time.sleep(2 ** attempt)
def upload_file(self, schema_slug: str, file_path: str) -> Dict[str, Any]:
"""Upload de arquivo com validação"""
with open(file_path, 'rb') as f:
files = {'files': f}
# Remove Content-Type header para multipart
headers = {"Authorization": f"Bearer {self.config.token}"}
response = self.session.post(
f"{self.config.base_url}/api/v1/schemas/{schema_slug}/files",
files=files,
headers=headers,
timeout=self.config.timeout
)
response.raise_for_status()
return response.json()
def query(self, schema_slug: str, query_data: Dict[str, Any]) -> Dict[str, Any]:
"""Execute consulta com validação de parâmetros"""
required_fields = ['select']
for field in required_fields:
if field not in query_data:
raise ValueError(f"Campo obrigatório ausente: {field}")
return self._make_request(
'POST',
f'/api/v1/schemas/{schema_slug}/query',
json=query_data
)
# Exemplo de uso
config = DataSnapConfig(token="seu_token_aqui")
client = DataSnapClient(config)
const axios = require('axios');
const FormData = require('form-data');
const fs = require('fs');
class DataSnapClient {
constructor(config = {}) {
this.config = {
baseUrl: 'https://api.datasnap.com.br',
timeout: 30000,
retryAttempts: 3,
...config
};
this.axios = axios.create({
baseURL: this.config.baseUrl,
timeout: this.config.timeout,
headers: {
'Authorization': `Bearer ${this.config.token}`,
'User-Agent': 'DataSnap-Client-JS/1.0'
}
});
// Interceptor para retry automático
this.setupRetryInterceptor();
}
setupRetryInterceptor() {
this.axios.interceptors.response.use(
response => response,
async error => {
const { config } = error;
if (!config.retryCount) {
config.retryCount = 0;
}
if (
config.retryCount < this.config.retryAttempts &&
(error.response?.status === 429 || error.code === 'ECONNRESET')
) {
config.retryCount++;
const delay = Math.pow(2, config.retryCount) * 1000;
await new Promise(resolve => setTimeout(resolve, delay));
return this.axios(config);
}
return Promise.reject(error);
}
);
}
async uploadFile(schemaSlug, filePath) {
const formData = new FormData();
formData.append('files', fs.createReadStream(filePath));
const response = await this.axios.post(
`/api/v1/schemas/${schemaSlug}/files`,
formData,
{ headers: formData.getHeaders() }
);
return response.data;
}
async query(schemaSlug, queryData) {
if (!queryData.select) {
throw new Error('Campo "select" é obrigatório');
}
const response = await this.axios.post(
`/api/v1/schemas/${schemaSlug}/query`,
queryData
);
return response.data;
}
}
module.exports = DataSnapClient;
def validate_jsonl_file(file_path: str, max_errors: int = 10) -> Dict[str, Any]:
"""
Valida arquivo JSONL e retorna relatório detalhado
"""
errors = []
valid_lines = 0
total_lines = 0
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
total_lines += 1
line = line.strip()
if not line: # Pular linhas vazias
continue
try:
json.loads(line)
valid_lines += 1
except json.JSONDecodeError as e:
errors.append({
'line': line_num,
'error': str(e),
'content': line[:100] + '...' if len(line) > 100 else line
})
if len(errors) >= max_errors:
break
return {
'valid': len(errors) == 0,
'total_lines': total_lines,
'valid_lines': valid_lines,
'error_count': len(errors),
'errors': errors,
'success_rate': (valid_lines / total_lines) * 100 if total_lines > 0 else 0
}
# Uso
result = validate_jsonl_file('dados.jsonl')
if result['valid']:
print(f"✅ Arquivo válido: {result['valid_lines']} linhas")
else:
print(f"❌ {result['error_count']} erros encontrados")
for error in result['errors'][:5]: # Mostrar primeiros 5 erros
print(f" Linha {error['line']}: {error['error']}")
def infer_schema_from_jsonl(file_path: str, sample_size: int = 1000) -> Dict[str, Any]:
"""
Infere schema a partir de arquivo JSONL
"""
fields = {}
samples_processed = 0
with open(file_path, 'r', encoding='utf-8') as f:
for line in f:
if samples_processed >= sample_size:
break
try:
data = json.loads(line.strip())
for key, value in data.items():
if key not in fields:
fields[key] = {
'type': None,
'nullable': False,
'samples': []
}
# Inferir tipo
if value is None:
fields[key]['nullable'] = True
else:
value_type = type(value).__name__
if fields[key]['type'] is None:
fields[key]['type'] = value_type
elif fields[key]['type'] != value_type:
fields[key]['type'] = 'mixed'
# Coletar amostras
if len(fields[key]['samples']) < 5:
fields[key]['samples'].append(value)
samples_processed += 1
except json.JSONDecodeError:
continue
return {
'inferred_schema': fields,
'samples_processed': samples_processed
}
class QueryTemplates:
"""Templates de consultas frequentemente usadas"""
@staticmethod
def top_values(field: str, limit: int = 10) -> Dict[str, Any]:
"""Top N valores mais frequentes"""
return {
"select": [field, "count(*) as frequency"],
"group_by": [field],
"order_by": [{"field": "frequency", "direction": "desc"}],
"limit": limit
}
@staticmethod
def daily_aggregation(date_field: str, value_field: str,
start_date: str, end_date: str) -> Dict[str, Any]:
"""Agregação diária de valores"""
return {
"select": [
f"date({date_field}) as date",
f"count(*) as count",
f"sum({value_field}) as total",
f"avg({value_field}) as average"
],
"where": [
{"field": date_field, "op": ">=", "value": start_date},
{"field": date_field, "op": "<=", "value": end_date}
],
"group_by": [f"date({date_field})"],
"order_by": [{"field": "date", "direction": "asc"}]
}
@staticmethod
def percentile_analysis(field: str) -> Dict[str, Any]:
"""Análise de percentis"""
return {
"select": [
f"min({field}) as min_value",
f"max({field}) as max_value",
f"avg({field}) as avg_value",
f"quantile(0.25)({field}) as p25",
f"quantile(0.5)({field}) as p50",
f"quantile(0.75)({field}) as p75",
f"quantile(0.95)({field}) as p95"
]
}
# Exemplos de uso
templates = QueryTemplates()
# Top 10 categorias mais frequentes
top_categories = templates.top_values("categoria", 10)
# Vendas diárias do mês
daily_sales = templates.daily_aggregation(
"data_venda", "valor", "2024-08-01", "2024-08-31"
)
# Análise estatística de preços
price_stats = templates.percentile_analysis("preco")
import asyncio
from datetime import datetime, timedelta
class ProcessingMonitor:
def __init__(self, client: DataSnapClient):
self.client = client
async def wait_for_completion(self, schema_slug: str,
timeout_minutes: int = 30) -> bool:
"""Aguarda conclusão do processamento com timeout"""
start_time = datetime.now()
timeout = timedelta(minutes=timeout_minutes)
while datetime.now() - start_time < timeout:
try:
files = self.client._make_request(
'GET',
f'/api/v1/schemas/{schema_slug}/files',
params={'processing_status': 'pending'}
)
pending_count = files['meta']['total']
if pending_count == 0:
print("✅ Processamento concluído!")
return True
print(f"⏳ {pending_count} arquivos pendentes...")
await asyncio.sleep(30)
except Exception as e:
print(f"❌ Erro ao verificar status: {e}")
await asyncio.sleep(60)
print("⏰ Timeout atingido")
return False
def get_processing_summary(self, schema_slug: str) -> Dict[str, Any]:
"""Obtém resumo do status de processamento"""
files = self.client._make_request(
'GET',
f'/api/v1/schemas/{schema_slug}/files'
)
status_counts = {}
for file_info in files['data']:
status = file_info['processing_status']
status_counts[status] = status_counts.get(status, 0) + 1
return {
'total_files': files['meta']['total'],
'status_breakdown': status_counts,
'completion_rate': (
status_counts.get('completed', 0) / files['meta']['total'] * 100
if files['meta']['total'] > 0 else 0
)
}
import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed
class BatchUploader:
def __init__(self, client: DataSnapClient, max_workers: int = 5):
self.client = client
self.max_workers = max_workers
def upload_directory(self, directory_path: str, schema_slug: str,
pattern: str = "*.jsonl") -> Dict[str, Any]:
"""Upload todos os arquivos JSONL de um diretório"""
directory = Path(directory_path)
files = list(directory.glob(pattern))
results = {
'successful': [],
'failed': [],
'total': len(files)
}
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
future_to_file = {
executor.submit(self._upload_single_file, schema_slug, str(file)): file
for file in files
}
for future in as_completed(future_to_file):
file_path = future_to_file[future]
try:
result = future.result()
results['successful'].append({
'file': str(file_path),
'result': result
})
print(f"✅ {file_path.name} enviado com sucesso")
except Exception as e:
results['failed'].append({
'file': str(file_path),
'error': str(e)
})
print(f"❌ Erro ao enviar {file_path.name}: {e}")
return results
def _upload_single_file(self, schema_slug: str, file_path: str):
"""Upload de um único arquivo com validação"""
# Validar antes do upload
validation = validate_jsonl_file(file_path, max_errors=1)
if not validation['valid']:
raise ValueError(f"Arquivo inválido: {validation['errors'][0]['error']}")
return self.client.upload_file(schema_slug, file_path)
import os
from dataclasses import dataclass
from typing import Optional
@dataclass
class EnvironmentConfig:
"""Configuração por ambiente"""
# Obrigatórios
token: str
# Opcionais com defaults
environment: str = "production"
timeout: int = 30
retry_attempts: int = 3
max_file_size_mb: int = 100
@classmethod
def from_env(cls) -> 'EnvironmentConfig':
"""Cria configuração a partir de variáveis de ambiente"""
token = os.getenv('DATASNAP_TOKEN')
if not token:
raise ValueError("DATASNAP_TOKEN é obrigatório")
return cls(
token=token,
environment=os.getenv('DATASNAP_ENV', 'production'),
timeout=int(os.getenv('DATASNAP_TIMEOUT', '30')),
retry_attempts=int(os.getenv('DATASNAP_RETRIES', '3')),
max_file_size_mb=int(os.getenv('DATASNAP_MAX_SIZE', '100'))
)
@property
def base_url(self) -> str:
"""URL base por ambiente"""
urls = {
'production': 'https://api.datasnap.com.br'
}
return urls.get(self.environment, urls['production'])
# Uso
config = EnvironmentConfig.from_env()
client = DataSnapClient(DataSnapConfig(
token=config.token,
base_url=config.base_url,
timeout=config.timeout,
retry_attempts=config.retry_attempts
))
version: '3.8'
services:
datasnap-processor:
build: .
environment:
- DATASNAP_TOKEN=${DATASNAP_TOKEN}
- DATASNAP_ENV=${DATASNAP_ENV:-production}
- DATASNAP_TIMEOUT=30
- DATASNAP_RETRIES=3
volumes:
- ./data:/app/data
- ./logs:/app/logs
restart: unless-stopped
datasnap-scheduler:
build: .
command: python scheduler.py
environment:
- DATASNAP_TOKEN=${DATASNAP_TOKEN}
- SCHEDULE_INTERVAL=${SCHEDULE_INTERVAL:-3600}
depends_on:
- datasnap-processor
restart: unless-stopped
import logging
import json
from datetime import datetime
class DataSnapLogger:
def __init__(self, name: str = "datasnap", level: str = "INFO"):
self.logger = logging.getLogger(name)
self.logger.setLevel(getattr(logging, level))
# Handler para saída estruturada
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(message)s'))
self.logger.addHandler(handler)
def log_request(self, method: str, url: str, status_code: int,
response_time: float, **kwargs):
"""Log estruturado para requisições"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'event': 'api_request',
'method': method,
'url': url,
'status_code': status_code,
'response_time_ms': round(response_time * 1000, 2),
**kwargs
}
self.logger.info(json.dumps(log_data))
def log_upload(self, schema_slug: str, file_path: str,
file_size: int, success: bool, **kwargs):
"""Log estruturado para uploads"""
log_data = {
'timestamp': datetime.utcnow().isoformat(),
'event': 'file_upload',
'schema': schema_slug,
'file_path': file_path,
'file_size_mb': round(file_size / (1024*1024), 2),
'success': success,
**kwargs
}
self.logger.info(json.dumps(log_data))
# Uso
logger = DataSnapLogger()
logger.log_upload("vendas", "data.jsonl", 1048576, True, file_id=123)