Templates prontos

Acelere seu desenvolvimento com estes templates testados e otimizados para integração com DataSnap.
Todos os templates incluem tratamento de erro, validação e boas práticas de segurança. Adapte conforme suas necessidades.

Cliente base reutilizável

Template Python

import requests
import json
import time
import logging
from typing import Dict, List, Optional, Any
from dataclasses import dataclass

@dataclass
class DataSnapConfig:
    token: str
    base_url: str = "https://api.datasnap.com.br"
    timeout: int = 30
    retry_attempts: int = 3

class DataSnapClient:
    """Cliente base reutilizável para integração com DataSnap"""
    
    def __init__(self, config: DataSnapConfig):
        self.config = config
        self.session = requests.Session()
        self.session.headers.update({
            "Authorization": f"Bearer {config.token}",
            "User-Agent": "DataSnap-Client/1.0"
        })
        
        # Configurar logging
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    
    def _make_request(self, method: str, url: str, **kwargs) -> Dict[str, Any]:
        """Método base para requisições com retry automático"""
        for attempt in range(self.config.retry_attempts):
            try:
                response = self.session.request(
                    method, 
                    f"{self.config.base_url}{url}",
                    timeout=self.config.timeout,
                    **kwargs
                )
                response.raise_for_status()
                return response.json()
                
            except requests.exceptions.HTTPError as e:
                if e.response.status_code == 429:  # Rate limit
                    wait_time = 2 ** attempt
                    self.logger.warning(f"Rate limit hit, waiting {wait_time}s")
                    time.sleep(wait_time)
                    continue
                raise
            except requests.exceptions.RequestException as e:
                if attempt == self.config.retry_attempts - 1:
                    raise
                time.sleep(2 ** attempt)
                
    def upload_file(self, schema_slug: str, file_path: str) -> Dict[str, Any]:
        """Upload de arquivo com validação"""
        with open(file_path, 'rb') as f:
            files = {'files': f}
            # Remove Content-Type header para multipart
            headers = {"Authorization": f"Bearer {self.config.token}"}
            
            response = self.session.post(
                f"{self.config.base_url}/api/v1/schemas/{schema_slug}/files",
                files=files,
                headers=headers,
                timeout=self.config.timeout
            )
            response.raise_for_status()
            return response.json()
    
    def query(self, schema_slug: str, query_data: Dict[str, Any]) -> Dict[str, Any]:
        """Execute consulta com validação de parâmetros"""
        required_fields = ['select']
        for field in required_fields:
            if field not in query_data:
                raise ValueError(f"Campo obrigatório ausente: {field}")
                
        return self._make_request(
            'POST',
            f'/api/v1/schemas/{schema_slug}/query',
            json=query_data
        )

# Exemplo de uso
config = DataSnapConfig(token="seu_token_aqui")
client = DataSnapClient(config)

Template JavaScript/Node.js

const axios = require('axios');
const FormData = require('form-data');
const fs = require('fs');

class DataSnapClient {
    constructor(config = {}) {
        this.config = {
            baseUrl: 'https://api.datasnap.com.br',
            timeout: 30000,
            retryAttempts: 3,
            ...config
        };
        
        this.axios = axios.create({
            baseURL: this.config.baseUrl,
            timeout: this.config.timeout,
            headers: {
                'Authorization': `Bearer ${this.config.token}`,
                'User-Agent': 'DataSnap-Client-JS/1.0'
            }
        });
        
        // Interceptor para retry automático
        this.setupRetryInterceptor();
    }
    
    setupRetryInterceptor() {
        this.axios.interceptors.response.use(
            response => response,
            async error => {
                const { config } = error;
                
                if (!config.retryCount) {
                    config.retryCount = 0;
                }
                
                if (
                    config.retryCount < this.config.retryAttempts &&
                    (error.response?.status === 429 || error.code === 'ECONNRESET')
                ) {
                    config.retryCount++;
                    const delay = Math.pow(2, config.retryCount) * 1000;
                    
                    await new Promise(resolve => setTimeout(resolve, delay));
                    return this.axios(config);
                }
                
                return Promise.reject(error);
            }
        );
    }
    
    async uploadFile(schemaSlug, filePath) {
        const formData = new FormData();
        formData.append('files', fs.createReadStream(filePath));
        
        const response = await this.axios.post(
            `/api/v1/schemas/${schemaSlug}/files`,
            formData,
            { headers: formData.getHeaders() }
        );
        
        return response.data;
    }
    
    async query(schemaSlug, queryData) {
        if (!queryData.select) {
            throw new Error('Campo "select" é obrigatório');
        }
        
        const response = await this.axios.post(
            `/api/v1/schemas/${schemaSlug}/query`,
            queryData
        );
        
        return response.data;
    }
}

module.exports = DataSnapClient;

Snippets de validação

Validador JSONL

def validate_jsonl_file(file_path: str, max_errors: int = 10) -> Dict[str, Any]:
    """
    Valida arquivo JSONL e retorna relatório detalhado
    """
    errors = []
    valid_lines = 0
    total_lines = 0
    
    with open(file_path, 'r', encoding='utf-8') as f:
        for line_num, line in enumerate(f, 1):
            total_lines += 1
            line = line.strip()
            
            if not line:  # Pular linhas vazias
                continue
                
            try:
                json.loads(line)
                valid_lines += 1
            except json.JSONDecodeError as e:
                errors.append({
                    'line': line_num,
                    'error': str(e),
                    'content': line[:100] + '...' if len(line) > 100 else line
                })
                
                if len(errors) >= max_errors:
                    break
    
    return {
        'valid': len(errors) == 0,
        'total_lines': total_lines,
        'valid_lines': valid_lines,
        'error_count': len(errors),
        'errors': errors,
        'success_rate': (valid_lines / total_lines) * 100 if total_lines > 0 else 0
    }

# Uso
result = validate_jsonl_file('dados.jsonl')
if result['valid']:
    print(f"✅ Arquivo válido: {result['valid_lines']} linhas")
else:
    print(f"❌ {result['error_count']} erros encontrados")
    for error in result['errors'][:5]:  # Mostrar primeiros 5 erros
        print(f"  Linha {error['line']}: {error['error']}")

Gerador de schemas

def infer_schema_from_jsonl(file_path: str, sample_size: int = 1000) -> Dict[str, Any]:
    """
    Infere schema a partir de arquivo JSONL
    """
    fields = {}
    samples_processed = 0
    
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            if samples_processed >= sample_size:
                break
                
            try:
                data = json.loads(line.strip())
                for key, value in data.items():
                    if key not in fields:
                        fields[key] = {
                            'type': None,
                            'nullable': False,
                            'samples': []
                        }
                    
                    # Inferir tipo
                    if value is None:
                        fields[key]['nullable'] = True
                    else:
                        value_type = type(value).__name__
                        if fields[key]['type'] is None:
                            fields[key]['type'] = value_type
                        elif fields[key]['type'] != value_type:
                            fields[key]['type'] = 'mixed'
                    
                    # Coletar amostras
                    if len(fields[key]['samples']) < 5:
                        fields[key]['samples'].append(value)
                
                samples_processed += 1
                
            except json.JSONDecodeError:
                continue
    
    return {
        'inferred_schema': fields,
        'samples_processed': samples_processed
    }

Templates de consulta

Consultas comuns

class QueryTemplates:
    """Templates de consultas frequentemente usadas"""
    
    @staticmethod
    def top_values(field: str, limit: int = 10) -> Dict[str, Any]:
        """Top N valores mais frequentes"""
        return {
            "select": [field, "count(*) as frequency"],
            "group_by": [field],
            "order_by": [{"field": "frequency", "direction": "desc"}],
            "limit": limit
        }
    
    @staticmethod
    def daily_aggregation(date_field: str, value_field: str, 
                         start_date: str, end_date: str) -> Dict[str, Any]:
        """Agregação diária de valores"""
        return {
            "select": [
                f"date({date_field}) as date",
                f"count(*) as count",
                f"sum({value_field}) as total",
                f"avg({value_field}) as average"
            ],
            "where": [
                {"field": date_field, "op": ">=", "value": start_date},
                {"field": date_field, "op": "<=", "value": end_date}
            ],
            "group_by": [f"date({date_field})"],
            "order_by": [{"field": "date", "direction": "asc"}]
        }
    
    @staticmethod
    def percentile_analysis(field: str) -> Dict[str, Any]:
        """Análise de percentis"""
        return {
            "select": [
                f"min({field}) as min_value",
                f"max({field}) as max_value",
                f"avg({field}) as avg_value",
                f"quantile(0.25)({field}) as p25",
                f"quantile(0.5)({field}) as p50",
                f"quantile(0.75)({field}) as p75",
                f"quantile(0.95)({field}) as p95"
            ]
        }

# Exemplos de uso
templates = QueryTemplates()

# Top 10 categorias mais frequentes
top_categories = templates.top_values("categoria", 10)

# Vendas diárias do mês
daily_sales = templates.daily_aggregation(
    "data_venda", "valor", "2024-08-01", "2024-08-31"
)

# Análise estatística de preços
price_stats = templates.percentile_analysis("preco")

Utilitários de processamento

Monitor de processamento

import asyncio
from datetime import datetime, timedelta

class ProcessingMonitor:
    def __init__(self, client: DataSnapClient):
        self.client = client
        
    async def wait_for_completion(self, schema_slug: str, 
                                 timeout_minutes: int = 30) -> bool:
        """Aguarda conclusão do processamento com timeout"""
        start_time = datetime.now()
        timeout = timedelta(minutes=timeout_minutes)
        
        while datetime.now() - start_time < timeout:
            try:
                files = self.client._make_request(
                    'GET', 
                    f'/api/v1/schemas/{schema_slug}/files',
                    params={'processing_status': 'pending'}
                )
                
                pending_count = files['meta']['total']
                if pending_count == 0:
                    print("✅ Processamento concluído!")
                    return True
                    
                print(f"⏳ {pending_count} arquivos pendentes...")
                await asyncio.sleep(30)
                
            except Exception as e:
                print(f"❌ Erro ao verificar status: {e}")
                await asyncio.sleep(60)
        
        print("⏰ Timeout atingido")
        return False
    
    def get_processing_summary(self, schema_slug: str) -> Dict[str, Any]:
        """Obtém resumo do status de processamento"""
        files = self.client._make_request(
            'GET',
            f'/api/v1/schemas/{schema_slug}/files'
        )
        
        status_counts = {}
        for file_info in files['data']:
            status = file_info['processing_status']
            status_counts[status] = status_counts.get(status, 0) + 1
        
        return {
            'total_files': files['meta']['total'],
            'status_breakdown': status_counts,
            'completion_rate': (
                status_counts.get('completed', 0) / files['meta']['total'] * 100
                if files['meta']['total'] > 0 else 0
            )
        }

Batch uploader

import os
from pathlib import Path
from concurrent.futures import ThreadPoolExecutor, as_completed

class BatchUploader:
    def __init__(self, client: DataSnapClient, max_workers: int = 5):
        self.client = client
        self.max_workers = max_workers
    
    def upload_directory(self, directory_path: str, schema_slug: str, 
                        pattern: str = "*.jsonl") -> Dict[str, Any]:
        """Upload todos os arquivos JSONL de um diretório"""
        directory = Path(directory_path)
        files = list(directory.glob(pattern))
        
        results = {
            'successful': [],
            'failed': [],
            'total': len(files)
        }
        
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            future_to_file = {
                executor.submit(self._upload_single_file, schema_slug, str(file)): file
                for file in files
            }
            
            for future in as_completed(future_to_file):
                file_path = future_to_file[future]
                try:
                    result = future.result()
                    results['successful'].append({
                        'file': str(file_path),
                        'result': result
                    })
                    print(f"✅ {file_path.name} enviado com sucesso")
                except Exception as e:
                    results['failed'].append({
                        'file': str(file_path),
                        'error': str(e)
                    })
                    print(f"❌ Erro ao enviar {file_path.name}: {e}")
        
        return results
    
    def _upload_single_file(self, schema_slug: str, file_path: str):
        """Upload de um único arquivo com validação"""
        # Validar antes do upload
        validation = validate_jsonl_file(file_path, max_errors=1)
        if not validation['valid']:
            raise ValueError(f"Arquivo inválido: {validation['errors'][0]['error']}")
        
        return self.client.upload_file(schema_slug, file_path)

Templates de configuração

Configuração ambiente

import os
from dataclasses import dataclass
from typing import Optional

@dataclass
class EnvironmentConfig:
    """Configuração por ambiente"""
    
    # Obrigatórios
    token: str
    
    # Opcionais com defaults
    environment: str = "production"
    timeout: int = 30
    retry_attempts: int = 3
    max_file_size_mb: int = 100
    
    @classmethod
    def from_env(cls) -> 'EnvironmentConfig':
        """Cria configuração a partir de variáveis de ambiente"""
        token = os.getenv('DATASNAP_TOKEN')
        if not token:
            raise ValueError("DATASNAP_TOKEN é obrigatório")
        
        return cls(
            token=token,
            environment=os.getenv('DATASNAP_ENV', 'production'),
            timeout=int(os.getenv('DATASNAP_TIMEOUT', '30')),
            retry_attempts=int(os.getenv('DATASNAP_RETRIES', '3')),
            max_file_size_mb=int(os.getenv('DATASNAP_MAX_SIZE', '100'))
        )
    
    @property
    def base_url(self) -> str:
        """URL base por ambiente"""
        urls = {
            'production': 'https://api.datasnap.com.br'
        }
        return urls.get(self.environment, urls['production'])

# Uso
config = EnvironmentConfig.from_env()
client = DataSnapClient(DataSnapConfig(
    token=config.token,
    base_url=config.base_url,
    timeout=config.timeout,
    retry_attempts=config.retry_attempts
))

Docker Compose template

version: '3.8'

services:
  datasnap-processor:
    build: .
    environment:
      - DATASNAP_TOKEN=${DATASNAP_TOKEN}
      - DATASNAP_ENV=${DATASNAP_ENV:-production}
      - DATASNAP_TIMEOUT=30
      - DATASNAP_RETRIES=3
    volumes:
      - ./data:/app/data
      - ./logs:/app/logs
    restart: unless-stopped
    
  datasnap-scheduler:
    build: .
    command: python scheduler.py
    environment:
      - DATASNAP_TOKEN=${DATASNAP_TOKEN}
      - SCHEDULE_INTERVAL=${SCHEDULE_INTERVAL:-3600}
    depends_on:
      - datasnap-processor
    restart: unless-stopped

Logging e monitoramento

Logger estruturado

import logging
import json
from datetime import datetime

class DataSnapLogger:
    def __init__(self, name: str = "datasnap", level: str = "INFO"):
        self.logger = logging.getLogger(name)
        self.logger.setLevel(getattr(logging, level))
        
        # Handler para saída estruturada
        handler = logging.StreamHandler()
        handler.setFormatter(logging.Formatter('%(message)s'))
        self.logger.addHandler(handler)
    
    def log_request(self, method: str, url: str, status_code: int, 
                   response_time: float, **kwargs):
        """Log estruturado para requisições"""
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'event': 'api_request',
            'method': method,
            'url': url,
            'status_code': status_code,
            'response_time_ms': round(response_time * 1000, 2),
            **kwargs
        }
        self.logger.info(json.dumps(log_data))
    
    def log_upload(self, schema_slug: str, file_path: str, 
                  file_size: int, success: bool, **kwargs):
        """Log estruturado para uploads"""
        log_data = {
            'timestamp': datetime.utcnow().isoformat(),
            'event': 'file_upload',
            'schema': schema_slug,
            'file_path': file_path,
            'file_size_mb': round(file_size / (1024*1024), 2),
            'success': success,
            **kwargs
        }
        self.logger.info(json.dumps(log_data))

# Uso
logger = DataSnapLogger()
logger.log_upload("vendas", "data.jsonl", 1048576, True, file_id=123)

Próximos passos