Coding Problems
Drillable problems for full-stack DS loops — heavy on feature engineering, evaluation, and production patterns; lighter on classical LeetCode. Multiple approaches per problem.
A · Feature engineering
P1. Compute entity-velocity features
Given events(user_id, ssn, device_id, applied_at), for each row compute: distinct SSNs touched by this device in the trailing 24h; distinct devices touched by this SSN in the trailing 24h.
Show solution
import pandas as pd
def velocity_features(events: pd.DataFrame) -> pd.DataFrame:
events = events.sort_values('applied_at').reset_index(drop=True)
results = events.copy()
results['distinct_ssns_per_device_24h'] = 0
results['distinct_devices_per_ssn_24h'] = 0
# For each row, find rows within 24h trailing per entity
for i, row in events.iterrows():
window_start = row['applied_at'] - pd.Timedelta('24h')
in_window = events[
(events['applied_at'] >= window_start) &
(events['applied_at'] <= row['applied_at'])
]
same_device = in_window[in_window['device_id'] == row['device_id']]
same_ssn = in_window[in_window['ssn'] == row['ssn']]
results.at[i, 'distinct_ssns_per_device_24h'] = same_device['ssn'].nunique()
results.at[i, 'distinct_devices_per_ssn_24h'] = same_ssn['device_id'].nunique()
return results
Production discussion: the naive solution is O(n²). At scale, push to SQL (with windowed self-joins or partitioned analytics) or to a precomputed feature store. Leakage: the window must be strictly < current event for serving correctness — here we include current row, which is fine for training but would mean "looking at self" in production.
P2. Time-since-last-event
Given events(user_id, event_at), compute time_since_last_event_minutes for each row (NULL for first event per user).
Show solution
import pandas as pd
def time_since_last(df: pd.DataFrame) -> pd.DataFrame:
df = df.sort_values(['user_id', 'event_at']).reset_index(drop=True)
prev = df.groupby('user_id')['event_at'].shift(1)
df['time_since_last_minutes'] = (df['event_at'] - prev).dt.total_seconds() / 60
return df
One line if you're fluent with groupby + shift. The interviewer will probably ask to extend: rolling window stats (events in last hour), or vectorize without pandas for streaming.
P3. Detect stuck-sensor runs
Given a series of sensor readings (timestamp, value), flag any run of 10+ consecutive samples with identical value.
Show solution
import pandas as pd
def flag_stuck(series: pd.Series, run_threshold: int = 10) -> pd.Series:
# 'Same as previous' boolean
same = (series.diff() == 0).astype(int)
# Cumulative run length resets when same==0
run_length = same * (same.groupby((same != same.shift()).cumsum()).cumcount() + 1)
return run_length >= run_threshold
Trap: the standard "rolling sum >= threshold" misses the leading samples of the run. The cumcount-reset pattern correctly numbers each run.
B · Evaluation
P4. Lift at top K%
Given scores and labels (0/1), compute the lift at the top 5%: ratio of positive rate among top 5% scored to overall positive rate.
Show solution
import numpy as np
def lift_at_k(scores: np.ndarray, labels: np.ndarray, k_pct: float = 0.05) -> float:
n = len(scores)
k = int(np.ceil(n * k_pct))
# Indices of top k by score (descending)
top_idx = np.argpartition(-scores, k - 1)[:k]
top_rate = labels[top_idx].mean()
base_rate = labels.mean()
return top_rate / base_rate if base_rate > 0 else float('inf')
argpartition is O(n) for picking top k; argsort would be O(n log n) — both work, but argpartition matters at scale.
P5. Pick threshold for fixed precision
Given scores and labels, find the lowest threshold such that precision ≥ 0.8 among flagged.
Show solution
import numpy as np
def threshold_for_precision(scores, labels, target_precision=0.8):
# Sort descending by score
order = np.argsort(-scores)
sorted_labels = labels[order]
sorted_scores = scores[order]
# Cumulative TP at each cutoff
cum_tp = np.cumsum(sorted_labels)
n_flagged = np.arange(1, len(scores) + 1)
precision = cum_tp / n_flagged
# Find the largest n_flagged where precision >= target (lowest threshold)
valid = precision >= target_precision
if not valid.any():
return None # impossible
last_valid_idx = np.where(valid)[0].max()
return float(sorted_scores[last_valid_idx])
Edge case: when no threshold achieves target precision, return None. Trap: with ties in scores, the threshold isn't unique — pick the lower one and document.
P6. PSI between two distributions
Compute the Population Stability Index between a baseline distribution and a current distribution of a feature.
Show solution
import numpy as np
def psi(baseline: np.ndarray, current: np.ndarray, n_bins: int = 10) -> float:
# Use baseline's quantiles to define bin edges
edges = np.quantile(baseline, np.linspace(0, 1, n_bins + 1))
edges[0], edges[-1] = -np.inf, np.inf
base_counts, _ = np.histogram(baseline, bins=edges)
curr_counts, _ = np.histogram(current, bins=edges)
# Avoid log(0)
base_pct = (base_counts + 1) / (base_counts.sum() + n_bins)
curr_pct = (curr_counts + 1) / (curr_counts.sum() + n_bins)
return float(np.sum((curr_pct - base_pct) * np.log(curr_pct / base_pct)))
Common thresholds: PSI < 0.1 = no shift; 0.1–0.25 = moderate; > 0.25 = significant shift requiring attention. The +1 Laplace smoothing prevents the log(0) trap on sparse bins.
C · Production patterns
P7. LRU feature cache with TTL
Implement a cache that evicts the least-recently-used entry when capacity is exceeded, and treats entries older than TTL as missing.
Show solution
from collections import OrderedDict
import time
class TTLLRUCache:
def __init__(self, capacity: int, ttl_seconds: float):
self.capacity = capacity
self.ttl = ttl_seconds
self.data: OrderedDict = OrderedDict()
def get(self, key):
if key not in self.data:
return None
value, ts = self.data[key]
if time.time() - ts > self.ttl:
del self.data[key]
return None
self.data.move_to_end(key)
return value
def set(self, key, value):
self.data[key] = (value, time.time())
self.data.move_to_end(key)
if len(self.data) > self.capacity:
self.data.popitem(last=False) # evict LRU
P8. Rate limiter for an inference service
Implement a per-customer rate limiter that allows N requests per minute, sliding window.
Show solution
from collections import defaultdict, deque
import time
class SlidingWindowLimiter:
def __init__(self, max_per_minute: int):
self.limit = max_per_minute
self.windows: dict[str, deque] = defaultdict(deque)
def allow(self, customer_id: str) -> bool:
now = time.time()
window = self.windows[customer_id]
# Drop entries older than 60s
while window and window[0] < now - 60:
window.popleft()
if len(window) >= self.limit:
return False
window.append(now)
return True
Tradeoff discussion to mention unprompted: this is in-memory per-process. At scale you'd want Redis with sorted sets, or a token bucket — both more memory-efficient at high QPS. The sliding-window-log is exact but uses O(N) memory per customer.
D · Classic algorithms (lighter)
P9. Two-sum
Given an array of ints and a target, return indices of two elements summing to target.
Show solution
def two_sum(nums: list[int], target: int) -> tuple[int, int] | None:
seen = {}
for i, n in enumerate(nums):
if target - n in seen:
return (seen[target - n], i)
seen[n] = i
return None
P10. Subarray sum equals K
Return the number of contiguous subarrays whose sum equals K. Negative numbers allowed.
Show solution
def subarray_sum_eq_k(nums: list[int], k: int) -> int:
seen = {0: 1}
running = 0
answer = 0
for n in nums:
running += n
answer += seen.get(running - k, 0)
seen[running] = seen.get(running, 0) + 1
return answer
Prefix-sum + hashmap trick. O(n) time, O(n) space. Handles negatives — that's the difference from a sliding-window approach.
Drill protocol
Enable drill mode. Read each problem, give yourself 20 minutes on paper or in a notebook. Speak through approach + complexity out loud before writing. Reveal the solution. Compare. Mark "practiced." Aim for 6–7 cleared before any onsite — particularly the feature-engineering ones if you're targeting SentiLink.