Automatizacion y Build
Celery
- devops
- python
- Docker
- Backend
- Automatizacion y Build
- scraping
- conceptos
- uso para produccion
- tareas asincronas
- task runner
- balanceo de carga
- task queues
- multinube
- brokers
- docs
- Celery Conceptos Avanzados y Configuración Profesional
- Celery Temas Avanzados Varios
- Guía Práctica Completa de Celery
- task queues
- Task Queues Temas Avanzados y Casos de Uso Especializados
-
Task Queues Patrones Empresariales y Optimizaciones
Celery Task Queue para Aplicaciones Distribuidas
Conceptos Fundamentales
¿Qué es Celery?
- Task Runner asíncrono para Python
- Gestiona colas de tareas (task queues) en segundo plano
- Ideal para operaciones que no deben bloquear la respuesta principal
- Soporta balanceo de carga entre workers
- Documentación Oficial
- Docker Python Image
- Guía Celery OpenWebinars
Arquitectura Core
- Client: Aplicación que envía tareas
- Broker: Sistema de mensajería intermediario
- Worker: Procesa las tareas de la cola
- Backend: Almacena resultados de tareas
Componentes Esenciales
Brokers Soportados
- Redis: Popular para desarrollo y producción pequeña/mediana
- RabbitMQ: Robustez empresarial, más características avanzadas
- Amazon SQS, Azure Service Bus para entornos multinube
Características Clave
- Tareas asíncronas para operaciones no críticas en tiempo real
- Programación de tareas con ejecución diferida
- Monitoreo y administración de workers
- Retry automático para fallos temporales
- Agrupación de tareas (chords, chains)
Monitoreo y Debugging
Herramientas
- Flower: Monitor web para Celery
- Logging estructurado
- Métricas de rendimiento
- Tracing distribuido
Casos de Uso Comunes
Ejemplos Prácticos
- Envío de correo tras registro de usuario
- Procesamiento de imágenes/video en segundo plano
- Análisis de datos pesados
- Limpieza periódica de bases de datos
- Generación de reportes complejos
- Web scraping programado
Configuración en Producción
Consideraciones para Producción
# Configuración básica para producción
app.conf.update(
task_serializer='json',
accept_content=['json'],
result_serializer='json',
timezone='Europe/Madrid',
enable_utc=True,
task_track_started=True,
task_time_limit=30*60 # 30 minutos
)
Escalabilidad
- Múltiples workers para paralelización
- Prefetching configurable para optimizar recursos
- Routing de tareas a colas específicas
- Prioridades en procesamiento
Dockerización
Configuración con Docker
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY . .
CMD ["celery", "-A", "proyecto", "worker", "--loglevel=info"]
Orquestación
- Supervisord para gestión de procesos
- Docker Compose para desarrollo local
- Kubernetes para escalabilidad en producción
Automatización
Integración con Automatizacion y Build
- CI/CD pipelines que ejecutan tareas Celery
- Despliegue automático de workers
- Health checks para monitoreo
- Escalado automático basado en carga
Patrones Avanzados
Workflows Complejos
# Cadenas de tareas
chain(task1.s() | task2.s() | task3.s()).apply_async()
# Grupos paralelos
group([task1.s(i) for i in range(10)]).apply_async()
# Chord (grupo + callback)
chord(header=[task1.s(), task2.s()])(callback.s()).apply_async()
Seguridad y Robustez
- Acknowledgment de tareas procesadas
- Dead letter queues para tareas fallidas
- Rate limiting para control de recursos
- Autenticación en brokers
Task Runners: Concepto y Diferencias con Task Queues
¿Qué es un Task Runner?
Un Task Runner es una herramienta o framework que automatiza la ejecución de tareas, scripts y comandos. A diferencia de las Task Queues, los Task Runners se enfocan en la orquestación y ejecución secuencial/paralela de tareas definidas, generalmente en el mismo entorno o máquina.
Diferencias Clave: Task Runners vs Task Queues
Task Runners
- Ejecución local o directa
- Orquestación de tareas definidas
- Dependencias entre tasks
- Build processes y automatización
- Desarrollo y deployment
Task Queues
- Ejecución asíncrona distribuida
- Procesamiento en background
- Mensajería y colas
- Workers distribuidos
- Escalabilidad horizontal
Task Runners Populares
1. Make (El Clásico)
# Makefile ejemplo
.PHONY: test build deploy
install:
pip install -r requirements.txt
test:
pytest tests/ -v
build:
docker build -t mi-app .
deploy: test build
docker push mi-app:latest
clean:
rm -rf __pycache__ *.pyc
2. Invoke (Python)
# tasks.py
from invoke import task
@task
def test(c, coverage=False):
"""Ejecutar tests"""
cmd = "pytest tests/"
if coverage:
cmd += " --cov=app"
c.run(cmd)
@task
def deploy(c, environment='staging'):
"""Desplegar aplicación"""
c.run(f"ansible-playbook deploy.yml -i {environment}")
@task
def setup(c):
"""Setup completo del proyecto"""
test(c)
c.run("docker-compose build")
c.run("docker-compose up -d")
3. NPM Scripts (JavaScript)
{
"scripts": {
"dev": "webpack serve --mode development",
"build": "webpack --mode production",
"test": "jest",
"lint": "eslint src/",
"deploy": "npm run build && npm run test && aws s3 sync dist/ s3://my-bucket"
}
}
4. Rake (Ruby)
# Rakefile
desc "Ejecutar tests"
task :test do
sh "bundle exec rspec"
end
desc "Desplegar a producción"
task :deploy => :test do
sh "cap production deploy"
end
desc "Setup del ambiente"
task :setup do
sh "bundle install"
sh "rails db:setup"
end
Casos de Uso Típicos
Desarrollo Local
# tasks/dev_tasks.py
@task
def dev_server(c, port=8000):
"""Iniciar servidor de desarrollo"""
c.run(f"python manage.py runserver {port}")
@task
def migrations(c):
"""Crear y aplicar migraciones"""
c.run("python manage.py makemigrations")
c.run("python manage.py migrate")
@task
def seed_data(c):
"""Poblar base de datos con datos de prueba"""
c.run("python manage.py loaddata fixtures/*.json")
Build y Compilación
@task
def build_frontend(c):
"""Compilar assets frontend"""
c.run("npm run build")
c.run("python manage.py collectstatic --noinput")
@task
def build_docker(c, tag="latest"):
"""Construir imagen Docker"""
c.run(f"docker build -t myapp:{tag} .")
c.run(f"docker tag myapp:{tag} registry.company.com/myapp:{tag}")
CI/CD Pipelines
@task
def ci_full(c):
"""Pipeline completo de CI"""
test(c)
lint(c)
security_scan(c)
build_docker(c)
push_docker(c)
@task
def lint(c):
"""Análisis de código"""
c.run("flake8 app/")
c.run("black --check app/")
c.run("mypy app/")
@task
def security_scan(c):
"""Escaneo de seguridad"""
c.run("bandit -r app/")
c.run("safety check")
Características Avanzadas
Dependencias entre Tareas
@task
def clean(c):
"""Limpiar archivos temporales"""
c.run("rm -rf build/ dist/ *.egg-info")
@task(pre=[clean])
def build(c):
"""Construir paquete"""
c.run("python setup.py sdist bdist_wheel")
@task(post=[build])
def release(c):
"""Crear release completo"""
c.run("git tag v1.0.0")
c.run("git push --tags")
Ejecución Condicional
@task
def deploy(c, environment=None):
"""Despliegue condicional por ambiente"""
if not environment:
environment = 'staging'
if environment == 'production':
confirm = input("¿Desplegar a producción? (y/n): ")
if confirm.lower() != 'y':
return
c.run(f"ansible-playbook deploy-{environment}.yml")
Parallel Execution
from invoke import run
@task
def test_parallel(c):
"""Ejecutar tests en paralelo"""
commands = [
"pytest tests/unit/",
"pytest tests/integration/",
"pytest tests/api/"
]
# Ejecución paralela (depende del task runner)
for cmd in commands:
run(cmd, asynchronous=True)
Integración con Herramientas
Docker Integration
@task
def docker_compose(c, command="up"):
"""Gestionar Docker Compose"""
if command == "build":
c.run("docker-compose build")
elif command == "up":
c.run("docker-compose up -d")
elif command == "down":
c.run("docker-compose down")
elif command == "logs":
c.run("docker-compose logs -f")
@task
def db_migrate(c):
"""Migrar base de datos en contenedor"""
c.run("docker-compose exec web python manage.py migrate")
Cloud Integration
@task
def deploy_aws(c, stack_name="my-app"):
"""Desplegar a AWS"""
c.run(f"aws cloudformation deploy --stack-name {stack_name} --template-file template.yml")
c.run(f"aws s3 sync dist/ s3://{stack_name}-bucket/")
@task
def kubernetes_deploy(c, namespace="default"):
"""Desplegar a Kubernetes"""
c.run(f"kubectl apply -f k8s/ -n {namespace}")
c.run(f"kubectl rollout status deployment/app -n {namespace}")
Task Runners Especializados
Para Data Science
# tasks/ds_tasks.py
@task
def download_dataset(c, dataset_name):
"""Descargar dataset"""
c.run(f"kaggle datasets download {dataset_name}")
c.run(f"unzip {dataset_name}.zip")
@task
def train_model(c, model_type="random_forest"):
"""Entrenar modelo de ML"""
c.run(f"python src/train.py --model {model_type}")
@task
def evaluate_model(c):
"""Evaluar modelo entrenado"""
c.run("python src/evaluate.py")
c.run("python src/generate_metrics.py")
Para Documentación
@task
def docs_build(c):
"""Construir documentación"""
c.run("sphinx-build docs/ build/docs")
c.run("cp -r build/docs/ public/")
@task
def docs_serve(c, port=8000):
"""Servir documentación localmente"""
c.run(f"python -m http.server {port} --directory public/")
Buenas Prácticas
Task Naming Conventions
# ✅ Nombres descriptivos y acciones
@task
def database_backup(c): ...
@task
def deploy_staging(c): ...
@task
def run_migrations(c): ...
# ❌ Nombres ambiguos
@task
def do_stuff(c): ...
@task
def thing(c): ...
Error Handling
@task
def safe_deploy(c):
"""Despliegue con manejo de errores"""
try:
c.run("git pull origin main")
c.run("docker-compose build")
c.run("docker-compose up -d")
print("✅ Despliegue exitoso")
except Exception as e:
print(f"❌ Error en despliegue: {e}")
# Rollback automático
c.run("docker-compose down")
raise
Configuration Management
from invoke import Config
# Configuración por ambiente
config = Config({
'dev': {
'database_url': 'sqlite:///dev.db',
'debug': True
},
'prod': {
'database_url': 'postgresql://...',
'debug': False
}
})
@task
def run_app(c, env='dev'):
"""Ejecutar app con configuración específica"""
env_config = config[env]
c.run(f"DATABASE_URL={env_config['database_url']} python app.py")
Integración con Task Queues
Combinando Ambos Patrones
@task
def schedule_maintenance(c):
"""Programar tareas de mantenimiento vía task queue"""
# Usar task runner para orquestar
c.run("python manage.py collectstatic --noinput")
# Programar tareas asíncronas
from celery_app import app
app.send_task('cleanup_old_files')
app.send_task('generate_reports')
app.send_task('update_search_index')
Hybrid Approach
@task
def data_pipeline(c):
"""Pipeline híbrido: task runner + task queues"""
# Fase 1: Task runner (preparación)
c.run("python scripts/download_data.py")
c.run("python scripts/validate_schema.py")
# Fase 2: Task queues (procesamiento distribuido)
from celery_app import app
chain(
process_raw_data.s(),
train_models.s(),
evaluate_results.s()
).delay()
Los Task Runners son esenciales para automatizar flujos de trabajo de desarrollo, build y deployment, complementando (no reemplazando) a las Task Queues en el ecosistema de automatización moderno.
¿Te gusta este contenido? Suscríbete vía RSS