Exemplos práticos de integração com a API DataSnap em diferentes linguagens de programação
import requests
import json
import time
from typing import Dict, List, Optional
class DataSnapClient:
def __init__(self, token: str, base_url: str = "https://api.datasnap.com.br"):
self.token = token
self.base_url = base_url
self.headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
def upload_file(self, schema_slug: str, file_path: str) -> Dict:
"""Upload de arquivo JSONL para um schema"""
url = f"{self.base_url}/api/v1/schemas/{schema_slug}/files"
with open(file_path, 'rb') as f:
files = {'files': f}
headers = {"Authorization": f"Bearer {self.token}"} # Sem Content-Type para multipart
response = requests.post(url, headers=headers, files=files)
response.raise_for_status()
return response.json()
def list_files(self, schema_slug: str, **filters) -> Dict:
"""Lista arquivos de um schema com filtros opcionais"""
url = f"{self.base_url}/api/v1/schemas/{schema_slug}/files"
response = requests.get(url, headers=self.headers, params=filters)
response.raise_for_status()
return response.json()
def process_files(self, schema_slug: str, shape_id: Optional[int] = None, executors: Optional[int] = None) -> Dict:
"""Inicia processamento de arquivos pendentes"""
url = f"{self.base_url}/api/v1/schemas/{schema_slug}/process-files"
data = {}
if shape_id:
data['shape_id'] = shape_id
if executors:
data['executors'] = executors
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()
def query(self, schema_slug: str, select: List[str], **kwargs) -> Dict:
"""Execute consulta nos dados processados"""
url = f"{self.base_url}/api/v1/schemas/{schema_slug}/query"
data = {"select": select, **kwargs}
response = requests.post(url, headers=self.headers, json=data)
response.raise_for_status()
return response.json()
def wait_for_processing(self, schema_slug: str, timeout: int = 300) -> bool:
"""Aguarda conclusão do processamento"""
start_time = time.time()
while time.time() - start_time < timeout:
files = self.list_files(schema_slug, processing_status="pending")
if files["meta"]["total"] == 0:
return True
print(f"Aguardando... {files['meta']['total']} arquivos pendentes")
time.sleep(30)
return False
# Exemplo de uso
client = DataSnapClient("seu_token_aqui")
# Upload
result = client.upload_file("vendas", "dados_vendas.jsonl")
print("Upload realizado:", result)
# Processamento
process_result = client.process_files("vendas")
print("Processamento iniciado:", process_result)
# Aguardar processamento
if client.wait_for_processing("vendas"):
# Consulta
query_result = client.query(
"vendas",
select=["categoria", "sum(valor) as total"],
group_by=["categoria"],
order_by=[{"field": "total", "direction": "desc"}]
)
print("Resultado da consulta:", query_result)
import os
import hashlib
from pathlib import Path
def validate_jsonl_file(file_path: str) -> bool:
"""Valida arquivo JSONL antes do upload"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line = line.strip()
if line: # Pular linhas vazias
json.loads(line)
return True
except (json.JSONDecodeError, UnicodeDecodeError) as e:
print(f"Erro na validação: {e}")
return False
def upload_with_retry(client: DataSnapClient, schema_slug: str, file_path: str, max_retries: int = 3) -> Dict:
"""Upload com retry automático"""
if not validate_jsonl_file(file_path):
raise ValueError("Arquivo JSONL inválido")
for attempt in range(max_retries):
try:
result = client.upload_file(schema_slug, file_path)
print(f"Upload bem-sucedido na tentativa {attempt + 1}")
return result
except requests.exceptions.RequestException as e:
print(f"Tentativa {attempt + 1} falhou: {e}")
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # Backoff exponencial
# Exemplo de uso
try:
result = upload_with_retry(client, "vendas", "dados.jsonl")
print("Sucesso:", result)
except Exception as e:
print("Erro no upload:", e)
const axios = require('axios');
const FormData = require('form-data');
const fs = require('fs');
class DataSnapClient {
constructor(token, baseUrl = 'https://api.datasnap.com.br') {
this.token = token;
this.baseUrl = baseUrl;
this.headers = {
'Authorization': `Bearer ${token}`,
'Content-Type': 'application/json'
};
}
async uploadFile(schemaSlug, filePath) {
const url = `${this.baseUrl}/api/v1/schemas/${schemaSlug}/files`;
const formData = new FormData();
formData.append('files', fs.createReadStream(filePath));
try {
const response = await axios.post(url, formData, {
headers: {
'Authorization': `Bearer ${this.token}`,
...formData.getHeaders()
}
});
return response.data;
} catch (error) {
throw new Error(`Upload failed: ${error.response?.data?.error || error.message}`);
}
}
async listFiles(schemaSlug, filters = {}) {
const url = `${this.baseUrl}/api/v1/schemas/${schemaSlug}/files`;
try {
const response = await axios.get(url, {
headers: this.headers,
params: filters
});
return response.data;
} catch (error) {
throw new Error(`List files failed: ${error.response?.data?.error || error.message}`);
}
}
async processFiles(schemaSlug, options = {}) {
const url = `${this.baseUrl}/api/v1/schemas/${schemaSlug}/process-files`;
try {
const response = await axios.post(url, options, { headers: this.headers });
return response.data;
} catch (error) {
throw new Error(`Process failed: ${error.response?.data?.error || error.message}`);
}
}
async query(schemaSlug, queryData) {
const url = `${this.baseUrl}/api/v1/schemas/${schemaSlug}/query`;
try {
const response = await axios.post(url, queryData, { headers: this.headers });
return response.data;
} catch (error) {
throw new Error(`Query failed: ${error.response?.data?.error || error.message}`);
}
}
async waitForProcessing(schemaSlug, timeoutMs = 300000) {
const startTime = Date.now();
while (Date.now() - startTime < timeoutMs) {
const files = await this.listFiles(schemaSlug, { processing_status: 'pending' });
if (files.meta.total === 0) {
return true;
}
console.log(`Aguardando... ${files.meta.total} arquivos pendentes`);
await new Promise(resolve => setTimeout(resolve, 30000));
}
return false;
}
}
// Exemplo de uso
async function main() {
const client = new DataSnapClient('seu_token_aqui');
try {
// Upload
const uploadResult = await client.uploadFile('vendas', 'dados_vendas.jsonl');
console.log('Upload realizado:', uploadResult);
// Processamento
const processResult = await client.processFiles('vendas');
console.log('Processamento iniciado:', processResult);
// Aguardar processamento
const processed = await client.waitForProcessing('vendas');
if (processed) {
// Consulta
const queryResult = await client.query('vendas', {
select: ['categoria', 'sum(valor) as total'],
group_by: ['categoria'],
order_by: [{ field: 'total', direction: 'desc' }]
});
console.log('Resultado da consulta:', queryResult);
}
} catch (error) {
console.error('Erro:', error.message);
}
}
main();
<?php
class DataSnapClient {
private $token;
private $baseUrl;
public function __construct($token, $baseUrl = 'https://api.datasnap.com.br') {
$this->token = $token;
$this->baseUrl = $baseUrl;
}
public function uploadFile($schemaSlug, $filePath) {
$url = $this->baseUrl . "/api/v1/schemas/{$schemaSlug}/files";
$curl = curl_init();
curl_setopt_array($curl, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => [
'files' => new CURLFile($filePath)
],
CURLOPT_HTTPHEADER => [
"Authorization: Bearer {$this->token}"
]
]);
$response = curl_exec($curl);
$httpCode = curl_getinfo($curl, CURLINFO_HTTP_CODE);
curl_close($curl);
if ($httpCode !== 200) {
throw new Exception("Upload failed with HTTP {$httpCode}: {$response}");
}
return json_decode($response, true);
}
public function listFiles($schemaSlug, $filters = []) {
$url = $this->baseUrl . "/api/v1/schemas/{$schemaSlug}/files";
if (!empty($filters)) {
$url .= '?' . http_build_query($filters);
}
return $this->makeRequest('GET', $url);
}
public function processFiles($schemaSlug, $options = []) {
$url = $this->baseUrl . "/api/v1/schemas/{$schemaSlug}/process-files";
return $this->makeRequest('POST', $url, $options);
}
public function query($schemaSlug, $queryData) {
$url = $this->baseUrl . "/api/v1/schemas/{$schemaSlug}/query";
return $this->makeRequest('POST', $url, $queryData);
}
private function makeRequest($method, $url, $data = null) {
$curl = curl_init();
curl_setopt_array($curl, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_CUSTOMREQUEST => $method,
CURLOPT_HTTPHEADER => [
"Authorization: Bearer {$this->token}",
"Content-Type: application/json"
]
]);
if ($data !== null) {
curl_setopt($curl, CURLOPT_POSTFIELDS, json_encode($data));
}
$response = curl_exec($curl);
$httpCode = curl_getinfo($curl, CURLINFO_HTTP_CODE);
curl_close($curl);
if ($httpCode >= 400) {
throw new Exception("Request failed with HTTP {$httpCode}: {$response}");
}
return json_decode($response, true);
}
}
// Exemplo de uso
try {
$client = new DataSnapClient('seu_token_aqui');
// Upload
$uploadResult = $client->uploadFile('vendas', 'dados_vendas.jsonl');
echo "Upload realizado: " . json_encode($uploadResult) . "\n";
// Processamento
$processResult = $client->processFiles('vendas');
echo "Processamento iniciado: " . json_encode($processResult) . "\n";
// Consulta (após processamento)
$queryResult = $client->query('vendas', [
'select' => ['categoria', 'sum(valor) as total'],
'group_by' => ['categoria'],
'order_by' => [['field' => 'total', 'direction' => 'desc']]
]);
echo "Resultado da consulta: " . json_encode($queryResult) . "\n";
} catch (Exception $e) {
echo "Erro: " . $e->getMessage() . "\n";
}
?>
package main
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"time"
)
type DataSnapClient struct {
token string
baseURL string
client *http.Client
}
func NewDataSnapClient(token, baseURL string) *DataSnapClient {
if baseURL == "" {
baseURL = "https://api.datasnap.com.br"
}
return &DataSnapClient{
token: token,
baseURL: baseURL,
client: &http.Client{Timeout: 30 * time.Second},
}
}
func (c *DataSnapClient) UploadFile(ctx context.Context, schemaSlug, filePath string) (map[string]interface{}, error) {
file, err := os.Open(filePath)
if err != nil {
return nil, err
}
defer file.Close()
var body bytes.Buffer
writer := multipart.NewWriter(&body)
part, err := writer.CreateFormFile("files", filePath)
if err != nil {
return nil, err
}
_, err = io.Copy(part, file)
if err != nil {
return nil, err
}
err = writer.Close()
if err != nil {
return nil, err
}
url := fmt.Sprintf("%s/api/v1/schemas/%s/files", c.baseURL, schemaSlug)
req, err := http.NewRequestWithContext(ctx, "POST", url, &body)
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.token)
req.Header.Set("Content-Type", writer.FormDataContentType())
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&result)
return result, err
}
func (c *DataSnapClient) Query(ctx context.Context, schemaSlug string, queryData map[string]interface{}) (map[string]interface{}, error) {
jsonData, err := json.Marshal(queryData)
if err != nil {
return nil, err
}
url := fmt.Sprintf("%s/api/v1/schemas/%s/query", c.baseURL, schemaSlug)
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return nil, err
}
req.Header.Set("Authorization", "Bearer "+c.token)
req.Header.Set("Content-Type", "application/json")
resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
var result map[string]interface{}
err = json.NewDecoder(resp.Body).Decode(&result)
return result, err
}
func main() {
client := NewDataSnapClient("seu_token_aqui", "")
ctx := context.Background()
// Upload
uploadResult, err := client.UploadFile(ctx, "vendas", "dados_vendas.jsonl")
if err != nil {
fmt.Printf("Erro no upload: %v\n", err)
return
}
fmt.Printf("Upload realizado: %+v\n", uploadResult)
// Consulta
queryData := map[string]interface{}{
"select": []string{"categoria", "sum(valor) as total"},
"group_by": []string{"categoria"},
"order_by": []map[string]string{{"field": "total", "direction": "desc"}},
}
queryResult, err := client.Query(ctx, "vendas", queryData)
if err != nil {
fmt.Printf("Erro na consulta: %v\n", err)
return
}
fmt.Printf("Resultado da consulta: %+v\n", queryResult)
}
def safe_upload(client, schema_slug, file_path):
try:
result = client.upload_file(schema_slug, file_path)
print(f"✅ Upload realizado: {result['uploaded'][0]['id']}")
return result
except requests.exceptions.HTTPError as e:
if e.response.status_code == 422:
error_data = e.response.json()
print(f"❌ Erro de validação: {error_data}")
elif e.response.status_code == 429:
print("⏸️ Limite de taxa atingido. Aguarde.")
time.sleep(60)
else:
print(f"❌ Erro HTTP {e.response.status_code}: {e.response.text}")
except Exception as e:
print(f"❌ Erro inesperado: {e}")
return None
import os
class Config:
DATASNAP_TOKEN = os.getenv("DATASNAP_TOKEN")
DATASNAP_ENV = os.getenv("DATASNAP_ENV", "production")
@property
def base_url(self):
urls = {
"production": "https://api.datasnap.com.br"
}
return urls.get(self.DATASNAP_ENV, urls["production"])
# Uso
config = Config()
client = DataSnapClient(config.DATASNAP_TOKEN, config.base_url)