How It Works
This guide provides a deep technical understanding of the Vimeo Connector's architecture, data flow, and implementation details.
System Architecture
The Vimeo Connector follows a layered architecture with clear separation of concerns:
┌─────────────────────────────────────────────────────────────────┐
│ Presentation Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Dashboard (React + Next.js) │ │
│ │ - URL Input & Validation │ │
│ │ - Real-time Progress Tracking │ │
│ │ - Job History & Results Display │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│ REST API (JSON/HTTPS)
▼
┌─────────────────────────────────────────────────────────────────┐
│ API Layer (FastAPI) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ /api/v1/sync - Start sync job │ │
│ │ /api/v1/sync/{id} - Get job status/progress │ │
│ │ /api/v1/health - Health check │ │
│ │ /api/v1/videos/{id} - Get video metadata │ │
│ │ /api/v1/transcripts/{id} - Get transcript │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Business Logic Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Sync Engine │ │
│ │ - Job Queue Management │ │
│ │ - Progress Tracking │ │
│ │ - Checksum Comparison (Incremental Sync) │ │
│ │ - Error Handling & Retry Logic │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Content Extraction │ │
│ │ - Transcript Parsing (VTT/SRT → Plain Text) │ │
│ │ - Metadata Normalization │ │
│ │ - Checksum Generation (SHA256) │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ HTTP Client Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Token Pool (Optional) │ │
│ │ - Round-robin token selection │ │
│ │ - Health tracking (HEALTHY/COOLDOWN/FAILED) │ │
│ │ - Automatic failover on 429/401/403 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Resilient HTTP Client (HTTPX) │ │
│ │ - Rate Limiting (Token Bucket) │ │
│ │ - Exponential Backoff Retry │ │
│ │ - Circuit Breaker │ │
│ │ - Connection Pooling │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│ HTTPS
▼
┌─────────────────────────────────────────────────────────────────┐
│ Vimeo API │
│ - GET /users/{id}/videos │
│ - GET /albums/{id}/videos │
│ - GET /videos/{id}/texttracks │
└─────────────────────────────────────────────────────────────────┘
Data Flow: Complete Sync Workflow
Let's trace a complete sync job from start to finish:
Phase 1: User Initiates Sync
User Action: Pastes URL and clicks "Import Collection"
Dashboard (Frontend):
// 1. Validate URL format
const isValid = validateVimeoUrl(url); // Client-side regex check
// 2. Send POST request to backend
fetch('/api/v1/sync', {
method: 'POST',
body: JSON.stringify({
url: 'https://vimeo.com/showcase/11708791',
mode: 'auto'
})
});
// 3. Start polling for progress
pollJobStatus(jobId);
API Layer:
@router.post("/sync")
async def create_sync_job(request: SyncRequest):
# 1. Create job with unique ID
job = Job(
id=generate_job_id(), # e.g., "sync_a1b2c3d4"
url=request.url,
mode=request.mode,
status="queued"
)
# 2. Add to job queue
await job_queue.enqueue(job)
# 3. Return job ID to client
return {"job_id": job.id, "status": "queued"}
Phase 2: URL Parsing & Routing
Sync Engine:
# 1. Parse URL to extract type and ID
parsed = parse_vimeo_url(job.url)
# Returns: {"type": "showcase", "id": "11708791"}
# 2. Route to appropriate fetcher
if parsed.type == "showcase":
videos = await fetch_showcase_videos(parsed.id)
elif parsed.type == "user":
videos = await fetch_user_videos(parsed.id)
# ... other URL types
URL Parsing Logic:
def parse_vimeo_url(url: str) -> dict:
patterns = {
r'vimeo\.com/showcase/(\d+)': 'showcase',
r'vimeo\.com/user(\d+)/albums': 'albums',
r'vimeo\.com/user(\d+)/videos': 'user_videos',
# ... other patterns
}
for pattern, url_type in patterns.items():
match = re.match(pattern, url)
if match:
return {"type": url_type, "id": match.group(1)}
raise InvalidURLError(f"Unsupported URL format: {url}")
Phase 3: API Calls to Vimeo
HTTP Client Layer:
async def fetch_showcase_videos(showcase_id: str) -> list:
# 1. Get token from pool (if enabled)
token_id = await token_pool.get_next_token()
# 2. Acquire rate limit slot
await rate_limiter.acquire(token_id)
# 3. Make API request with retry logic
response = await http_client.get(
f"/albums/{showcase_id}/videos",
headers={"Authorization": f"Bearer {token}"},
timeout=30
)
# 4. Handle pagination
videos = response.json()["data"]
while response.json().get("paging", {}).get("next"):
next_url = response.json()["paging"]["next"]
response = await http_client.get(next_url)
videos.extend(response.json()["data"])
return videos
Rate Limiting (Token Bucket Algorithm):
class RateLimiter:
def __init__(self, capacity=600, window=600):
self.capacity = capacity # 600 calls
self.window = window # 10 minutes
self.tokens = capacity
self.last_refill = time.time()
async def acquire(self, token_id: str):
await self._refill()
if self.tokens < 1:
# Calculate wait time
wait_time = self.window - (time.time() - self.last_refill)
raise RateLimitError(f"Wait {wait_time}s")
self.tokens -= 1
async def _refill(self):
now = time.time()
elapsed = now - self.last_refill
if elapsed >= self.window:
self.tokens = self.capacity
self.last_refill = now
Phase 4: Transcript Fetching
For Each Video:
async def fetch_video_with_transcript(video_id: str) -> dict:
# 1. Fetch video metadata
video = await vimeo_client.get_video(video_id)
# 2. Check for transcripts
texttracks = await vimeo_client.get_texttracks(video_id)
if not texttracks:
return {**video, "transcript": None, "has_transcript": False}
# 3. Download transcript file
transcript_url = texttracks[0]["link"]
transcript_response = await http_client.get(transcript_url)
# 4. Parse VTT/SRT to plain text
transcript_text = parse_transcript(
transcript_response.text,
format=texttracks[0]["type"] # "captions" (VTT) or "subtitles" (SRT)
)
return {
**video,
"transcript": transcript_text,
"has_transcript": True
}
Transcript Parsing:
def parse_vtt(content: str) -> str:
"""Strip VTT timing codes, return plain text."""
lines = content.split('\n')
text_lines = []
for line in lines:
# Skip WEBVTT header
if line.startswith('WEBVTT'):
continue
# Skip timing lines (00:01:23.456 --> 00:01:25.789)
if '-->' in line:
continue
# Skip empty lines and cue identifiers
if not line.strip() or line.strip().isdigit():
continue
text_lines.append(line.strip())
return '\n\n'.join(text_lines)
Phase 5: Checksum Comparison (Incremental Sync)
Sync Engine:
async def process_video(video: dict, job: Job) -> str:
# 1. Generate checksum from content
checksum = generate_checksum(video)
# 2. Load previous checksum (if exists)
previous_checksum = await load_checksum(video["uri"])
# 3. Compare checksums
if previous_checksum is None:
status = "added"
elif checksum == previous_checksum:
status = "unchanged"
if job.mode == "incremental":
return status # Skip re-processing
else:
status = "updated"
# 4. Store video data
await store_video_metadata(video)
if video.get("transcript"):
await store_transcript(video["transcript"])
await store_checksum(video["uri"], checksum)
return status
def generate_checksum(video: dict) -> str:
"""Generate SHA256 hash of video content."""
content = f"{video['name']}{video['description']}"
content += f"{video['duration']}{video['created_time']}"
if video.get("transcript"):
content += video["transcript"]
return hashlib.sha256(content.encode()).hexdigest()
Phase 6: Progress Updates
Sync Engine:
async def sync_videos(job: Job, videos: list):
job.total = len(videos)
for i, video in enumerate(videos):
# 1. Update current video
job.current_video = video["name"]
job.processed = i
await update_job_status(job)
# 2. Process video
status = await process_video(video, job)
# 3. Track results
job.results[status] += 1
# 4. Mark job complete
job.status = "completed"
await update_job_status(job)
Dashboard Polling:
// Poll every 1.5 seconds
setInterval(async () => {
const response = await fetch(`/api/v1/sync/${jobId}`);
const job = await response.json();
// Update UI
updateProgressBar(job.processed / job.total * 100);
updateCurrentVideo(job.current_video);
if (job.status === 'completed') {
clearInterval(pollInterval);
displayResults(job.results);
}
}, 1500);
Token Pool Deep-Dive
The Token Pool provides 6x capacity through intelligent load balancing:
Round-Robin Selection
class TokenPool:
def __init__(self, tokens: list):
self.tokens = {
token.identifier: {
"token": token,
"health": "healthy",
"rate_limiter": RateLimiter(),
"cooldown_until": None
}
for token in tokens
}
self.current_index = 0
self.token_ids = list(self.tokens.keys())
async def get_next_token(self) -> str:
"""Select next healthy token using round-robin."""
attempts = 0
max_attempts = len(self.tokens)
while attempts < max_attempts:
token_id = self.token_ids[self.current_index]
self.current_index = (self.current_index + 1) % len(self.tokens)
token_state = self.tokens[token_id]
# Check health
if token_state["health"] == "healthy":
return token_id
# Check if cooldown expired
if token_state["health"] == "cooldown":
if time.time() >= token_state["cooldown_until"]:
token_state["health"] = "healthy"
return token_id
attempts += 1
raise AllTokensCooldownError("All tokens unavailable")
Automatic Failover
async def make_request_with_failover(url: str):
attempts = 0
max_attempts = 3
while attempts < max_attempts:
try:
# 1. Get token from pool
token_id = await token_pool.get_next_token()
# 2. Make request
response = await http_client.get(
url,
headers={"Authorization": f"Bearer {token}"}
)
# 3. Mark success
token_pool.mark_success(token_id)
return response
except RateLimitError as e:
# 4. Mark cooldown, try next token
token_pool.mark_cooldown(token_id, duration=600)
metrics.record_failover(reason="rate_limit")
attempts += 1
continue
except AuthenticationError as e:
# 5. Mark failed, try next token
token_pool.mark_failed(token_id)
metrics.record_failover(reason="auth_error")
attempts += 1
continue
raise MaxRetriesExceeded("All failover attempts exhausted")
Error Handling Strategy
The connector implements a multi-level error handling strategy:
Level 1: Retryable Errors (Transient)
Network errors, timeouts, 5xx errors
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=2, min=1, max=10),
retry=retry_if_exception_type((NetworkError, TimeoutError))
)
async def fetch_with_retry(url: str):
return await http_client.get(url)
Retry Schedule:
- Attempt 1: Immediate
- Attempt 2: Wait 2 seconds
- Attempt 3: Wait 4 seconds
- Attempt 4: Wait 8 seconds (max 3 retries)
Level 2: Non-Retryable Errors (Permanent)
401/403 auth errors, 404 not found, 429 rate limit
async def fetch_with_error_classification(url: str):
try:
return await http_client.get(url)
except HTTPError as e:
if e.status_code in [401, 403]:
raise AuthenticationError("Invalid token")
elif e.status_code == 404:
raise ResourceNotFoundError("Video deleted")
elif e.status_code == 429:
raise RateLimitError("Rate limit exceeded")
else:
raise # Re-raise for retry layer
Why 429 is Non-Retryable:
- Immediate retry would trigger IP ban
- Token Pool handles failover instead
- Retry after cooldown (10 minutes)
Level 3: Circuit Breaker
Prevents cascading failures
class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.state = "closed" # closed, open, half-open
self.opened_at = None
async def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.opened_at >= self.timeout:
self.state = "half-open"
else:
raise CircuitBreakerOpenError("Too many failures")
try:
result = await func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
self.failures += 1
if self.failures >= self.failure_threshold:
self.state = "open"
self.opened_at = time.time()
raise
Observability
Structured Logging
# Log with context
log = get_logger(__name__)
bind_context(job_id="sync_a1b2c3d4", user_id=123)
log.info("Starting sync", url="https://vimeo.com/showcase/...")
# Output:
# {
# "timestamp": "2025-10-17T14:22:10Z",
# "level": "info",
# "message": "Starting sync",
# "job_id": "sync_a1b2c3d4",
# "user_id": 123,
# "url": "https://vimeo.com/showcase/...",
# "correlation_id": "req-abc-123"
# }
Metrics Collection
# Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge
# Request counters
http_requests_total = Counter(
'vimeo_http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)
# Latency histogram
http_request_duration = Histogram(
'vimeo_http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)
# Active jobs gauge
active_jobs = Gauge(
'vimeo_sync_jobs_active',
'Number of active sync jobs'
)
# Usage
with http_request_duration.labels('GET', '/videos').time():
response = await http_client.get('/videos/123')
http_requests_total.labels('GET', '/videos', response.status).inc()
Distributed Tracing
from opentelemetry import trace
tracer = trace.get_tracer(__name__)
async def fetch_video_with_transcript(video_id: str):
with tracer.start_as_current_span("fetch_video_with_transcript") as span:
span.set_attribute("video_id", video_id)
# Child span: fetch metadata
with tracer.start_as_current_span("fetch_metadata"):
video = await vimeo_client.get_video(video_id)
# Child span: fetch transcript
with tracer.start_as_current_span("fetch_transcript"):
transcript = await vimeo_client.get_transcript(video_id)
span.set_attribute("has_transcript", transcript is not None)
return {**video, "transcript": transcript}
Trace Example:
fetch_video_with_transcript (120ms)
├── fetch_metadata (50ms)
│ └── http_get /videos/123 (45ms)
└── fetch_transcript (65ms)
├── http_get /videos/123/texttracks (20ms)
└── http_get <transcript_url> (40ms)
Performance Optimizations
Connection Pooling
# Reuse HTTP connections
http_client = httpx.AsyncClient(
limits=httpx.Limits(
max_connections=100, # Total connections
max_keepalive_connections=20 # Idle connections
),
timeout=httpx.Timeout(30.0)
)
Benefits:
- Avoid TCP handshake overhead (~100ms per connection)
- Reuse TLS sessions (save ~200ms per request)
- Reduce server load (fewer connection setups)
Async/Await Concurrency
async def fetch_multiple_videos(video_ids: list):
# Process videos concurrently
tasks = [fetch_video_with_transcript(vid) for vid in video_ids]
return await asyncio.gather(*tasks)
Performance Gain:
- Serial: 500 videos × 2s each = 1000s (16.7 minutes)
- Parallel (10 concurrent): 500 videos / 10 × 2s = 100s (1.7 minutes)
Rate Limit Aware:
# Limit concurrency to avoid hitting rate limits
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests
async def fetch_with_semaphore(video_id):
async with semaphore:
return await fetch_video_with_transcript(video_id)
Checksum Caching
# Cache checksums in memory for fast lookups during sync
checksum_cache = {}
async def load_checksum(video_uri: str) -> str:
if video_uri in checksum_cache:
return checksum_cache[video_uri]
checksum = await db.load_checksum(video_uri)
checksum_cache[video_uri] = checksum
return checksum
Security Measures
Secret Masking in Logs
def mask_secret(secret: str) -> str:
"""Show only first and last 4 characters."""
if len(secret) <= 8:
return "xxxx"
return f"{secret[:4]}...{secret[-4:]}"
# Usage
log.info("Using token", token=mask_secret(api_token))
# Output: token="ab13...447f"
PII Redaction
import re
PII_PATTERNS = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
}
def redact_pii(text: str) -> str:
for pii_type, pattern in PII_PATTERNS.items():
text = re.sub(pattern, f'[REDACTED_{pii_type.upper()}]', text)
return text
# Applied to all log messages automatically
Rate Limit Protection
Prevents IP bans:
- Stay at 80% of rate limit capacity (480 of 600 calls)
- Use Token Pool for distribution
- Implement cooldown on 429 responses
Technology Stack
Backend:
- Python 3.11 - Modern async/await, type hints
- FastAPI - High-performance async web framework
- HTTPX - Async HTTP client with HTTP/2 support
- Pydantic - Data validation and settings management
- Structlog - Structured logging
- Tenacity - Retry logic with exponential backoff
Frontend:
- React 18 - UI components
- Next.js - Server-side rendering, routing
- TailwindCSS - Utility-first styling
Observability:
- Prometheus - Metrics collection
- Grafana - Metrics visualization
- Jaeger - Distributed tracing
- OpenTelemetry - Instrumentation standard
Infrastructure:
- Docker Compose - Local development
- Heroku - Cloud deployment (PaaS)
- Kubernetes - Scalable production (optional)
Deployment Architecture
┌─────────────────────┐
│ Load Balancer │
│ (Heroku Router) │
└──────────┬──────────┘
│
┌───────────────┴───────────────┐
│ │
┌──────────▼──────────┐ ┌─────────▼──────────┐
│ Backend Instance 1 │ │ Backend Instance 2 │
│ (Web Dyno) │ │ (Web Dyno) │
└──────────┬──────────┘ └─────────┬──────────┘
│ │
└───────────────┬───────────────┘
│
┌──────────▼──────────┐
│ Background Worker │
│ (Worker Dyno) │
│ - Job Queue │
│ - Sync Processing │
└──────────┬──────────┘
│
┌──────────▼──────────┐
│ PostgreSQL │
│ (Job State) │
└─────────────────────┘
Scalability:
- Horizontal: Add more web/worker dynos
- Vertical: Upgrade dyno sizes for more CPU/RAM
- Database: Use connection pooling (PgBouncer)
Related Documentation
For Users:
- What is the Vimeo Connector? - High-level overview
- Getting Started - Quick start guide
For Developers:
- REST API Reference - API endpoints
- GitHub Repository - Source code
For Administrators:
- How to Set Up Token Pool - 6x capacity
- Troubleshooting - Common issues