Skip to content

Eneza Media Manager

The Media Manager is a Python microservice that processes video files for ads, adding watermarks, credits, and generating multi-resolution variants.

Architecture

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   ENEZA API     │────▶│  MESSAGE BROKER │────▶│  MEDIA MANAGER  │
│                 │     │                 │     │                 │
│ • Upload video  │     │ video-          │     │ • Download      │
│ • Create ad     │     │   processing    │     │ • Validate      │
│                 │     │                 │     │ • Process       │
└─────────────────┘     └─────────────────┘     └─────────────────┘


┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   ENEZA API     │◀────│  MESSAGE BROKER │◀────│  CLOUDFLARE R2  │
│                 │     │                 │     │                 │
│ • Update ad     │     │ video-          │     │ • Upload final  │
│ • Mark ready    │     │   processed     │     │ • Delete source │
└─────────────────┘     └─────────────────┘     └─────────────────┘

Technology Stack

ComponentTechnology
LanguagePython 3.11
Video ProcessingFFmpeg
StorageCloudflare R2 (S3-compatible)
Message BrokerGoogle Pub/Sub / RabbitMQ
HostingCloud Run

Project Structure

media-manager/
├── src/
│   ├── main.py                    # Entry point
│   ├── config.py                  # Configuration
│   ├── video_processing/
│   │   ├── __init__.py
│   │   ├── upload.py             # Upload to storage
│   │   ├── concat.py             # Concatenate videos
│   │   ├── overlay.py            # Add overlays
│   │   ├── resize.py             # Generate variants
│   │   └── qr_code.py            # Generate QR watermarks
│   ├── utils/
│   │   ├── storage_interface.py  # Storage abstraction
│   │   ├── temp_storage.py       # Temp file management
│   │   ├── r2_storage.py         # R2 client
│   │   └── s3_storage.py         # S3 client (legacy)
│   └── messaging/
│       ├── __init__.py
│       └── connection_manager.py
└── requirements.txt

Video Processing Pipeline

Main Entry Point

python
# main.py
def main():
    logger.info("🚀 Starting Eneza Media Manager Service")

    # Initialize storage
    storage = R2Storage()

    # Create and connect to broker
    broker = create_broker()
    broker.connect()

    # Subscribe to video processing topic
    broker.subscribe(
        TOPICS.VIDEO_PROCESSING,
        handle_video_processing_message,
        options=SubscribeOptions(auto_ack=False)
    )

    logger.info("👂 Waiting for video processing requests...")

    # Keep alive
    signal.pause()

Message Handler

python
def handle_video_processing_message(message: BrokerMessage):
    """
    Expected message format:
    {
        "type": "process_ad_video",
        "ad_id": "abc123",
        "video_url": "https://...",
        "operations": ["resize", "watermark", "security"]
    }
    """
    try:
        data = message.data
        ad_id = data.get("ad_id")
        video_url = data.get("video_url")

        if not ad_id or not video_url:
            logger.error("Missing required fields")
            message.ack()
            return

        logger.info(f"🎬 Processing video for ad: {ad_id}")

        # Process the video
        processed_url = process_ad_video(ad_id, video_url, storage)

        # Publish success notification
        publish_completion_notification(ad_id, processed_url=processed_url)

        message.ack()
        logger.info(f"✅ Video processing completed for ad: {ad_id}")

    except Exception as e:
        logger.error(f"❌ Error processing message: {str(e)}")
        publish_completion_notification(ad_id, error=str(e))
        message.nack(requeue=True)

Video Processing Flow

python
def process_ad_video(ad_id: str, video_url: str, storage: StorageInterface) -> str:
    """Process an ad video by adding credits and return the processed video URL"""
    with temp_directory() as temp_dir:
        # 1. Download the source video
        source_video = storage.download_file(video_url, temp_dir, f"{ad_id}_source.mp4")
        logger.info(f"✅ Ad video downloaded: {source_video}")

        # 2. Validate video
        validate_video(source_video)

        # 3. Get credits video (always appended at end)
        credits_video = os.path.expanduser("~/eneza_closing_credit.mp4")

        # 4. Concatenate videos
        output_path = os.path.join(temp_dir, f"{ad_id}_processed.mp4")
        concat_videos(source_video, credits_video, output_path)
        logger.info(f"🎬 Video concatenated with credits")

        # 5. Upload to processed folder
        destination_path = f"processed/{ad_id}.mp4"
        processed_url = storage.upload_file(output_path, destination_path)
        logger.info(f"☁️ Processed video uploaded: {processed_url}")

        # 6. Delete source video
        storage.delete_file(video_url)
        logger.info(f"🗑️ Source video deleted")

        return processed_url

Video Operations

Validation

python
def validate_video(video_path: str, max_size_mb: int = 100, min_duration: int = 5, max_duration: int = 90):
    """Validate video file size and duration"""
    # Check file size
    file_size_mb = os.path.getsize(video_path) / (1024 * 1024)
    if file_size_mb > max_size_mb:
        raise ValueError(f"Video file too large: {file_size_mb:.1f}MB (max {max_size_mb}MB)")

    # Check duration
    duration = get_video_duration(video_path)
    if duration < min_duration:
        raise ValueError(f"Video too short: {duration:.1f}s (min {min_duration}s)")
    if duration > max_duration:
        raise ValueError(f"Video too long: {duration:.1f}s (max {max_duration}s)")

    logger.info(f"✅ Video validated: {file_size_mb:.1f}MB, {duration:.1f}s")
    return duration


def get_video_duration(video_path: str) -> float:
    """Get video duration using ffprobe"""
    cmd = [
        'ffprobe', '-v', 'quiet', '-print_format', 'json',
        '-show_format', video_path
    ]
    result = subprocess.run(cmd, capture_output=True, text=True)
    if result.returncode != 0:
        raise ValueError(f"Failed to get video duration: {result.stderr}")
    info = json.loads(result.stdout)
    return float(info.get('format', {}).get('duration', 0))

Video Concatenation

python
# video_processing/concat.py
def concat_videos(video1_path: str, video2_path: str, output_path: str):
    """Concatenate two videos using ffmpeg concat demuxer"""
    # Create concat file
    concat_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False)
    concat_file.write(f"file '{video1_path}'\n")
    concat_file.write(f"file '{video2_path}'\n")
    concat_file.close()

    # Run ffmpeg
    cmd = [
        'ffmpeg', '-y',
        '-f', 'concat',
        '-safe', '0',
        '-i', concat_file.name,
        '-c', 'copy',  # Stream copy (fast, no re-encoding)
        output_path
    ]

    result = subprocess.run(cmd, capture_output=True)
    if result.returncode != 0:
        raise ValueError(f"Concat failed: {result.stderr.decode()}")

    os.unlink(concat_file.name)

Multi-Resolution Variants

python
# video_processing/resize.py
RESOLUTIONS = {
    '1080p': {'width': 1920, 'height': 1080},
    '720p': {'width': 1280, 'height': 720},
    '480p': {'width': 854, 'height': 480},
    '360p': {'width': 640, 'height': 360},
}

def generate_video_variants(input_path: str, output_dir: str, ad_id: str) -> dict:
    """Generate multiple resolution variants of a video"""
    variants = {}

    for res_name, dimensions in RESOLUTIONS.items():
        output_path = os.path.join(output_dir, f"{ad_id}_{res_name}.mp4")

        cmd = [
            'ffmpeg', '-y',
            '-i', input_path,
            '-vf', f"scale={dimensions['width']}:{dimensions['height']}:force_original_aspect_ratio=decrease",
            '-c:v', 'libx264',
            '-preset', 'fast',
            '-crf', '23',
            '-c:a', 'aac',
            '-b:a', '128k',
            output_path
        ]

        result = subprocess.run(cmd, capture_output=True)
        if result.returncode == 0:
            variants[res_name] = {
                'path': output_path,
                'width': dimensions['width'],
                'height': dimensions['height']
            }

    return variants

QR Code Watermark

python
# video_processing/qr_code.py
import qrcode
from PIL import Image

def generate_qr_watermark(subscription_code: str, size: int = 100) -> Image:
    """Generate a QR code image for watermarking"""
    qr = qrcode.QRCode(
        version=1,
        error_correction=qrcode.constants.ERROR_CORRECT_L,
        box_size=4,
        border=1
    )
    qr.add_data(subscription_code)
    qr.make(fit=True)

    img = qr.make_image(fill_color="white", back_color="transparent")
    return img.resize((size, size))


def add_qr_watermark(video_path: str, qr_image_path: str, output_path: str, position: str = 'bottom_right'):
    """Add QR code watermark to video"""
    positions = {
        'bottom_right': 'main_w-overlay_w-10:main_h-overlay_h-10',
        'bottom_left': '10:main_h-overlay_h-10',
        'top_right': 'main_w-overlay_w-10:10',
        'top_left': '10:10'
    }

    overlay_pos = positions.get(position, positions['bottom_right'])

    cmd = [
        'ffmpeg', '-y',
        '-i', video_path,
        '-i', qr_image_path,
        '-filter_complex', f'overlay={overlay_pos}',
        '-c:a', 'copy',
        output_path
    ]

    result = subprocess.run(cmd, capture_output=True)
    if result.returncode != 0:
        raise ValueError(f"Watermark failed: {result.stderr.decode()}")

Storage Interface

python
# utils/storage_interface.py
from abc import ABC, abstractmethod

class StorageInterface(ABC):
    @abstractmethod
    def upload_file(self, local_path: str, remote_path: str) -> str:
        """Upload file and return public URL"""
        pass

    @abstractmethod
    def download_file(self, url: str, local_dir: str, filename: str) -> str:
        """Download file and return local path"""
        pass

    @abstractmethod
    def delete_file(self, url_or_path: str) -> bool:
        """Delete file from storage"""
        pass

R2 Storage Implementation

python
# utils/r2_storage.py
import boto3
from botocore.config import Config

class R2Storage(StorageInterface):
    def __init__(self):
        self.client = boto3.client(
            's3',
            endpoint_url=f"https://{os.getenv('R2_ACCOUNT_ID')}.r2.cloudflarestorage.com",
            aws_access_key_id=os.getenv('R2_ACCESS_KEY_ID'),
            aws_secret_access_key=os.getenv('R2_SECRET_ACCESS_KEY'),
            config=Config(signature_version='s3v4')
        )
        self.bucket = os.getenv('R2_BUCKET_NAME')
        self.public_url = os.getenv('R2_PUBLIC_URL')  # CDN URL

    def upload_file(self, local_path: str, remote_path: str) -> str:
        self.client.upload_file(
            local_path,
            self.bucket,
            remote_path,
            ExtraArgs={'ContentType': 'video/mp4'}
        )
        return f"{self.public_url}/{remote_path}"

    def download_file(self, url: str, local_dir: str, filename: str) -> str:
        local_path = os.path.join(local_dir, filename)

        if url.startswith('http'):
            # Download from URL
            response = requests.get(url, stream=True)
            with open(local_path, 'wb') as f:
                for chunk in response.iter_content(chunk_size=8192):
                    f.write(chunk)
        else:
            # Download from R2
            self.client.download_file(self.bucket, url, local_path)

        return local_path

    def delete_file(self, url_or_path: str) -> bool:
        # Extract key from URL
        if url_or_path.startswith('http'):
            key = url_or_path.replace(f"{self.public_url}/", '')
        else:
            key = url_or_path

        self.client.delete_object(Bucket=self.bucket, Key=key)
        return True

Completion Notification

python
def publish_completion_notification(ad_id: str, processed_url: str = None, error: str = None):
    """Publish video processing completion to message broker"""
    if not broker or not broker.is_connected():
        logger.error(f"Cannot publish completion for {ad_id}: Broker not connected")
        return

    notification = {
        "ad_id": ad_id,
        "status": "completed" if processed_url else "failed",
    }

    if processed_url:
        notification["processed_url"] = processed_url
    if error:
        notification["error"] = error

    broker.publish(
        TOPICS.VIDEO_PROCESSED,
        notification,
        attributes={
            "adId": ad_id,
            "type": "video_processed"
        }
    )
    logger.info(f"✅ Published completion notification for ad: {ad_id}")

Environment Variables

bash
# Storage (Cloudflare R2)
R2_ACCOUNT_ID=...
R2_ACCESS_KEY_ID=...
R2_SECRET_ACCESS_KEY=...
R2_BUCKET_NAME=eneza-media
R2_PUBLIC_URL=https://media.eneza.app

# Message Broker
MESSAGE_BROKER=pubsub
GCP_PROJECT_ID=eneza-40ab5
PUBSUB_TOPIC_SUFFIX=prod

# Processing
PROCESSED_VIDEOS_PREFIX=processed/

# Cloud Run
PORT=8080

Graceful Shutdown

python
def graceful_shutdown(signum, frame):
    """Handle graceful shutdown on SIGTERM/SIGINT"""
    logger.info("Received shutdown signal, cleaning up...")

    if broker:
        broker.disconnect()
        logger.info("Message broker disconnected")

    sys.exit(0)

# Register handlers
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)

One chat. Everything done.