Eneza Screenshot Verifier
The Screenshot Verifier is a Python microservice that uses AI-powered image analysis to verify that screenshots submitted by users are legitimate proof of posting ads to WhatsApp Status.
Architecture Overview
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ ENEZA API │────▶│ GOOGLE PUBSUB │────▶│ SCREENSHOT │
│ │ │ │ │ VERIFIER │
│ • Upload │ │ screenshot- │ │ │
│ • Store in R2 │ │ verify │ │ • Download img │
│ • Publish msg │ │ │ │ • Run pipeline │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
▼
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ ENEZA API │◀────│ GOOGLE PUBSUB │◀────│ VERIFICATION │
│ │ │ │ │ RESULTS │
│ • Update status │ │ screenshot- │ │ │
│ • Credit user │ │ processed │ │ • Pass/Fail │
│ • Notify │ │ │ │ • Score 0-1 │
└─────────────────┘ └─────────────────┘ └─────────────────┘Technology Stack
| Component | Technology |
|---|---|
| Language | Python 3.11 |
| AI/Vision | Google Gemini Vision API |
| Image Processing | Pillow, NumPy, OpenCV |
| Video Processing | FFmpeg |
| Message Broker | Google Pub/Sub |
| Hosting | Cloud Run (GPU-enabled) |
Project Structure
screenshot-verifier/
├── app.py # Entry point, message handling
├── src/
│ ├── db_utils.py # Database queries
│ ├── verification_pipeline/
│ │ ├── base.py # Base classes
│ │ ├── verification_pipeline.py # Main orchestrator
│ │ └── checks/ # Individual verification checks
│ │ ├── __init__.py
│ │ ├── anti_manipulation/
│ │ ├── image_to_video_checker/
│ │ ├── qr_code_matcher/
│ │ ├── screenshot_origin_phone_matcher/
│ │ ├── screenshot_time_difference_matcher/
│ │ ├── user_behavior_analysis/
│ │ ├── view_counter/
│ │ └── watermark_checker/
│ ├── decision_engine/
│ │ └── decision_engine.py # Automated decision making
│ └── utils/
│ ├── logger.py
│ └── test_utils.py
└── requirements.txtVerification Pipeline
The verification pipeline runs multiple checks in sequence:
1. Gateway Check: Image-to-Video Matching
This check MUST pass before any other checks run.
python
# image_to_video_checker.py
class ImageToVideoCheck(BaseCheck):
"""
Verifies screenshot came from the subscribed video.
Uses perceptual hashing and structural similarity.
"""
PHASH_MATCH_THRESHOLD = 0.85 # 85% similarity required
DHASH_MATCH_THRESHOLD = 0.80
HISTOGRAM_MATCH_THRESHOLD = 0.70
CREDITS_DURATION_SECONDS = 10 # Exclude credits section
def execute(self, subscription, screenshot) -> VerificationResult:
# 1. Download screenshot
screenshot_image = self._download_image(screenshot['screenshotLink'])
# 2. Extract frames from video (excluding credits)
video_url = subscription['ad']['video']
video_frames = self._extract_video_frames(video_url)
# 3. Compare screenshot to each frame
match_result = self._find_best_match(screenshot_image, video_frames)
if match_result['best_similarity'] >= self.PHASH_MATCH_THRESHOLD:
return VerificationResult(status=PASS, score=match_result['best_similarity'])
else:
return VerificationResult(status=FAIL, score=match_result['best_similarity'])
def _compute_phash(self, image):
"""Perceptual hash - robust to minor transformations"""
img = image.resize((17, 16)).convert('L')
pixels = np.array(img, dtype=np.float64)
diff = pixels[:, 1:] > pixels[:, :-1]
return ''.join(['1' if b else '0' for b in diff.flatten()])
def _extract_video_frames(self, video_url):
"""Extract frames using ffmpeg, excluding credits section"""
duration = self._get_video_duration(video_url)
end_time = duration - self.CREDITS_DURATION_SECONDS
cmd = [
'ffmpeg', '-i', video_url,
'-t', str(end_time), # Stop before credits
'-vf', 'fps=0.5', # 1 frame per 2 seconds
'-frames:v', '30',
output_pattern
]2. Core Verification Checks
After the gateway check passes, these checks run:
python
# checks/__init__.py
available_checks = [
# Breaking checks (critical)
AntiManipulationCheck(),
QRCodeMatcherCheck(),
ScreenshotOriginPhoneCheck(),
ScreenshotTimeDifferenceCheck(),
WatermarkCheck(),
# Aggregate checks
UserBehaviorAnalysisCheck(),
ViewCounterCheck()
]Anti-Manipulation Check
Detects image manipulation and AI-generated content.
python
class AntiManipulationCheck(BaseCheck):
"""
Detects:
- AI-generated images
- Photoshopped/edited screenshots
- Screenshot templates
- Copy-paste artifacts
"""
def execute(self, subscription, screenshot):
image = download_image(screenshot['screenshotLink'])
# Check EXIF data consistency
exif_valid = self._validate_exif(image)
# Check for manipulation artifacts
artifacts = self._detect_manipulation_artifacts(image)
# AI generation detection via Gemini
ai_generated = self._check_ai_generation(image)
if ai_generated:
return VerificationResult(
status=FAIL,
score=0.0,
details={'reason': 'AI_GENERATED_SUSPECTED', 'is_critical': True}
)QR Code Matcher
Verifies the unique QR watermark matches the subscription.
python
class QRCodeMatcherCheck(BaseCheck):
"""
Each video has a unique QR watermark embedded.
The screenshot must contain the correct QR code.
"""
def execute(self, subscription, screenshot):
# Decode QR from screenshot
qr_data = self._decode_qr(screenshot_image)
# Compare with subscription code
expected_code = subscription['code']
if qr_data == expected_code:
return VerificationResult(status=PASS, score=1.0)
elif qr_data is None:
return VerificationResult(
status=FAIL,
score=0.0,
details={'reason': 'WATERMARK_MISSING', 'is_critical': True}
)
else:
return VerificationResult(
status=FAIL,
score=0.0,
details={'reason': 'WATERMARK_CODE_MISMATCH', 'is_critical': True}
)Device Origin Check
Verifies screenshot came from the registered device.
python
class ScreenshotOriginPhoneCheck(BaseCheck):
"""
Compares device info in screenshot EXIF with registered phone info.
"""
def execute(self, subscription, screenshot):
# Extract device info from screenshot EXIF
screenshot_device = self._extract_device_info(screenshot_image)
# Get registered device from subscription
registered_device = subscription['phoneInfo']
# Compare key identifiers
if self._devices_match(screenshot_device, registered_device):
return VerificationResult(status=PASS, score=1.0)
else:
return VerificationResult(
status=FAIL,
score=0.0,
details={'reason': 'DEVICE_MISMATCH', 'is_critical': True}
)Time Difference Check
Validates screenshot was taken within expected window.
python
class ScreenshotTimeDifferenceCheck(BaseCheck):
"""
Screenshot must be taken:
- After subscription time
- Within 24-hour posting window
- Before current time
"""
MAX_WINDOW_HOURS = 24
def execute(self, subscription, screenshot):
subscribe_time = subscription['subscribeTime']
screenshot_time = screenshot['screenshotTime']
submit_time = screenshot['submitTime']
# Validate timeline
if screenshot_time < subscribe_time:
return VerificationResult(status=FAIL, details={'reason': 'SCREENSHOT_BEFORE_SUBSCRIPTION'})
hours_diff = (submit_time - subscribe_time).total_seconds() / 3600
if hours_diff > self.MAX_WINDOW_HOURS:
return VerificationResult(status=FAIL, details={'reason': 'WINDOW_EXPIRED'})
return VerificationResult(status=PASS, score=1.0)View Counter Check
Extracts view count from WhatsApp Status screenshot using AI.
python
class ViewCounterCheck(BaseCheck):
"""
Uses Gemini Vision to extract view count from screenshot.
WhatsApp Status shows "X views" at the bottom.
"""
def execute(self, subscription, screenshot):
# Use Gemini Vision to extract view count
prompt = """
Analyze this WhatsApp Status screenshot.
Find and extract the view count shown at the bottom.
Return ONLY the number, or "NOT_FOUND" if no view count visible.
"""
response = gemini_model.generate_content([
prompt,
screenshot_image
])
extracted_views = self._parse_view_count(response.text)
if extracted_views is None:
return VerificationResult(status=NEEDS_REVIEW, score=0.5)
return VerificationResult(
status=PASS,
score=min(1.0, extracted_views / 100), # Normalize score
details={'extracted_views': extracted_views}
)Decision Engine
The decision engine makes automated approve/reject/ban decisions:
python
# decision_engine.py
class Decision(Enum):
AUTO_APPROVE = "AUTO_APPROVE"
MANUAL_REVIEW = "MANUAL_REVIEW"
AUTO_REJECT = "AUTO_REJECT"
AUTO_BAN = "AUTO_BAN"
class DecisionEngine:
"""
STRICT BAN POLICY:
- 2 critical failures EVER = permanent ban
- Score < 0.30 = ban
- Score < 0.50 = reject
- Score < 0.70 = manual review
- Score >= 0.70 = approve
"""
# Critical checks that count toward ban threshold
CRITICAL_CHECK_NAMES = {
'watermark',
'screenshot_origin_phone',
'anti_manipulation'
}
def make_decision(self, verification_results, user_fraud_history=None):
checks = verification_results['checks_performed']
score = verification_results['final_score']
# Identify critical failures
critical_failures = self._identify_critical_failures(checks)
total_critical = user_fraud_history.get('criticalFailureCount', 0) + len(critical_failures)
# STRICT: 2 critical failures EVER = permanent ban
if total_critical >= 2:
return DecisionResult(
decision=Decision.AUTO_BAN,
reason=f"FRAUD_DETECTED: {total_critical} critical failures"
)
# Very low score = ban
if score < 0.30:
return DecisionResult(decision=Decision.AUTO_BAN, reason="LOW_CONFIDENCE_FRAUD")
# 1 critical failure = reject with warning
if len(critical_failures) == 1:
return DecisionResult(
decision=Decision.AUTO_REJECT,
reason=f"CRITICAL_FAILURE: {critical_failures[0]} (WARNING: 1 more = ban)"
)
# Low score = reject
if score < 0.50:
return DecisionResult(decision=Decision.AUTO_REJECT, reason=f"LOW_SCORE: {score}")
# Medium score = manual review
if score < 0.70:
return DecisionResult(decision=Decision.MANUAL_REVIEW, reason=f"NEEDS_REVIEW: {score}")
# High score = approve
return DecisionResult(decision=Decision.AUTO_APPROVE, reason=f"APPROVED: {score}")Message Flow
Receiving Verification Requests
python
# app.py
def process_message(message: BrokerMessage):
data = message.data
ad_id = data.get("ad_id")
subscription_id = data.get("subscription_id")
# Run verification pipeline
subscription = get_subscription_by_id(subscription_id)
screenshot = get_screenshot_by_ad_id(ad_id)
verification_results = start_verification_pipeline(subscription, screenshot)
# Publish results
broker.publish(TOPICS.SCREENSHOT_PROCESSED, {
"ad_id": ad_id,
"subscription_id": subscription_id,
"status": verification_results.status.value,
"score": verification_results.score,
"details": verification_results.details
})
message.ack()Main Pipeline Orchestration
python
# verification_pipeline.py
def start_verification_pipeline(subscription, screenshot, user_fraud_history=None):
# GATEWAY CHECK: Must pass first
gateway_check = ImageToVideoCheck()
gateway_result = gateway_check.execute(subscription, screenshot)
if gateway_result.status == VerificationStatus.FAIL:
return VerificationResult(
status=FAIL,
score=gateway_result.score,
details={
"gateway_check_failed": True,
"decision": "AUTO_REJECT",
"decision_reason": "Screenshot does not contain the subscribed ad video"
}
)
# Run remaining checks
all_details = [gateway_result.to_dict()]
total_score = gateway_result.score * gateway_check.weight
total_weight = gateway_check.weight
for check in available_checks:
result = check.execute(subscription, screenshot)
all_details.append(result.to_dict())
total_score += result.score * check.weight
total_weight += check.weight
final_score = total_score / total_weight
# Get decision from decision engine
decision_engine = DecisionEngine(user_fraud_history)
decision_result = decision_engine.make_decision({
"checks_performed": all_details,
"final_score": final_score
})
return VerificationResult(
status=DECISION_TO_STATUS[decision_result.decision],
score=decision_result.aggregate_score,
details={
"checks_performed": all_details,
"decision": decision_result.decision.value,
"decision_reason": decision_result.reason,
"should_ban": decision_result.should_ban
}
)Environment Variables
bash
# Message Broker
MESSAGE_BROKER=pubsub
GCP_PROJECT_ID=eneza-40ab5
PUBSUB_TOPIC_SUFFIX=prod
# AI/Vision
GEMINI_API_KEY=AIzaSy...
# Database (read-only access)
DATABASE_URL=postgresql://...
# Cloud Run
PORT=8080Health Checks
python
class HealthHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == '/health' or self.path == '/':
self.send_response(200)
self.end_headers()
self.wfile.write(b'{"status": "healthy", "service": "screenshot-verifier"}')