FakeCheck - AI-Powered Deepfake Detection¶
A comprehensive deepfake detection system combining CLIP, Whisper, and Google Gemini for multi-modal video analysis.
🚀 Quick Start Guide¶
Step 1: API Keys Setup (Required)¶
Before running, you'll need to obtain and set these API keys in the cells below:
- 🔑 Google Gemini API Key: Get from Google AI Studio
- 🤗 HuggingFace Token: Get from HF Settings (for model downloads)
Step 2: Upload Required Files (from Repo) 📁¶
📋 REQUIRED FILES from the backend/app/core/
directory:
- 📄 flow.py - Optical flow spike detection
- 📄 fusion.py - Score fusion and decision logic
- 📄 gemini.py - Google Gemini AI analysis
- 📄 models.py - CLIP visual scoring & Whisper transcription
- 📄 video.py - Video frame/audio sampling
📤 Upload Instructions:
- Click 'Files' tab in left sidebar
- Click 'Upload to session storage'
- Select all 5 .py files from your local
backend/app/core/
folder - Wait for upload completion ✅
Step 3: Upload Your Test Video 🎬¶
- 📁 Click 'Files' tab in left sidebar
- 📤 Click 'Upload to session storage'
- 🎬 Select a video file (MP4, AVI, MOV, MKV, WebM)
- ✏️ Update the
TEST_VIDEO_PATH
variable below with your filename
🎯 Detection Pipeline Overview¶
This system analyzes videos through multiple specialized detectors:
- 🎨 Visual Analysis: CLIP-based authenticity scoring
- 🗣️ Audio Analysis: Whisper transcription + lip-sync verification
- 🤖 AI Analysis: Gemini checks for artifacts, blinks, gibberish text
- 📊 Heuristics: Optical flow anomalies and frame transitions
- 🔀 Fusion: Weighted combination of all signals → final verdict
📝 Video Recommendations¶
✅ Good Test Videos:
- Clear faces with visible speech/lip movement
- Good lighting and audio quality
- News clips, interviews, vlogs, speeches
- 10-60 seconds duration
⚠️ Challenging Videos:
- Poor lighting or blurry footage
- Multiple speakers or no speech
- Extreme angles or partial face occlusion
⚙️ Processing Details¶
- 🕐 Duration: Videos processed up to 30 seconds max
- 🎯 Performance: Works best with clear faces and speech
- 🌍 Languages: Non-English speech disables lip-sync but other checks remain active
- ⏱️ Time: Gemini API calls take 30-120 seconds depending on content
- 📊 Output: Confidence score, verdict, detected issues, and timeline events
💡 Pro Tip: Start with a short (10-20 second) video of someone speaking clearly for best results!
In [3]:
# Cell 01: Robust Environment Setup with Force Installation
import sys
import subprocess
import importlib
# 0. Function to force package installation with system override
def force_install_package(package_spec):
"""Force installation ignoring system packages"""
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
'--force-reinstall', '--no-deps', package_spec])
def install_with_deps(package_spec):
"""Install package with dependencies"""
subprocess.check_call([sys.executable, '-m', 'pip', 'install',
'--force-reinstall', package_spec])
# 1. Upgrade pip first
print("--- Step 0: Upgrading pip ---")
subprocess.check_call([sys.executable, '-m', 'pip', 'install', '--upgrade', 'pip'])
# 2. CRITICAL: Uninstall Colab's default packages that conflict
print("\n--- Step 1: Removing conflicting Colab defaults ---")
# Uninstall packages that Colab pre-installs which conflict with our requirements
subprocess.run([sys.executable, '-m', 'pip', 'uninstall', '-y',
'Pillow', 'numpy', 'protobuf'], capture_output=True)
# 3. Install core dependencies with explicit force
print("\n--- Step 2: Installing critical base packages ---")
# Install NumPy 1.26.4 first as many packages depend on it
force_install_package('numpy==1.26.4')
# Install Pillow 9.5.0 to fix the is_directory error
force_install_package('Pillow==9.5.0')
# Install protobuf early (use 4.x since MediaPipe is removed)
force_install_package('protobuf==4.25.3')
# *** NEW STEP: Install a stable google-api-core ***
print("\n--- Step 2.5: Installing stable google-api-core ---")
install_with_deps('google-api-core[grpc]~=2.11.1') # Or another stable 2.x like 2.15.0
# 4. Verify critical packages before proceeding
print("\n--- Verification 1: Base packages ---")
importlib.invalidate_caches()
import numpy
import PIL
import google.protobuf
import google.api_core # Verify this new addition
print(f"NumPy version: {numpy.__version__}")
print(f"Pillow version: {PIL.__version__}")
print(f"Protobuf version: {google.protobuf.__version__}")
print(f"google-api-core version: {google.api_core.__version__}") # Check its version
assert numpy.__version__ == "1.26.4", f"NumPy version mismatch: {numpy.__version__}"
assert PIL.__version__ == "9.5.0", f"Pillow version mismatch: {PIL.__version__}"
assert google.api_core.__version__.startswith("2.11.1"), f"google-api-core version mismatch: {google.api_core.__version__}"
# Test critical imports
try:
from PIL import ImageFont
print("✅ PIL.ImageFont imported successfully")
except ImportError as e:
print(f"❌ PIL.ImageFont import failed: {e}")
raise
# 5. Install PyTorch with CUDA support
print("\n--- Step 3: Installing PyTorch stack ---")
torch_cmd = [sys.executable, '-m', 'pip', 'install',
'torch==2.2.1+cu118', 'torchvision==0.17.1+cu118',
'torchaudio==2.2.1+cu118',
'--index-url', 'https://download.pytorch.org/whl/cu118']
subprocess.check_call(torch_cmd)
# 6. Install transformers ecosystem with specific versions
print("\n--- Step 4: Installing Transformers stack ---")
# Quote version constraints to avoid shell interpretation
subprocess.check_call([sys.executable, '-m', 'pip', 'install', "transformers>=4.30.0,<4.41.0"])
subprocess.check_call([sys.executable, '-m', 'pip', 'install', "huggingface-hub>=0.20.0"])
subprocess.check_call([sys.executable, '-m', 'pip', 'install', "tokenizers>=0.14.0"])
# 7. Install ftfy before open-clip
print("\n--- Step 5: Installing ftfy and open-clip ---")
install_with_deps('ftfy>=6.0')
force_install_package('open-clip-torch==2.23.0')
# 8. Verify torch and open-clip
print("\n--- Verification 2: Torch and vision stack ---")
importlib.invalidate_caches()
try:
import torch
import open_clip
print(f"✅ PyTorch version: {torch.__version__}")
print("✅ open-clip imported successfully")
except ImportError as e:
print(f"❌ Import error: {e}")
raise
# 9. Install Google Cloud Libraries
print("\n--- Step 6: Installing Google Generative AI ---")
install_with_deps('google-generativeai==0.5.2')
# install_with_deps('google-cloud-vision~=3.4')
# 10. Pre-install compatible versions for Whisper dependencies
print("\n--- Step 7: Pre-installing Whisper dependencies ---")
# Install numba compatible with numpy 1.26.4
install_with_deps('numba==0.58.1')
# Install spacy 3.4.4 to get thinc 8.1.x (compatible with numpy 1.26.4)
# This prevents whisper from pulling thinc 8.3.6 which requires numpy 2.x
install_with_deps('spacy==3.4.4')
install_with_deps('thinc>=8.1.0,<8.2.0')
# 11. Install Whisper
print("\n--- Step 8: Installing OpenAI Whisper ---")
install_with_deps('openai-whisper==20231117')
# 12. Install remaining utilities
print("\n--- Step 9: Installing other utilities ---")
install_with_deps('ffmpeg-python==0.2.0')
install_with_deps('opencv-python-headless==4.9.0.80')
install_with_deps('nest-asyncio==1.6.0')
install_with_deps('langdetect>=1.0.9') # Required by gemini.py
install_with_deps('scikit-image==0.21.0') # Required for SSIM in flow.py
# 13. Force reinstall our exact versions one more time to ensure they stick
print("\n--- Step 10: Final version enforcement ---")
force_install_package('numpy==1.26.4')
force_install_package('Pillow==9.5.0')
# 14. Final comprehensive verification
print("\n--- FINAL VERIFICATION ---")
importlib.invalidate_caches()
overall_setup_ok_final = True
def verify_import(module_name, version_attr='__version__', expected_version=None, critical=False):
global overall_setup_ok_final # Use the renamed global
try:
module = importlib.import_module(module_name)
version = getattr(module, version_attr, 'N/A')
status = "✅"
message = f"{module_name}: {version}"
if expected_version:
message += f" (expected: {expected_version})"
if version != expected_version:
status = "⚠️"
if critical: overall_setup_ok_final = False # Fail build on critical mismatch
print(f"{status} {message}")
if module_name == "PIL" and expected_version == "9.5.0" and version == "9.5.0":
from PIL import ImageFont # Test only if Pillow is our target version
print(" ✅ PIL.ImageFont works with Pillow 9.5.0")
return True
except ImportError as e:
print(f"❌ {module_name}: Import failed - {e}")
if critical: overall_setup_ok_final = False
return False
except Exception as e_gen:
print(f"❌ {module_name}: Verification error - {e_gen}")
if critical: overall_setup_ok_final = False
return False
# Critical version checks
verify_import('PIL', expected_version='9.5.0', critical=True)
verify_import('numpy', expected_version='1.26.4', critical=True)
verify_import('google.api_core', expected_version='2.11.1', critical=True) # Verify pinned GAC
# Other important checks
verify_import('torch', expected_version='2.2.1+cu118')
verify_import('transformers') # No strict version, just check import
verify_import('huggingface_hub') # No strict version
verify_import('open_clip')
verify_import('whisper', expected_version='20231117') # package version
verify_import('cv2')
verify_import('google.generativeai', version_attr='VERSION', expected_version='0.5.2')
print("\n" + "="*50)
if overall_setup_ok_final:
print("✅ Environment setup targeted critical versions. Check ⚠️ for non-critical or resolved versions.")
else:
print("❌ Critical issues remain in environment setup. Check errors above.")
print("="*50)
In [4]:
# Cell 2: Imports, Backend Module Loading & Model Setup
import os
import sys
import json
import time
import uuid
import asyncio
from pathlib import Path
from typing import Dict, Any, List, Optional, Tuple
from datetime import datetime
from IPython.display import display, Markdown
import nest_asyncio
# Apply nest_asyncio for running asyncio code in Colab cells
nest_asyncio.apply()
# Add /content to Python path for backend modules
if '/content' not in sys.path:
sys.path.append('/content')
print("📁 BACKEND MODULE SETUP")
print("=" * 60)
print("""
🚨 CRITICAL: You MUST upload 5 backend files before proceeding!
📋 REQUIRED FILES from backend/app/core/ directory:
1. 📄 flow.py - Optical flow spike detection
2. 📄 fusion.py - Score fusion and thresholds
3. 📄 gemini.py - Gemini API interactions (parallel)
4. 📄 models.py - CLIP scoring & Whisper transcription
5. 📄 video.py - Video sampling functions
HOW TO UPLOAD:
1. Click 'Files' tab in left sidebar
2. Click 'Upload to session storage'
3. Select all 5 .py files from your local backend/app/core/ folder
4. Wait for upload to complete
5. Re-run this cell
⚠️ IMPORTANT: These must be the exact files from the repository!
Do NOT create empty files or copy code manually!
""")
# Import backend modules with graceful degradation
modules_status = {}
backend_modules = ['flow', 'fusion', 'gemini', 'models', 'video']
for module_name in backend_modules:
try:
# First check if file exists in /content
module_path = f'/content/{module_name}.py'
if not os.path.exists(module_path):
modules_status[module_name] = f'❌ Missing: File not found at {module_path}'
print(f"⚠️ {module_name}.py not found in /content/ - some features will be disabled")
continue
# Import the module from /content specifically
import importlib.util
spec = importlib.util.spec_from_file_location(module_name, module_path)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module)
# Verify the module has expected functions
expected_functions = {
'video': ['sample_video_content'],
'models': ['calculate_visual_clip_score', 'transcribe_audio_content'],
'gemini': ['run_gemini_inspections'],
'flow': ['detect_spikes'],
'fusion': ['fuse_detection_scores']
}
if module_name in expected_functions:
missing_functions = []
for func_name in expected_functions[module_name]:
if not hasattr(module, func_name):
missing_functions.append(func_name)
if missing_functions:
modules_status[module_name] = f'❌ Missing functions: {", ".join(missing_functions)}'
print(f"⚠️ {module_name}.py missing functions: {missing_functions}")
continue
modules_status[module_name] = '✅ Loaded'
globals()[module_name] = module
sys.modules[module_name] = module # Also add to sys.modules
else:
modules_status[module_name] = f'❌ Import error: Could not create module spec'
print(f"⚠️ Failed to create import spec for {module_name}.py")
except Exception as e:
modules_status[module_name] = f'❌ Import error: {e}'
print(f"⚠️ Error importing {module_name}.py: {e}")
print("\n📦 Backend Module Status:")
for module, status in modules_status.items():
print(f" {module}.py: {status}")
# Debug: Show what files are actually in /content
print("\n🔍 Debug: Files in /content directory:")
try:
content_files = [f for f in os.listdir('/content') if f.endswith('.py')]
if content_files:
for f in content_files:
file_path = f'/content/{f}'
size_kb = os.path.getsize(file_path) / 1024
print(f" 📄 {f} ({size_kb:.1f} KB)")
else:
print(" ❌ No .py files found in /content/")
print("\n💡 UPLOAD REQUIRED:")
print(" 1. Click 'Files' tab in left sidebar")
print(" 2. Click 'Upload to session storage'")
print(" 3. Upload these 5 files: flow.py, fusion.py, gemini.py, models.py, video.py")
print(" 4. Re-run this cell")
except Exception as e:
print(f" ❌ Error checking /content: {e}")
# Debug: Show what functions are available in the video module if it was loaded
if 'video' in globals():
print(f"\n🔍 Debug: Functions in loaded 'video' module:")
video_functions = [attr for attr in dir(video) if not attr.startswith('_') and callable(getattr(video, attr))]
if video_functions:
for func in video_functions:
print(f" • {func}")
else:
print(" ❌ No functions found in video module")
else:
print(f"\n🔍 Debug: 'video' module not loaded in globals()")
# Essential ML imports
import torch
import numpy as np
import PIL.Image as Image
import open_clip
import whisper
import google.generativeai as genai
# Configuration (matching backend)
CLIP_MODEL_NAME = "ViT-L-14"
CLIP_PRETRAINED = "laion2b_s32b_b82k"
WHISPER_MODEL_NAME = "base.en"
GEMINI_MODEL_NAME = "gemini-2.5-pro-preview-05-06"
TARGET_FPS = 8
MAX_VIDEO_DURATION_SEC = 30
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
print(f"\n🔧 Configuration:")
print(f" Device: {DEVICE}")
print(f" CLIP Model: {CLIP_MODEL_NAME}")
print(f" Whisper Model: {WHISPER_MODEL_NAME}")
print(f" Target FPS: {TARGET_FPS}")
# Google Cloud Authentication (Colab specific)
print("\n🔐 Google Cloud Authentication:")
try:
from google.colab import auth
auth.authenticate_user()
print("✅ Google Cloud authentication successful")
except ImportError:
print("⚠️ Not running in Google Colab")
# API Key Setup
GEMINI_API_KEY = None
try:
from google.colab import userdata
GEMINI_API_KEY = userdata.get('GEMINI_API_KEY')
if GEMINI_API_KEY:
genai.configure(api_key=GEMINI_API_KEY)
print("✅ Gemini API configured")
else:
print("⚠️ GEMINI_API_KEY not found in Colab Secrets")
except ImportError:
GEMINI_API_KEY = os.getenv('GEMINI_API_KEY')
if GEMINI_API_KEY:
genai.configure(api_key=GEMINI_API_KEY)
print("✅ Gemini API configured from environment")
else:
print("⚠️ GEMINI_API_KEY not found")
# Initialize Models
print("\n🤖 Loading Models...")
CLIP_MODEL, CLIP_PREPROCESS_FN = None, None
WHISPER_MODEL = None
GEMINI_MODEL = None
# CLIP Model
if 'models' in modules_status and '✅' in modules_status['models']:
try:
CLIP_MODEL, _, CLIP_PREPROCESS_FN = open_clip.create_model_and_transforms(
CLIP_MODEL_NAME, pretrained=CLIP_PRETRAINED, device=DEVICE
)
CLIP_MODEL.eval()
print(f"✅ CLIP Model loaded: {CLIP_MODEL_NAME}")
except Exception as e:
print(f"❌ CLIP Model error: {e}")
# Whisper Model
try:
WHISPER_MODEL = whisper.load_model(WHISPER_MODEL_NAME, device=DEVICE)
print(f"✅ Whisper Model loaded: {WHISPER_MODEL_NAME}")
except Exception as e:
print(f"❌ Whisper Model error: {e}")
# Gemini Model
if GEMINI_API_KEY and 'gemini' in modules_status and '✅' in modules_status['gemini']:
try:
# Configure Gemini API (simple configuration like the backend)
genai.configure(api_key=GEMINI_API_KEY)
# Create model (simple initialization matching backend)
GEMINI_MODEL = genai.GenerativeModel(GEMINI_MODEL_NAME)
print(f"✅ Gemini Model initialized: {GEMINI_MODEL_NAME}")
except Exception as e:
print(f"❌ Gemini Model error: {e}")
# Status Summary
essential_loaded = all([CLIP_MODEL, WHISPER_MODEL])
print(f"\n📊 Setup Status:")
print(f" Backend Modules: {sum('✅' in status for status in modules_status.values())}/5 loaded")
print(f" Essential Models: {'✅ Ready' if essential_loaded else '❌ Issues'}")
print(f" Gemini Available: {'✅ Yes' if GEMINI_MODEL else '⚠️ Disabled'}")
if not essential_loaded:
print("\n⚠️ Some components missing - pipeline will run with reduced functionality")
print("=" * 60)
In [11]:
# Cell 3: Main Detection Pipeline (Backend Integration)
async def run_detection_pipeline(video_path: str, job_id: str = None) -> Dict[str, Any]:
"""
Main deepfake detection pipeline matching backend implementation.
Returns the same JSON structure as the API.
"""
start_time = time.time()
video_basename = os.path.basename(video_path)
run_id = f"{Path(video_basename).stem}_{(job_id or uuid.uuid4().hex)[:6]}"
result = {
"input_video": video_basename,
"run_id": run_id,
"pipeline_version": "notebook_backend_integration_v1"
}
temp_audio_path = None
try:
print(f"🔍 Processing: {video_basename}")
# Step 1: Sample video content
print("📹 Step 1: Sampling video content...")
if 'video' not in globals():
raise RuntimeError("video.py module not loaded - please upload video.py to /content/")
# Get video module from globals to avoid UnboundLocalError
video_module = globals()['video']
# Try alternative access methods if hasattr fails
if not hasattr(video_module, 'sample_video_content'):
print("🔧 Video function not found, trying direct import...")
# Try re-importing directly
try:
import importlib.util
spec = importlib.util.spec_from_file_location("video_direct", "/content/video.py")
video_direct = importlib.util.module_from_spec(spec)
spec.loader.exec_module(video_direct)
sample_func_direct = getattr(video_direct, 'sample_video_content', None)
if sample_func_direct:
print("✅ Using directly imported video module")
video_module = video_direct # Replace the problematic module
globals()['video'] = video_direct # Update global reference
else:
raise RuntimeError("sample_video_content not found in direct import")
except Exception as e:
raise RuntimeError(f"video.sample_video_content function not available: {e}")
frames, temp_audio_path, original_dur, processed_dur = video_module.sample_video_content(
video_path,
target_fps=TARGET_FPS,
max_duration_sec=MAX_VIDEO_DURATION_SEC
)
result.update({
"video_original_duration_sec": float(round(original_dur, 2)),
"video_processed_duration_sec": float(round(processed_dur, 2)),
"num_frames_sampled": int(len(frames))
})
if not frames:
raise RuntimeError("Frame sampling returned no frames")
# Step 2: CLIP visual score
print("🎨 Step 2: Calculating CLIP visual score...")
clip_score = 0.0
if CLIP_MODEL and CLIP_PREPROCESS_FN and 'models' in globals():
clip_score = models.calculate_visual_clip_score(
frames, CLIP_MODEL, CLIP_PREPROCESS_FN, DEVICE
)
result["score_visual_clip"] = float(round(clip_score, 3))
# Step 3: Whisper transcription
print("🎤 Step 3: Transcribing audio with Whisper...")
transcription = {"text": "", "words": [], "avg_no_speech_prob": 1.0, "language": "unknown"}
if WHISPER_MODEL and temp_audio_path and 'models' in globals():
transcription = models.transcribe_audio_content(temp_audio_path, WHISPER_MODEL)
# Check for valid English speech
avg_no_speech_prob = transcription.get("avg_no_speech_prob", 0.0)
detected_lang = transcription.get("language", "unknown")
lipsync_enabled = True
result["detected_language"] = detected_lang
if avg_no_speech_prob > 0.85:
print(f"⚠️ High 'no speech' probability ({avg_no_speech_prob:.2f}) detected. Disabling lip-sync check.")
lipsync_enabled = False
transcription["text"] = "[No speech detected]"
elif detected_lang != 'en':
print(f"⚠️ Detected language is '{detected_lang}', not 'en'. Disabling lip-sync check.")
lipsync_enabled = False
transcription["text"] = f"[Non-English language detected: {detected_lang}]"
result["transcript_snippet"] = (
transcription["text"][:150] + "..." if transcription["text"] else "[No Speech/Audio Error]"
)
# Step 4: Gemini inspections
print("🔮 Step 4: Running Gemini inspections...")
vis_flag = lip_flag = blink_flag = 0
gibberish_score = 0.0
gemini_events = []
if GEMINI_MODEL and 'gemini' in globals():
try:
print(f"🔍 Debug: Gemini model type: {type(GEMINI_MODEL)}")
print(f"🔍 Debug: Gemini model available: {GEMINI_MODEL is not None}")
print(f"🔍 Debug: Number of frames: {len(frames)}")
print(f"🔍 Debug: Video duration: {processed_dur:.2f}s")
print(f"🔍 Debug: Lipsync enabled: {lipsync_enabled}")
print(f"🔍 Debug: Video path exists: {os.path.exists(video_path)}")
print(f"🔍 Debug: Transcript text length: {len(transcription.get('text', ''))}")
vis_flag, lip_flag, blink_flag, gibberish_score, gemini_events = await gemini.run_gemini_inspections(
frames,
video_path,
transcription["text"],
GEMINI_MODEL,
fps=TARGET_FPS,
enable_visual_artifacts=True,
enable_lipsync=lipsync_enabled,
enable_abnormal_blinks=True,
enable_ocr_gibberish=True
)
print(f"🔍 Debug: Gemini results - vis:{vis_flag}, lip:{lip_flag}, blink:{blink_flag}, gibberish:{gibberish_score:.3f}")
except Exception as e:
print(f"⚠️ Gemini inspections failed: {e}")
import traceback
print(f"🔍 Debug: Full traceback:\n{traceback.format_exc()}")
else:
if not GEMINI_MODEL:
print("⚠️ GEMINI_MODEL is None - Gemini checks disabled")
print(f"🔍 Debug: GEMINI_API_KEY present: {GEMINI_API_KEY is not None}")
print(f"🔍 Debug: gemini module status: {modules_status.get('gemini', 'not found')}")
if 'gemini' not in globals():
print("⚠️ gemini module not in globals - Gemini checks disabled")
print(f"🔍 Debug: Available modules in globals: {[k for k in globals().keys() if not k.startswith('_')]}")
result.update({
"flag_gemini_visual_artifact": int(vis_flag),
"flag_gemini_lipsync_issue": int(lip_flag),
"flag_gemini_abnormal_blinks": int(blink_flag)
})
# Step 5: Heuristic detectors
print("🔬 Step 5: Running heuristic detectors...")
flow_result = {"score": 0.0, "events": [], "tags": []}
if 'flow' in globals():
try:
flow_result = flow.detect_spikes(frames, TARGET_FPS)
except Exception as e:
print(f"⚠️ Flow detection failed: {e}")
# Step 6: Score fusion
print("⚖️ Step 6: Fusing detection scores...")
if 'fusion' in globals():
try:
other_scores = {
"gibberish": gibberish_score,
"flow": flow_result.get("score", 0.0),
}
final_conf, final_label, fusion_tags, label_confidence = fusion.fuse_detection_scores(
clip_score,
vis_flag,
lip_flag,
blink_flag,
other_scores=other_scores
)
except Exception as e:
print(f"⚠️ Score fusion failed: {e}")
final_conf, final_label, fusion_tags, label_confidence = 0.5, "UNCERTAIN", [], 0.5
else:
# Simple fallback fusion
final_conf = (clip_score * 0.4 + (vis_flag + lip_flag + blink_flag) * 0.2)
final_label = "LIKELY_FAKE" if final_conf > 0.6 else "LIKELY_REAL" if final_conf < 0.3 else "UNCERTAIN"
fusion_tags = []
label_confidence = 0.5
result.update({
"deepfake_confidence_overall": float(final_conf),
"label_confidence": float(label_confidence),
"final_predicted_label": final_label,
})
# Aggregate anomaly tags and events
all_tags = list(fusion_tags)
all_tags.extend(flow_result.get("tags", []))
if gibberish_score > 0:
all_tags.append("gibberish_text")
result["anomaly_tags_detected"] = sorted(list(set(all_tags)))
# Timeline events
timeline_events = []
timeline_events.extend(flow_result.get("events", []))
timeline_events.extend(gemini_events)
timeline_events.sort(key=lambda ev: (ev.get("module", ""), ev.get("ts", 0.0)))
result["events"] = timeline_events
# Heuristic checks detail (ensure JSON serializable)
result["heuristicChecks"] = {
"visual_clip": float(clip_score),
"gemini_visual_artifacts": int(vis_flag),
"gemini_lipsync_issue": int(lip_flag),
"gemini_blink_abnormality": int(blink_flag),
"gibberish": float(gibberish_score),
"flow": float(flow_result.get("score", 0.0)),
}
result["processing_time"] = float(round(time.time() - start_time, 2))
print(f"✅ Pipeline completed successfully in {result['processing_time']:.2f}s")
except Exception as e:
import traceback
error_msg = f"Pipeline error for {video_basename}: {e}"
print(f"❌ {error_msg}")
print(traceback.format_exc())
result.update({
"error": error_msg,
"trace": traceback.format_exc(),
"final_predicted_label": "ERROR_IN_PROCESSING",
"deepfake_confidence_overall": 0.5,
"label_confidence": 0.5,
"anomaly_tags_detected": ["PIPELINE_ERROR"],
"heuristicChecks": {},
"events": [],
"score_visual_clip": 0.0,
"flag_gemini_visual_artifact": 0,
"flag_gemini_lipsync_issue": 0,
"flag_gemini_abnormal_blinks": 0,
"processing_time": float(round(time.time() - start_time, 2))
})
finally:
if temp_audio_path and os.path.exists(temp_audio_path):
try:
os.remove(temp_audio_path)
except OSError:
pass
return result
def format_api_response(pipeline_result: Dict[str, Any], job_id: str) -> Dict[str, Any]:
"""
Format pipeline result to match the API response structure from backend/app/schemas.py
"""
processing_time = pipeline_result.get("processing_time", 0.0)
# Map internal anomaly tags to user-friendly descriptions
tag_mapping = {
"VISUAL_CLIP_ANOMALY": "Visual Anomaly Detected",
"GEMINI_VISUAL_ARTIFACTS": "Visual Artifacts Detected",
"GEMINI_LIPSYNC_ISSUE": "Lip-sync Issue Detected",
"GEMINI_ABNORMAL_BLINKS": "Abnormal Blinking Pattern",
"gibberish_text": "Gibberish Text Detected",
"flow_spike": "Motion Flow Anomaly",
"PIPELINE_ERROR": "Processing Error"
}
mapped_tags = [tag_mapping.get(tag, tag) for tag in pipeline_result.get("anomaly_tags_detected", [])]
return {
"job_id": job_id,
"status": "completed",
"result": {
"id": pipeline_result.get("run_id", job_id),
"isReal": pipeline_result.get("final_predicted_label", "ERROR_IN_PROCESSING") == "LIKELY_REAL",
"label": pipeline_result.get("final_predicted_label", "ERROR_IN_PROCESSING"),
"confidenceScore": pipeline_result.get("label_confidence", 0.5),
"processedAt": datetime.utcnow().isoformat() + "Z",
"tags": mapped_tags,
"details": {
"visualScore": pipeline_result.get("score_visual_clip", 0.0),
"processingTime": processing_time,
"videoLength": pipeline_result.get("video_processed_duration_sec", 0.0),
"originalVideoLength": pipeline_result.get("video_original_duration_sec", 0.0),
"pipelineVersion": pipeline_result.get("pipeline_version", "unknown"),
"transcriptSnippet": pipeline_result.get("transcript_snippet", "N/A"),
"geminiChecks": {
"visualArtifacts": bool(pipeline_result.get("flag_gemini_visual_artifact", 0)),
"lipsyncIssue": bool(pipeline_result.get("flag_gemini_lipsync_issue", 0)),
"abnormalBlinks": bool(pipeline_result.get("flag_gemini_abnormal_blinks", 0))
},
"heuristicChecks": pipeline_result.get("heuristicChecks", {}),
"detectedLanguage": pipeline_result.get("detected_language", "unknown"),
"error_message": pipeline_result.get("error"),
"error_trace": pipeline_result.get("trace")
},
"events": pipeline_result.get("events", [])
},
"processing_time": processing_time
}
# Helper function to run async code from notebook
def run_async(coro):
"""Run async code in notebook"""
try:
loop = asyncio.get_running_loop()
return loop.run_until_complete(coro)
except RuntimeError:
return asyncio.run(coro)
In [12]:
# Cell 4: Demo & Testing Setup
print("🎬 DEEPFAKE DETECTION DEMO")
print("=" * 60)
# Check if backend modules are loaded first
missing_modules = []
required_modules = ['flow', 'fusion', 'gemini', 'models', 'video']
for mod in required_modules:
if mod not in globals():
missing_modules.append(mod)
if missing_modules:
print("❌ SETUP INCOMPLETE!")
print("Missing backend modules:", ', '.join(missing_modules))
print("""
🚨 REQUIRED SETUP STEPS:
1. 📄 Upload Backend Files:
• Go to Files tab → Upload to session storage
• Upload ALL 5 files: flow.py, fusion.py, gemini.py, models.py, video.py
• These files are from the backend/app/core/ directory
2. 📤 Run Cell 2 again to load the modules
3. 📹 Then upload your test video and run this demo
⚠️ The backend files must be the actual files from the repository!
""")
else:
print("✅ Backend modules loaded successfully!")
print("""
📋 VIDEO UPLOAD INSTRUCTIONS:
1. 📁 Click the 'Files' tab in the left sidebar of Colab
2. 📤 Click 'Upload to session storage'
3. 🎬 Select a video file (MP4, AVI, MOV, MKV, WebM)
4. ✏️ Update the TEST_VIDEO_PATH variable below with your filename
SAMPLE VIDEOS RECOMMENDED:
• Real video: Genuine person speaking (news clips, interviews)
• Fake video: Known deepfake content for comparison
PROCESSING NOTES:
• Videos processed for up to 30 seconds maximum
• Works best with clear faces and speech
• Non-English speech disables lip-sync but other checks remain active
• Gemini API calls may take 30-120 seconds depending on content
""")
# Configure your test video path here
TEST_VIDEO_PATH = "/content/fake_test.mp4" # ⚠️ CHANGE THIS TO YOUR UPLOADED VIDEO
# Demo helper functions
def find_uploaded_videos():
"""Find video files in /content directory"""
video_extensions = ['.mp4', '.avi', '.mov', '.mkv', '.webm', '.m4v']
found_videos = []
try:
for file in os.listdir('/content'):
if any(file.lower().endswith(ext) for ext in video_extensions):
file_path = f"/content/{file}"
if os.path.getsize(file_path) > 1000: # At least 1KB
size_mb = os.path.getsize(file_path) / (1024*1024)
found_videos.append((file, size_mb))
except OSError:
pass
return found_videos
def check_video_file(video_path):
"""Check if video file exists and is valid"""
if not os.path.exists(video_path):
print(f"❌ Video not found: {video_path}")
# Try to find available videos
found_videos = find_uploaded_videos()
if found_videos:
print(f"\n💡 Found {len(found_videos)} video file(s) in /content:")
for video, size_mb in found_videos:
print(f" • {video} ({size_mb:.1f} MB)")
print(f"\n📝 Try setting: TEST_VIDEO_PATH = \"/content/{found_videos[0][0]}\"")
else:
print("\n🔧 SETUP REQUIRED:")
print("1. Upload a video file to Colab using the Files tab")
print("2. Update TEST_VIDEO_PATH variable above")
print("3. Re-run this cell")
return False
size_mb = os.path.getsize(video_path) / (1024*1024)
if size_mb < 0.001: # Less than 1KB
print(f"❌ Video file too small: {size_mb:.3f} MB")
return False
print(f"✅ Video found: {os.path.basename(video_path)} ({size_mb:.1f} MB)")
return True
async def run_demo():
"""Run the detection pipeline and display results"""
if not check_video_file(TEST_VIDEO_PATH):
return
print(f"\n🚀 Starting deepfake detection pipeline...")
print(f"📁 Input: {os.path.basename(TEST_VIDEO_PATH)}")
# Generate demo job ID
demo_job_id = f"demo_{uuid.uuid4().hex[:8]}"
# Run the pipeline
pipeline_result = await run_detection_pipeline(TEST_VIDEO_PATH, demo_job_id)
# Format as API response
api_response = format_api_response(pipeline_result, demo_job_id)
# Display results
print("\n" + "="*80)
print("🎯 DETECTION RESULTS")
print("="*80)
result = api_response["result"]
# Main verdict
print(f"🏷️ VERDICT: {result['label']}")
print(f"🎭 Is Real: {'✅ YES' if result['isReal'] else '❌ NO'}")
print(f"📊 Confidence: {result['confidenceScore']:.1%}")
print(f"⏱️ Processing Time: {api_response['processing_time']:.1f}s")
# Tags
if result['tags']:
print(f"🚩 Detected Issues: {', '.join(result['tags'])}")
else:
print("🚩 Detected Issues: None")
# Technical details
details = result['details']
print(f"\n📹 Video Info:")
print(f" • Length: {details['videoLength']:.1f}s (original: {details['originalVideoLength']:.1f}s)")
print(f" • Language: {details.get('detectedLanguage', 'unknown')}")
print(f" • Transcript: {details['transcriptSnippet']}")
print(f"\n🔍 Component Scores:")
heuristics = details['heuristicChecks']
for component, score in heuristics.items():
if isinstance(score, (int, float)):
print(f" • {component.replace('_', ' ').title()}: {score:.3f}")
print(f"\n🔮 Gemini Checks:")
gemini = details['geminiChecks']
for check, detected in gemini.items():
status = "🔴 DETECTED" if detected else "🟢 CLEAN"
print(f" • {check.replace('_', ' ').title()}: {status}")
# Timeline events
events = result.get('events', [])
if events:
print(f"\n⏰ Timeline Events ({len(events)} total):")
for i, event in enumerate(events[:5]): # Show first 5 events
module = event.get('module', 'unknown')
event_type = event.get('event', 'unknown')
ts = event.get('ts', 0)
print(f" {i+1}. [{module}] {event_type} @ {ts:.1f}s")
if len(events) > 5:
print(f" ... and {len(events) - 5} more events")
else:
print("\n⏰ Timeline Events: None detected")
# Error handling
if details.get('error_message'):
print(f"\n⚠️ Pipeline Error: {details['error_message']}")
print("\n" + "="*80)
print("📄 Raw API Response (JSON):")
print("="*80)
print(json.dumps(api_response, indent=2))
return api_response
# Synchronous wrapper for easy execution
def run_demo_sync():
"""Synchronous wrapper for the demo"""
return run_async(run_demo())
# Check for uploaded videos on cell execution
found_videos = find_uploaded_videos()
if found_videos:
print(f"📂 Found {len(found_videos)} video file(s) in /content:")
for video, size_mb in found_videos:
print(f" • {video} ({size_mb:.1f} MB)")
print("\n🎬 Ready to analyze! Update TEST_VIDEO_PATH above and run the cell below.")
print("=" * 60)
In [13]:
# Cell 5: Execute Detection Demo
# 🎬 EXECUTE DETECTION
# Run this cell after uploading your video and updating TEST_VIDEO_PATH above
run_demo_sync()
Out[13]:
In [ ]:
In [ ]: