Mixpeek Logo
    Schedule Demo
    ESEthan Steininger
    6 min read

    Building a Multimodal Data Processing Pipeline with Kafka, Airflow, and SageMaker

    Build a multimodal data processing pipeline using Apache Kafka, Apache Airflow, and Amazon SageMaker. This pipeline will handle various file types (image, video, audio, text, and documents) in parallel, process them through custom ML tasks, and store the results in a database.

    Building a Multimodal Data Processing Pipeline with Kafka, Airflow, and SageMaker
    Data Processing

    In this article, we'll explore how to build a robust multimodal data processing pipeline similar to Mixpeek using Apache Kafka, Apache Airflow, and Amazon SageMaker. This pipeline will handle various file types (image, video, audio, text, and documents) in parallel, process them through custom ML tasks, and store the results in a database.

    Architecture Overview

    Let's start with a high-level overview of our architecture:

    graph TD S3[Amazon S3] --> Kafka[Apache Kafka] Kafka --> Airflow[Apache Airflow] Airflow --> Extract[Extraction] Airflow --> Generate[Generation] Airflow --> Embed[Embedding] Extract --> SageMaker[Amazon SageMaker] Generate --> SageMaker Embed --> SageMaker SageMaker --> DB[Customer Database]

    Components

    1. Amazon S3: Storage for incoming files
    2. Apache Kafka: Message queue for handling file events
    3. Apache Airflow: Workflow management and orchestration
    4. Amazon SageMaker: ML model hosting and inference
    5. Customer Database: Final storage for processed data

    Step 1: Set Up S3 Event Notifications

    First, we'll set up S3 to send notifications when new files are added. We'll use AWS Lambda to forward these notifications to Kafka.

    import json
    import boto3
    from kafka import KafkaProducer
    
    def lambda_handler(event, context):
        s3 = boto3.client('s3')
        kafka_producer = KafkaProducer(bootstrap_servers=['your_kafka_broker:9092'])
        
        for record in event['Records']:
            bucket = record['s3']['bucket']['name']
            key = record['s3']['object']['key']
            
            # Get file metadata
            metadata = s3.head_object(Bucket=bucket, Key=key)
            file_type = metadata['ContentType']
            
            # Send message to Kafka
            message = json.dumps({
                'bucket': bucket,
                'key': key,
                'file_type': file_type
            })
            kafka_producer.send('s3_events', message.encode('utf-8'))
        
        kafka_producer.close()
        return {
            'statusCode': 200,
            'body': json.dumps('Messages sent to Kafka successfully')
        }
    

    Step 2: Set Up Kafka Consumer in Airflow

    Next, we'll create an Airflow DAG that consumes messages from Kafka and triggers the appropriate processing tasks.

    from airflow import DAG
    from airflow.operators.python_operator import PythonOperator
    from kafka import KafkaConsumer
    from datetime import datetime, timedelta
    
    default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2024, 6, 1),
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': timedelta(minutes=5),
    }
    
    dag = DAG(
        'process_s3_files',
        default_args=default_args,
        description='Process S3 files based on Kafka messages',
        schedule_interval=timedelta(minutes=5),
    )
    
    def process_kafka_message():
        consumer = KafkaConsumer('s3_events', bootstrap_servers=['your_kafka_broker:9092'])
        for message in consumer:
            file_info = json.loads(message.value.decode('utf-8'))
            trigger_processing_tasks(file_info)
    
    def trigger_processing_tasks(file_info):
        # Trigger appropriate tasks based on file type
        if file_info['file_type'].startswith('image'):
            process_image.execute(context={'file_info': file_info})
        elif file_info['file_type'].startswith('video'):
            process_video.execute(context={'file_info': file_info})
        # Add similar conditions for audio, text, and documents
    
    process_kafka_messages = PythonOperator(
        task_id='process_kafka_messages',
        python_callable=process_kafka_message,
        dag=dag,
    )
    
    # Define task operators for each file type and processing step
    process_image = PythonOperator(
        task_id='process_image',
        python_callable=process_image_file,
        provide_context=True,
        dag=dag,
    )
    
    # Define similar operators for video, audio, text, and documents
    
    process_kafka_messages >> [process_image, process_video, process_audio, process_text, process_document]
    

    Step 3: Implement Processing Tasks

    Now, let's implement the processing tasks for each file type. We'll use SageMaker for ML inference.

    import boto3
    import sagemaker
    
    def process_image_file(**context):
        file_info = context['file_info']
        s3_client = boto3.client('s3')
        sagemaker_runtime = boto3.client('sagemaker-runtime')
    
        # Download file from S3
        file_content = s3_client.get_object(Bucket=file_info['bucket'], Key=file_info['key'])['Body'].read()
    
        # Extraction
        extracted_data = extract_image_data(file_content)
    
        # Generation
        generated_data = generate_image_metadata(extracted_data)
    
        # Embedding
        embedded_data = create_image_embedding(extracted_data)
    
        # Store results in customer database
        store_results(file_info, extracted_data, generated_data, embedded_data)
    
    def extract_image_data(file_content):
        # Use SageMaker endpoint for image extraction
        response = sagemaker_runtime.invoke_endpoint(
            EndpointName='image-extraction-endpoint',
            ContentType='application/x-image',
            Body=file_content
        )
        return json.loads(response['Body'].read().decode())
    
    def generate_image_metadata(extracted_data):
        # Use SageMaker endpoint for metadata generation
        response = sagemaker_runtime.invoke_endpoint(
            EndpointName='image-metadata-generation-endpoint',
            ContentType='application/json',
            Body=json.dumps(extracted_data)
        )
        return json.loads(response['Body'].read().decode())
    
    def create_image_embedding(extracted_data):
        # Use SageMaker endpoint for embedding creation
        response = sagemaker_runtime.invoke_endpoint(
            EndpointName='image-embedding-endpoint',
            ContentType='application/json',
            Body=json.dumps(extracted_data)
        )
        return json.loads(response['Body'].read().decode())
    
    def store_results(file_info, extracted_data, generated_data, embedded_data):
        # Implement logic to store results in customer database
        pass
    
    # Implement similar functions for video, audio, text, and document processing
    

    Step 4: Set Up SageMaker Endpoints

    To use custom ML models in SageMaker, you'll need to create and deploy endpoints for each task. Here's an example of how to deploy a custom model:

    from sagemaker.pytorch import PyTorchModel
    
    model_data = 's3://your-bucket/model.tar.gz'
    role = 'arn:aws:iam::your-account-id:role/SageMakerRole'
    
    pytorch_model = PyTorchModel(model_data=model_data,
                                 role=role,
                                 framework_version='1.8.0',
                                 py_version='py3',
                                 entry_point='inference.py')
    
    predictor = pytorch_model.deploy(instance_type='ml.c5.xlarge',
                                     initial_instance_count=1,
                                     endpoint_name='your-endpoint-name')
    

    Everything Together Now

    This pipeline allows for parallel processing of multiple file types using custom ML tasks. The architecture is scalable and can be extended to handle more file types or processing steps as needed.

    Here's a final diagram showing the complete flow:

    graph TD S3[Amazon S3] --> Lambda[AWS Lambda] Lambda --> Kafka[Apache Kafka] Kafka --> Airflow[Apache Airflow] Airflow --> |Image| ExtractI[Extract Image] Airflow --> |Video| ExtractV[Extract Video] Airflow --> |Audio| ExtractA[Extract Audio] Airflow --> |Text| ExtractT[Extract Text] ExtractI --> GenerateI[Generate Image Metadata] ExtractV --> GenerateV[Generate Video Metadata] ExtractA --> GenerateA[Generate Audio Metadata] ExtractT --> GenerateT[Generate Text Metadata] GenerateI --> EmbedI[Embed Image] GenerateV --> EmbedV[Embed Video] GenerateA --> EmbedA[Embed Audio] GenerateT --> EmbedT[Embed Text] EmbedI --> SageMaker[Amazon SageMaker] EmbedV --> SageMaker EmbedA --> SageMaker EmbedT --> SageMaker SageMaker --> DB[Customer Database]

    Top 3 Critical Challenges

    1. Data Consistency and Integrity:
      • Issue: If any step in the processing pipeline fails, it can lead to incomplete or inconsistent data in the final database.
      • Impact: This can result in unreliable analytics, flawed decision-making, and potential data corruption.
      • Solution: Implement transactional updates and rollback mechanisms. Ensure each processing step is idempotent and can be safely retried. Consider implementing a staging area before final database insertion to verify data integrity.
    2. Scalability and Performance Bottlenecks:
      • Issue: As data volume grows, certain components (e.g., SageMaker endpoints, Kafka brokers) may become overwhelmed, leading to increased latency or system failures.
      • Impact: This can result in delayed data processing, missed real-time insights, and potential data loss during high-traffic periods.
      • Solution: Implement auto-scaling for all components, especially SageMaker endpoints. Use Kafka partitioning effectively. Consider batch processing for non-real-time tasks. Regularly perform load testing to identify and address bottlenecks proactively.
    3. Operational Complexity and Maintenance Overhead:
      • Issue: Managing multiple complex systems (Kafka, Airflow, SageMaker) requires significant DevOps resources and expertise.
      • Impact: This can lead to increased operational costs, longer time-to-market for new features, and higher risk of configuration errors or system downtime.
      • Solution: Invest in comprehensive monitoring and alerting systems. Automate as much of the operational tasks as possible through Infrastructure as Code. Consider using managed services or platforms like Mixpeek that abstract away much of this complexity, allowing your team to focus on data insights rather than infrastructure management.

    By focusing on these three critical areas, you can address the most significant risks to your multimodal data processing pipeline's reliability, performance, and manageability. Remember, while building such a system offers great flexibility, it also comes with substantial responsibilities. Platforms like Mixpeek can significantly reduce these challenges by providing a more integrated and managed solution.

    Why Build This?

    Use Cases

    1. Content Moderation: Automatically process and flag inappropriate content across various media types.
    2. Intelligent Search: Enable advanced search capabilities across different file types in your data lake.
    3. Recommendation Systems: Generate personalized recommendations based on user interactions with diverse content.
    4. Data Enrichment: Automatically enhance your datasets with AI-generated metadata and embeddings.
    5. Compliance and Legal: Scan documents and communications for potential legal or regulatory issues.

    Benefits

    1. Replication:
      • The Kafka-based architecture allows for easy replication of data streams.
      • Multiple consumers can process the same data independently, enabling parallel workflows.
    2. Consistency:
      • Airflow ensures that processing tasks are executed in a consistent, ordered manner.
      • The use of SageMaker endpoints guarantees consistent ML model performance across all data processing.
    3. Scalability:
      • Each component (Kafka, Airflow, SageMaker) can be scaled independently based on workload.
      • Kafka can handle high-volume data ingestion.
      • Airflow can manage an increasing number of concurrent tasks.
      • SageMaker can auto-scale inference endpoints based on demand.
    4. Flexibility:
      • Easy to add new file types or processing steps by extending the Airflow DAG.
      • Custom ML models can be deployed and updated in SageMaker without disrupting the pipeline.

    Simplifying with Mixpeek

    While the system we've described is powerful and flexible, it requires significant setup and maintenance. Mixpeek offers a streamlined solution that accomplishes the same goals with much less complexity. Here's a glimpse of how Mixpeek simplifies this process:

    Creating a Connection

    from mixpeek import Mixpeek
    
    mixpeek = Mixpeek('API_KEY')
    
    mixpeek.connections.create(
        alias="my-mongo-test",
        engine="mongodb",
        details={
            "host": "your_host_address",
            "database": "your_database_name",
            "username": "your_username",
            "password": "your_password"
        }
    )
    

    This single API call replaces the need for manual setup of Kafka consumers and database connections.

    Creating a Pipeline

    💡
    Note: we're sending the pipeline code as a string, but it's advised to use Github for CI/CD with the pipeline code: https://docs.mixpeek.com/pipelines/github
    from mixpeek import Mixpeek
    mixpeek = Mixpeek("API_KEY")
    
    pipeline_id = mixpeek.pipelines.create(
      alias="VideoProcessingPipeline",
      code="""
    def handler(event):
        mixpeek = Mixpeek("API_KEY")
        file_url = event.file_url(event['bucket'], event['key'])
        # process video into chunks
        processed_videos = mixpeek.tools.video.process(
            url=file_url,
            frame_interval=5,
            resolution=[720, 1280],
            return_base64=True
        )
        results = []
        for index, chunk in enumerate(processed_videos):
            print(f"embedding video chunk: {index}")
            # embed each chunk
            embed_response = mixpeek.embed.video(
                model_id="mixpeek/vuse-generic-v1",
                input=chunk['base64_string'],
                input_type="base64"
            )
            obj = {
                "embedding": embed_response['embedding'],
                "file_url": file_url,
                "metadata": {
                    "time_start": chunk.start_time,
                    "time_end": chunk.end_time,
                    "duration": chunk.duration,
                }
            }
            results.append(obj)
        return results
      """,
      source={
        "connection_id": "conn_456",
        "filters": {
          "content_type": "video/*"
        }
      },
      destination={
        "connection_id": "conn_431",
        "collection": "video_embeddings",
        "metadata": {
          "type": "video_processing"
        }
      }
    )

    https://docs.mixpeek.com/use-cases/video_understanding

    This single pipeline creation replaces the entire Airflow DAG setup, SageMaker endpoint management, and custom processing logic we implemented earlier.

    By using Mixpeek, you can achieve the same multimodal data processing capabilities with significantly less code and infrastructure management. This allows you to focus on utilizing the processed data rather than maintaining the processing pipeline itself.

    While building a custom solution gives you ultimate control, platforms like Mixpeek offer a compelling balance of power and simplicity, especially for teams that want to quickly implement advanced data processing without extensive DevOps overhead.

    Join the Discussion

    Have thoughts, questions, or insights about this post? Be the first to start the conversation in our community!

    Start a Discussion
    ES
    Ethan Steininger

    June 21, 2024 · 6 min read