Configure workflows automatizados e pipelines de processamento para otimizar seu trabalho com DataSnap
import schedule
import time
import logging
from pathlib import Path
from datetime import datetime
from dataclasses import dataclass
from typing import List, Dict, Any
@dataclass
class WorkflowConfig:
schema_slug: str
watch_directory: str
processed_directory: str
error_directory: str
file_pattern: str = "*.jsonl"
processing_mode: str = "auto" # auto, scheduled, manual
schedule_interval: int = 60 # minutos
webhook_url: str = None
class DataSnapWorkflow:
def __init__(self, config: WorkflowConfig, datasnap_client):
self.config = config
self.client = datasnap_client
self.logger = self.setup_logging()
# Criar diretórios necessários
Path(config.processed_directory).mkdir(parents=True, exist_ok=True)
Path(config.error_directory).mkdir(parents=True, exist_ok=True)
def setup_logging(self):
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(f'datasnap_workflow_{self.config.schema_slug}.log'),
logging.StreamHandler()
]
)
return logging.getLogger(__name__)
def discover_files(self) -> List[Path]:
"""Descobre arquivos para processamento"""
watch_path = Path(self.config.watch_directory)
files = list(watch_path.glob(self.config.file_pattern))
# Filtrar apenas arquivos não processados
new_files = []
for file in files:
if file.suffix == '.jsonl' and file.stat().st_size > 0:
new_files.append(file)
return sorted(new_files, key=lambda x: x.stat().st_mtime)
def validate_file(self, file_path: Path) -> Dict[str, Any]:
"""Valida arquivo JSONL"""
try:
import json
errors = []
line_count = 0
with open(file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
line_count += 1
line = line.strip()
if line: # Pular linhas vazias
try:
json.loads(line)
except json.JSONDecodeError as e:
errors.append(f"Linha {line_num}: {str(e)}")
if len(errors) >= 10: # Limitar erros reportados
break
return {
'valid': len(errors) == 0,
'line_count': line_count,
'errors': errors
}
except Exception as e:
return {
'valid': False,
'line_count': 0,
'errors': [f"Erro ao ler arquivo: {str(e)}"]
}
def process_file(self, file_path: Path) -> Dict[str, Any]:
"""Processa um único arquivo"""
self.logger.info(f"Processando arquivo: {file_path.name}")
# 1. Validar arquivo
validation = self.validate_file(file_path)
if not validation['valid']:
self.logger.error(f"Arquivo inválido: {file_path.name}")
self.move_file(file_path, self.config.error_directory)
return {'status': 'error', 'reason': 'validation_failed', 'errors': validation['errors']}
# 2. Upload
try:
result = self.client.upload_file(self.config.schema_slug, str(file_path))
# Verificar se upload foi bem-sucedido
if result.get('uploaded') and len(result['uploaded']) > 0:
upload_info = result['uploaded'][0]
if upload_info.get('validation') == 'ok':
self.logger.info(f"Upload realizado: {file_path.name} -> ID {upload_info.get('id')}")
# Mover arquivo para diretório de processados
self.move_file(file_path, self.config.processed_directory)
# Processar automaticamente se configurado
if self.config.processing_mode == 'auto':
self.trigger_processing()
return {
'status': 'success',
'file_id': upload_info.get('id'),
'lines_processed': validation['line_count']
}
else:
self.logger.error(f"Validação falhou: {upload_info.get('errors', [])}")
self.move_file(file_path, self.config.error_directory)
return {'status': 'error', 'reason': 'server_validation_failed'}
except Exception as e:
self.logger.error(f"Erro no upload: {str(e)}")
return {'status': 'error', 'reason': 'upload_failed', 'error': str(e)}
def move_file(self, source: Path, destination_dir: str):
"""Move arquivo para diretório de destino"""
dest_path = Path(destination_dir) / source.name
# Renomear se já existir
counter = 1
while dest_path.exists():
name_parts = source.stem, counter, source.suffix
dest_path = Path(destination_dir) / f"{name_parts[0]}_{name_parts[1]}{name_parts[2]}"
counter += 1
source.rename(dest_path)
self.logger.info(f"Arquivo movido: {source.name} -> {dest_path}")
def trigger_processing(self):
"""Dispara processamento de arquivos pendentes"""
try:
result = self.client.process_files(self.config.schema_slug)
self.logger.info(f"Processamento iniciado: {result}")
return result
except Exception as e:
self.logger.error(f"Erro ao iniciar processamento: {str(e)}")
def send_notification(self, message: str, level: str = 'info'):
"""Envia notificação via webhook"""
if not self.config.webhook_url:
return
try:
import requests
payload = {
'timestamp': datetime.now().isoformat(),
'schema': self.config.schema_slug,
'level': level,
'message': message
}
response = requests.post(
self.config.webhook_url,
json=payload,
timeout=10
)
response.raise_for_status()
except Exception as e:
self.logger.error(f"Erro ao enviar notificação: {str(e)}")
def run_batch(self) -> Dict[str, Any]:
"""Executa processamento em lote"""
files = self.discover_files()
if not files:
self.logger.info("Nenhum arquivo encontrado para processamento")
return {'processed': 0, 'errors': 0, 'files': []}
self.logger.info(f"Encontrados {len(files)} arquivo(s) para processamento")
results = {
'processed': 0,
'errors': 0,
'files': []
}
for file in files:
result = self.process_file(file)
results['files'].append({
'file': file.name,
'result': result
})
if result['status'] == 'success':
results['processed'] += 1
else:
results['errors'] += 1
# Enviar resumo
summary = f"Processamento concluído: {results['processed']} sucessos, {results['errors']} erros"
self.logger.info(summary)
self.send_notification(summary)
return results
def start_scheduler(self):
"""Inicia agendador de tarefas"""
if self.config.processing_mode == 'scheduled':
schedule.every(self.config.schedule_interval).minutes.do(self.run_batch)
self.logger.info(f"Agendador iniciado: processamento a cada {self.config.schedule_interval} minutos")
while True:
schedule.run_pending()
time.sleep(60) # Verificar a cada minuto
# Exemplo de uso
config = WorkflowConfig(
schema_slug="vendas",
watch_directory="/data/incoming",
processed_directory="/data/processed",
error_directory="/data/errors",
processing_mode="scheduled",
schedule_interval=30,
webhook_url="https://sua-app.com/webhooks/datasnap"
)
workflow = DataSnapWorkflow(config, datasnap_client)
# Executar uma vez
results = workflow.run_batch()
# Ou iniciar agendador (executa continuamente)
# workflow.start_scheduler()
import time
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import threading
class DataSnapFileHandler(FileSystemEventHandler):
def __init__(self, workflow: DataSnapWorkflow):
self.workflow = workflow
self.processing_lock = threading.Lock()
def on_created(self, event):
if event.is_directory:
return
# Filtrar apenas arquivos JSONL
if event.src_path.endswith('.jsonl'):
self.workflow.logger.info(f"Novo arquivo detectado: {event.src_path}")
# Aguardar arquivo ser completamente escrito
self.wait_for_file_completion(event.src_path)
# Processar em thread separada para não bloquear
with self.processing_lock:
thread = threading.Thread(
target=self.process_new_file,
args=(event.src_path,)
)
thread.daemon = True
thread.start()
def wait_for_file_completion(self, file_path: str, timeout: int = 30):
"""Aguarda arquivo ser completamente escrito"""
from pathlib import Path
file_obj = Path(file_path)
last_size = 0
stable_count = 0
for _ in range(timeout):
try:
current_size = file_obj.stat().st_size
if current_size == last_size:
stable_count += 1
if stable_count >= 3: # 3 segundos estável
break
else:
stable_count = 0
last_size = current_size
time.sleep(1)
except:
time.sleep(1)
def process_new_file(self, file_path: str):
"""Processa novo arquivo"""
try:
from pathlib import Path
result = self.workflow.process_file(Path(file_path))
if result['status'] == 'success':
message = f"Arquivo processado automaticamente: {Path(file_path).name}"
self.workflow.send_notification(message, 'success')
else:
message = f"Erro ao processar arquivo: {Path(file_path).name} - {result.get('reason')}"
self.workflow.send_notification(message, 'error')
except Exception as e:
self.workflow.logger.error(f"Erro no processamento automático: {str(e)}")
class AutomaticFileProcessor:
def __init__(self, workflow: DataSnapWorkflow):
self.workflow = workflow
self.observer = None
def start_watching(self):
"""Inicia monitoramento automático"""
event_handler = DataSnapFileHandler(self.workflow)
self.observer = Observer()
self.observer.schedule(
event_handler,
self.workflow.config.watch_directory,
recursive=False
)
self.observer.start()
self.workflow.logger.info(f"Monitoramento iniciado: {self.workflow.config.watch_directory}")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
self.stop_watching()
def stop_watching(self):
"""Para monitoramento"""
if self.observer:
self.observer.stop()
self.observer.join()
self.workflow.logger.info("Monitoramento interrompido")
# Uso
processor = AutomaticFileProcessor(workflow)
processor.start_watching()
from flask import Flask, request, jsonify
import requests
import json
app = Flask(__name__)
# Webhook receiver para notificações do DataSnap
@app.route('/webhooks/datasnap', methods=['POST'])
def handle_datasnap_webhook():
"""Recebe webhooks do DataSnap sobre status de processamento"""
try:
payload = request.get_json()
topic = payload.get('topic')
metadata = payload.get('metadata', {})
if topic == 'schemas.run.status':
handle_processing_status(metadata)
return jsonify({'status': 'received'}), 200
except Exception as e:
app.logger.error(f"Erro ao processar webhook: {str(e)}")
return jsonify({'error': str(e)}), 500
def handle_processing_status(metadata):
"""Processa notificações de status"""
run_id = metadata.get('run_id')
tenant = metadata.get('tenant')
status = metadata.get('status')
app.logger.info(f"Processamento {run_id} - Status: {status}")
# Ações baseadas no status
if status == 'completed':
# Processamento concluído - executar próxima etapa
trigger_next_workflow_step(run_id)
elif status == 'failed':
# Processamento falhou - enviar alerta
send_failure_alert(run_id, metadata)
# Atualizar dashboard/banco de dados
update_processing_status(run_id, status, metadata)
def trigger_next_workflow_step(run_id):
"""Dispara próxima etapa do workflow"""
# Exemplo: executar análise automática após processamento
try:
# Executar consulta de análise
query_data = {
"select": ["categoria", "sum(valor) as total", "count(*) as quantidade"],
"group_by": ["categoria"],
"order_by": [{"field": "total", "direction": "desc"}]
}
# Enviar para sistema de análise
analysis_result = datasnap_client.query("vendas", query_data)
# Gerar relatório automático
generate_automated_report(analysis_result)
except Exception as e:
app.logger.error(f"Erro na próxima etapa: {str(e)}")
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
import pika
import json
from datetime import datetime
class DataSnapMessageQueue:
def __init__(self, rabbitmq_url: str):
self.connection = pika.BlockingConnection(pika.URLParameters(rabbitmq_url))
self.channel = self.connection.channel()
# Declarar filas
self.channel.queue_declare(queue='datasnap.uploads', durable=True)
self.channel.queue_declare(queue='datasnap.processing', durable=True)
self.channel.queue_declare(queue='datasnap.analysis', durable=True)
def publish_upload_event(self, schema_slug: str, file_info: dict):
"""Publica evento de upload"""
message = {
'event': 'file_uploaded',
'timestamp': datetime.now().isoformat(),
'schema': schema_slug,
'file_info': file_info
}
self.channel.basic_publish(
exchange='',
routing_key='datasnap.uploads',
body=json.dumps(message),
properties=pika.BasicProperties(delivery_mode=2) # Persistir mensagem
)
def consume_upload_events(self, callback):
"""Consome eventos de upload"""
def wrapper(ch, method, properties, body):
try:
message = json.loads(body)
callback(message)
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"Erro ao processar mensagem: {e}")
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
self.channel.basic_qos(prefetch_count=1)
self.channel.basic_consume(
queue='datasnap.uploads',
on_message_callback=wrapper
)
print("Aguardando mensagens...")
self.channel.start_consuming()
# Worker para processar mensagens
def process_upload_message(message):
"""Processa mensagem de upload"""
print(f"Processando upload: {message['file_info']['file_name']}")
# Lógica de processamento
schema = message['schema']
file_info = message['file_info']
# Verificar se arquivo precisa de processamento imediato
if file_info.get('size_bytes', 0) > 50 * 1024 * 1024: # 50MB
# Arquivo grande - processar imediatamente
trigger_immediate_processing(schema)
# Log do evento
log_upload_event(message)
# Uso
mq = DataSnapMessageQueue('amqp://localhost')
mq.consume_upload_events(process_upload_message)
import schedule
from datetime import datetime, timedelta
from jinja2 import Template
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from email.mime.base import MIMEBase
class AutomatedReportSystem:
def __init__(self, datasnap_client, email_config):
self.client = datasnap_client
self.email_config = email_config
def generate_daily_summary(self, schema_slug: str):
"""Gera relatório diário"""
end_date = datetime.now()
start_date = end_date - timedelta(days=1)
# Consultas para o relatório
queries = {
'total_records': {
"select": ["count(*) as total"],
"where": [
{"field": "created_at", "op": ">=", "value": start_date.strftime('%Y-%m-%d')},
{"field": "created_at", "op": "<", "value": end_date.strftime('%Y-%m-%d')}
]
},
'by_category': {
"select": ["categoria", "count(*) as total"],
"where": [
{"field": "created_at", "op": ">=", "value": start_date.strftime('%Y-%m-%d')},
{"field": "created_at", "op": "<", "value": end_date.strftime('%Y-%m-%d')}
],
"group_by": ["categoria"],
"order_by": [{"field": "total", "direction": "desc"}]
}
}
report_data = {}
for key, query in queries.items():
try:
result = self.client.query(schema_slug, query)
report_data[key] = result['data']
except Exception as e:
print(f"Erro na consulta {key}: {e}")
report_data[key] = []
# Gerar HTML do relatório
html_report = self.render_report_template(report_data, start_date, end_date)
# Enviar por email
self.send_email_report(
subject=f"Relatório Diário - {schema_slug} - {start_date.strftime('%Y-%m-%d')}",
html_content=html_report
)
return report_data
def render_report_template(self, data, start_date, end_date):
"""Renderiza template do relatório"""
template_html = """
<!DOCTYPE html>
<html>
<head>
<title>Relatório DataSnap</title>
<style>
body { font-family: Arial, sans-serif; }
table { border-collapse: collapse; width: 100%; }
th, td { border: 1px solid #ddd; padding: 8px; text-align: left; }
th { background-color: #f2f2f2; }
.summary { background-color: #e7f3ff; padding: 15px; margin: 10px 0; }
</style>
</head>
<body>
<h1>Relatório DataSnap</h1>
<div class="summary">
<h2>Resumo do Período</h2>
<p><strong>Período:</strong> {{ start_date }} a {{ end_date }}</p>
<p><strong>Total de Registros:</strong> {{ total_records }}</p>
</div>
<h2>Distribuição por Categoria</h2>
<table>
<thead>
<tr><th>Categoria</th><th>Total</th></tr>
</thead>
<tbody>
{% for item in by_category %}
<tr>
<td>{{ item[0] }}</td>
<td>{{ item[1] }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</body>
</html>
"""
template = Template(template_html)
return template.render(
start_date=start_date.strftime('%Y-%m-%d'),
end_date=end_date.strftime('%Y-%m-%d'),
total_records=data['total_records'][0][0] if data['total_records'] else 0,
by_category=data['by_category']
)
def send_email_report(self, subject: str, html_content: str):
"""Envia relatório por email"""
try:
msg = MIMEMultipart('alternative')
msg['Subject'] = subject
msg['From'] = self.email_config['from']
msg['To'] = ', '.join(self.email_config['to'])
html_part = MIMEText(html_content, 'html')
msg.attach(html_part)
# Enviar email
server = smtplib.SMTP(self.email_config['smtp_host'], self.email_config['smtp_port'])
if self.email_config.get('use_tls'):
server.starttls()
if self.email_config.get('username'):
server.login(self.email_config['username'], self.email_config['password'])
server.send_message(msg)
server.quit()
print(f"Relatório enviado: {subject}")
except Exception as e:
print(f"Erro ao enviar email: {e}")
def schedule_reports(self, schema_slug: str):
"""Agenda relatórios automáticos"""
# Relatório diário às 08:00
schedule.every().day.at("08:00").do(
self.generate_daily_summary,
schema_slug
)
# Relatório semanal às segundas-feiras às 09:00
schedule.every().monday.at("09:00").do(
self.generate_weekly_summary,
schema_slug
)
print("Relatórios agendados configurados")
# Configuração e uso
email_config = {
'smtp_host': 'smtp.gmail.com',
'smtp_port': 587,
'use_tls': True,
'username': 'seu-email@gmail.com',
'password': 'sua-senha',
'from': 'relatorios@empresa.com',
'to': ['gerente@empresa.com', 'analista@empresa.com']
}
report_system = AutomatedReportSystem(datasnap_client, email_config)
report_system.schedule_reports("vendas")
# Executar agendador
while True:
schedule.run_pending()
time.sleep(60)
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'data-team',
'depends_on_past': False,
'start_date': datetime(2024, 8, 1),
'email_on_failure': True,
'email_on_retry': False,
'retries': 2,
'retry_delay': timedelta(minutes=5)
}
dag = DAG(
'datasnap_processing_pipeline',
default_args=default_args,
description='Pipeline completo DataSnap',
schedule_interval='@hourly',
catchup=False
)
# Tarefa 1: Descobrir arquivos
discover_files = BashOperator(
task_id='discover_files',
bash_command='python /scripts/discover_files.py',
dag=dag
)
# Tarefa 2: Validar arquivos
def validate_files(**context):
# Lógica de validação
pass
validate = PythonOperator(
task_id='validate_files',
python_callable=validate_files,
dag=dag
)
# Tarefa 3: Upload para DataSnap
def upload_to_datasnap(**context):
# Lógica de upload
pass
upload = PythonOperator(
task_id='upload_to_datasnap',
python_callable=upload_to_datasnap,
dag=dag
)
# Tarefa 4: Processar dados
def trigger_processing(**context):
# Lógica de processamento
pass
process = PythonOperator(
task_id='trigger_processing',
python_callable=trigger_processing,
dag=dag
)
# Tarefa 5: Gerar relatório
def generate_report(**context):
# Lógica de relatório
pass
report = PythonOperator(
task_id='generate_report',
python_callable=generate_report,
dag=dag
)
# Definir dependências
discover_files >> validate >> upload >> process >> report