Skip to content

Eneza Screenshot Verifier

The Screenshot Verifier is a Python microservice that uses AI-powered image analysis to verify that screenshots submitted by users are legitimate proof of posting ads to WhatsApp Status.

Architecture Overview

┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   ENEZA API     │────▶│  GOOGLE PUBSUB  │────▶│  SCREENSHOT     │
│                 │     │                 │     │   VERIFIER      │
│ • Upload        │     │ screenshot-     │     │                 │
│ • Store in R2   │     │   verify        │     │ • Download img  │
│ • Publish msg   │     │                 │     │ • Run pipeline  │
└─────────────────┘     └─────────────────┘     └─────────────────┘


┌─────────────────┐     ┌─────────────────┐     ┌─────────────────┐
│   ENEZA API     │◀────│  GOOGLE PUBSUB  │◀────│ VERIFICATION    │
│                 │     │                 │     │   RESULTS       │
│ • Update status │     │ screenshot-     │     │                 │
│ • Credit user   │     │   processed     │     │ • Pass/Fail     │
│ • Notify        │     │                 │     │ • Score 0-1     │
└─────────────────┘     └─────────────────┘     └─────────────────┘

Technology Stack

ComponentTechnology
LanguagePython 3.11
AI/VisionGoogle Gemini Vision API
Image ProcessingPillow, NumPy, OpenCV
Video ProcessingFFmpeg
Message BrokerGoogle Pub/Sub
HostingCloud Run (GPU-enabled)

Project Structure

screenshot-verifier/
├── app.py                    # Entry point, message handling
├── src/
│   ├── db_utils.py          # Database queries
│   ├── verification_pipeline/
│   │   ├── base.py          # Base classes
│   │   ├── verification_pipeline.py  # Main orchestrator
│   │   └── checks/          # Individual verification checks
│   │       ├── __init__.py
│   │       ├── anti_manipulation/
│   │       ├── image_to_video_checker/
│   │       ├── qr_code_matcher/
│   │       ├── screenshot_origin_phone_matcher/
│   │       ├── screenshot_time_difference_matcher/
│   │       ├── user_behavior_analysis/
│   │       ├── view_counter/
│   │       └── watermark_checker/
│   ├── decision_engine/
│   │   └── decision_engine.py  # Automated decision making
│   └── utils/
│       ├── logger.py
│       └── test_utils.py
└── requirements.txt

Verification Pipeline

The verification pipeline runs multiple checks in sequence:

1. Gateway Check: Image-to-Video Matching

This check MUST pass before any other checks run.

python
# image_to_video_checker.py
class ImageToVideoCheck(BaseCheck):
    """
    Verifies screenshot came from the subscribed video.
    Uses perceptual hashing and structural similarity.
    """

    PHASH_MATCH_THRESHOLD = 0.85  # 85% similarity required
    DHASH_MATCH_THRESHOLD = 0.80
    HISTOGRAM_MATCH_THRESHOLD = 0.70
    CREDITS_DURATION_SECONDS = 10  # Exclude credits section

    def execute(self, subscription, screenshot) -> VerificationResult:
        # 1. Download screenshot
        screenshot_image = self._download_image(screenshot['screenshotLink'])

        # 2. Extract frames from video (excluding credits)
        video_url = subscription['ad']['video']
        video_frames = self._extract_video_frames(video_url)

        # 3. Compare screenshot to each frame
        match_result = self._find_best_match(screenshot_image, video_frames)

        if match_result['best_similarity'] >= self.PHASH_MATCH_THRESHOLD:
            return VerificationResult(status=PASS, score=match_result['best_similarity'])
        else:
            return VerificationResult(status=FAIL, score=match_result['best_similarity'])

    def _compute_phash(self, image):
        """Perceptual hash - robust to minor transformations"""
        img = image.resize((17, 16)).convert('L')
        pixels = np.array(img, dtype=np.float64)
        diff = pixels[:, 1:] > pixels[:, :-1]
        return ''.join(['1' if b else '0' for b in diff.flatten()])

    def _extract_video_frames(self, video_url):
        """Extract frames using ffmpeg, excluding credits section"""
        duration = self._get_video_duration(video_url)
        end_time = duration - self.CREDITS_DURATION_SECONDS

        cmd = [
            'ffmpeg', '-i', video_url,
            '-t', str(end_time),  # Stop before credits
            '-vf', 'fps=0.5',     # 1 frame per 2 seconds
            '-frames:v', '30',
            output_pattern
        ]

2. Core Verification Checks

After the gateway check passes, these checks run:

python
# checks/__init__.py
available_checks = [
    # Breaking checks (critical)
    AntiManipulationCheck(),
    QRCodeMatcherCheck(),
    ScreenshotOriginPhoneCheck(),
    ScreenshotTimeDifferenceCheck(),
    WatermarkCheck(),

    # Aggregate checks
    UserBehaviorAnalysisCheck(),
    ViewCounterCheck()
]

Anti-Manipulation Check

Detects image manipulation and AI-generated content.

python
class AntiManipulationCheck(BaseCheck):
    """
    Detects:
    - AI-generated images
    - Photoshopped/edited screenshots
    - Screenshot templates
    - Copy-paste artifacts
    """

    def execute(self, subscription, screenshot):
        image = download_image(screenshot['screenshotLink'])

        # Check EXIF data consistency
        exif_valid = self._validate_exif(image)

        # Check for manipulation artifacts
        artifacts = self._detect_manipulation_artifacts(image)

        # AI generation detection via Gemini
        ai_generated = self._check_ai_generation(image)

        if ai_generated:
            return VerificationResult(
                status=FAIL,
                score=0.0,
                details={'reason': 'AI_GENERATED_SUSPECTED', 'is_critical': True}
            )

QR Code Matcher

Verifies the unique QR watermark matches the subscription.

python
class QRCodeMatcherCheck(BaseCheck):
    """
    Each video has a unique QR watermark embedded.
    The screenshot must contain the correct QR code.
    """

    def execute(self, subscription, screenshot):
        # Decode QR from screenshot
        qr_data = self._decode_qr(screenshot_image)

        # Compare with subscription code
        expected_code = subscription['code']

        if qr_data == expected_code:
            return VerificationResult(status=PASS, score=1.0)
        elif qr_data is None:
            return VerificationResult(
                status=FAIL,
                score=0.0,
                details={'reason': 'WATERMARK_MISSING', 'is_critical': True}
            )
        else:
            return VerificationResult(
                status=FAIL,
                score=0.0,
                details={'reason': 'WATERMARK_CODE_MISMATCH', 'is_critical': True}
            )

Device Origin Check

Verifies screenshot came from the registered device.

python
class ScreenshotOriginPhoneCheck(BaseCheck):
    """
    Compares device info in screenshot EXIF with registered phone info.
    """

    def execute(self, subscription, screenshot):
        # Extract device info from screenshot EXIF
        screenshot_device = self._extract_device_info(screenshot_image)

        # Get registered device from subscription
        registered_device = subscription['phoneInfo']

        # Compare key identifiers
        if self._devices_match(screenshot_device, registered_device):
            return VerificationResult(status=PASS, score=1.0)
        else:
            return VerificationResult(
                status=FAIL,
                score=0.0,
                details={'reason': 'DEVICE_MISMATCH', 'is_critical': True}
            )

Time Difference Check

Validates screenshot was taken within expected window.

python
class ScreenshotTimeDifferenceCheck(BaseCheck):
    """
    Screenshot must be taken:
    - After subscription time
    - Within 24-hour posting window
    - Before current time
    """

    MAX_WINDOW_HOURS = 24

    def execute(self, subscription, screenshot):
        subscribe_time = subscription['subscribeTime']
        screenshot_time = screenshot['screenshotTime']
        submit_time = screenshot['submitTime']

        # Validate timeline
        if screenshot_time < subscribe_time:
            return VerificationResult(status=FAIL, details={'reason': 'SCREENSHOT_BEFORE_SUBSCRIPTION'})

        hours_diff = (submit_time - subscribe_time).total_seconds() / 3600
        if hours_diff > self.MAX_WINDOW_HOURS:
            return VerificationResult(status=FAIL, details={'reason': 'WINDOW_EXPIRED'})

        return VerificationResult(status=PASS, score=1.0)

View Counter Check

Extracts view count from WhatsApp Status screenshot using AI.

python
class ViewCounterCheck(BaseCheck):
    """
    Uses Gemini Vision to extract view count from screenshot.
    WhatsApp Status shows "X views" at the bottom.
    """

    def execute(self, subscription, screenshot):
        # Use Gemini Vision to extract view count
        prompt = """
        Analyze this WhatsApp Status screenshot.
        Find and extract the view count shown at the bottom.
        Return ONLY the number, or "NOT_FOUND" if no view count visible.
        """

        response = gemini_model.generate_content([
            prompt,
            screenshot_image
        ])

        extracted_views = self._parse_view_count(response.text)

        if extracted_views is None:
            return VerificationResult(status=NEEDS_REVIEW, score=0.5)

        return VerificationResult(
            status=PASS,
            score=min(1.0, extracted_views / 100),  # Normalize score
            details={'extracted_views': extracted_views}
        )

Decision Engine

The decision engine makes automated approve/reject/ban decisions:

python
# decision_engine.py
class Decision(Enum):
    AUTO_APPROVE = "AUTO_APPROVE"
    MANUAL_REVIEW = "MANUAL_REVIEW"
    AUTO_REJECT = "AUTO_REJECT"
    AUTO_BAN = "AUTO_BAN"

class DecisionEngine:
    """
    STRICT BAN POLICY:
    - 2 critical failures EVER = permanent ban
    - Score < 0.30 = ban
    - Score < 0.50 = reject
    - Score < 0.70 = manual review
    - Score >= 0.70 = approve
    """

    # Critical checks that count toward ban threshold
    CRITICAL_CHECK_NAMES = {
        'watermark',
        'screenshot_origin_phone',
        'anti_manipulation'
    }

    def make_decision(self, verification_results, user_fraud_history=None):
        checks = verification_results['checks_performed']
        score = verification_results['final_score']

        # Identify critical failures
        critical_failures = self._identify_critical_failures(checks)
        total_critical = user_fraud_history.get('criticalFailureCount', 0) + len(critical_failures)

        # STRICT: 2 critical failures EVER = permanent ban
        if total_critical >= 2:
            return DecisionResult(
                decision=Decision.AUTO_BAN,
                reason=f"FRAUD_DETECTED: {total_critical} critical failures"
            )

        # Very low score = ban
        if score < 0.30:
            return DecisionResult(decision=Decision.AUTO_BAN, reason="LOW_CONFIDENCE_FRAUD")

        # 1 critical failure = reject with warning
        if len(critical_failures) == 1:
            return DecisionResult(
                decision=Decision.AUTO_REJECT,
                reason=f"CRITICAL_FAILURE: {critical_failures[0]} (WARNING: 1 more = ban)"
            )

        # Low score = reject
        if score < 0.50:
            return DecisionResult(decision=Decision.AUTO_REJECT, reason=f"LOW_SCORE: {score}")

        # Medium score = manual review
        if score < 0.70:
            return DecisionResult(decision=Decision.MANUAL_REVIEW, reason=f"NEEDS_REVIEW: {score}")

        # High score = approve
        return DecisionResult(decision=Decision.AUTO_APPROVE, reason=f"APPROVED: {score}")

Message Flow

Receiving Verification Requests

python
# app.py
def process_message(message: BrokerMessage):
    data = message.data
    ad_id = data.get("ad_id")
    subscription_id = data.get("subscription_id")

    # Run verification pipeline
    subscription = get_subscription_by_id(subscription_id)
    screenshot = get_screenshot_by_ad_id(ad_id)
    verification_results = start_verification_pipeline(subscription, screenshot)

    # Publish results
    broker.publish(TOPICS.SCREENSHOT_PROCESSED, {
        "ad_id": ad_id,
        "subscription_id": subscription_id,
        "status": verification_results.status.value,
        "score": verification_results.score,
        "details": verification_results.details
    })

    message.ack()

Main Pipeline Orchestration

python
# verification_pipeline.py
def start_verification_pipeline(subscription, screenshot, user_fraud_history=None):
    # GATEWAY CHECK: Must pass first
    gateway_check = ImageToVideoCheck()
    gateway_result = gateway_check.execute(subscription, screenshot)

    if gateway_result.status == VerificationStatus.FAIL:
        return VerificationResult(
            status=FAIL,
            score=gateway_result.score,
            details={
                "gateway_check_failed": True,
                "decision": "AUTO_REJECT",
                "decision_reason": "Screenshot does not contain the subscribed ad video"
            }
        )

    # Run remaining checks
    all_details = [gateway_result.to_dict()]
    total_score = gateway_result.score * gateway_check.weight
    total_weight = gateway_check.weight

    for check in available_checks:
        result = check.execute(subscription, screenshot)
        all_details.append(result.to_dict())
        total_score += result.score * check.weight
        total_weight += check.weight

    final_score = total_score / total_weight

    # Get decision from decision engine
    decision_engine = DecisionEngine(user_fraud_history)
    decision_result = decision_engine.make_decision({
        "checks_performed": all_details,
        "final_score": final_score
    })

    return VerificationResult(
        status=DECISION_TO_STATUS[decision_result.decision],
        score=decision_result.aggregate_score,
        details={
            "checks_performed": all_details,
            "decision": decision_result.decision.value,
            "decision_reason": decision_result.reason,
            "should_ban": decision_result.should_ban
        }
    )

Environment Variables

bash
# Message Broker
MESSAGE_BROKER=pubsub
GCP_PROJECT_ID=eneza-40ab5
PUBSUB_TOPIC_SUFFIX=prod

# AI/Vision
GEMINI_API_KEY=AIzaSy...

# Database (read-only access)
DATABASE_URL=postgresql://...

# Cloud Run
PORT=8080

Health Checks

python
class HealthHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/health' or self.path == '/':
            self.send_response(200)
            self.end_headers()
            self.wfile.write(b'{"status": "healthy", "service": "screenshot-verifier"}')

One chat. Everything done.