Celery

Conceptos Fundamentales

¿Qué es Celery?

  • Task Runner asíncrono para Python
  • Gestiona colas de tareas (task queues) en segundo plano
  • Ideal para operaciones que no deben bloquear la respuesta principal
  • Soporta balanceo de carga entre workers
  • Documentación Oficial
  • Docker Python Image
  • Guía Celery OpenWebinars

    Arquitectura Core

  • Client: Aplicación que envía tareas
  • Broker: Sistema de mensajería intermediario
  • Worker: Procesa las tareas de la cola
  • Backend: Almacena resultados de tareas

Componentes Esenciales

Brokers Soportados

  • Redis: Popular para desarrollo y producción pequeña/mediana
  • RabbitMQ: Robustez empresarial, más características avanzadas
  • Amazon SQS, Azure Service Bus para entornos multinube

Características Clave

  • Tareas asíncronas para operaciones no críticas en tiempo real
  • Programación de tareas con ejecución diferida
  • Monitoreo y administración de workers
  • Retry automático para fallos temporales
  • Agrupación de tareas (chords, chains)

Monitoreo y Debugging

Herramientas

  • Flower: Monitor web para Celery
  • Logging estructurado
  • Métricas de rendimiento
  • Tracing distribuido

    Casos de Uso Comunes

Ejemplos Prácticos

  • Envío de correo tras registro de usuario
  • Procesamiento de imágenes/video en segundo plano
  • Análisis de datos pesados
  • Limpieza periódica de bases de datos
  • Generación de reportes complejos
  • Web scraping programado

Configuración en Producción

Consideraciones para Producción

# Configuración básica para producción
app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    timezone='Europe/Madrid',
    enable_utc=True,
    task_track_started=True,
    task_time_limit=30*60  # 30 minutos
)

Escalabilidad

  • Múltiples workers para paralelización
  • Prefetching configurable para optimizar recursos
  • Routing de tareas a colas específicas
  • Prioridades en procesamiento

Dockerización

Configuración con Docker

FROM python:3.11-slim

WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt

COPY . .
CMD ["celery", "-A", "proyecto", "worker", "--loglevel=info"]

Orquestación

  • Supervisord para gestión de procesos
  • Docker Compose para desarrollo local
  • Kubernetes para escalabilidad en producción

Automatización

Integración con Automatizacion y Build

  • CI/CD pipelines que ejecutan tareas Celery
  • Despliegue automático de workers
  • Health checks para monitoreo
  • Escalado automático basado en carga

Patrones Avanzados

Workflows Complejos

# Cadenas de tareas
chain(task1.s() | task2.s() | task3.s()).apply_async()

# Grupos paralelos
group([task1.s(i) for i in range(10)]).apply_async()

# Chord (grupo + callback)
chord(header=[task1.s(), task2.s()])(callback.s()).apply_async()

Seguridad y Robustez

  • Acknowledgment de tareas procesadas
  • Dead letter queues para tareas fallidas
  • Rate limiting para control de recursos
  • Autenticación en brokers

Task Runners: Concepto y Diferencias con Task Queues

¿Qué es un Task Runner?

Un Task Runner es una herramienta o framework que automatiza la ejecución de tareas, scripts y comandos. A diferencia de las Task Queues, los Task Runners se enfocan en la orquestación y ejecución secuencial/paralela de tareas definidas, generalmente en el mismo entorno o máquina.

Diferencias Clave: Task Runners vs Task Queues

Task Runners

  • Ejecución local o directa
  • Orquestación de tareas definidas
  • Dependencias entre tasks
  • Build processes y automatización
  • Desarrollo y deployment

Task Queues

  • Ejecución asíncrona distribuida
  • Procesamiento en background
  • Mensajería y colas
  • Workers distribuidos
  • Escalabilidad horizontal

Task Runners Populares

1. Make (El Clásico)

# Makefile ejemplo
.PHONY: test build deploy

install:
	pip install -r requirements.txt

test:
	pytest tests/ -v

build:
	docker build -t mi-app .

deploy: test build
	docker push mi-app:latest

clean:
	rm -rf __pycache__ *.pyc

2. Invoke (Python)

# tasks.py
from invoke import task

@task
def test(c, coverage=False):
    """Ejecutar tests"""
    cmd = "pytest tests/"
    if coverage:
        cmd += " --cov=app"
    c.run(cmd)

@task
def deploy(c, environment='staging'):
    """Desplegar aplicación"""
    c.run(f"ansible-playbook deploy.yml -i {environment}")

@task
def setup(c):
    """Setup completo del proyecto"""
    test(c)
    c.run("docker-compose build")
    c.run("docker-compose up -d")

3. NPM Scripts (JavaScript)

{
  "scripts": {
    "dev": "webpack serve --mode development",
    "build": "webpack --mode production",
    "test": "jest",
    "lint": "eslint src/",
    "deploy": "npm run build && npm run test && aws s3 sync dist/ s3://my-bucket"
  }
}

4. Rake (Ruby)

# Rakefile
desc "Ejecutar tests"
task :test do
  sh "bundle exec rspec"
end

desc "Desplegar a producción"
task :deploy => :test do
  sh "cap production deploy"
end

desc "Setup del ambiente"
task :setup do
  sh "bundle install"
  sh "rails db:setup"
end

Casos de Uso Típicos

Desarrollo Local

# tasks/dev_tasks.py
@task
def dev_server(c, port=8000):
    """Iniciar servidor de desarrollo"""
    c.run(f"python manage.py runserver {port}")

@task
def migrations(c):
    """Crear y aplicar migraciones"""
    c.run("python manage.py makemigrations")
    c.run("python manage.py migrate")

@task
def seed_data(c):
    """Poblar base de datos con datos de prueba"""
    c.run("python manage.py loaddata fixtures/*.json")

Build y Compilación

@task
def build_frontend(c):
    """Compilar assets frontend"""
    c.run("npm run build")
    c.run("python manage.py collectstatic --noinput")

@task
def build_docker(c, tag="latest"):
    """Construir imagen Docker"""
    c.run(f"docker build -t myapp:{tag} .")
    c.run(f"docker tag myapp:{tag} registry.company.com/myapp:{tag}")

CI/CD Pipelines

@task
def ci_full(c):
    """Pipeline completo de CI"""
    test(c)
    lint(c)
    security_scan(c)
    build_docker(c)
    push_docker(c)

@task
def lint(c):
    """Análisis de código"""
    c.run("flake8 app/")
    c.run("black --check app/")
    c.run("mypy app/")

@task
def security_scan(c):
    """Escaneo de seguridad"""
    c.run("bandit -r app/")
    c.run("safety check")

Características Avanzadas

Dependencias entre Tareas

@task
def clean(c):
    """Limpiar archivos temporales"""
    c.run("rm -rf build/ dist/ *.egg-info")

@task(pre=[clean])
def build(c):
    """Construir paquete"""
    c.run("python setup.py sdist bdist_wheel")

@task(post=[build])
def release(c):
    """Crear release completo"""
    c.run("git tag v1.0.0")
    c.run("git push --tags")

Ejecución Condicional

@task
def deploy(c, environment=None):
    """Despliegue condicional por ambiente"""
    if not environment:
        environment = 'staging'
    if environment == 'production':
        confirm = input("¿Desplegar a producción? (y/n): ")
        if confirm.lower() != 'y':
            return
    c.run(f"ansible-playbook deploy-{environment}.yml")

Parallel Execution

from invoke import run

@task
def test_parallel(c):
    """Ejecutar tests en paralelo"""
    commands = [
        "pytest tests/unit/",
        "pytest tests/integration/", 
        "pytest tests/api/"
    ]
    # Ejecución paralela (depende del task runner)
    for cmd in commands:
        run(cmd, asynchronous=True)

Integración con Herramientas

Docker Integration

@task
def docker_compose(c, command="up"):
    """Gestionar Docker Compose"""
    if command == "build":
        c.run("docker-compose build")
    elif command == "up":
        c.run("docker-compose up -d")
    elif command == "down":
        c.run("docker-compose down")
    elif command == "logs":
        c.run("docker-compose logs -f")

@task
def db_migrate(c):
    """Migrar base de datos en contenedor"""
    c.run("docker-compose exec web python manage.py migrate")

Cloud Integration

@task
def deploy_aws(c, stack_name="my-app"):
    """Desplegar a AWS"""
    c.run(f"aws cloudformation deploy --stack-name {stack_name} --template-file template.yml")
    c.run(f"aws s3 sync dist/ s3://{stack_name}-bucket/")

@task
def kubernetes_deploy(c, namespace="default"):
    """Desplegar a Kubernetes"""
    c.run(f"kubectl apply -f k8s/ -n {namespace}")
    c.run(f"kubectl rollout status deployment/app -n {namespace}")

Task Runners Especializados

Para Data Science

# tasks/ds_tasks.py
@task
def download_dataset(c, dataset_name):
    """Descargar dataset"""
    c.run(f"kaggle datasets download {dataset_name}")
    c.run(f"unzip {dataset_name}.zip")

@task
def train_model(c, model_type="random_forest"):
    """Entrenar modelo de ML"""
    c.run(f"python src/train.py --model {model_type}")

@task
def evaluate_model(c):
    """Evaluar modelo entrenado"""
    c.run("python src/evaluate.py")
    c.run("python src/generate_metrics.py")

Para Documentación

@task
def docs_build(c):
    """Construir documentación"""
    c.run("sphinx-build docs/ build/docs")
    c.run("cp -r build/docs/ public/")

@task
def docs_serve(c, port=8000):
    """Servir documentación localmente"""
    c.run(f"python -m http.server {port} --directory public/")

Buenas Prácticas

Task Naming Conventions

# ✅ Nombres descriptivos y acciones
@task
def database_backup(c): ...

@task 
def deploy_staging(c): ...

@task
def run_migrations(c): ...

# ❌ Nombres ambiguos
@task
def do_stuff(c): ...

@task
def thing(c): ...

Error Handling

@task
def safe_deploy(c):
    """Despliegue con manejo de errores"""
    try:
        c.run("git pull origin main")
        c.run("docker-compose build")
        c.run("docker-compose up -d")
        print("✅ Despliegue exitoso")
    except Exception as e:
        print(f"❌ Error en despliegue: {e}")
        # Rollback automático
        c.run("docker-compose down")
        raise

Configuration Management

from invoke import Config

# Configuración por ambiente
config = Config({
    'dev': {
        'database_url': 'sqlite:///dev.db',
        'debug': True
    },
    'prod': {
        'database_url': 'postgresql://...',
        'debug': False
    }
})

@task
def run_app(c, env='dev'):
    """Ejecutar app con configuración específica"""
    env_config = config[env]
    c.run(f"DATABASE_URL={env_config['database_url']} python app.py")

Integración con Task Queues

Combinando Ambos Patrones

@task
def schedule_maintenance(c):
    """Programar tareas de mantenimiento vía task queue"""
    # Usar task runner para orquestar
    c.run("python manage.py collectstatic --noinput")
    # Programar tareas asíncronas
    from celery_app import app
    app.send_task('cleanup_old_files')
    app.send_task('generate_reports')
    app.send_task('update_search_index')

Hybrid Approach

@task
def data_pipeline(c):
    """Pipeline híbrido: task runner + task queues"""
    # Fase 1: Task runner (preparación)
    c.run("python scripts/download_data.py")
    c.run("python scripts/validate_schema.py")
    # Fase 2: Task queues (procesamiento distribuido)
    from celery_app import app
    chain(
        process_raw_data.s(),
        train_models.s(),
        evaluate_results.s()
    ).delay()

Los Task Runners son esenciales para automatizar flujos de trabajo de desarrollo, build y deployment, complementando (no reemplazando) a las Task Queues en el ecosistema de automatización moderno.