FastAPI + Celery: Building Asynchronous Web Applications

Learn how to integrate FastAPI with Celery to create asynchronous web applications.

Posted on Software

Modern web applications often need to perform time-consuming operations like sending emails, processing files, or generating reports. Running these tasks synchronously would block the HTTP response, leading to poor user experience and potential timeouts. This is where distributed task queues come in.

In this post, we’ll explore how to build asynchronous web applications using FastAPI and Celery, backed by Redis as the message broker. We’ll examine a complete working example from my fastapi-celery-demo repository.

Understanding the Architecture

Before diving into code, let’s understand the components of our distributed system:

Services

  1. FastAPI Application - The web API that receives HTTP requests and enqueues tasks
  2. Celery Worker - The background processor that executes tasks from the queue
  3. Redis - The message broker that queues tasks and stores results

This separation of concerns allows your API to respond immediately while heavy processing happens in the background.

Why Not Use FastAPI’s Built-in Background Tasks?

FastAPI provides a BackgroundTasks feature for simple operations:

from fastapi import BackgroundTasks
@app.post("/send-notification/")
async def send_notification(email: str, background_tasks: BackgroundTasks):
background_tasks.add_task(write_log, f"Email: {email}")
return {"message": "Notification sent"}

This works well for lightweight operations, but has limitations:

Celery solves these problems by providing a robust, distributed task queue system.

Setup

Dependencies

First, let’s take a look at the project dependencies. Dependencies can be installed with uv sync.

pyproject.toml
dependencies = [
"celery>=5.5.3",
"fastapi[standard]>=0.124.2",
"mypy>=1.19.0",
"pydantic>=2.11.7",
"pydantic-settings>=2.6.1",
"redis>=6.4.0",
]

Compose

This demo uses Docker Compose to manage all services. Here’s the docker-compose.yaml from my repository:

docker-compose.yaml
services:
redis:
container_name: redis
image: redis:8-alpine
ports:
- "6379:6379"
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
worker:
build: .
restart: unless-stopped
depends_on:
redis:
condition: service_healthy
environment:
- REDIS_URL=redis://redis:6379/0
- CELERY_RESULT_EXPIRES=900
command: celery -A src.worker worker --concurrency=2 --loglevel=INFO
api:
build: .
restart: unless-stopped
depends_on:
- worker
ports:
- "8000:8000"
volumes:
- ./public:/app/public
- ./src/templates:/app/src/templates
environment:
- REDIS_URL=redis://redis:6379/0
command: uvicorn src.api:app --host 0.0.0.0 --port 8000

Configuration Details

Redis Service:

Worker Service:

API Service:

Building the Celery Worker

Create a task definition file to implement your background tasks. Here’s the sleep_task.py from the demo:

src/task_defs/sleep_task.py
import logging
import time
from pydantic import BaseModel, Field
from src.config import settings
from src.worker import celery_app
logger = logging.getLogger(__name__)
class CreateSleepTaskRequest(BaseModel):
"""Request model for creating a sleep task."""
duration: int = Field(..., gt=0, le=300, description="Task duration in seconds")
@celery_app.task(name="sleep_task", bind=True)
def sleep_task(self, duration: int) -> dict[str, object]:
"""
Task that sleeps for specified duration with progress updates.
Args:
duration: Sleep duration in seconds
Returns:
dict with task completion info
Raises:
ValueError: If duration exceeds worker_max_timeout
"""
logger.info(f"Task {self.request.id} starting: sleeping for {duration} seconds")
# Track progress with direct state updates
for i in range(duration):
# Check if we've exceeded the worker max timeout threshold
if i >= settings.worker_max_timeout:
error_msg = f"Task duration {duration}s exceeds worker max timeout of {settings.worker_max_timeout}s"
logger.error(f"Task {self.request.id} failed: {error_msg}")
raise ValueError(error_msg)
time.sleep(1)
progress = ((i + 1) / duration) * 100
self.update_state(
state="PROGRESS",
meta={
"current": i + 1,
"total": duration,
"progress": progress,
},
)
logger.debug(f"Task progress: {progress:.1f}%")
logger.info(f"Task {self.request.id} completed after {duration} seconds")
return {"status": "ok"}

Key Task Features

Creating the FastAPI Application

Now let’s build the API that will dispatch tasks to Celery. The models.py defines the response structure:

src/models.py
from datetime import datetime
from pydantic import BaseModel
class TaskResponse(BaseModel):
"""Unified response model for all task-related endpoints.
Used by:
- POST /tasks (create_task)
- GET /tasks (list_tasks)
- GET /tasks/{task_id} (get_task)
"""
task_id: str
state: str
name: str | None = None
worker: str | None = None # Set for GET /tasks responses
date_done: datetime | str | None = None # datetime for list, str for detail
progress: float | None = None # Progress percentage (0-100)

The API endpoint in api.py creates and monitors tasks:

src/api.py
from fastapi import FastAPI
from celery.result import AsyncResult
from src.worker import celery_app
from src.task_defs.sleep_task import sleep_task, CreateSleepTaskRequest
from src.models import TaskResponse
app = FastAPI(title="FastAPI + Celery Demo")
@app.post("/tasks/sleep_task", response_model=TaskResponse, status_code=201)
async def create_sleep_task(request: CreateSleepTaskRequest) -> TaskResponse:
"""
Create a new sleep task.
Args:
request: Task parameters (duration in seconds)
Returns:
TaskResponse with task_id and initial state
"""
result = celery_app.send_task(
"sleep_task", kwargs={"duration": request.duration}
)
return TaskResponse(
task_id=result.id,
state=result.state,
name="sleep_task",
)
@app.get("/tasks/{task_id}", response_model=TaskResponse)
async def get_task(task_id: str) -> TaskResponse:
"""Get status of a specific task"""
return tasks.get_task(task_id)

The get_task function can be found in tasks.py.

def get_task(task_id: str) -> TaskResponse:
"""
Get detailed status of a specific task by ID.
Uses Celery's AsyncResult which queries the Redis backend.
This works for tasks in ANY state (pending, running, completed).
Args:
task_id: Unique task identifier
Returns:
TaskResponse with current task state and metadata
"""
result = AsyncResult(task_id, app=celery_app)
response = TaskResponse(
task_id=task_id,
state=result.state,
name=result.name,
)
# Add result for successful tasks
if result.successful():
response.date_done = result.date_done
# Add progress for running tasks
if result.state == "PROGRESS":
info: dict = result.info if isinstance(result.info, dict) else {}
response.progress = info.get("progress") if info else None
return response

Running the Application with Docker Compose

Start all services with Docker Compose:

Terminal window
docker compose up

The services will start in the following order:

You should see log output from all three services. Once you see messages indicating the worker is ready and the API is running, you’re ready to test.

In the UI, can create a task and see the result in the table.

List running tasks

When to Use Celery vs. FastAPI Background Tasks

Use FastAPI Background Tasks when:

Use Celery when:

Conclusion

Combining FastAPI and Celery provides a powerful foundation for building asynchronous web applications. FastAPI handles HTTP requests with excellent performance and type safety, while Celery manages the complexity of distributed task processing.

The complete working example is available in my fastapi-celery-demo repository. Clone it, experiment with it, and adapt it to your needs.

Key takeaways:

  1. Separate concerns - Keep your API fast by offloading heavy work to background workers
  2. Monitor actively - Use Flower or custom logging to track task execution
  3. Design for failure - Make tasks idempotent and implement retry logic
  4. Scale independently - Add more workers by increasing concurrency and number of replicas as task volume grows without changing your API