Eneza Media Manager
The Media Manager is a Python microservice that processes video files for ads, adding watermarks, credits, and generating multi-resolution variants.
Architecture
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ ENEZA API │────▶│ MESSAGE BROKER │────▶│ MEDIA MANAGER │
│ │ │ │ │ │
│ • Upload video │ │ video- │ │ • Download │
│ • Create ad │ │ processing │ │ • Validate │
│ │ │ │ │ • Process │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ ENEZA API │◀────│ MESSAGE BROKER │◀────│ CLOUDFLARE R2 │
│ │ │ │ │ │
│ • Update ad │ │ video- │ │ • Upload final │
│ • Mark ready │ │ processed │ │ • Delete source │
└─────────────────┘ └─────────────────┘ └─────────────────┘Technology Stack
| Component | Technology |
|---|---|
| Language | Python 3.11 |
| Video Processing | FFmpeg |
| Storage | Cloudflare R2 (S3-compatible) |
| Message Broker | Google Pub/Sub / RabbitMQ |
| Hosting | Cloud Run |
Project Structure
media-manager/
├── src/
│ ├── main.py # Entry point
│ ├── config.py # Configuration
│ ├── video_processing/
│ │ ├── __init__.py
│ │ ├── upload.py # Upload to storage
│ │ ├── concat.py # Concatenate videos
│ │ ├── overlay.py # Add overlays
│ │ ├── resize.py # Generate variants
│ │ └── qr_code.py # Generate QR watermarks
│ ├── utils/
│ │ ├── storage_interface.py # Storage abstraction
│ │ ├── temp_storage.py # Temp file management
│ │ ├── r2_storage.py # R2 client
│ │ └── s3_storage.py # S3 client (legacy)
│ └── messaging/
│ ├── __init__.py
│ └── connection_manager.py
└── requirements.txtVideo Processing Pipeline
Main Entry Point
python
# main.py
def main():
logger.info("🚀 Starting Eneza Media Manager Service")
# Initialize storage
storage = R2Storage()
# Create and connect to broker
broker = create_broker()
broker.connect()
# Subscribe to video processing topic
broker.subscribe(
TOPICS.VIDEO_PROCESSING,
handle_video_processing_message,
options=SubscribeOptions(auto_ack=False)
)
logger.info("👂 Waiting for video processing requests...")
# Keep alive
signal.pause()Message Handler
python
def handle_video_processing_message(message: BrokerMessage):
"""
Expected message format:
{
"type": "process_ad_video",
"ad_id": "abc123",
"video_url": "https://...",
"operations": ["resize", "watermark", "security"]
}
"""
try:
data = message.data
ad_id = data.get("ad_id")
video_url = data.get("video_url")
if not ad_id or not video_url:
logger.error("Missing required fields")
message.ack()
return
logger.info(f"🎬 Processing video for ad: {ad_id}")
# Process the video
processed_url = process_ad_video(ad_id, video_url, storage)
# Publish success notification
publish_completion_notification(ad_id, processed_url=processed_url)
message.ack()
logger.info(f"✅ Video processing completed for ad: {ad_id}")
except Exception as e:
logger.error(f"❌ Error processing message: {str(e)}")
publish_completion_notification(ad_id, error=str(e))
message.nack(requeue=True)Video Processing Flow
python
def process_ad_video(ad_id: str, video_url: str, storage: StorageInterface) -> str:
"""Process an ad video by adding credits and return the processed video URL"""
with temp_directory() as temp_dir:
# 1. Download the source video
source_video = storage.download_file(video_url, temp_dir, f"{ad_id}_source.mp4")
logger.info(f"✅ Ad video downloaded: {source_video}")
# 2. Validate video
validate_video(source_video)
# 3. Get credits video (always appended at end)
credits_video = os.path.expanduser("~/eneza_closing_credit.mp4")
# 4. Concatenate videos
output_path = os.path.join(temp_dir, f"{ad_id}_processed.mp4")
concat_videos(source_video, credits_video, output_path)
logger.info(f"🎬 Video concatenated with credits")
# 5. Upload to processed folder
destination_path = f"processed/{ad_id}.mp4"
processed_url = storage.upload_file(output_path, destination_path)
logger.info(f"☁️ Processed video uploaded: {processed_url}")
# 6. Delete source video
storage.delete_file(video_url)
logger.info(f"🗑️ Source video deleted")
return processed_urlVideo Operations
Validation
python
def validate_video(video_path: str, max_size_mb: int = 100, min_duration: int = 5, max_duration: int = 90):
"""Validate video file size and duration"""
# Check file size
file_size_mb = os.path.getsize(video_path) / (1024 * 1024)
if file_size_mb > max_size_mb:
raise ValueError(f"Video file too large: {file_size_mb:.1f}MB (max {max_size_mb}MB)")
# Check duration
duration = get_video_duration(video_path)
if duration < min_duration:
raise ValueError(f"Video too short: {duration:.1f}s (min {min_duration}s)")
if duration > max_duration:
raise ValueError(f"Video too long: {duration:.1f}s (max {max_duration}s)")
logger.info(f"✅ Video validated: {file_size_mb:.1f}MB, {duration:.1f}s")
return duration
def get_video_duration(video_path: str) -> float:
"""Get video duration using ffprobe"""
cmd = [
'ffprobe', '-v', 'quiet', '-print_format', 'json',
'-show_format', video_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
raise ValueError(f"Failed to get video duration: {result.stderr}")
info = json.loads(result.stdout)
return float(info.get('format', {}).get('duration', 0))Video Concatenation
python
# video_processing/concat.py
def concat_videos(video1_path: str, video2_path: str, output_path: str):
"""Concatenate two videos using ffmpeg concat demuxer"""
# Create concat file
concat_file = tempfile.NamedTemporaryFile(mode='w', suffix='.txt', delete=False)
concat_file.write(f"file '{video1_path}'\n")
concat_file.write(f"file '{video2_path}'\n")
concat_file.close()
# Run ffmpeg
cmd = [
'ffmpeg', '-y',
'-f', 'concat',
'-safe', '0',
'-i', concat_file.name,
'-c', 'copy', # Stream copy (fast, no re-encoding)
output_path
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise ValueError(f"Concat failed: {result.stderr.decode()}")
os.unlink(concat_file.name)Multi-Resolution Variants
python
# video_processing/resize.py
RESOLUTIONS = {
'1080p': {'width': 1920, 'height': 1080},
'720p': {'width': 1280, 'height': 720},
'480p': {'width': 854, 'height': 480},
'360p': {'width': 640, 'height': 360},
}
def generate_video_variants(input_path: str, output_dir: str, ad_id: str) -> dict:
"""Generate multiple resolution variants of a video"""
variants = {}
for res_name, dimensions in RESOLUTIONS.items():
output_path = os.path.join(output_dir, f"{ad_id}_{res_name}.mp4")
cmd = [
'ffmpeg', '-y',
'-i', input_path,
'-vf', f"scale={dimensions['width']}:{dimensions['height']}:force_original_aspect_ratio=decrease",
'-c:v', 'libx264',
'-preset', 'fast',
'-crf', '23',
'-c:a', 'aac',
'-b:a', '128k',
output_path
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode == 0:
variants[res_name] = {
'path': output_path,
'width': dimensions['width'],
'height': dimensions['height']
}
return variantsQR Code Watermark
python
# video_processing/qr_code.py
import qrcode
from PIL import Image
def generate_qr_watermark(subscription_code: str, size: int = 100) -> Image:
"""Generate a QR code image for watermarking"""
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=4,
border=1
)
qr.add_data(subscription_code)
qr.make(fit=True)
img = qr.make_image(fill_color="white", back_color="transparent")
return img.resize((size, size))
def add_qr_watermark(video_path: str, qr_image_path: str, output_path: str, position: str = 'bottom_right'):
"""Add QR code watermark to video"""
positions = {
'bottom_right': 'main_w-overlay_w-10:main_h-overlay_h-10',
'bottom_left': '10:main_h-overlay_h-10',
'top_right': 'main_w-overlay_w-10:10',
'top_left': '10:10'
}
overlay_pos = positions.get(position, positions['bottom_right'])
cmd = [
'ffmpeg', '-y',
'-i', video_path,
'-i', qr_image_path,
'-filter_complex', f'overlay={overlay_pos}',
'-c:a', 'copy',
output_path
]
result = subprocess.run(cmd, capture_output=True)
if result.returncode != 0:
raise ValueError(f"Watermark failed: {result.stderr.decode()}")Storage Interface
python
# utils/storage_interface.py
from abc import ABC, abstractmethod
class StorageInterface(ABC):
@abstractmethod
def upload_file(self, local_path: str, remote_path: str) -> str:
"""Upload file and return public URL"""
pass
@abstractmethod
def download_file(self, url: str, local_dir: str, filename: str) -> str:
"""Download file and return local path"""
pass
@abstractmethod
def delete_file(self, url_or_path: str) -> bool:
"""Delete file from storage"""
passR2 Storage Implementation
python
# utils/r2_storage.py
import boto3
from botocore.config import Config
class R2Storage(StorageInterface):
def __init__(self):
self.client = boto3.client(
's3',
endpoint_url=f"https://{os.getenv('R2_ACCOUNT_ID')}.r2.cloudflarestorage.com",
aws_access_key_id=os.getenv('R2_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('R2_SECRET_ACCESS_KEY'),
config=Config(signature_version='s3v4')
)
self.bucket = os.getenv('R2_BUCKET_NAME')
self.public_url = os.getenv('R2_PUBLIC_URL') # CDN URL
def upload_file(self, local_path: str, remote_path: str) -> str:
self.client.upload_file(
local_path,
self.bucket,
remote_path,
ExtraArgs={'ContentType': 'video/mp4'}
)
return f"{self.public_url}/{remote_path}"
def download_file(self, url: str, local_dir: str, filename: str) -> str:
local_path = os.path.join(local_dir, filename)
if url.startswith('http'):
# Download from URL
response = requests.get(url, stream=True)
with open(local_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
else:
# Download from R2
self.client.download_file(self.bucket, url, local_path)
return local_path
def delete_file(self, url_or_path: str) -> bool:
# Extract key from URL
if url_or_path.startswith('http'):
key = url_or_path.replace(f"{self.public_url}/", '')
else:
key = url_or_path
self.client.delete_object(Bucket=self.bucket, Key=key)
return TrueCompletion Notification
python
def publish_completion_notification(ad_id: str, processed_url: str = None, error: str = None):
"""Publish video processing completion to message broker"""
if not broker or not broker.is_connected():
logger.error(f"Cannot publish completion for {ad_id}: Broker not connected")
return
notification = {
"ad_id": ad_id,
"status": "completed" if processed_url else "failed",
}
if processed_url:
notification["processed_url"] = processed_url
if error:
notification["error"] = error
broker.publish(
TOPICS.VIDEO_PROCESSED,
notification,
attributes={
"adId": ad_id,
"type": "video_processed"
}
)
logger.info(f"✅ Published completion notification for ad: {ad_id}")Environment Variables
bash
# Storage (Cloudflare R2)
R2_ACCOUNT_ID=...
R2_ACCESS_KEY_ID=...
R2_SECRET_ACCESS_KEY=...
R2_BUCKET_NAME=eneza-media
R2_PUBLIC_URL=https://media.eneza.app
# Message Broker
MESSAGE_BROKER=pubsub
GCP_PROJECT_ID=eneza-40ab5
PUBSUB_TOPIC_SUFFIX=prod
# Processing
PROCESSED_VIDEOS_PREFIX=processed/
# Cloud Run
PORT=8080Graceful Shutdown
python
def graceful_shutdown(signum, frame):
"""Handle graceful shutdown on SIGTERM/SIGINT"""
logger.info("Received shutdown signal, cleaning up...")
if broker:
broker.disconnect()
logger.info("Message broker disconnected")
sys.exit(0)
# Register handlers
signal.signal(signal.SIGTERM, graceful_shutdown)
signal.signal(signal.SIGINT, graceful_shutdown)