Skip to main content

Automação inteligente com DataSnap

Automatize tarefas repetitivas e crie workflows eficientes para maximizar a produtividade com DataSnap, desde uploads até análises complexas.
A automação reduz erros, economiza tempo e garante consistência no trabalho com dados.

Tipos de automação

Automação de upload

Configure uploads automáticos baseados em eventos ou agendamentos:

Upload por evento

Monitore diretórios e faça upload automático de novos arquivos

Upload agendado

Execute uploads em intervalos regulares definidos

Upload condicional

Processe apenas arquivos que atendem critérios específicos

Backup automático

Mantenha cópias dos dados em múltiplos ambientes

Pipeline completo

Workflow de dados automatizado

import schedule
import time
import logging
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from typing import List, Dict, Any

@dataclass
class WorkflowConfig:
    schema_slug: str
    watch_directory: str
    processed_directory: str
    error_directory: str
    file_pattern: str = "*.jsonl"
    schedule_interval: int = 60  # minutos

class DataSnapWorkflow:
    def __init__(self, config: WorkflowConfig, datasnap_client):
        self.config = config
        self.client = datasnap_client
        self.logger = self.setup_logging()
        
        # Criar diretórios necessários
        Path(config.processed_directory).mkdir(parents=True, exist_ok=True)
        Path(config.error_directory).mkdir(parents=True, exist_ok=True)
    
    def setup_logging(self):
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler(f'datasnap_workflow_{self.config.schema_slug}.log'),
                logging.StreamHandler()
            ]
        )
        return logging.getLogger(__name__)
    
    def discover_files(self) -> List[Path]:
        """Descobre arquivos para upload"""
        watch_path = Path(self.config.watch_directory)
        files = list(watch_path.glob(self.config.file_pattern))
        
        # Filtrar apenas arquivos não processados
        new_files = []
        for file in files:
            if file.suffix == '.jsonl' and file.stat().st_size > 0:
                new_files.append(file)
        
        return sorted(new_files, key=lambda x: x.stat().st_mtime)
    
    def validate_file(self, file_path: Path) -> Dict[str, Any]:
        """Valida arquivo JSONL"""
        try:
            import json
            errors = []
            line_count = 0
            
            with open(file_path, 'r', encoding='utf-8') as f:
                for line_num, line in enumerate(f, 1):
                    line_count += 1
                    line = line.strip()
                    
                    if line:  # Pular linhas vazias
                        try:
                            json.loads(line)
                        except json.JSONDecodeError as e:
                            errors.append(f"Linha {line_num}: {str(e)}")
                            if len(errors) >= 10:  # Limitar erros reportados
                                break
            
            return {
                'valid': len(errors) == 0,
                'line_count': line_count,
                'errors': errors
            }
            
        except Exception as e:
            return {
                'valid': False,
                'line_count': 0,
                'errors': [f"Erro ao ler arquivo: {str(e)}"]
            }
    
    def process_file(self, file_path: Path) -> Dict[str, Any]:
        """Processa um único arquivo"""
        self.logger.info(f"Processando arquivo: {file_path.name}")
        
        # 1. Validar arquivo
        validation = self.validate_file(file_path)
        if not validation['valid']:
            self.logger.error(f"Arquivo inválido: {file_path.name}")
            self.move_file(file_path, self.config.error_directory)
            return {'status': 'error', 'reason': 'validation_failed', 'errors': validation['errors']}
        
        # 2. Upload
        try:
            result = self.client.upload_file(self.config.schema_slug, str(file_path))
            
            # Verificar se upload foi bem-sucedido
            if result.get('uploaded') and len(result['uploaded']) > 0:
                upload_info = result['uploaded'][0]
                
                if upload_info.get('validation') == 'ok':
                    self.logger.info(f"Upload realizado: {file_path.name} -> ID {upload_info.get('id')}")
                    
                    # Mover arquivo para diretório de processados
                    self.move_file(file_path, self.config.processed_directory)
                    
                    
                    return {
                        'status': 'success',
                        'file_id': upload_info.get('id'),
                        'lines_processed': validation['line_count']
                    }
                else:
                    self.logger.error(f"Validação falhou: {upload_info.get('errors', [])}")
                    self.move_file(file_path, self.config.error_directory)
                    return {'status': 'error', 'reason': 'server_validation_failed'}
            
        except Exception as e:
            self.logger.error(f"Erro no upload: {str(e)}")
            return {'status': 'error', 'reason': 'upload_failed', 'error': str(e)}
    
    def move_file(self, source: Path, destination_dir: str):
        """Move arquivo para diretório de destino"""
        dest_path = Path(destination_dir) / source.name
        
        # Renomear se já existir
        counter = 1
        while dest_path.exists():
            name_parts = source.stem, counter, source.suffix
            dest_path = Path(destination_dir) / f"{name_parts[0]}_{name_parts[1]}{name_parts[2]}"
            counter += 1
        
        source.rename(dest_path)
        self.logger.info(f"Arquivo movido: {source.name} -> {dest_path}")
    
    
    
    def run_batch(self) -> Dict[str, Any]:
        """Executa upload em lote"""
        files = self.discover_files()
        
        if not files:
            self.logger.info("Nenhum arquivo encontrado para upload")
            return {'processed': 0, 'errors': 0, 'files': []}
        
        self.logger.info(f"Encontrados {len(files)} arquivo(s) para upload")
        
        results = {
            'processed': 0,
            'errors': 0,
            'files': []
        }
        
        for file in files:
            result = self.process_file(file)
            results['files'].append({
                'file': file.name,
                'result': result
            })
            
            if result['status'] == 'success':
                results['processed'] += 1
            else:
                results['errors'] += 1
        
        # Log do resumo
        summary = f"Upload concluído: {results['processed']} sucessos, {results['errors']} erros"
        self.logger.info(summary)
        
        return results
    
    def start_scheduler(self):
        """Inicia agendador de tarefas"""
        if self.config.processing_mode == 'scheduled':
            schedule.every(self.config.schedule_interval).minutes.do(self.run_batch)
            
            self.logger.info(f"Agendador iniciado: execução a cada {self.config.schedule_interval} minutos")
            
            while True:
                schedule.run_pending()
                time.sleep(60)  # Verificar a cada minuto

# Exemplo de uso
config = WorkflowConfig(
    schema_slug="vendas",
    watch_directory="/data/incoming",
    processed_directory="/data/processed",
    error_directory="/data/errors",
    schedule_interval=30,
)

workflow = DataSnapWorkflow(config, datasnap_client)

# Executar uma vez
results = workflow.run_batch()

# Ou iniciar agendador (executa continuamente)
# workflow.start_scheduler()

Monitoramento de diretórios

Watch automático com notificações

import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading

class DataSnapFileHandler(FileSystemEventHandler):
    def __init__(self, workflow: DataSnapWorkflow):
        self.workflow = workflow
        self.processing_lock = threading.Lock()
        
    def on_created(self, event):
        if event.is_directory:
            return
            
        # Filtrar apenas arquivos JSONL
        if event.src_path.endswith('.jsonl'):
            self.workflow.logger.info(f"Novo arquivo detectado: {event.src_path}")
            
            # Aguardar arquivo ser completamente escrito
            self.wait_for_file_completion(event.src_path)
            
            # Processar em thread separada para não bloquear
            with self.processing_lock:
                thread = threading.Thread(
                    target=self.process_new_file,
                    args=(event.src_path,)
                )
                thread.daemon = True
                thread.start()
    
    def wait_for_file_completion(self, file_path: str, timeout: int = 30):
        """Aguarda arquivo ser completamente escrito"""
        from pathlib import Path
        
        file_obj = Path(file_path)
        last_size = 0
        stable_count = 0
        
        for _ in range(timeout):
            try:
                current_size = file_obj.stat().st_size
                if current_size == last_size:
                    stable_count += 1
                    if stable_count >= 3:  # 3 segundos estável
                        break
                else:
                    stable_count = 0
                    last_size = current_size
                
                time.sleep(1)
            except:
                time.sleep(1)
    
    def process_new_file(self, file_path: str):
        """Processa novo arquivo"""
        try:
            from pathlib import Path
            result = self.workflow.process_file(Path(file_path))
            
                
        except Exception as e:
            self.workflow.logger.error(f"Erro no processamento automático: {str(e)}")

class AutomaticFileProcessor:
    def __init__(self, workflow: DataSnapWorkflow):
        self.workflow = workflow
        self.observer = None
        
    def start_watching(self):
        """Inicia monitoramento automático"""
        event_handler = DataSnapFileHandler(self.workflow)
        self.observer = Observer()
        
        self.observer.schedule(
            event_handler,
            self.workflow.config.watch_directory,
            recursive=False
        )
        
        self.observer.start()
        self.workflow.logger.info(f"Monitoramento iniciado: {self.workflow.config.watch_directory}")
        
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.stop_watching()
    
    def stop_watching(self):
        """Para monitoramento"""
        if self.observer:
            self.observer.stop()
            self.observer.join()
            self.workflow.logger.info("Monitoramento interrompido")

# Uso
processor = AutomaticFileProcessor(workflow)
processor.start_watching()

Integração com sistemas externos

Integração com sistemas de mensageria

import pika
import json
from datetime import datetime

class DataSnapMessageQueue:
    def __init__(self, rabbitmq_url: str):
        self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
        self.channel = self.connection.channel()
        
        # Declarar filas
        self.channel.queue_declare(queue='datasnap.uploads', durable=True)
        self.channel.queue_declare(queue='datasnap.analysis', durable=True)
    
    def publish_upload_event(self, schema_slug: str, file_info: dict):
        """Publica evento de upload"""
        message = {
            'event': 'file_uploaded',
            'timestamp': datetime.now().isoformat(),
            'schema': schema_slug,
            'file_info': file_info
        }
        
        self.channel.basic_publish(
            exchange='',
            routing_key='datasnap.uploads',
            body=json.dumps(message),
            properties=pika.BasicProperties(delivery_mode=2)  # Persistir mensagem
        )
    
    def consume_upload_events(self, callback):
        """Consome eventos de upload"""
        def wrapper(ch, method, properties, body):
            try:
                message = json.loads(body)
                callback(message)
                ch.basic_ack(delivery_tag=method.delivery_tag)
            except Exception as e:
                print(f"Erro ao processar mensagem: {e}")
                ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(
            queue='datasnap.uploads',
            on_message_callback=wrapper
        )
        
        print("Aguardando mensagens...")
        self.channel.start_consuming()

# Worker para processar mensagens
def process_upload_message(message):
    """Processa mensagem de upload"""
    print(f"Processando upload: {message['file_info']['file_name']}")
    
    # Lógica de análise
    schema = message['schema']
    file_info = message['file_info']
    
    # Verificar se arquivo precisa de análise imediata
    if file_info.get('size_bytes', 0) > 50 * 1024 * 1024:  # 50MB
    
    # Log do evento
    log_upload_event(message)

# Uso
mq = DataSnapMessageQueue('amqp://localhost')
mq.consume_upload_events(process_upload_message)

Análise e relatórios automáticos

Sistema de relatórios agendados

import schedule
from datetime import datetime, timedelta
from jinja2 import Template
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase

class AutomatedReportSystem:
    def __init__(self, datasnap_client, email_config):
        self.client = datasnap_client
        self.email_config = email_config
    
    def generate_daily_summary(self, schema_slug: str):
        """Gera relatório diário"""
        end_date = datetime.now()
        start_date = end_date - timedelta(days=1)
        
        # Consultas para o relatório
        queries = {
            'total_records': {
                "select": ["count(*) as total"],
                "where": [
                    {"field": "created_at", "op": ">=", "value": start_date.strftime('%Y-%m-%d')},
                    {"field": "created_at", "op": "<", "value": end_date.strftime('%Y-%m-%d')}
                ]
            },
            'by_category': {
                "select": ["categoria", "count(*) as total"],
                "where": [
                    {"field": "created_at", "op": ">=", "value": start_date.strftime('%Y-%m-%d')},
                    {"field": "created_at", "op": "<", "value": end_date.strftime('%Y-%m-%d')}
                ],
                "group_by": ["categoria"],
                "order_by": [{"field": "total", "direction": "desc"}]
            }
        }
        
        report_data = {}
        for key, query in queries.items():
            try:
                result = self.client.query(schema_slug, query)
                report_data[key] = result['data']
            except Exception as e:
                print(f"Erro na consulta {key}: {e}")
                report_data[key] = []
        
        # Gerar HTML do relatório
        html_report = self.render_report_template(report_data, start_date, end_date)
        
        # Enviar por email
        self.send_email_report(
            subject=f"Relatório Diário - {schema_slug} - {start_date.strftime('%Y-%m-%d')}",
            html_content=html_report
        )
        
        return report_data
    
    def render_report_template(self, data, start_date, end_date):
        """Renderiza template do relatório"""
        template_html = """
        <!DOCTYPE html>
        <html>
        <head>
            <title>Relatório DataSnap</title>
            <style>
                body { font-family: Arial, sans-serif; }
                table { border-collapse: collapse; width: 100%; }
                th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
                th { background-color: #f2f2f2; }
                .summary { background-color: #e7f3ff; padding: 15px; margin: 10px 0; }
            </style>
        </head>
        <body>
            <h1>Relatório DataSnap</h1>
            <div class="summary">
                <h2>Resumo do Período</h2>
                <p><strong>Período:</strong> {{ start_date }} a {{ end_date }}</p>
                <p><strong>Total de Registros:</strong> {{ total_records }}</p>
            </div>
            
            <h2>Distribuição por Categoria</h2>
            <table>
                <thead>
                    <tr><th>Categoria</th><th>Total</th></tr>
                </thead>
                <tbody>
                    {% for item in by_category %}
                    <tr>
                        <td>{{ item[0] }}</td>
                        <td>{{ item[1] }}</td>
                    </tr>
                    {% endfor %}
                </tbody>
            </table>
        </body>
        </html>
        """
        
        template = Template(template_html)
        return template.render(
            start_date=start_date.strftime('%Y-%m-%d'),
            end_date=end_date.strftime('%Y-%m-%d'),
            total_records=data['total_records'][0][0] if data['total_records'] else 0,
            by_category=data['by_category']
        )
    
    def send_email_report(self, subject: str, html_content: str):
        """Envia relatório por email"""
        try:
            msg = MIMEMultipart('alternative')
            msg['Subject'] = subject
            msg['From'] = self.email_config['from']
            msg['To'] = ', '.join(self.email_config['to'])
            
            html_part = MIMEText(html_content, 'html')
            msg.attach(html_part)
            
            # Enviar email
            server = smtplib.SMTP(self.email_config['smtp_host'], self.email_config['smtp_port'])
            if self.email_config.get('use_tls'):
                server.starttls()
            if self.email_config.get('username'):
                server.login(self.email_config['username'], self.email_config['password'])
            
            server.send_message(msg)
            server.quit()
            
            print(f"Relatório enviado: {subject}")
            
        except Exception as e:
            print(f"Erro ao enviar email: {e}")
    
    def schedule_reports(self, schema_slug: str):
        """Agenda relatórios automáticos"""
        # Relatório diário às 08:00
        schedule.every().day.at("08:00").do(
            self.generate_daily_summary,
            schema_slug
        )
        
        # Relatório semanal às segundas-feiras às 09:00
        schedule.every().monday.at("09:00").do(
            self.generate_weekly_summary,
            schema_slug
        )
        
        print("Relatórios agendados configurados")

# Configuração e uso
email_config = {
    'smtp_host': 'smtp.gmail.com',
    'smtp_port': 587,
    'use_tls': True,
    'username': 'seu-email@gmail.com',
    'password': 'sua-senha',
    'from': 'relatorios@empresa.com',
    'to': ['gerente@empresa.com', 'analista@empresa.com']
}

report_system = AutomatedReportSystem(datasnap_client, email_config)
report_system.schedule_reports("vendas")

# Executar agendador
while True:
    schedule.run_pending()
    time.sleep(60)

Orquestração com Apache Airflow

DAG para pipeline DataSnap

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
    'owner': 'data-team',
    'depends_on_past': False,
    'start_date': datetime(2024, 8, 1),
    'email_on_failure': True,
    'email_on_retry': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    'datasnap_pipeline',
    default_args=default_args,
    description='Pipeline DataSnap',
    schedule_interval='@hourly',
    catchup=False
)

# Tarefa 1: Descobrir arquivos
discover_files = BashOperator(
    task_id='discover_files',
    bash_command='python /scripts/discover_files.py',
    dag=dag
)

# Tarefa 2: Validar arquivos
def validate_files(**context):
    # Lógica de validação
    pass

validate = PythonOperator(
    task_id='validate_files',
    python_callable=validate_files,
    dag=dag
)

# Tarefa 3: Upload para DataSnap
def upload_to_datasnap(**context):
    # Lógica de upload
    pass

upload = PythonOperator(
    task_id='upload_to_datasnap',
    python_callable=upload_to_datasnap,
    dag=dag
)


# Tarefa 5: Gerar relatório
def generate_report(**context):
    # Lógica de relatório
    pass

report = PythonOperator(
    task_id='generate_report',
    python_callable=generate_report,
    dag=dag
)

# Definir dependências
discover_files >> validate >> upload >> report

Próximos passos