FastAPI + Celery: Building Asynchronous Web Applications
Learn how to integrate FastAPI with Celery to create asynchronous web applications.
Modern web applications often need to perform time-consuming operations like sending emails, processing files, or generating reports. Running these tasks synchronously would block the HTTP response, leading to poor user experience and potential timeouts. This is where distributed task queues come in.
In this post, we’ll explore how to build asynchronous web applications using FastAPI and Celery, backed by Redis as the message broker. We’ll examine a complete working example from my fastapi-celery-demo repository.
Understanding the Architecture
Before diving into code, let’s understand the components of our distributed system:
Services
- FastAPI Application - The web API that receives HTTP requests and enqueues tasks
- Celery Worker - The background processor that executes tasks from the queue
- Redis - The message broker that queues tasks and stores results
This separation of concerns allows your API to respond immediately while heavy processing happens in the background.
Why Not Use FastAPI’s Built-in Background Tasks?
FastAPI provides a BackgroundTasks feature for simple operations:
from fastapi import BackgroundTasks
@app.post("/send-notification/")async def send_notification(email: str, background_tasks: BackgroundTasks): background_tasks.add_task(write_log, f"Email: {email}") return {"message": "Notification sent"}This works well for lightweight operations, but has limitations:
- Not distributed - Tasks run in the same process as your web server
- No persistence - If the server restarts, queued tasks are lost
- Limited monitoring - No built-in way to track task status or failures
- Resource constraints - Heavy tasks can still impact API performance
Celery solves these problems by providing a robust, distributed task queue system.
Setup
Dependencies
First, let’s take a look at the project dependencies. Dependencies can be installed with uv sync.
dependencies = [ "celery>=5.5.3", "fastapi[standard]>=0.124.2", "mypy>=1.19.0", "pydantic>=2.11.7", "pydantic-settings>=2.6.1", "redis>=6.4.0",]Compose
This demo uses Docker Compose to manage all services. Here’s the docker-compose.yaml from my repository:
services: redis: container_name: redis image: redis:8-alpine ports: - "6379:6379" healthcheck: test: ["CMD", "redis-cli", "ping"] interval: 5s timeout: 3s retries: 5
worker: build: . restart: unless-stopped depends_on: redis: condition: service_healthy environment: - REDIS_URL=redis://redis:6379/0 - CELERY_RESULT_EXPIRES=900 command: celery -A src.worker worker --concurrency=2 --loglevel=INFO
api: build: . restart: unless-stopped depends_on: - worker ports: - "8000:8000" volumes: - ./public:/app/public - ./src/templates:/app/src/templates environment: - REDIS_URL=redis://redis:6379/0 command: uvicorn src.api:app --host 0.0.0.0 --port 8000Configuration Details
Redis Service:
- Uses
redis:8-alpinefor a lightweight, up-to-date Redis image - Health checks ensure dependent services wait for Redis to be ready
- Named container makes debugging and monitoring easier
Worker Service:
restart: unless-stoppedensures workers auto-restart on failures during development--concurrency=2runs 2 worker processes for parallel task executionCELERY_RESULT_EXPIRES=900automatically cleans up results after 15 minutes
API Service:
- Depends on worker service to ensure proper startup order
- Volume mounts allow live updates to static files and templates without rebuilding
- Exposes port 8000 for HTTP access
Building the Celery Worker
Create a task definition file to implement your background tasks. Here’s the sleep_task.py from the demo:
import loggingimport timefrom pydantic import BaseModel, Fieldfrom src.config import settingsfrom src.worker import celery_app
logger = logging.getLogger(__name__)
class CreateSleepTaskRequest(BaseModel): """Request model for creating a sleep task."""
duration: int = Field(..., gt=0, le=300, description="Task duration in seconds")
@celery_app.task(name="sleep_task", bind=True)def sleep_task(self, duration: int) -> dict[str, object]: """ Task that sleeps for specified duration with progress updates.
Args: duration: Sleep duration in seconds
Returns: dict with task completion info
Raises: ValueError: If duration exceeds worker_max_timeout """ logger.info(f"Task {self.request.id} starting: sleeping for {duration} seconds")
# Track progress with direct state updates for i in range(duration): # Check if we've exceeded the worker max timeout threshold if i >= settings.worker_max_timeout: error_msg = f"Task duration {duration}s exceeds worker max timeout of {settings.worker_max_timeout}s" logger.error(f"Task {self.request.id} failed: {error_msg}") raise ValueError(error_msg)
time.sleep(1) progress = ((i + 1) / duration) * 100
self.update_state( state="PROGRESS", meta={ "current": i + 1, "total": duration, "progress": progress, }, ) logger.debug(f"Task progress: {progress:.1f}%")
logger.info(f"Task {self.request.id} completed after {duration} seconds")
return {"status": "ok"}Key Task Features
bind=True- Provides access to the task instance (self) for progress updates and request metadata- Pydantic Validation -
CreateSleepTaskRequestvalidates duration is between 1-300 seconds - Progress Tracking - Uses
self.update_state()to report current progress, total duration, and percentage - Timeout Protection - Checks against
worker_max_timeoutto prevent runaway tasks - Comprehensive Logging - Tracks task lifecycle with structured log messages
- Task Identification - Uses
self.request.idto uniquely identify each task execution
Creating the FastAPI Application
Now let’s build the API that will dispatch tasks to Celery. The models.py defines the response structure:
from datetime import datetimefrom pydantic import BaseModel
class TaskResponse(BaseModel): """Unified response model for all task-related endpoints.
Used by: - POST /tasks (create_task) - GET /tasks (list_tasks) - GET /tasks/{task_id} (get_task) """
task_id: str state: str name: str | None = None worker: str | None = None # Set for GET /tasks responses date_done: datetime | str | None = None # datetime for list, str for detail progress: float | None = None # Progress percentage (0-100)The API endpoint in api.py creates and monitors tasks:
from fastapi import FastAPIfrom celery.result import AsyncResultfrom src.worker import celery_appfrom src.task_defs.sleep_task import sleep_task, CreateSleepTaskRequestfrom src.models import TaskResponse
app = FastAPI(title="FastAPI + Celery Demo")
@app.post("/tasks/sleep_task", response_model=TaskResponse, status_code=201)async def create_sleep_task(request: CreateSleepTaskRequest) -> TaskResponse: """ Create a new sleep task.
Args: request: Task parameters (duration in seconds)
Returns: TaskResponse with task_id and initial state """ result = celery_app.send_task( "sleep_task", kwargs={"duration": request.duration} )
return TaskResponse( task_id=result.id, state=result.state, name="sleep_task", )
@app.get("/tasks/{task_id}", response_model=TaskResponse)async def get_task(task_id: str) -> TaskResponse: """Get status of a specific task""" return tasks.get_task(task_id)The get_task function can be found in tasks.py.
def get_task(task_id: str) -> TaskResponse: """ Get detailed status of a specific task by ID.
Uses Celery's AsyncResult which queries the Redis backend. This works for tasks in ANY state (pending, running, completed).
Args: task_id: Unique task identifier
Returns: TaskResponse with current task state and metadata """ result = AsyncResult(task_id, app=celery_app)
response = TaskResponse( task_id=task_id, state=result.state, name=result.name, )
# Add result for successful tasks if result.successful(): response.date_done = result.date_done
# Add progress for running tasks if result.state == "PROGRESS": info: dict = result.info if isinstance(result.info, dict) else {} response.progress = info.get("progress") if info else None
return responseRunning the Application with Docker Compose
Start all services with Docker Compose:
docker compose upThe services will start in the following order:
- redis: Starts and waits for health check to pass
- worker: Starts after Redis is healthy and begins processing tasks
- api: Starts after the worker is ready and listens on port 8000
You should see log output from all three services. Once you see messages indicating the worker is ready and the API is running, you’re ready to test.
In the UI, can create a task and see the result in the table.
When to Use Celery vs. FastAPI Background Tasks
Use FastAPI Background Tasks when:
- The operation is lightweight (< 1 second)
- You don’t need task persistence
- You don’t need to track task status
- Single-server deployment is sufficient
Use Celery when:
- Tasks take more than a few seconds
- You need task persistence and reliability
- You want to track task progress and status
- You need to scale workers independently
- You require advanced features (retries, routing, rate limiting)
Conclusion
Combining FastAPI and Celery provides a powerful foundation for building asynchronous web applications. FastAPI handles HTTP requests with excellent performance and type safety, while Celery manages the complexity of distributed task processing.
The complete working example is available in my fastapi-celery-demo repository. Clone it, experiment with it, and adapt it to your needs.
Key takeaways:
- Separate concerns - Keep your API fast by offloading heavy work to background workers
- Monitor actively - Use Flower or custom logging to track task execution
- Design for failure - Make tasks idempotent and implement retry logic
- Scale independently - Add more workers by increasing concurrency and number of replicas as task volume grows without changing your API