Automação inteligente com DataSnap

Automatize tarefas repetitivas e crie workflows eficientes para maximizar a produtividade com DataSnap, desde uploads até análises complexas.
A automação reduz erros, economiza tempo e garante consistência nos processos de dados.

Tipos de automação

Automação de upload

Configure uploads automáticos baseados em eventos ou agendamentos:

Upload por evento

Monitore diretórios e faça upload automático de novos arquivos

Upload agendado

Execute uploads em intervalos regulares definidos

Upload condicional

Processe apenas arquivos que atendem critérios específicos

Backup automático

Mantenha cópias dos dados em múltiplos ambientes

Pipeline completo

Workflow de dados automatizado

import schedule
import time
import logging
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class WorkflowConfig:
    schema_slug: str
    watch_directory: str
    processed_directory: str
    error_directory: str
    file_pattern: str = "*.jsonl"
    processing_mode: str = "auto"  # auto, scheduled, manual
    schedule_interval: int = 60  # minutos
    webhook_url: str = None

class DataSnapWorkflow:
    def __init__(self, config: WorkflowConfig, datasnap_client):
        self.config = config
        self.client = datasnap_client
        self.logger = self.setup_logging()
        
        # Criar diretórios necessários
        Path(config.processed_directory).mkdir(parents=True, exist_ok=True)
        Path(config.error_directory).mkdir(parents=True, exist_ok=True)
    
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(f'datasnap_workflow_{self.config.schema_slug}.log'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)
    
    def discover_files(self) -> List[Path]:
        """Descobre arquivos para processamento"""
        watch_path = Path(self.config.watch_directory)
        files = list(watch_path.glob(self.config.file_pattern))
        
        # Filtrar apenas arquivos não processados
        new_files = []
        for file in files:
            if file.suffix == '.jsonl' and file.stat().st_size > 0:
                new_files.append(file)
        
        return sorted(new_files, key=lambda x: x.stat().st_mtime)
    
    def validate_file(self, file_path: Path) -> Dict[str, Any]:
        """Valida arquivo JSONL"""
        try:
            import json
            errors = []
            line_count = 0
            
            with open(file_path, 'r', encoding='utf-8') as f:
                for line_num, line in enumerate(f, 1):
                    line_count += 1
                    line = line.strip()
                    
                    if line:  # Pular linhas vazias
                        try:
                            json.loads(line)
                        except json.JSONDecodeError as e:
                            errors.append(f"Linha {line_num}: {str(e)}")
                            if len(errors) >= 10:  # Limitar erros reportados
                                break
            
            return {
                'valid': len(errors) == 0,
                'line_count': line_count,
                'errors': errors
            }
            
        except Exception as e:
            return {
                'valid': False,
                'line_count': 0,
                'errors': [f"Erro ao ler arquivo: {str(e)}"]
            }
    
    def process_file(self, file_path: Path) -> Dict[str, Any]:
        """Processa um único arquivo"""
        self.logger.info(f"Processando arquivo: {file_path.name}")
        
        # 1. Validar arquivo
        validation = self.validate_file(file_path)
        if not validation['valid']:
            self.logger.error(f"Arquivo inválido: {file_path.name}")
            self.move_file(file_path, self.config.error_directory)
            return {'status': 'error', 'reason': 'validation_failed', 'errors': validation['errors']}
        
        # 2. Upload
        try:
            result = self.client.upload_file(self.config.schema_slug, str(file_path))
            
            # Verificar se upload foi bem-sucedido
            if result.get('uploaded') and len(result['uploaded']) > 0:
                upload_info = result['uploaded'][0]
                
                if upload_info.get('validation') == 'ok':
                    self.logger.info(f"Upload realizado: {file_path.name} -> ID {upload_info.get('id')}")
                    
                    # Mover arquivo para diretório de processados
                    self.move_file(file_path, self.config.processed_directory)
                    
                    # Processar automaticamente se configurado
                    if self.config.processing_mode == 'auto':
                        self.trigger_processing()
                    
                    return {
                        'status': 'success',
                        'file_id': upload_info.get('id'),
                        'lines_processed': validation['line_count']
                    }
                else:
                    self.logger.error(f"Validação falhou: {upload_info.get('errors', [])}")
                    self.move_file(file_path, self.config.error_directory)
                    return {'status': 'error', 'reason': 'server_validation_failed'}
            
        except Exception as e:
            self.logger.error(f"Erro no upload: {str(e)}")
            return {'status': 'error', 'reason': 'upload_failed', 'error': str(e)}
    
    def move_file(self, source: Path, destination_dir: str):
        """Move arquivo para diretório de destino"""
        dest_path = Path(destination_dir) / source.name
        
        # Renomear se já existir
        counter = 1
        while dest_path.exists():
            name_parts = source.stem, counter, source.suffix
            dest_path = Path(destination_dir) / f"{name_parts[0]}_{name_parts[1]}{name_parts[2]}"
            counter += 1
        
        source.rename(dest_path)
        self.logger.info(f"Arquivo movido: {source.name} -> {dest_path}")
    
    def trigger_processing(self):
        """Dispara processamento de arquivos pendentes"""
        try:
            result = self.client.process_files(self.config.schema_slug)
            self.logger.info(f"Processamento iniciado: {result}")
            return result
        except Exception as e:
            self.logger.error(f"Erro ao iniciar processamento: {str(e)}")
    
    def send_notification(self, message: str, level: str = 'info'):
        """Envia notificação via webhook"""
        if not self.config.webhook_url:
            return
        
        try:
            import requests
            
            payload = {
                'timestamp': datetime.now().isoformat(),
                'schema': self.config.schema_slug,
                'level': level,
                'message': message
            }
            
            response = requests.post(
                self.config.webhook_url,
                json=payload,
                timeout=10
            )
            response.raise_for_status()
            
        except Exception as e:
            self.logger.error(f"Erro ao enviar notificação: {str(e)}")
    
    def run_batch(self) -> Dict[str, Any]:
        """Executa processamento em lote"""
        files = self.discover_files()
        
        if not files:
            self.logger.info("Nenhum arquivo encontrado para processamento")
            return {'processed': 0, 'errors': 0, 'files': []}
        
        self.logger.info(f"Encontrados {len(files)} arquivo(s) para processamento")
        
        results = {
            'processed': 0,
            'errors': 0,
            'files': []
        }
        
        for file in files:
            result = self.process_file(file)
            results['files'].append({
                'file': file.name,
                'result': result
            })
            
            if result['status'] == 'success':
                results['processed'] += 1
            else:
                results['errors'] += 1
        
        # Enviar resumo
        summary = f"Processamento concluído: {results['processed']} sucessos, {results['errors']} erros"
        self.logger.info(summary)
        self.send_notification(summary)
        
        return results
    
    def start_scheduler(self):
        """Inicia agendador de tarefas"""
        if self.config.processing_mode == 'scheduled':
            schedule.every(self.config.schedule_interval).minutes.do(self.run_batch)
            
            self.logger.info(f"Agendador iniciado: processamento a cada {self.config.schedule_interval} minutos")
            
            while True:
                schedule.run_pending()
                time.sleep(60)  # Verificar a cada minuto

# Exemplo de uso
config = WorkflowConfig(
    schema_slug="vendas",
    watch_directory="/data/incoming",
    processed_directory="/data/processed",
    error_directory="/data/errors",
    processing_mode="scheduled",
    schedule_interval=30,
    webhook_url="https://sua-app.com/webhooks/datasnap"
)

workflow = DataSnapWorkflow(config, datasnap_client)

# Executar uma vez
results = workflow.run_batch()

# Ou iniciar agendador (executa continuamente)
# workflow.start_scheduler()

Monitoramento de diretórios

Watch automático com notificações

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading

class DataSnapFileHandler(FileSystemEventHandler):
    def __init__(self, workflow: DataSnapWorkflow):
        self.workflow = workflow
        self.processing_lock = threading.Lock()
        
    def on_created(self, event):
        if event.is_directory:
            return
            
        # Filtrar apenas arquivos JSONL
        if event.src_path.endswith('.jsonl'):
            self.workflow.logger.info(f"Novo arquivo detectado: {event.src_path}")
            
            # Aguardar arquivo ser completamente escrito
            self.wait_for_file_completion(event.src_path)
            
            # Processar em thread separada para não bloquear
            with self.processing_lock:
                thread = threading.Thread(
                    target=self.process_new_file,
                    args=(event.src_path,)
                )
                thread.daemon = True
                thread.start()
    
    def wait_for_file_completion(self, file_path: str, timeout: int = 30):
        """Aguarda arquivo ser completamente escrito"""
        from pathlib import Path
        
        file_obj = Path(file_path)
        last_size = 0
        stable_count = 0
        
        for _ in range(timeout):
            try:
                current_size = file_obj.stat().st_size
                if current_size == last_size:
                    stable_count += 1
                    if stable_count >= 3:  # 3 segundos estável
                        break
                else:
                    stable_count = 0
                    last_size = current_size
                
                time.sleep(1)
            except:
                time.sleep(1)
    
    def process_new_file(self, file_path: str):
        """Processa novo arquivo"""
        try:
            from pathlib import Path
            result = self.workflow.process_file(Path(file_path))
            
            if result['status'] == 'success':
                message = f"Arquivo processado automaticamente: {Path(file_path).name}"
                self.workflow.send_notification(message, 'success')
            else:
                message = f"Erro ao processar arquivo: {Path(file_path).name} - {result.get('reason')}"
                self.workflow.send_notification(message, 'error')
                
        except Exception as e:
            self.workflow.logger.error(f"Erro no processamento automático: {str(e)}")

class AutomaticFileProcessor:
    def __init__(self, workflow: DataSnapWorkflow):
        self.workflow = workflow
        self.observer = None
        
    def start_watching(self):
        """Inicia monitoramento automático"""
        event_handler = DataSnapFileHandler(self.workflow)
        self.observer = Observer()
        
        self.observer.schedule(
            event_handler,
            self.workflow.config.watch_directory,
            recursive=False
        )
        
        self.observer.start()
        self.workflow.logger.info(f"Monitoramento iniciado: {self.workflow.config.watch_directory}")
        
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.stop_watching()
    
    def stop_watching(self):
        """Para monitoramento"""
        if self.observer:
            self.observer.stop()
            self.observer.join()
            self.workflow.logger.info("Monitoramento interrompido")

# Uso
processor = AutomaticFileProcessor(workflow)
processor.start_watching()

Integração com sistemas externos

API webhooks para notificações

from flask import Flask, request, jsonify
import requests
import json

app = Flask(__name__)

# Webhook receiver para notificações do DataSnap
@app.route('/webhooks/datasnap', methods=['POST'])
def handle_datasnap_webhook():
    """Recebe webhooks do DataSnap sobre status de processamento"""
    
    try:
        payload = request.get_json()
        
        topic = payload.get('topic')
        metadata = payload.get('metadata', {})
        
        if topic == 'schemas.run.status':
            handle_processing_status(metadata)
        
        return jsonify({'status': 'received'}), 200
        
    except Exception as e:
        app.logger.error(f"Erro ao processar webhook: {str(e)}")
        return jsonify({'error': str(e)}), 500

def handle_processing_status(metadata):
    """Processa notificações de status"""
    run_id = metadata.get('run_id')
    tenant = metadata.get('tenant')
    status = metadata.get('status')
    
    app.logger.info(f"Processamento {run_id} - Status: {status}")
    
    # Ações baseadas no status
    if status == 'completed':
        # Processamento concluído - executar próxima etapa
        trigger_next_workflow_step(run_id)
        
    elif status == 'failed':
        # Processamento falhou - enviar alerta
        send_failure_alert(run_id, metadata)
    
    # Atualizar dashboard/banco de dados
    update_processing_status(run_id, status, metadata)

def trigger_next_workflow_step(run_id):
    """Dispara próxima etapa do workflow"""
    # Exemplo: executar análise automática após processamento
    try:
        # Executar consulta de análise
        query_data = {
            "select": ["categoria", "sum(valor) as total", "count(*) as quantidade"],
            "group_by": ["categoria"],
            "order_by": [{"field": "total", "direction": "desc"}]
        }
        
        # Enviar para sistema de análise
        analysis_result = datasnap_client.query("vendas", query_data)
        
        # Gerar relatório automático
        generate_automated_report(analysis_result)
        
    except Exception as e:
        app.logger.error(f"Erro na próxima etapa: {str(e)}")

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

Integração com sistemas de mensageria

import pika
import json
from datetime import datetime

class DataSnapMessageQueue:
    def __init__(self, rabbitmq_url: str):
        self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
        self.channel = self.connection.channel()
        
        # Declarar filas
        self.channel.queue_declare(queue='datasnap.uploads', durable=True)
        self.channel.queue_declare(queue='datasnap.processing', durable=True)
        self.channel.queue_declare(queue='datasnap.analysis', durable=True)
    
    def publish_upload_event(self, schema_slug: str, file_info: dict):
        """Publica evento de upload"""
        message = {
            'event': 'file_uploaded',
            'timestamp': datetime.now().isoformat(),
            'schema': schema_slug,
            'file_info': file_info
        }
        
        self.channel.basic_publish(
            exchange='',
            routing_key='datasnap.uploads',
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2)  # Persistir mensagem
        )
    
    def consume_upload_events(self, callback):
        """Consome eventos de upload"""
        def wrapper(ch, method, properties, body):
            try:
                message = json.loads(body)
                callback(message)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                print(f"Erro ao processar mensagem: {e}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='datasnap.uploads',
            on_message_callback=wrapper
        )
        
        print("Aguardando mensagens...")
        self.channel.start_consuming()

# Worker para processar mensagens
def process_upload_message(message):
    """Processa mensagem de upload"""
    print(f"Processando upload: {message['file_info']['file_name']}")
    
    # Lógica de processamento
    schema = message['schema']
    file_info = message['file_info']
    
    # Verificar se arquivo precisa de processamento imediato
    if file_info.get('size_bytes', 0) > 50 * 1024 * 1024:  # 50MB
        # Arquivo grande - processar imediatamente
        trigger_immediate_processing(schema)
    
    # Log do evento
    log_upload_event(message)

# Uso
mq = DataSnapMessageQueue('amqp://localhost')
mq.consume_upload_events(process_upload_message)

Análise e relatórios automáticos

Sistema de relatórios agendados

import schedule
from datetime import datetime, timedelta
from jinja2 import Template
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase

class AutomatedReportSystem:
    def __init__(self, datasnap_client, email_config):
        self.client = datasnap_client
        self.email_config = email_config
    
    def generate_daily_summary(self, schema_slug: str):
        """Gera relatório diário"""
        end_date = datetime.now()
        start_date = end_date - timedelta(days=1)
        
        # Consultas para o relatório
        queries = {
            'total_records': {
                "select": ["count(*) as total"],
                "where": [
                    {"field": "created_at", "op": ">=", "value": start_date.strftime('%Y-%m-%d')},
                    {"field": "created_at", "op": "<", "value": end_date.strftime('%Y-%m-%d')}
                ]
            },
            'by_category': {
                "select": ["categoria", "count(*) as total"],
                "where": [
                    {"field": "created_at", "op": ">=", "value": start_date.strftime('%Y-%m-%d')},
                    {"field": "created_at", "op": "<", "value": end_date.strftime('%Y-%m-%d')}
                ],
                "group_by": ["categoria"],
                "order_by": [{"field": "total", "direction": "desc"}]
            }
        }
        
        report_data = {}
        for key, query in queries.items():
            try:
                result = self.client.query(schema_slug, query)
                report_data[key] = result['data']
            except Exception as e:
                print(f"Erro na consulta {key}: {e}")
                report_data[key] = []
        
        # Gerar HTML do relatório
        html_report = self.render_report_template(report_data, start_date, end_date)
        
        # Enviar por email
        self.send_email_report(
            subject=f"Relatório Diário - {schema_slug} - {start_date.strftime('%Y-%m-%d')}",
            html_content=html_report
        )
        
        return report_data
    
    def render_report_template(self, data, start_date, end_date):
        """Renderiza template do relatório"""
        template_html = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Relatório DataSnap</title>
            <style>
                body { font-family: Arial, sans-serif; }
                table { border-collapse: collapse; width: 100%; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                th { background-color: #f2f2f2; }
                .summary { background-color: #e7f3ff; padding: 15px; margin: 10px 0; }
            </style>
        </head>
        <body>
            <h1>Relatório DataSnap</h1>
            <div class="summary">
                <h2>Resumo do Período</h2>
                <p><strong>Período:</strong> {{ start_date }} a {{ end_date }}</p>
                <p><strong>Total de Registros:</strong> {{ total_records }}</p>
            </div>
            
            <h2>Distribuição por Categoria</h2>
            <table>
                <thead>
                    <tr><th>Categoria</th><th>Total</th></tr>
                </thead>
                <tbody>
                    {% for item in by_category %}
                    <tr>
                        <td>{{ item[0] }}</td>
                        <td>{{ item[1] }}</td>
                    </tr>
                    {% endfor %}
                </tbody>
            </table>
        </body>
        </html>
        """
        
        template = Template(template_html)
        return template.render(
            start_date=start_date.strftime('%Y-%m-%d'),
            end_date=end_date.strftime('%Y-%m-%d'),
            total_records=data['total_records'][0][0] if data['total_records'] else 0,
            by_category=data['by_category']
        )
    
    def send_email_report(self, subject: str, html_content: str):
        """Envia relatório por email"""
        try:
            msg = MIMEMultipart('alternative')
            msg['Subject'] = subject
            msg['From'] = self.email_config['from']
            msg['To'] = ', '.join(self.email_config['to'])
            
            html_part = MIMEText(html_content, 'html')
            msg.attach(html_part)
            
            # Enviar email
            server = smtplib.SMTP(self.email_config['smtp_host'], self.email_config['smtp_port'])
            if self.email_config.get('use_tls'):
                server.starttls()
            if self.email_config.get('username'):
                server.login(self.email_config['username'], self.email_config['password'])
            
            server.send_message(msg)
            server.quit()
            
            print(f"Relatório enviado: {subject}")
            
        except Exception as e:
            print(f"Erro ao enviar email: {e}")
    
    def schedule_reports(self, schema_slug: str):
        """Agenda relatórios automáticos"""
        # Relatório diário às 08:00
        schedule.every().day.at("08:00").do(
            self.generate_daily_summary,
            schema_slug
        )
        
        # Relatório semanal às segundas-feiras às 09:00
        schedule.every().monday.at("09:00").do(
            self.generate_weekly_summary,
            schema_slug
        )
        
        print("Relatórios agendados configurados")

# Configuração e uso
email_config = {
    'smtp_host': 'smtp.gmail.com',
    'smtp_port': 587,
    'use_tls': True,
    'username': 'seu-email@gmail.com',
    'password': 'sua-senha',
    'from': 'relatorios@empresa.com',
    'to': ['gerente@empresa.com', 'analista@empresa.com']
}

report_system = AutomatedReportSystem(datasnap_client, email_config)
report_system.schedule_reports("vendas")

# Executar agendador
while True:
    schedule.run_pending()
    time.sleep(60)

Orquestração com Apache Airflow

DAG para pipeline DataSnap

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 8, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'datasnap_processing_pipeline',
    default_args=default_args,
    description='Pipeline completo DataSnap',
    schedule_interval='@hourly',
    catchup=False
)

# Tarefa 1: Descobrir arquivos
discover_files = BashOperator(
    task_id='discover_files',
    bash_command='python /scripts/discover_files.py',
    dag=dag
)

# Tarefa 2: Validar arquivos
def validate_files(**context):
    # Lógica de validação
    pass

validate = PythonOperator(
    task_id='validate_files',
    python_callable=validate_files,
    dag=dag
)

# Tarefa 3: Upload para DataSnap
def upload_to_datasnap(**context):
    # Lógica de upload
    pass

upload = PythonOperator(
    task_id='upload_to_datasnap',
    python_callable=upload_to_datasnap,
    dag=dag
)

# Tarefa 4: Processar dados
def trigger_processing(**context):
    # Lógica de processamento
    pass

process = PythonOperator(
    task_id='trigger_processing',
    python_callable=trigger_processing,
    dag=dag
)

# Tarefa 5: Gerar relatório
def generate_report(**context):
    # Lógica de relatório
    pass

report = PythonOperator(
    task_id='generate_report',
    python_callable=generate_report,
    dag=dag
)

# Definir dependências
discover_files >> validate >> upload >> process >> report

Próximos passos