Ambiente de desenvolvimento otimizado

Configure seu ambiente de desenvolvimento para trabalhar de forma mais eficiente com a API DataSnap, usando as melhores ferramentas e práticas da indústria.
Um ambiente bem configurado pode aumentar significativamente sua produtividade ao integrar com DataSnap.

IDEs recomendadas

Visual Studio Code

Configure extensões e snippets para acelerar o desenvolvimento:

REST Client

Teste endpoints diretamente no editor

JSON Tools

Formate e valide dados JSONL

Python/Node.js

Extensões específicas da linguagem

Git Integration

Controle de versão integrado

Configuração VS Code

// settings.json
{
    "python.defaultInterpreterPath": "./venv/bin/python",
    "python.formatting.provider": "black",
    "python.linting.enabled": true,
    "python.linting.pylintEnabled": true,
    "json.schemaDownload.enable": true,
    "files.associations": {
        "*.jsonl": "jsonl"
    },
    "rest-client.environmentVariables": {
        "local": {
            "baseUrl": "https://api.datasnap.com.br",
            "token": "${DATASNAP_TOKEN}"
        }
    }
}

Snippets personalizados

// datasnap.json (user snippets)
{
    "DataSnap Client": {
        "prefix": "ds-client",
        "body": [
            "import requests",
            "from typing import Dict, Any",
            "",
            "class DataSnapClient:",
            "    def __init__(self, token: str, base_url: str = \"https://api.datasnap.com.br\"):",
            "        self.token = token",
            "        self.base_url = base_url",
            "        self.headers = {\"Authorization\": f\"Bearer {token}\"}",
            "",
            "    def query(self, schema_slug: str, query_data: Dict[str, Any]) -> Dict[str, Any]:",
            "        response = requests.post(",
            "            f\"{self.base_url}/api/v1/schemas/{schema_slug}/query\",",
            "            headers=self.headers,",
            "            json=query_data",
            "        )",
            "        response.raise_for_status()",
            "        return response.json()",
            "",
            "# Uso",
            "client = DataSnapClient(\"${1:your_token}\")",
            "$0"
        ],
        "description": "DataSnap client template"
    },
    "DataSnap Query": {
        "prefix": "ds-query",
        "body": [
            "query_data = {",
            "    \"select\": [\"${1:field1}\", \"${2:field2}\"],",
            "    \"where\": [",
            "        {\"field\": \"${3:field}\", \"op\": \"${4:=}\", \"value\": \"${5:value}\"}",
            "    ],",
            "    \"order_by\": [{\"field\": \"${6:field}\", \"direction\": \"${7:asc}\"}],",
            "    \"limit\": ${8:100}",
            "}",
            "",
            "result = client.query(\"${9:schema_slug}\", query_data)",
            "$0"
        ],
        "description": "DataSnap query template"
    }
}

PyCharm / IntelliJ

Configure templates de código e ferramentas de debug:
# Live Templates para PyCharm
# Configuração: File → Settings → Editor → Live Templates

# Template: dsquery
query_data = {
    "select": ["$FIELD1$", "$FIELD2$"],
    "where": [
        {"field": "$WHERE_FIELD$", "op": "$OPERATOR$", "value": "$VALUE$"}
    ],
    "order_by": [{"field": "$ORDER_FIELD$", "direction": "$DIRECTION$"}],
    "limit": $LIMIT$
}

result = client.query("$SCHEMA$", query_data)
$END$

Ferramentas de linha de comando

DataSnap CLI (conceitual)

# Instalar ferramenta CLI personalizada
pip install datasnap-cli

# Configurar credenciais
datasnap config set-token YOUR_TOKEN
datasnap config set-url https://api.datasnap.com.br

# Comandos úteis
datasnap schemas list
datasnap files upload schema-name file.jsonl
datasnap files list schema-name --status pending
datasnap process start schema-name
datasnap query schema-name "SELECT * FROM data LIMIT 10"

Scripts utilitários

Validador de arquivos

#!/bin/bash
# validate-jsonl.sh

if [ $# -eq 0 ]; then
    echo "Uso: $0 <arquivo.jsonl>"
    exit 1
fi

FILE=$1
ERRORS=0
LINE_NUM=0

echo "Validando arquivo: $FILE"
echo "----------------------------------------"

while IFS= read -r line || [ -n "$line" ]; do
    LINE_NUM=$((LINE_NUM + 1))
    
    if [ -z "$line" ]; then
        continue  # Pular linhas vazias
    fi
    
    if ! echo "$line" | jq empty 2>/dev/null; then
        echo "❌ Linha $LINE_NUM: JSON inválido"
        echo "   $line"
        ERRORS=$((ERRORS + 1))
    fi
done < "$FILE"

echo "----------------------------------------"
if [ $ERRORS -eq 0 ]; then
    echo "✅ Arquivo válido! ($LINE_NUM linhas processadas)"
else
    echo "❌ $ERRORS erro(s) encontrado(s) em $LINE_NUM linhas"
    exit 1
fi

Upload em lote

#!/usr/bin/env python3
# bulk-upload.py

import os
import sys
import argparse
from pathlib import Path
import requests
from concurrent.futures import ThreadPoolExecutor

class BulkUploader:
    def __init__(self, token, base_url="https://api.datasnap.com.br"):
        self.token = token
        self.base_url = base_url
        self.headers = {"Authorization": f"Bearer {token}"}
    
    def upload_file(self, schema_slug, file_path):
        """Upload um único arquivo"""
        with open(file_path, 'rb') as f:
            files = {'files': f}
            headers = {"Authorization": f"Bearer {self.token}"}
            
            response = requests.post(
                f"{self.base_url}/api/v1/schemas/{schema_slug}/files",
                files=files,
                headers=headers
            )
            
            return {
                'file': str(file_path),
                'status': 'success' if response.status_code == 200 else 'error',
                'response': response.json() if response.status_code == 200 else response.text
            }
    
    def upload_directory(self, schema_slug, directory, pattern="*.jsonl", max_workers=3):
        """Upload todos os arquivos de um diretório"""
        directory = Path(directory)
        files = list(directory.glob(pattern))
        
        if not files:
            print(f"Nenhum arquivo encontrado em {directory} com padrão {pattern}")
            return []
        
        print(f"Encontrados {len(files)} arquivo(s) para upload")
        results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {
                executor.submit(self.upload_file, schema_slug, file): file
                for file in files
            }
            
            for future in futures:
                file = futures[future]
                try:
                    result = future.result()
                    results.append(result)
                    
                    if result['status'] == 'success':
                        print(f"✅ {file.name}")
                    else:
                        print(f"❌ {file.name}: {result['response']}")
                        
                except Exception as e:
                    print(f"❌ {file.name}: {str(e)}")
                    results.append({
                        'file': str(file),
                        'status': 'error',
                        'response': str(e)
                    })
        
        return results

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Upload em lote para DataSnap")
    parser.add_argument("schema", help="Schema slug")
    parser.add_argument("directory", help="Diretório com arquivos")
    parser.add_argument("--pattern", default="*.jsonl", help="Padrão de arquivos")
    parser.add_argument("--workers", type=int, default=3, help="Número de workers")
    
    args = parser.parse_args()
    
    token = os.getenv('DATASNAP_TOKEN')
    if not token:
        print("❌ Erro: DATASNAP_TOKEN não definido")
        sys.exit(1)
    
    uploader = BulkUploader(token)
    results = uploader.upload_directory(
        args.schema,
        args.directory,
        args.pattern,
        args.workers
    )
    
    successful = len([r for r in results if r['status'] == 'success'])
    failed = len(results) - successful
    
    print(f"\n📊 Resumo: {successful} sucesso, {failed} falhas")

Debugging e monitoramento

Configuração de logs

import logging
import sys
from datetime import datetime

# Configurar logging estruturado
def setup_logging(level=logging.INFO):
    """Configura logging para desenvolvimento DataSnap"""
    
    # Formatter personalizado
    class DataSnapFormatter(logging.Formatter):
        def format(self, record):
            # Adicionar timestamp e contexto
            if hasattr(record, 'schema'):
                schema_info = f"[{record.schema}] "
            else:
                schema_info = ""
            
            return f"{datetime.now().isoformat()} {schema_info}{record.getMessage()}"
    
    # Configurar handler
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(DataSnapFormatter())
    
    # Configurar logger
    logger = logging.getLogger('datasnap')
    logger.setLevel(level)
    logger.addHandler(handler)
    
    return logger

# Uso
logger = setup_logging()

def debug_query(client, schema_slug, query_data):
    """Função helper para debug de queries"""
    logger.info(f"Executando query no schema {schema_slug}")
    logger.debug(f"Query: {query_data}")
    
    try:
        result = client.query(schema_slug, query_data)
        logger.info(f"Query executada com sucesso. {len(result.get('data', []))} registros retornados")
        return result
    except Exception as e:
        logger.error(f"Erro na query: {str(e)}", extra={'schema': schema_slug})
        raise

Profiling de performance

import time
import functools
from typing import Any, Callable

def profile_datasnap_calls(func: Callable) -> Callable:
    """Decorator para medir performance de chamadas DataSnap"""
    
    @functools.wraps(func)
    def wrapper(*args, **kwargs) -> Any:
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            duration = time.time() - start_time
            
            # Log performance
            print(f"⏱️  {func.__name__}: {duration:.3f}s")
            
            # Detectar chamadas lentas
            if duration > 5.0:
                print(f"⚠️  Chamada lenta detectada: {func.__name__} ({duration:.3f}s)")
            
            return result
            
        except Exception as e:
            duration = time.time() - start_time
            print(f"❌ {func.__name__}: falhou após {duration:.3f}s - {str(e)}")
            raise
    
    return wrapper

# Uso
class DataSnapClient:
    @profile_datasnap_calls
    def query(self, schema_slug: str, query_data: dict):
        # Implementação da query
        pass
    
    @profile_datasnap_calls
    def upload_file(self, schema_slug: str, file_path: str):
        # Implementação do upload
        pass

Testes automatizados

Setup de testes

import pytest
import requests_mock
from unittest.mock import Mock, patch
from datasnap_client import DataSnapClient

class TestDataSnapClient:
    @pytest.fixture
    def client(self):
        return DataSnapClient("test-token", "https://test.datasnap.com")
    
    @pytest.fixture
    def mock_response(self):
        return {
            "data": [
                {"id": 1, "name": "Test", "value": 100},
                {"id": 2, "name": "Test2", "value": 200}
            ],
            "meta": {"total": 2},
            "success": True
        }
    
    def test_query_success(self, client, mock_response):
        """Testa consulta bem-sucedida"""
        with requests_mock.Mocker() as m:
            m.post(
                "https://test.datasnap.com/api/v1/schemas/test/query",
                json=mock_response
            )
            
            query_data = {"select": ["name", "value"], "limit": 10}
            result = client.query("test", query_data)
            
            assert result["success"] is True
            assert len(result["data"]) == 2
            assert result["data"][0]["name"] == "Test"
    
    def test_query_authentication_error(self, client):
        """Testa erro de autenticação"""
        with requests_mock.Mocker() as m:
            m.post(
                "https://test.datasnap.com/api/v1/schemas/test/query",
                status_code=401,
                json={"error": "Unauthenticated."}
            )
            
            query_data = {"select": ["name"], "limit": 10}
            
            with pytest.raises(requests.exceptions.HTTPError):
                client.query("test", query_data)
    
    @patch('builtins.open', new_callable=lambda: Mock())
    def test_upload_file(self, mock_open, client):
        """Testa upload de arquivo"""
        mock_file = Mock()
        mock_open.return_value.__enter__.return_value = mock_file
        
        with requests_mock.Mocker() as m:
            m.post(
                "https://test.datasnap.com/api/v1/schemas/test/files",
                json={"uploaded": [{"id": 123, "file_name": "test.jsonl"}]}
            )
            
            result = client.upload_file("test", "test.jsonl")
            
            assert "uploaded" in result
            assert result["uploaded"][0]["id"] == 123

# Executar testes
# pytest test_datasnap.py -v

Integração contínua

# .github/workflows/datasnap-integration.yml
name: DataSnap Integration Tests

on:
  push:
    branches: [main, develop]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    
    steps:
    - uses: actions/checkout@v2
    
    - name: Set up Python
      uses: actions/setup-python@v2
      with:
        python-version: '3.9'
    
    - name: Install dependencies
      run: |
        pip install -r requirements.txt
        pip install pytest requests-mock
    
    - name: Validate JSONL files
      run: |
        find . -name "*.jsonl" -exec jq empty {} \;
    
    - name: Run unit tests
      run: |
        pytest tests/ -v --cov=datasnap_client
    
    - name: Integration tests (staging)
      env:
        DATASNAP_TOKEN: ${{ secrets.DATASNAP_STAGING_TOKEN }}
        DATASNAP_URL: https://staging.datasnap.com.br
      run: |
        python integration_tests.py

Produtividade e automação

Makefile para automação

# Makefile para projetos DataSnap

.PHONY: help install test lint upload process query clean

help:
	@echo "Comandos disponíveis:"
	@echo "  install    - Instalar dependências"
	@echo "  test       - Executar testes"
	@echo "  lint       - Verificar código"
	@echo "  upload     - Upload de arquivos para staging"
	@echo "  process    - Iniciar processamento"
	@echo "  query      - Executar query de exemplo"
	@echo "  clean      - Limpar arquivos temporários"

install:
	pip install -r requirements.txt
	pip install -e .

test:
	pytest tests/ -v --cov=src/

lint:
	black src/
	flake8 src/
	mypy src/

validate-data:
	@echo "Validando arquivos JSONL..."
	@find data/ -name "*.jsonl" -exec jq empty {} \;
	@echo "✅ Todos os arquivos são válidos"

upload: validate-data
	python scripts/bulk_upload.py $(SCHEMA) data/

process:
	python scripts/process_files.py $(SCHEMA)

query:
	python scripts/example_query.py $(SCHEMA)

clean:
	find . -type f -name "*.pyc" -delete
	find . -type d -name "__pycache__" -delete
	rm -rf .pytest_cache/
	rm -rf .coverage

# Usar: make upload SCHEMA=my-schema

Docker para desenvolvimento

# Dockerfile.dev
FROM python:3.9-slim

WORKDIR /app

# Instalar dependências do sistema
RUN apt-get update && apt-get install -y \
    jq \
    curl \
    && rm -rf /var/lib/apt/lists/*

# Instalar dependências Python
COPY requirements.txt .
RUN pip install -r requirements.txt

# Instalar ferramentas de desenvolvimento
RUN pip install pytest black flake8 mypy ipython

# Configurar volumes
VOLUME ["/app/data", "/app/src"]

# Script de entrada
COPY docker-entrypoint.sh /
RUN chmod +x /docker-entrypoint.sh

ENTRYPOINT ["/docker-entrypoint.sh"]
CMD ["python"]
# docker-entrypoint.sh
#!/bin/bash

# Validar variáveis de ambiente
if [ -z "$DATASNAP_TOKEN" ]; then
    echo "❌ DATASNAP_TOKEN não definido"
    exit 1
fi

echo "✅ Ambiente DataSnap configurado"
echo "🔗 URL: ${DATASNAP_URL:-https://api.datasnap.com.br}"

# Executar comando
exec "$@"
# docker-compose.yml
version: '3.8'

services:
  datasnap-dev:
    build:
      context: .
      dockerfile: Dockerfile.dev
    environment:
      - DATASNAP_TOKEN=${DATASNAP_TOKEN}
      - DATASNAP_URL=${DATASNAP_URL:-https://api.datasnap.com.br}
    volumes:
      - ./src:/app/src
      - ./data:/app/data
      - ./tests:/app/tests
    working_dir: /app
    command: tail -f /dev/null  # Manter container ativo

Próximos passos