Skip to main content

How It Works

This guide provides a deep technical understanding of the Vimeo Connector's architecture, data flow, and implementation details.

System Architecture

The Vimeo Connector follows a layered architecture with clear separation of concerns:

┌─────────────────────────────────────────────────────────────────┐
│ Presentation Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Dashboard (React + Next.js) │ │
│ │ - URL Input & Validation │ │
│ │ - Real-time Progress Tracking │ │
│ │ - Job History & Results Display │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│ REST API (JSON/HTTPS)

┌─────────────────────────────────────────────────────────────────┐
│ API Layer (FastAPI) │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ /api/v1/sync - Start sync job │ │
│ │ /api/v1/sync/{id} - Get job status/progress │ │
│ │ /api/v1/health - Health check │ │
│ │ /api/v1/videos/{id} - Get video metadata │ │
│ │ /api/v1/transcripts/{id} - Get transcript │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ Business Logic Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Sync Engine │ │
│ │ - Job Queue Management │ │
│ │ - Progress Tracking │ │
│ │ - Checksum Comparison (Incremental Sync) │ │
│ │ - Error Handling & Retry Logic │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Content Extraction │ │
│ │ - Transcript Parsing (VTT/SRT → Plain Text) │ │
│ │ - Metadata Normalization │ │
│ │ - Checksum Generation (SHA256) │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘


┌─────────────────────────────────────────────────────────────────┐
│ HTTP Client Layer │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Token Pool (Optional) │ │
│ │ - Round-robin token selection │ │
│ │ - Health tracking (HEALTHY/COOLDOWN/FAILED) │ │
│ │ - Automatic failover on 429/401/403 │ │
│ └──────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ Resilient HTTP Client (HTTPX) │ │
│ │ - Rate Limiting (Token Bucket) │ │
│ │ - Exponential Backoff Retry │ │
│ │ - Circuit Breaker │ │
│ │ - Connection Pooling │ │
│ └──────────────────────────────────────────────────────────┘ │
└────────────────────────────┬────────────────────────────────────┘
│ HTTPS

┌─────────────────────────────────────────────────────────────────┐
│ Vimeo API │
│ - GET /users/{id}/videos │
│ - GET /albums/{id}/videos │
│ - GET /videos/{id}/texttracks │
└─────────────────────────────────────────────────────────────────┘

Data Flow: Complete Sync Workflow

Let's trace a complete sync job from start to finish:

Phase 1: User Initiates Sync

User Action: Pastes URL and clicks "Import Collection"

Dashboard (Frontend):

// 1. Validate URL format
const isValid = validateVimeoUrl(url); // Client-side regex check

// 2. Send POST request to backend
fetch('/api/v1/sync', {
method: 'POST',
body: JSON.stringify({
url: 'https://vimeo.com/showcase/11708791',
mode: 'auto'
})
});

// 3. Start polling for progress
pollJobStatus(jobId);

API Layer:

@router.post("/sync")
async def create_sync_job(request: SyncRequest):
# 1. Create job with unique ID
job = Job(
id=generate_job_id(), # e.g., "sync_a1b2c3d4"
url=request.url,
mode=request.mode,
status="queued"
)

# 2. Add to job queue
await job_queue.enqueue(job)

# 3. Return job ID to client
return {"job_id": job.id, "status": "queued"}

Phase 2: URL Parsing & Routing

Sync Engine:

# 1. Parse URL to extract type and ID
parsed = parse_vimeo_url(job.url)
# Returns: {"type": "showcase", "id": "11708791"}

# 2. Route to appropriate fetcher
if parsed.type == "showcase":
videos = await fetch_showcase_videos(parsed.id)
elif parsed.type == "user":
videos = await fetch_user_videos(parsed.id)
# ... other URL types

URL Parsing Logic:

def parse_vimeo_url(url: str) -> dict:
patterns = {
r'vimeo\.com/showcase/(\d+)': 'showcase',
r'vimeo\.com/user(\d+)/albums': 'albums',
r'vimeo\.com/user(\d+)/videos': 'user_videos',
# ... other patterns
}

for pattern, url_type in patterns.items():
match = re.match(pattern, url)
if match:
return {"type": url_type, "id": match.group(1)}

raise InvalidURLError(f"Unsupported URL format: {url}")

Phase 3: API Calls to Vimeo

HTTP Client Layer:

async def fetch_showcase_videos(showcase_id: str) -> list:
# 1. Get token from pool (if enabled)
token_id = await token_pool.get_next_token()

# 2. Acquire rate limit slot
await rate_limiter.acquire(token_id)

# 3. Make API request with retry logic
response = await http_client.get(
f"/albums/{showcase_id}/videos",
headers={"Authorization": f"Bearer {token}"},
timeout=30
)

# 4. Handle pagination
videos = response.json()["data"]
while response.json().get("paging", {}).get("next"):
next_url = response.json()["paging"]["next"]
response = await http_client.get(next_url)
videos.extend(response.json()["data"])

return videos

Rate Limiting (Token Bucket Algorithm):

class RateLimiter:
def __init__(self, capacity=600, window=600):
self.capacity = capacity # 600 calls
self.window = window # 10 minutes
self.tokens = capacity
self.last_refill = time.time()

async def acquire(self, token_id: str):
await self._refill()

if self.tokens < 1:
# Calculate wait time
wait_time = self.window - (time.time() - self.last_refill)
raise RateLimitError(f"Wait {wait_time}s")

self.tokens -= 1

async def _refill(self):
now = time.time()
elapsed = now - self.last_refill

if elapsed >= self.window:
self.tokens = self.capacity
self.last_refill = now

Phase 4: Transcript Fetching

For Each Video:

async def fetch_video_with_transcript(video_id: str) -> dict:
# 1. Fetch video metadata
video = await vimeo_client.get_video(video_id)

# 2. Check for transcripts
texttracks = await vimeo_client.get_texttracks(video_id)

if not texttracks:
return {**video, "transcript": None, "has_transcript": False}

# 3. Download transcript file
transcript_url = texttracks[0]["link"]
transcript_response = await http_client.get(transcript_url)

# 4. Parse VTT/SRT to plain text
transcript_text = parse_transcript(
transcript_response.text,
format=texttracks[0]["type"] # "captions" (VTT) or "subtitles" (SRT)
)

return {
**video,
"transcript": transcript_text,
"has_transcript": True
}

Transcript Parsing:

def parse_vtt(content: str) -> str:
"""Strip VTT timing codes, return plain text."""
lines = content.split('\n')
text_lines = []

for line in lines:
# Skip WEBVTT header
if line.startswith('WEBVTT'):
continue

# Skip timing lines (00:01:23.456 --> 00:01:25.789)
if '-->' in line:
continue

# Skip empty lines and cue identifiers
if not line.strip() or line.strip().isdigit():
continue

text_lines.append(line.strip())

return '\n\n'.join(text_lines)

Phase 5: Checksum Comparison (Incremental Sync)

Sync Engine:

async def process_video(video: dict, job: Job) -> str:
# 1. Generate checksum from content
checksum = generate_checksum(video)

# 2. Load previous checksum (if exists)
previous_checksum = await load_checksum(video["uri"])

# 3. Compare checksums
if previous_checksum is None:
status = "added"
elif checksum == previous_checksum:
status = "unchanged"
if job.mode == "incremental":
return status # Skip re-processing
else:
status = "updated"

# 4. Store video data
await store_video_metadata(video)
if video.get("transcript"):
await store_transcript(video["transcript"])
await store_checksum(video["uri"], checksum)

return status

def generate_checksum(video: dict) -> str:
"""Generate SHA256 hash of video content."""
content = f"{video['name']}{video['description']}"
content += f"{video['duration']}{video['created_time']}"
if video.get("transcript"):
content += video["transcript"]

return hashlib.sha256(content.encode()).hexdigest()

Phase 6: Progress Updates

Sync Engine:

async def sync_videos(job: Job, videos: list):
job.total = len(videos)

for i, video in enumerate(videos):
# 1. Update current video
job.current_video = video["name"]
job.processed = i
await update_job_status(job)

# 2. Process video
status = await process_video(video, job)

# 3. Track results
job.results[status] += 1

# 4. Mark job complete
job.status = "completed"
await update_job_status(job)

Dashboard Polling:

// Poll every 1.5 seconds
setInterval(async () => {
const response = await fetch(`/api/v1/sync/${jobId}`);
const job = await response.json();

// Update UI
updateProgressBar(job.processed / job.total * 100);
updateCurrentVideo(job.current_video);

if (job.status === 'completed') {
clearInterval(pollInterval);
displayResults(job.results);
}
}, 1500);

Token Pool Deep-Dive

The Token Pool provides 6x capacity through intelligent load balancing:

Round-Robin Selection

class TokenPool:
def __init__(self, tokens: list):
self.tokens = {
token.identifier: {
"token": token,
"health": "healthy",
"rate_limiter": RateLimiter(),
"cooldown_until": None
}
for token in tokens
}
self.current_index = 0
self.token_ids = list(self.tokens.keys())

async def get_next_token(self) -> str:
"""Select next healthy token using round-robin."""
attempts = 0
max_attempts = len(self.tokens)

while attempts < max_attempts:
token_id = self.token_ids[self.current_index]
self.current_index = (self.current_index + 1) % len(self.tokens)

token_state = self.tokens[token_id]

# Check health
if token_state["health"] == "healthy":
return token_id

# Check if cooldown expired
if token_state["health"] == "cooldown":
if time.time() >= token_state["cooldown_until"]:
token_state["health"] = "healthy"
return token_id

attempts += 1

raise AllTokensCooldownError("All tokens unavailable")

Automatic Failover

async def make_request_with_failover(url: str):
attempts = 0
max_attempts = 3

while attempts < max_attempts:
try:
# 1. Get token from pool
token_id = await token_pool.get_next_token()

# 2. Make request
response = await http_client.get(
url,
headers={"Authorization": f"Bearer {token}"}
)

# 3. Mark success
token_pool.mark_success(token_id)
return response

except RateLimitError as e:
# 4. Mark cooldown, try next token
token_pool.mark_cooldown(token_id, duration=600)
metrics.record_failover(reason="rate_limit")
attempts += 1
continue

except AuthenticationError as e:
# 5. Mark failed, try next token
token_pool.mark_failed(token_id)
metrics.record_failover(reason="auth_error")
attempts += 1
continue

raise MaxRetriesExceeded("All failover attempts exhausted")

Error Handling Strategy

The connector implements a multi-level error handling strategy:

Level 1: Retryable Errors (Transient)

Network errors, timeouts, 5xx errors

@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=2, min=1, max=10),
retry=retry_if_exception_type((NetworkError, TimeoutError))
)
async def fetch_with_retry(url: str):
return await http_client.get(url)

Retry Schedule:

  • Attempt 1: Immediate
  • Attempt 2: Wait 2 seconds
  • Attempt 3: Wait 4 seconds
  • Attempt 4: Wait 8 seconds (max 3 retries)

Level 2: Non-Retryable Errors (Permanent)

401/403 auth errors, 404 not found, 429 rate limit

async def fetch_with_error_classification(url: str):
try:
return await http_client.get(url)
except HTTPError as e:
if e.status_code in [401, 403]:
raise AuthenticationError("Invalid token")
elif e.status_code == 404:
raise ResourceNotFoundError("Video deleted")
elif e.status_code == 429:
raise RateLimitError("Rate limit exceeded")
else:
raise # Re-raise for retry layer

Why 429 is Non-Retryable:

  • Immediate retry would trigger IP ban
  • Token Pool handles failover instead
  • Retry after cooldown (10 minutes)

Level 3: Circuit Breaker

Prevents cascading failures

class CircuitBreaker:
def __init__(self, failure_threshold=5, timeout=60):
self.failure_threshold = failure_threshold
self.timeout = timeout
self.failures = 0
self.state = "closed" # closed, open, half-open
self.opened_at = None

async def call(self, func, *args, **kwargs):
if self.state == "open":
if time.time() - self.opened_at >= self.timeout:
self.state = "half-open"
else:
raise CircuitBreakerOpenError("Too many failures")

try:
result = await func(*args, **kwargs)
if self.state == "half-open":
self.state = "closed"
self.failures = 0
return result
except Exception as e:
self.failures += 1
if self.failures >= self.failure_threshold:
self.state = "open"
self.opened_at = time.time()
raise

Observability

Structured Logging

# Log with context
log = get_logger(__name__)
bind_context(job_id="sync_a1b2c3d4", user_id=123)

log.info("Starting sync", url="https://vimeo.com/showcase/...")
# Output:
# {
# "timestamp": "2025-10-17T14:22:10Z",
# "level": "info",
# "message": "Starting sync",
# "job_id": "sync_a1b2c3d4",
# "user_id": 123,
# "url": "https://vimeo.com/showcase/...",
# "correlation_id": "req-abc-123"
# }

Metrics Collection

# Prometheus metrics
from prometheus_client import Counter, Histogram, Gauge

# Request counters
http_requests_total = Counter(
'vimeo_http_requests_total',
'Total HTTP requests',
['method', 'endpoint', 'status']
)

# Latency histogram
http_request_duration = Histogram(
'vimeo_http_request_duration_seconds',
'HTTP request latency',
['method', 'endpoint']
)

# Active jobs gauge
active_jobs = Gauge(
'vimeo_sync_jobs_active',
'Number of active sync jobs'
)

# Usage
with http_request_duration.labels('GET', '/videos').time():
response = await http_client.get('/videos/123')
http_requests_total.labels('GET', '/videos', response.status).inc()

Distributed Tracing

from opentelemetry import trace

tracer = trace.get_tracer(__name__)

async def fetch_video_with_transcript(video_id: str):
with tracer.start_as_current_span("fetch_video_with_transcript") as span:
span.set_attribute("video_id", video_id)

# Child span: fetch metadata
with tracer.start_as_current_span("fetch_metadata"):
video = await vimeo_client.get_video(video_id)

# Child span: fetch transcript
with tracer.start_as_current_span("fetch_transcript"):
transcript = await vimeo_client.get_transcript(video_id)

span.set_attribute("has_transcript", transcript is not None)
return {**video, "transcript": transcript}

Trace Example:

fetch_video_with_transcript (120ms)
├── fetch_metadata (50ms)
│ └── http_get /videos/123 (45ms)
└── fetch_transcript (65ms)
├── http_get /videos/123/texttracks (20ms)
└── http_get <transcript_url> (40ms)

Performance Optimizations

Connection Pooling

# Reuse HTTP connections
http_client = httpx.AsyncClient(
limits=httpx.Limits(
max_connections=100, # Total connections
max_keepalive_connections=20 # Idle connections
),
timeout=httpx.Timeout(30.0)
)

Benefits:

  • Avoid TCP handshake overhead (~100ms per connection)
  • Reuse TLS sessions (save ~200ms per request)
  • Reduce server load (fewer connection setups)

Async/Await Concurrency

async def fetch_multiple_videos(video_ids: list):
# Process videos concurrently
tasks = [fetch_video_with_transcript(vid) for vid in video_ids]
return await asyncio.gather(*tasks)

Performance Gain:

  • Serial: 500 videos × 2s each = 1000s (16.7 minutes)
  • Parallel (10 concurrent): 500 videos / 10 × 2s = 100s (1.7 minutes)

Rate Limit Aware:

# Limit concurrency to avoid hitting rate limits
semaphore = asyncio.Semaphore(5) # Max 5 concurrent requests

async def fetch_with_semaphore(video_id):
async with semaphore:
return await fetch_video_with_transcript(video_id)

Checksum Caching

# Cache checksums in memory for fast lookups during sync
checksum_cache = {}

async def load_checksum(video_uri: str) -> str:
if video_uri in checksum_cache:
return checksum_cache[video_uri]

checksum = await db.load_checksum(video_uri)
checksum_cache[video_uri] = checksum
return checksum

Security Measures

Secret Masking in Logs

def mask_secret(secret: str) -> str:
"""Show only first and last 4 characters."""
if len(secret) <= 8:
return "xxxx"
return f"{secret[:4]}...{secret[-4:]}"

# Usage
log.info("Using token", token=mask_secret(api_token))
# Output: token="ab13...447f"

PII Redaction

import re

PII_PATTERNS = {
'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'
}

def redact_pii(text: str) -> str:
for pii_type, pattern in PII_PATTERNS.items():
text = re.sub(pattern, f'[REDACTED_{pii_type.upper()}]', text)
return text

# Applied to all log messages automatically

Rate Limit Protection

Prevents IP bans:

  • Stay at 80% of rate limit capacity (480 of 600 calls)
  • Use Token Pool for distribution
  • Implement cooldown on 429 responses

Technology Stack

Backend:

  • Python 3.11 - Modern async/await, type hints
  • FastAPI - High-performance async web framework
  • HTTPX - Async HTTP client with HTTP/2 support
  • Pydantic - Data validation and settings management
  • Structlog - Structured logging
  • Tenacity - Retry logic with exponential backoff

Frontend:

  • React 18 - UI components
  • Next.js - Server-side rendering, routing
  • TailwindCSS - Utility-first styling

Observability:

  • Prometheus - Metrics collection
  • Grafana - Metrics visualization
  • Jaeger - Distributed tracing
  • OpenTelemetry - Instrumentation standard

Infrastructure:

  • Docker Compose - Local development
  • Heroku - Cloud deployment (PaaS)
  • Kubernetes - Scalable production (optional)

Deployment Architecture

                         ┌─────────────────────┐
│ Load Balancer │
│ (Heroku Router) │
└──────────┬──────────┘

┌───────────────┴───────────────┐
│ │
┌──────────▼──────────┐ ┌─────────▼──────────┐
│ Backend Instance 1 │ │ Backend Instance 2 │
│ (Web Dyno) │ │ (Web Dyno) │
└──────────┬──────────┘ └─────────┬──────────┘
│ │
└───────────────┬───────────────┘

┌──────────▼──────────┐
│ Background Worker │
│ (Worker Dyno) │
│ - Job Queue │
│ - Sync Processing │
└──────────┬──────────┘

┌──────────▼──────────┐
│ PostgreSQL │
│ (Job State) │
└─────────────────────┘

Scalability:

  • Horizontal: Add more web/worker dynos
  • Vertical: Upgrade dyno sizes for more CPU/RAM
  • Database: Use connection pooling (PgBouncer)

For Users:

For Developers:

For Administrators: