Guía Práctica Completa de Celery

Configuración Inicial

Instalación y Setup Básico

# Instalación
pip install celery redis

# Verificar servicios
redis-server --version
celery --version

Aplicación Básica

# celery_app.py
from celery import Celery
import time

# Configuración de la app
app = Celery(
    'mi_proyecto',
    broker='redis://localhost:6379/0',
    backend='redis://localhost:6379/0',
    include=['proyecto.tareas']
)

# Configuración general
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Europe/Madrid',
    enable_utc=True,
)

# Tarea de ejemplo
@app.task(bind=True)
def tarea_larga(self, segundos):
    """Tarea que simula procesamiento largo"""
    self.update_state(state='PROGRESS', meta={'porcentaje': 0})
    for i in range(segundos):
        time.sleep(1)
        porcentaje = ((i + 1) / segundos) * 100
        self.update_state(
            state='PROGRESS', 
            meta={'porcentaje': round(porcentaje, 2)}
        )
    return f'Tarea completada después de {segundos} segundos'

Ejemplos Completos por Categoría

1. Sistema de Notificaciones por Email

# tareas/emails.py
from celery_app import app
from smtplib import SMTP
from email.mime.text import MIMEText
import jinja2

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def enviar_email_bienvenida(self, usuario_email, usuario_nombre):
    """Envía email de bienvenida con reintentos automáticos"""
    try:
        # Plantilla del email
        template = f"""
        <h1>Bienvenido {usuario_nombre}!</h1>
        <p>Gracias por registrarte en nuestro servicio.</p>
        """
        # Configurar mensaje
        mensaje = MIMEText(template, 'html')
        mensaje['Subject'] = 'Bienvenida a Nuestra Plataforma'
        mensaje['From'] = 'noreply@empresa.com'
        mensaje['To'] = usuario_email
        # Enviar email
        with SMTP('smtp.empresa.com', 587) as servidor:
            servidor.starttls()
            servidor.login('usuario', 'password')
            servidor.send_message(mensaje)
        return f'Email enviado a {usuario_email}'
    except Exception as exc:
        # Reintento automático en caso de error
        raise self.retry(exc=exc)

@app.task
def enviar_newsletter_masiva(lista_emails, contenido):
    """Envía newsletter a lista masiva de emails"""
    resultados = []
    for email in lista_emails:
        # Encadenar envíos individuales
        resultado = enviar_email_bienvenida.delay(email, 'Cliente')
        resultados.append(resultado.id)
    return f'Newsletter programada para {len(resulta)} destinatarios'

2. Procesamiento de Archivos

# tareas/archivos.py
from celery_app import app
from PIL import Image
import pandas as pd
import os

@app.task(bind=True)
def procesar_imagen(self, ruta_imagen, operaciones):
    """Procesa imagen con múltiples operaciones"""
    try:
        with Image.open(ruta_imagen) as img:
            self.update_state(state='PROGRESS', meta={'etapa': 'Abriendo imagen'})
            # Aplicar operaciones
            for i, operacion in enumerate(operaciones):
                self.update_state(
                    state='PROGRESS', 
                    meta={'etapa': f'Procesando: {operacion}', 'progreso': (i/len(operaciones))*100}
                )
                if operacion['tipo'] == 'redimensionar':
                    img = img.resize(operacion['dimensiones'])
                elif operacion['tipo'] == 'convertir':
                    img = img.convert(operacion['formato'])
                elif operacion['tipo'] == 'filtro':
                    img = img.filter(operacion['filtro'])
            # Guardar resultado
            nombre_base = os.path.splitext(ruta_imagen)[0]
            ruta_salida = f"{nombre_base}_procesada.jpg"
            img.save(ruta_salida, quality=85)
            return {
                'estado': 'completado',
                'archivo_salida': ruta_salida,
                'tamaño_original': os.path.getsize(ruta_imagen),
                'tamaño_final': os.path.getsize(ruta_salida)
            }
    except Exception as e:
        return {'estado': 'error', 'mensaje': str(e)}

@app.task
def procesar_csv_masivo(ruta_csv, operaciones):
    """Procesa archivo CSV grande en chunks"""
    chunksize = 10000
    resultados = []
    for i, chunk in enumerate(pd.read_csv(ruta_csv, chunksize=chunksize)):
        # Procesar cada chunk en paralelo
        resultado = procesar_chunk_csv.delay(chunk.to_dict(), operaciones, i)
        resultados.append(resultado)
    return {
        'total_chunks': len(resultados),
        'chunk_ids': [r.id for r in resultados]
    }

@app.task
def procesar_chunk_csv(chunk_data, operaciones, chunk_num):
    """Procesa un chunk individual de CSV"""
    df = pd.DataFrame(chunk_data)
    # Aplicar operaciones
    for op in operaciones:
        if op['tipo'] == 'filtro':
            df = df.query(op['condicion'])
        elif op['tipo'] == 'transformacion':
            df[op['columna']] = df[op['columna']].apply(op['funcion'])
    return {
        'chunk_num': chunk_num,
        'filas_procesadas': len(df),
        'columnas': list(df.columns)
    }

3. Web Scraping Distribuido

# tareas/scraping.py
from celery_app import app
import requests
from bs4 import BeautifulSoup
import json

@app.task(bind=True, rate_limit='10/m')
def scrapear_pagina(self, url, selectores):
    """Scraping de página individual con rate limiting"""
    try:
        headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        }
        respuesta = requests.get(url, headers=headers, timeout=10)
        respuesta.raise_for_status()
        soup = BeautifulSoup(respuesta.content, 'html.parser')
        datos = {}
        for clave, selector in selectores.items():
            elementos = soup.select(selector)
            datos[clave] = [elem.get_text(strip=True) for elem in elementos]
        return {
            'url': url,
            'estado': 'éxito',
            'datos': datos,
            'timestamp': str(app.now())
        }
    except Exception as e:
        return {
            'url': url,
            'estado': 'error',
            'error': str(e)
        }

@app.task
def scrapear_sitio_completo(urls_config):
    """Coordina scraping de múltiples URLs"""
    resultados = []
    for config in urls_config:
        resultado = scrapear_pagina.delay(
            config['url'], 
            config['selectores']
        )
        resultados.append(resultado)
    # Esperar y recopilar todos los resultados
    datos_completos = []
    for resultado in resultados:
        try:
            datos = resultado.get(timeout=30)
            datos_completos.append(datos)
        except Exception as e:
            datos_completos.append({'error': str(e)})
    return datos_completos

4. Sistema de Reportes

# tareas/reportes.py
from celery_app import app
import pandas as pd
import matplotlib.pyplot as plt
import io
import base64

@app.task(bind=True)
def generar_reporte_ventas(self, fecha_inicio, fecha_fin, formato='pdf'):
    """Genera reporte de ventas con gráficos"""
    self.update_state(state='PROGRESS', meta={'etapa': 'Obteniendo datos'})
    # Simular obtención de datos
    datos_ventas = obtener_ventas_periodo(fecha_inicio, fecha_fin)
    df = pd.DataFrame(datos_ventas)
    self.update_state(state='PROGRESS', meta={'etapa': 'Procesando datos'})
    # Análisis de datos
    ventas_por_dia = df.groupby('fecha')['monto'].sum()
    productos_populares = df['producto'].value_counts().head(10)
    self.update_state(state='PROGRESS', meta={'etapa': 'Generando gráficos'})
    # Crear visualizaciones
    fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 12))
    # Gráfico de ventas por día
    ventas_por_dia.plot(ax=ax1, kind='line', title='Ventas por Día')
    ax1.set_ylabel('Monto de Ventas')
    # Gráfico de productos populares
    productos_populares.plot(ax=ax2, kind='bar', title='Productos Más Vendidos')
    ax2.set_ylabel('Cantidad Vendida')
    plt.tight_layout()
    # Convertir a base64 para enviar
    buffer = io.BytesIO()
    plt.savefig(buffer, format='png', dpi=150)
    buffer.seek(0)
    imagen_base64 = base64.b64encode(buffer.getvalue()).decode()
    # Métricas del reporte
    metricas = {
        'total_ventas': df['monto'].sum(),
        'transacciones': len(df),
        'promedio_venta': df['monto'].mean(),
        'periodo': f'{fecha_inicio} a {fecha_fin}'
    }
    return {
        'metricas': metricas,
        'grafico': imagen_base64,
        'formato': formato
    }

def obtener_ventas_periodo(fecha_inicio, fecha_fin):
    """Función simulada para obtener datos de ventas"""
    # En implementación real, conectaría a base de datos
    return [
        {'fecha': '2024-01-01', 'producto': 'A', 'monto': 100},
        {'fecha': '2024-01-01', 'producto': 'B', 'monto': 150},
        # ... más datos
    ]

Configuración de Producción

Docker Compose para Desarrollo

# docker-compose.yml
version: '3.8'

services:
  redis:
    image: redis:7-alpine
    ports:
      - "6379:6379"
    volumes:
      - redis_data:/data

  worker:
    build: .
    command: celery -A celery_app worker --loglevel=info --concurrency=4
    volumes:
      - .:/app
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0
      - CELERY_RESULT_BACKEND=redis://redis:6379/0

  beat:
    build: .
    command: celery -A celery_app beat --loglevel=info
    volumes:
      - .:/app
    depends_on:
      - redis
    environment:
      - CELERY_BROKER_URL=redis://redis:6379/0

  flower:
    build: .
    command: celery -A celery_app flower --port=5555
    ports:
      - "5555:5555"
    depends_on:
      - redis
      - worker

volumes:
  redis_data:

Configuración Avanzada de Worker

# config/production.py
class ProductionConfig:
    # Optimización para producción
    worker_prefetch_multiplier = 1
    worker_max_tasks_per_child = 1000
    task_acks_late = True
    task_reject_on_worker_lost = True
    # Queue routing
    task_routes = {
        'tareas.emails.*': {'queue': 'emails'},
        'tareas.archivos.*': {'queue': 'procesamiento'},
        'tareas.scraping.*': {'queue': 'scraping'},
        'tareas.reportes.*': {'queue': 'reportes'},
    }
    # Rate limiting
    task_annotations = {
        'tareas.scraping.scrapear_pagina': {'rate_limit': '10/m'},
        'tareas.emails.*': {'rate_limit': '100/m'},
    }

# Aplicar configuración
app.config_from_object(ProductionConfig)

Monitoreo y Management

Script de Monitoreo

# monitoreo.py
from celery_app import app
from celery.result import AsyncResult

def verificar_estado_tarea(task_id):
    """Verifica el estado de una tarea específica"""
    resultado = AsyncResult(task_id, app=app)
    return {
        'id': task_id,
        'estado': resultado.state,
        'resultado': resultado.result if resultado.ready() else None,
        'exito': resultado.successful(),
        'info': resultado.info if hasattr(resultado, 'info') else None
    }

def estadisticas_colas():
    """Obtiene estadísticas de las colas"""
    inspector = app.control.inspect()
    return {
        'tareas_activas': inspector.active(),
        'tareas_pendientes': inspector.reserved(),
        'workers_activos': inspector.stats(),
        'colas_registradas': inspector.registered_queues()
    }

# Ejemplo de uso
if __name__ == '__main__':
    # Ejecutar tarea de ejemplo
    tarea = app.send_task('tarea_larga', args=[5])
    print(f"Tarea enviada: {tarea.id}")
    # Verificar estado
    import time
    time.sleep(2)
    estado = verificar_estado_tarea(tarea.id)
    print(f"Estado: {estado}")

Comandos Útiles de Terminal

# Iniciar worker con múltiples colas
celery -A celery_app worker --loglevel=info --queues=emails,procesamiento,scraping

# Worker específico para emails
celery -A celery_app worker --loglevel=info --queues=emails --concurrency=2

# Monitoreo con Flower
celery -A celery_app flower --port=5555

# Inspeccionar colas
celery -A celery_app inspect active
celery -A celery_app inspect stats

# Purgar colas (cuidado!)
celery -A celery_app purge

Patrones Comunes Resueltos

Chain (Ejecución Secuencial)

from celery import chain

# Tareas que se ejecutan en secuencia
flujo = chain(
    obtener_datos.s(),
    procesar_datos.s(),
    guardar_resultados.s()
)

resultado = flujo.delay()

Group (Ejecución Paralela)

from celery import group

# Múltiples tareas en paralelo
tareas_paralelas = group(
    procesar_archivo.s(archivo) for archivo in lista_archivos
)

resultado = tareas_paralelas.delay()

Chord (Paralelo + Callback)

from celery import chord

# Procesamiento paralelo seguido de agregación
header = [procesar_item.s(item) for item in items]
callback = agregar_resultados.s()

resultado = chord(header)(callback)

Retry con Exponential Backoff

@app.task(bind=True, max_retries=5)
def tarea_con_reintentos(self, url):
    try:
        response = requests.get(url, timeout=10)
        return response.json()
    except requests.exceptions.RequestException as exc:
        # Exponential backoff: 2^retry segundos
        countdown = 2 ** self.request.retries
        raise self.retry(countdown=countdown, exc=exc)