Começando rapidamente

Explore exemplos de código para integrar sua aplicação com o DataSnap de forma eficiente e robusta.
Todos os exemplos incluem tratamento de erro e boas práticas de integração. Adapte conforme suas necessidades específicas.

Python

Cliente básico

import requests
import json
import time
from typing import Dict, List, Optional

class DataSnapClient:
    def __init__(self, token: str, base_url: str = "https://api.datasnap.com.br"):
        self.token = token
        self.base_url = base_url
        self.headers = {
            "Authorization": f"Bearer {token}",
            "Content-Type": "application/json"
        }
    
    def upload_file(self, schema_slug: str, file_path: str) -> Dict:
        """Upload de arquivo JSONL para um schema"""
        url = f"{self.base_url}/api/v1/schemas/{schema_slug}/files"
        
        with open(file_path, 'rb') as f:
            files = {'files': f}
            headers = {"Authorization": f"Bearer {self.token}"}  # Sem Content-Type para multipart
            
            response = requests.post(url, headers=headers, files=files)
            response.raise_for_status()
            return response.json()
    
    def list_files(self, schema_slug: str, **filters) -> Dict:
        """Lista arquivos de um schema com filtros opcionais"""
        url = f"{self.base_url}/api/v1/schemas/{schema_slug}/files"
        response = requests.get(url, headers=self.headers, params=filters)
        response.raise_for_status()
        return response.json()
    
    def process_files(self, schema_slug: str, shape_id: Optional[int] = None, executors: Optional[int] = None) -> Dict:
        """Inicia processamento de arquivos pendentes"""
        url = f"{self.base_url}/api/v1/schemas/{schema_slug}/process-files"
        data = {}
        if shape_id:
            data['shape_id'] = shape_id
        if executors:
            data['executors'] = executors
            
        response = requests.post(url, headers=self.headers, json=data)
        response.raise_for_status()
        return response.json()
    
    def query(self, schema_slug: str, select: List[str], **kwargs) -> Dict:
        """Execute consulta nos dados processados"""
        url = f"{self.base_url}/api/v1/schemas/{schema_slug}/query"
        data = {"select": select, **kwargs}
        
        response = requests.post(url, headers=self.headers, json=data)
        response.raise_for_status()
        return response.json()
    
    def wait_for_processing(self, schema_slug: str, timeout: int = 300) -> bool:
        """Aguarda conclusão do processamento"""
        start_time = time.time()
        
        while time.time() - start_time < timeout:
            files = self.list_files(schema_slug, processing_status="pending")
            if files["meta"]["total"] == 0:
                return True
            
            print(f"Aguardando... {files['meta']['total']} arquivos pendentes")
            time.sleep(30)
        
        return False

# Exemplo de uso
client = DataSnapClient("seu_token_aqui")

# Upload
result = client.upload_file("vendas", "dados_vendas.jsonl")
print("Upload realizado:", result)

# Processamento
process_result = client.process_files("vendas")
print("Processamento iniciado:", process_result)

# Aguardar processamento
if client.wait_for_processing("vendas"):
    # Consulta
    query_result = client.query(
        "vendas",
        select=["categoria", "sum(valor) as total"],
        group_by=["categoria"],
        order_by=[{"field": "total", "direction": "desc"}]
    )
    print("Resultado da consulta:", query_result)

Upload com retry e validação

import os
import hashlib
from pathlib import Path

def validate_jsonl_file(file_path: str) -> bool:
    """Valida arquivo JSONL antes do upload"""
    try:
        with open(file_path, 'r', encoding='utf-8') as f:
            for line_num, line in enumerate(f, 1):
                line = line.strip()
                if line:  # Pular linhas vazias
                    json.loads(line)
        return True
    except (json.JSONDecodeError, UnicodeDecodeError) as e:
        print(f"Erro na validação: {e}")
        return False

def upload_with_retry(client: DataSnapClient, schema_slug: str, file_path: str, max_retries: int = 3) -> Dict:
    """Upload com retry automático"""
    if not validate_jsonl_file(file_path):
        raise ValueError("Arquivo JSONL inválido")
    
    for attempt in range(max_retries):
        try:
            result = client.upload_file(schema_slug, file_path)
            print(f"Upload bem-sucedido na tentativa {attempt + 1}")
            return result
        except requests.exceptions.RequestException as e:
            print(f"Tentativa {attempt + 1} falhou: {e}")
            if attempt == max_retries - 1:
                raise
            time.sleep(2 ** attempt)  # Backoff exponencial

# Exemplo de uso
try:
    result = upload_with_retry(client, "vendas", "dados.jsonl")
    print("Sucesso:", result)
except Exception as e:
    print("Erro no upload:", e)

JavaScript/Node.js

Cliente com async/await

const axios = require('axios');
const FormData = require('form-data');
const fs = require('fs');

class DataSnapClient {
    constructor(token, baseUrl = 'https://api.datasnap.com.br') {
        this.token = token;
        this.baseUrl = baseUrl;
        this.headers = {
            'Authorization': `Bearer ${token}`,
            'Content-Type': 'application/json'
        };
    }
    
    async uploadFile(schemaSlug, filePath) {
        const url = `${this.baseUrl}/api/v1/schemas/${schemaSlug}/files`;
        const formData = new FormData();
        formData.append('files', fs.createReadStream(filePath));
        
        try {
            const response = await axios.post(url, formData, {
                headers: {
                    'Authorization': `Bearer ${this.token}`,
                    ...formData.getHeaders()
                }
            });
            return response.data;
        } catch (error) {
            throw new Error(`Upload failed: ${error.response?.data?.error || error.message}`);
        }
    }
    
    async listFiles(schemaSlug, filters = {}) {
        const url = `${this.baseUrl}/api/v1/schemas/${schemaSlug}/files`;
        
        try {
            const response = await axios.get(url, {
                headers: this.headers,
                params: filters
            });
            return response.data;
        } catch (error) {
            throw new Error(`List files failed: ${error.response?.data?.error || error.message}`);
        }
    }
    
    async processFiles(schemaSlug, options = {}) {
        const url = `${this.baseUrl}/api/v1/schemas/${schemaSlug}/process-files`;
        
        try {
            const response = await axios.post(url, options, { headers: this.headers });
            return response.data;
        } catch (error) {
            throw new Error(`Process failed: ${error.response?.data?.error || error.message}`);
        }
    }
    
    async query(schemaSlug, queryData) {
        const url = `${this.baseUrl}/api/v1/schemas/${schemaSlug}/query`;
        
        try {
            const response = await axios.post(url, queryData, { headers: this.headers });
            return response.data;
        } catch (error) {
            throw new Error(`Query failed: ${error.response?.data?.error || error.message}`);
        }
    }
    
    async waitForProcessing(schemaSlug, timeoutMs = 300000) {
        const startTime = Date.now();
        
        while (Date.now() - startTime < timeoutMs) {
            const files = await this.listFiles(schemaSlug, { processing_status: 'pending' });
            
            if (files.meta.total === 0) {
                return true;
            }
            
            console.log(`Aguardando... ${files.meta.total} arquivos pendentes`);
            await new Promise(resolve => setTimeout(resolve, 30000));
        }
        
        return false;
    }
}

// Exemplo de uso
async function main() {
    const client = new DataSnapClient('seu_token_aqui');
    
    try {
        // Upload
        const uploadResult = await client.uploadFile('vendas', 'dados_vendas.jsonl');
        console.log('Upload realizado:', uploadResult);
        
        // Processamento
        const processResult = await client.processFiles('vendas');
        console.log('Processamento iniciado:', processResult);
        
        // Aguardar processamento
        const processed = await client.waitForProcessing('vendas');
        if (processed) {
            // Consulta
            const queryResult = await client.query('vendas', {
                select: ['categoria', 'sum(valor) as total'],
                group_by: ['categoria'],
                order_by: [{ field: 'total', direction: 'desc' }]
            });
            console.log('Resultado da consulta:', queryResult);
        }
    } catch (error) {
        console.error('Erro:', error.message);
    }
}

main();

PHP

Cliente com cURL

<?php

class DataSnapClient {
    private $token;
    private $baseUrl;
    
    public function __construct($token, $baseUrl = 'https://api.datasnap.com.br') {
        $this->token = $token;
        $this->baseUrl = $baseUrl;
    }
    
    public function uploadFile($schemaSlug, $filePath) {
        $url = $this->baseUrl . "/api/v1/schemas/{$schemaSlug}/files";
        
        $curl = curl_init();
        curl_setopt_array($curl, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => [
                'files' => new CURLFile($filePath)
            ],
            CURLOPT_HTTPHEADER => [
                "Authorization: Bearer {$this->token}"
            ]
        ]);
        
        $response = curl_exec($curl);
        $httpCode = curl_getinfo($curl, CURLINFO_HTTP_CODE);
        curl_close($curl);
        
        if ($httpCode !== 200) {
            throw new Exception("Upload failed with HTTP {$httpCode}: {$response}");
        }
        
        return json_decode($response, true);
    }
    
    public function listFiles($schemaSlug, $filters = []) {
        $url = $this->baseUrl . "/api/v1/schemas/{$schemaSlug}/files";
        if (!empty($filters)) {
            $url .= '?' . http_build_query($filters);
        }
        
        return $this->makeRequest('GET', $url);
    }
    
    public function processFiles($schemaSlug, $options = []) {
        $url = $this->baseUrl . "/api/v1/schemas/{$schemaSlug}/process-files";
        return $this->makeRequest('POST', $url, $options);
    }
    
    public function query($schemaSlug, $queryData) {
        $url = $this->baseUrl . "/api/v1/schemas/{$schemaSlug}/query";
        return $this->makeRequest('POST', $url, $queryData);
    }
    
    private function makeRequest($method, $url, $data = null) {
        $curl = curl_init();
        curl_setopt_array($curl, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_CUSTOMREQUEST => $method,
            CURLOPT_HTTPHEADER => [
                "Authorization: Bearer {$this->token}",
                "Content-Type: application/json"
            ]
        ]);
        
        if ($data !== null) {
            curl_setopt($curl, CURLOPT_POSTFIELDS, json_encode($data));
        }
        
        $response = curl_exec($curl);
        $httpCode = curl_getinfo($curl, CURLINFO_HTTP_CODE);
        curl_close($curl);
        
        if ($httpCode >= 400) {
            throw new Exception("Request failed with HTTP {$httpCode}: {$response}");
        }
        
        return json_decode($response, true);
    }
}

// Exemplo de uso
try {
    $client = new DataSnapClient('seu_token_aqui');
    
    // Upload
    $uploadResult = $client->uploadFile('vendas', 'dados_vendas.jsonl');
    echo "Upload realizado: " . json_encode($uploadResult) . "\n";
    
    // Processamento
    $processResult = $client->processFiles('vendas');
    echo "Processamento iniciado: " . json_encode($processResult) . "\n";
    
    // Consulta (após processamento)
    $queryResult = $client->query('vendas', [
        'select' => ['categoria', 'sum(valor) as total'],
        'group_by' => ['categoria'],
        'order_by' => [['field' => 'total', 'direction' => 'desc']]
    ]);
    echo "Resultado da consulta: " . json_encode($queryResult) . "\n";
    
} catch (Exception $e) {
    echo "Erro: " . $e->getMessage() . "\n";
}
?>

Go

Cliente com contexto

package main

import (
    "bytes"
    "context"
    "encoding/json"
    "fmt"
    "io"
    "mime/multipart"
    "net/http"
    "os"
    "time"
)

type DataSnapClient struct {
    token   string
    baseURL string
    client  *http.Client
}

func NewDataSnapClient(token, baseURL string) *DataSnapClient {
    if baseURL == "" {
        baseURL = "https://api.datasnap.com.br"
    }
    
    return &DataSnapClient{
        token:   token,
        baseURL: baseURL,
        client:  &http.Client{Timeout: 30 * time.Second},
    }
}

func (c *DataSnapClient) UploadFile(ctx context.Context, schemaSlug, filePath string) (map[string]interface{}, error) {
    file, err := os.Open(filePath)
    if err != nil {
        return nil, err
    }
    defer file.Close()
    
    var body bytes.Buffer
    writer := multipart.NewWriter(&body)
    
    part, err := writer.CreateFormFile("files", filePath)
    if err != nil {
        return nil, err
    }
    
    _, err = io.Copy(part, file)
    if err != nil {
        return nil, err
    }
    
    err = writer.Close()
    if err != nil {
        return nil, err
    }
    
    url := fmt.Sprintf("%s/api/v1/schemas/%s/files", c.baseURL, schemaSlug)
    req, err := http.NewRequestWithContext(ctx, "POST", url, &body)
    if err != nil {
        return nil, err
    }
    
    req.Header.Set("Authorization", "Bearer "+c.token)
    req.Header.Set("Content-Type", writer.FormDataContentType())
    
    resp, err := c.client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    var result map[string]interface{}
    err = json.NewDecoder(resp.Body).Decode(&result)
    return result, err
}

func (c *DataSnapClient) Query(ctx context.Context, schemaSlug string, queryData map[string]interface{}) (map[string]interface{}, error) {
    jsonData, err := json.Marshal(queryData)
    if err != nil {
        return nil, err
    }
    
    url := fmt.Sprintf("%s/api/v1/schemas/%s/query", c.baseURL, schemaSlug)
    req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
    if err != nil {
        return nil, err
    }
    
    req.Header.Set("Authorization", "Bearer "+c.token)
    req.Header.Set("Content-Type", "application/json")
    
    resp, err := c.client.Do(req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()
    
    var result map[string]interface{}
    err = json.NewDecoder(resp.Body).Decode(&result)
    return result, err
}

func main() {
    client := NewDataSnapClient("seu_token_aqui", "")
    ctx := context.Background()
    
    // Upload
    uploadResult, err := client.UploadFile(ctx, "vendas", "dados_vendas.jsonl")
    if err != nil {
        fmt.Printf("Erro no upload: %v\n", err)
        return
    }
    fmt.Printf("Upload realizado: %+v\n", uploadResult)
    
    // Consulta
    queryData := map[string]interface{}{
        "select": []string{"categoria", "sum(valor) as total"},
        "group_by": []string{"categoria"},
        "order_by": []map[string]string{{"field": "total", "direction": "desc"}},
    }
    
    queryResult, err := client.Query(ctx, "vendas", queryData)
    if err != nil {
        fmt.Printf("Erro na consulta: %v\n", err)
        return
    }
    fmt.Printf("Resultado da consulta: %+v\n", queryResult)
}

Boas práticas

Tratamento de erros

Sempre implemente tratamento robusto de erros:
def safe_upload(client, schema_slug, file_path):
    try:
        result = client.upload_file(schema_slug, file_path)
        print(f"✅ Upload realizado: {result['uploaded'][0]['id']}")
        return result
    except requests.exceptions.HTTPError as e:
        if e.response.status_code == 422:
            error_data = e.response.json()
            print(f"❌ Erro de validação: {error_data}")
        elif e.response.status_code == 429:
            print("⏸️ Limite de taxa atingido. Aguarde.")
            time.sleep(60)
        else:
            print(f"❌ Erro HTTP {e.response.status_code}: {e.response.text}")
    except Exception as e:
        print(f"❌ Erro inesperado: {e}")
        return None

Configuração por ambiente

import os

class Config:
    DATASNAP_TOKEN = os.getenv("DATASNAP_TOKEN")
    DATASNAP_ENV = os.getenv("DATASNAP_ENV", "production")
    
    @property
    def base_url(self):
        urls = {
            "production": "https://api.datasnap.com.br"
        }
        return urls.get(self.DATASNAP_ENV, urls["production"])

# Uso
config = Config()
client = DataSnapClient(config.DATASNAP_TOKEN, config.base_url)

Próximos passos