Visão Geral

Os webhooks da DataSnap permitem que sua aplicação receba notificações automáticas sobre mudanças no status de processamento dos arquivos. Em vez de consultar constantemente a API para verificar o progresso, você recebe atualizações em tempo real diretamente em seus endpoints.
Benefícios dos webhooks: Reduz o número de requisições à API, fornece notificações instantâneas e permite arquiteturas orientadas a eventos.

Como Funciona

  1. Configuração: Você configura URLs de webhook no painel da DataSnap
  2. Processamento: Quando arquivos são processados, a DataSnap monitora mudanças de status
  3. Notificação: A cada mudança de status, enviamos uma requisição POST para seus endpoints
  4. Confirmação: Seu endpoint deve retornar status HTTP 2xx para confirmar o recebimento

Configuração de Webhooks

No Painel de Controle

Para configurar webhooks através da interface web:
1

Acessar a seção de Webhooks

No painel de controle da DataSnap, navegue até Webhooks no menu principal.
2

Criar novo webhook

Clique em ”+ Criar Webhook” para adicionar um novo endpoint.
3

Configurar endpoint

URL do Webhook: Informe a URL completa onde você quer receber as notificaçõesExemplo: https://sua-aplicacao.com/webhooks/datasnap
4

Salvar configuração

Clique em “Salvar” para ativar o webhook.
Teste seu webhook: Você pode enviar um webhook de teste através do painel para verificar se está funcionando corretamente.

Validação de URLs

Certifique-se de que seu endpoint webhook:
  • Está acessível publicamente (não em localhost ou IPs privados)
  • Aceita requisições POST
  • Responde com status HTTP 2xx (200, 201, 204, etc.)
  • Responde em até 10 segundos
  • Usa HTTPS (recomendado para segurança)

Eventos de Webhook

Tipos de Eventos Suportados

Atualmente, o sistema envia webhooks para eventos relacionados ao processamento de arquivos:
EventoTopicDescrição
Aceitoschemas.run.statusProcessamento foi aceito e adicionado à fila
Iniciadoschemas.run.statusProcessamento começou a executar
Em Andamentoschemas.run.statusProcessamento está sendo executado
Concluídoschemas.run.statusProcessamento finalizado com sucesso
Falhouschemas.run.statusProcessamento falhou com erro

Payload do Webhook

Todas as notificações seguem o mesmo formato base:
{
  "topic": "schemas.run.status",
  "metadata": {
    "run_id": 24,
    "tenant": "sua_empresa",
    "status": "completed",
    "started_at": "2025-08-12T23:14:28-03:00"
  }
}

Campos do Payload

topic
string
Tópico do evento - Identifica o tipo de evento. Atualmente sempre schemas.run.status para processamento de arquivos.
metadata
object
Metadados do evento - Informações específicas sobre a execução do processamento.

Exemplos de Payloads por Status

{
  "topic": "schemas.run.status",
  "metadata": {
    "run_id": 24,
    "tenant": "sua_empresa",
    "status": "accepted",
    "started_at": "2025-08-12T23:14:28-03:00"
  }
}

Implementação do Receptor

Estrutura Básica do Endpoint

Seu endpoint webhook deve:
  1. Aceitar requisições POST
  2. Processar o JSON do payload
  3. Retornar status HTTP 2xx
  4. Processar rapidamente (< 10 segundos)
from flask import Flask, request, jsonify
import json
import logging

app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

@app.route('/webhooks/datasnap', methods=['POST'])
def handle_datasnap_webhook():
    """Receptor de webhooks do DataSnap"""
    try:
        # Validar Content-Type
        if request.content_type != 'application/json':
            return jsonify({'error': 'Content-Type deve ser application/json'}), 400
        
        # Obter payload
        payload = request.get_json()
        
        if not payload:
            return jsonify({'error': 'Payload JSON inválido'}), 400
        
        # Log do evento recebido
        logger.info(f"Webhook recebido: {json.dumps(payload)}")
        
        # Processar baseado no tópico
        if payload.get('topic') == 'schemas.run.status':
            processar_status_execucao(payload['metadata'])
        else:
            logger.warning(f"Tópico desconhecido: {payload.get('topic')}")
        
        return jsonify({'status': 'processed'}), 200
        
    except Exception as e:
        logger.error(f"Erro ao processar webhook: {str(e)}")
        return jsonify({'error': 'Erro interno'}), 500

def processar_status_execucao(metadata):
    """Processa eventos de status de execução"""
    run_id = metadata.get('run_id')
    status = metadata.get('status')
    tenant = metadata.get('tenant')
    
    logger.info(f"Processamento {run_id} do tenant {tenant}: {status}")
    
    if status == 'completed':
        # Processamento concluído com sucesso
        files_processed = metadata.get('files_processed', 0)
        total_records = metadata.get('total_records', 0)
        
        logger.info(f"Sucesso: {files_processed} arquivos, {total_records} registros")
        
        # Aqui você pode:
        # - Atualizar banco de dados
        # - Enviar notificação por email
        # - Iniciar próximo processo na pipeline
        # - Atualizar dashboard em tempo real
        
    elif status == 'failed':
        # Processamento falhou
        error_message = metadata.get('error_message', 'Erro desconhecido')
        
        logger.error(f"Falha no processamento: {error_message}")
        
        # Aqui você pode:
        # - Registrar erro no sistema de monitoramento
        # - Enviar alerta para equipe técnica
        # - Agendar retry automático
        # - Notificar usuário sobre o problema
        
    elif status in ['accepted', 'started', 'processing']:
        # Processamento em andamento
        logger.info(f"Status intermediário: {status}")
        
        # Aqui você pode:
        # - Atualizar barra de progresso
        # - Mostrar status em tempo real no frontend
        # - Registrar tempo de início para métricas

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Segurança

Validação de Origem

Para garantir que os webhooks realmente vêm da DataSnap:

Tratamento de Erros e Retry

Sistema de Retry da DataSnap

A DataSnap implementa retry automático para webhooks que falham:
  • Tentativas: Até 5 tentativas por webhook
  • Backoff: Exponencial com jitter (1s, 2s, 4s, 8s, 16s)
  • Timeout: 10 segundos por tentativa
  • Condições de retry: Status HTTP 5xx, timeout, erro de conexão

Status de Falha Permanente

Webhooks são marcados como falha permanente quando:
  • Status HTTP 4xx: Erro do cliente (não será repetido)
  • Após 5 tentativas: Todas as tentativas falharam
  • URL inválida: URL não resolve ou não é acessível

Implementação de Idempotência

Seu endpoint deve ser idempotente para lidar com entregas duplicadas:
# Cache para evitar processamento duplicado
processed_webhooks = set()

@app.route('/webhooks/datasnap', methods=['POST'])
def handle_webhook():
    payload = request.get_json()
    
    # Criar ID único para o webhook
    webhook_id = f"{payload['metadata']['run_id']}_{payload['metadata']['status']}"
    
    if webhook_id in processed_webhooks:
        logger.info(f"Webhook duplicado ignorado: {webhook_id}")
        return jsonify({'status': 'already_processed'}), 200
    
    # Processar webhook
    try:
        processar_status_execucao(payload['metadata'])
        processed_webhooks.add(webhook_id)
        return jsonify({'status': 'processed'}), 200
    except Exception as e:
        # Não adicionar ao cache em caso de erro
        # para permitir retry
        raise

Monitoramento e Debugging

Logs Estruturados

Implemente logging estruturado para facilitar debugging:
import json
import logging
from datetime import datetime

class WebhookLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)
        handler = logging.StreamHandler()
        formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    
    def log_webhook_received(self, payload, request_info):
        """Log quando webhook é recebido"""
        log_data = {
            'event': 'webhook_received',
            'timestamp': datetime.utcnow().isoformat(),
            'run_id': payload.get('metadata', {}).get('run_id'),
            'status': payload.get('metadata', {}).get('status'),
            'tenant': payload.get('metadata', {}).get('tenant'),
            'ip': request_info.get('ip'),
            'user_agent': request_info.get('user_agent')
        }
        self.logger.info(json.dumps(log_data))
    
    def log_processing_result(self, run_id, status, success, error=None):
        """Log resultado do processamento"""
        log_data = {
            'event': 'webhook_processed',
            'timestamp': datetime.utcnow().isoformat(),
            'run_id': run_id,
            'status': status,
            'success': success
        }
        
        if error:
            log_data['error'] = str(error)
        
        level = logging.INFO if success else logging.ERROR
        self.logger.log(level, json.dumps(log_data))

# Uso
webhook_logger = WebhookLogger('datasnap_webhooks')

Métricas de Webhook

Monitore estas métricas importantes:
from collections import defaultdict, Counter
import time

class WebhookMetrics:
    def __init__(self):
        self.counters = defaultdict(int)
        self.response_times = []
        self.errors = Counter()
        self.last_webhook = {}
    
    def record_webhook(self, payload, processing_time, success=True, error=None):
        """Registra métricas de um webhook"""
        status = payload.get('metadata', {}).get('status', 'unknown')
        tenant = payload.get('metadata', {}).get('tenant', 'unknown')
        
        # Contadores
        self.counters[f'webhooks_total'] += 1
        self.counters[f'webhooks_status_{status}'] += 1
        self.counters[f'webhooks_tenant_{tenant}'] += 1
        
        if success:
            self.counters[f'webhooks_success'] += 1
        else:
            self.counters[f'webhooks_failed'] += 1
            if error:
                self.errors[str(error)] += 1
        
        # Tempo de resposta
        self.response_times.append(processing_time)
        
        # Último webhook por tenant
        self.last_webhook[tenant] = time.time()
    
    def get_stats(self):
        """Retorna estatísticas atuais"""
        if not self.response_times:
            return self.counters
        
        return {
            **dict(self.counters),
            'avg_response_time': sum(self.response_times) / len(self.response_times),
            'max_response_time': max(self.response_times),
            'min_response_time': min(self.response_times),
            'total_webhooks_processed': len(self.response_times),
            'common_errors': dict(self.errors.most_common(5))
        }

# Instância global
metrics = WebhookMetrics()

Casos de Uso Avançados

Integração com Sistema de Notificações

def enviar_notificacao_usuario(metadata):
    """Envia notificação para usuário baseado no status"""
    run_id = metadata.get('run_id')
    status = metadata.get('status')
    tenant = metadata.get('tenant')
    
    # Buscar informações do usuário/projeto
    user_info = get_user_info_by_tenant(tenant)
    
    if status == 'completed':
        files_processed = metadata.get('files_processed', 0)
        total_records = metadata.get('total_records', 0)
        
        message = f"""
        ✅ Processamento concluído com sucesso!
        
        Arquivos processados: {files_processed}
        Registros processados: {total_records:,}
        ID da execução: {run_id}
        
        Seus dados estão prontos para consulta.
        """
        
        # Enviar email, SMS, push notification, etc.
        send_email(user_info['email'], "Processamento Concluído", message)
        send_push_notification(user_info['device_token'], message)
        
    elif status == 'failed':
        error_message = metadata.get('error_message', 'Erro desconhecido')
        
        message = f"""
        ❌ Falha no processamento
        
        ID da execução: {run_id}
        Erro: {error_message}
        
        Por favor, verifique seus arquivos e tente novamente.
        """
        
        send_email(user_info['email'], "Erro no Processamento", message)

Integração com Pipeline de Dados

def processar_pipeline_dados(metadata):
    """Integra webhook com pipeline de dados"""
    if metadata.get('status') == 'completed':
        run_id = metadata.get('run_id')
        tenant = metadata.get('tenant')
        
        # Disparar próximo processo na pipeline
        next_process_config = {
            'source_run_id': run_id,
            'tenant': tenant,
            'process_type': 'data_transformation',
            'triggered_by': 'webhook'
        }
        
        # Exemplo: adicionar job em fila
        redis_client.lpush('data_pipeline_queue', json.dumps(next_process_config))
        
        # Ou disparar função lambda/cloud function
        trigger_lambda_function('data-transformer', next_process_config)
        
        logger.info(f"Pipeline iniciada para run_id: {run_id}")

Dashboard em Tempo Real

# Integração com WebSocket para dashboard
import socketio

sio = socketio.Client()

def atualizar_dashboard(metadata):
    """Atualiza dashboard em tempo real via WebSocket"""
    dashboard_data = {
        'type': 'processing_update',
        'run_id': metadata.get('run_id'),
        'status': metadata.get('status'),
        'tenant': metadata.get('tenant'),
        'timestamp': time.time()
    }
    
    if metadata.get('status') == 'completed':
        dashboard_data.update({
            'files_processed': metadata.get('files_processed', 0),
            'total_records': metadata.get('total_records', 0),
            'completed_at': metadata.get('completed_at')
        })
    
    # Emitir para todos os clientes do tenant
    sio.emit('processing_update', dashboard_data, room=f"tenant_{metadata.get('tenant')}")

Troubleshooting

Problemas Comuns

Melhores Práticas

✅ Faça

  • Responda rapidamente: Confirme recebimento em < 2 segundos, processe depois
  • Seja idempotente: Lide com webhooks duplicados graciosamente
  • Use HTTPS: Para segurança das comunicações
  • Implemente logging: Para debugging e auditoria
  • Valide payloads: Sempre valide estrutura dos dados recebidos
  • Use filas: Para processamento assíncrono de tarefas pesadas

❌ Evite

  • Processamento síncrono longo: Não faça operações demoradas no endpoint
  • Ignorar erros HTTP: Sempre retorne status codes apropriados
  • Logs insuficientes: Dificulta debugging de problemas
  • Endpoints públicos sem segurança: Use filtros de IP ou autenticação
  • Falha silenciosa: Sempre log errors e notifique sobre problemas

Suporte e Recursos Adicionais

Precisa de ajuda com webhooks? Nossa equipe está disponível:
URLs de webhook são sensíveis: Certifique-se de que suas URLs estão protegidas e acessíveis apenas quando necessário.