Mixpeek Logo
    Schedule Demo
    ESEthan Steininger
    6 min read

    Scaling Video Processing with Celery and Render

    Build a scalable, distributed video processing pipeline using celery and render with fastapi

    Scaling Video Processing with Celery and Render
    Data Processing

    At Mixpeek, we've develoed a highly scalable and fault-tolerant video understanding pipeline capable of processing thousands of videos concurrently. Our system leverages Celery workers, FastAPI, AWS S3, AWS Sagemaker, and Render for deployment to create a distributed processing architecture that ensures reliability and efficiency. This article will dive into the technical details of our implementation, providing insights and code snippets to help you replicate our setup.

    System Architecture Overview

    Our video processing pipeline consists of the following key components:

    1. FastAPI endpoint for video upload
    2. S3 storage for uploaded videos
    3. Celery tasks for distributed processing
    4. Custom video chunking and embedding services
    5. Database storage for processed video segments
    6. Render for deployment and scaling
    7. Redis for caching and as a message broker

    Here's a visual representation of our system architecture:

    graph TD A[Client] --> B[FastAPI Endpoint] B --> C[S3 Storage] B --> H[Redis] H -->|Message Broker| D[Celery Tasks] subgraph Celery Workers D --> E[Video Chunking Service] D --> F[Video Embedding Service] end E --> G[Database Storage] F --> G H -->|Caching| D subgraph Render Deployment B D H end classDef renderService fill:#f9f,stroke:#333,stroke-width:2px; class B,D,H renderService; classDef externalService fill:#fff,stroke:#333,stroke-width:2px; class C,G externalService;

    This architecture allows us to efficiently handle video uploads, process them in a distributed manner, and store the results for further use.

    Deployment Stack on Render

    Our production deployment on Render consists of the following components:

    Redis Cache: Serves as a message broker for Celery and provides caching capabilities.

    Free tier tends to work just fine

    API Service: Handles API requests, including the video upload endpoint.

    Celery Workers: Two separate background worker services for processing tasks.

    This setup allows us to scale our video processing pipeline horizontally by adding more Celery workers as needed.

    FastAPI Upload Endpoint

    The entry point of our pipeline is a FastAPI route that handles video uploads:

    @router.post("/upload")
    async def index_upload(
        request: Request,
        file: UploadFile = File(...),
        collection: str = Form(...),
        metadata: str = Form(default=None)
    ):
        # ... (file handling and metadata extraction)
    
        s3_response = upload_file_to_s3(content_io, file_id, request.index_id, collection)
    
        input_data = ProcessInputData(**{
            "index_id": request.index_id,
            "source": "upload",
            "file_id": file_id,
            "file_ext": file_extension,
            "s3_response": {
                **s3_response.dict()
            },
            "collection": collection,
            "metadata": metadata_dict
        })
    
        task = process_file.delay(input_data.dict())
        
        return {"message": "File upload processing started", "task_id": task.id}
    

    This endpoint:

    1. Accepts video uploads and metadata
    2. Uploads the video to S3
    3. Creates a ProcessInputData object with relevant information
    4. Queues a Celery task for processing

    Celery Task Definition

    We use a shared Celery task to process the uploaded videos:

    @shared_task(
        bind=True,
        name="managed.tasks.process_file",
        max_retries=3,
        default_retry_delay=5,
    )
    def process_file(self, input_data: dict):
        processor = FileProcessor(input_data)
        processor.task_id = self.request.id
        processor.process()
    

    The @shared_task decorator offers several advantages over @task:

    1. It allows the task to be used by multiple Celery apps
    2. It provides better integration with Django (if used)
    3. It offers more flexibility in task registration and discovery

    The task is configured with:

    • max_retries=3: Automatically retries the task up to 3 times in case of failure
    • default_retry_delay=5: Waits 5 seconds between retry attempts

    Video Processing Pipeline

    The core of our video processing is handled by the FileProcessor class:

    class FileProcessor:
        def process(self):
            try:
                self.initialize_file()
                self.handle_file_source()
                self.process_file()
                self.finalize_file()
            except Exception as e:
                self.handle_error(e)
                raise
    

    This class orchestrates the entire processing pipeline, including error handling and cleanup.

    Video Chunking and Embedding

    The heart of our video understanding pipeline lies in the process_video function:

    def process_video(tmp_file_path, s3_url: AnyUrl, input_data: ProcessInputData):
        video_chunker = ChunkService(
            file_path=tmp_file_path,
            target_size_mb=5,
            auto_adjust=False
        )
        embed_service = VideoEmbedService(model="vuse-generic-v1")
        video_segments_db = VideoSegmentsDBService(input_data.index_id)
    
        batch_size = 10  # Process 10 chunks at a time
        temp_dir = tempfile.mkdtemp()
    
        try:
            batch = []
            for i, video_chunk in enumerate(video_chunker.process_video()):
                # Process each chunk (embedding, metadata extraction, etc.)
                # ...
    
                if len(batch) >= batch_size:
                    process_batch(batch, video_segments_db, temp_dir, i // batch_size)
                    batch = []
    
            # Process any remaining chunks
            if batch:
                process_batch(batch, video_segments_db, temp_dir, (i + 1) // batch_size)
    
        finally:
            # Clean up temporary files
            # ...
    

    Key components:

    1. ChunkService: Splits videos into manageable chunks
    2. VideoEmbedService: Generates embeddings for each chunk, using our vuse-generic-v1 embedding model hosted on AWS Sagemaker
    3. VideoSegmentsDBService: Stores processed video segments
    4. Batch processing: Processes chunks in batches for efficiency

    The ChunkService is responsible for splitting the video into smaller, processable chunks:

    def process_video(self):
        self.initialize_video()
        chunk_count = 0
        while True:
            temp_video_path = os.path.join(self.temp_dir, f"chunk_{chunk_count}.mp4")
            frames_written = self._write_chunk_to_disk(temp_video_path)
            
            if frames_written == 0:
                break
    
            base64_string = self._create_base64_from_file(temp_video_path)
            
            start_time, end_time = self._calculate_time_intervals(frames_written)
            chunk_object = {
                "base64": base64_string,
                "start_time": start_time,
                "end_time": end_time
            }
    
            yield chunk_object
    
            os.remove(temp_video_path)
            chunk_count += 1
    
        self.cap.release()
    

    This generator-based approach allows for efficient memory usage when processing large videos.

    Scaling with Celery on Render

    To scale our video processing pipeline, we use Celery workers deployed on Render. Here's how we've set up and run our Celery workers:

      • -A db.service.celery_app: Specifies the Celery app instance
      • --loglevel=info: Sets the logging level
      • -n worker1@%h: Names the worker (replace worker1 with a unique name for each worker)
      • -c 4: Sets the number of concurrent worker processes to 4
    1. Redis as Message Broker and Result Backend:
      We use a Redis instance on Render as both the message broker and result backend for Celery. This setup ensures efficient task distribution and result storage.
    2. Additional Disk Space:
      Our Celery workers are configured with additional disk space on Render to handle temporary storage of video chunks and processing artifacts.
    3. Scaling Workers:
      By using multiple Celery worker instances (celery_1 and celery_2), we can easily scale our processing capacity. We can add more worker instances on Render as needed to handle increased load.
    4. Monitoring:
      Our dashboard service, also deployed on Render, allows us to monitor the performance and status of our Celery workers and overall pipeline.

    Celery Worker Configuration:
    We've deployed two Celery worker instances (celery_1 and celery_2) as Background Worker services on Render. Each worker is configured with the following command:

    celery -A db.service.celery_app worker --loglevel=info -n worker1@%h -c 4
    

    Key components of this command:

    Celery Configuration

    To use Redis as the message broker and result backend, our Celery configuration (celeryconfig.py) looks like this:

    broker_url = 'redis://your-redis-url:6379/0'
    result_backend = 'redis://your-redis-url:6379/0'
    task_serializer = 'json'
    result_serializer = 'json'
    accept_content = ['json']
    timezone = 'UTC'
    enable_utc = True
    

    Replace 'your-redis-url' with the actual URL of your Redis instance on Render.

    [... previous content unchanged ...]

    Conclusion

    [... previous content unchanged ...]

    End Result: Video Understanding Demo for Advertisers

    The true power of Mixpeek's video processing pipeline becomes evident when we look at its practical applications. Let's explore how our system processes thousands of social media videos for scene understanding, particularly benefiting the advertising sector.

    Insurance Video Analysis | Interactive Demo
    Visual demo of Mixpeek’s video analysis capabilities tailored for the insurance industry.

    Video Intelligence for Advertising

    Our demo showcases the processing of numerous social media videos, extracting key concepts and scenes that are valuable for advertisers.

    Here's what the end result looks like:

    1. Concept Extraction: The system identifies popular concepts within videos, such as:
      • Crowd celebrating
      • Person dancing
      • Breaking the ice
      • Human connection
    2. Scene Matching: For each concept, the system provides:
      • The number of matching scenes
      • A relevance score (e.g., Score: 0.63)
      • The collection the video belongs to (e.g., Media and Advertising)

    Integration and Accessibility

    Advertisers can easily integrate this video intelligence into their existing workflows:

    1. One-Line Integration: Integrate your S3 bucket directly or bake the Mixpeek SDK into your code
    2. Real-Time Updates: The system keeps embeddings fresh by syncing with the client's database, ensuring up-to-date video understanding.
    3. Exploration Interface: A user-friendly interface allows advertisers to explore processed videos, search for concepts, and visualize matches.

    Conclusion

    Mixpeek's video processing pipeline demonstrates how to leverage Celery, FastAPI, and custom services to create a scalable and robust system for handling thousands of videos. Our deployment on Render, utilizing multiple Celery workers and a Redis cache, allows for efficient scaling and management of the processing load.

    Key takeaways:

    1. Use FastAPI for efficient upload handling
    2. Leverage Celery for distributed task processing
    3. Implement custom services for video chunking and embedding
    4. Process videos in batches for improved efficiency
    5. Use S3 for reliable storage of uploaded videos
    6. Implement proper error handling and retries
    7. Deploy on a scalable platform like Render
    8. Utilize Redis for efficient task queuing and result storage
    9. Scale horizontally by adding more Celery workers as needed

    By following these principles and adapting the provided architecture and code snippets, you can build a similar system capable of processing large volumes of video content without failure or droppage, while maintaining the flexibility to scale as your processing needs grow.

    Join the Discussion

    Have thoughts, questions, or insights about this post? Be the first to start the conversation in our community!

    Start a Discussion
    ES
    Ethan Steininger

    August 2, 2024 · 6 min read