Challenge: Design a URL Shortener Service
Problem Statement
Design a URL shortener service similar to bit.ly. The service should:
- Shorten a URL: Accept a long URL and return a shortened version.
- Redirect a URL: Accept a shortened URL and redirect to the original URL.
- Track Clicks: Track the number of times a shortened URL has been accessed.
- Prevent Duplicates: If the same long URL is shortened multiple times, return the same shortened URL.
Requirements:
- Use a database (e.g., SQLite, PostgreSQL, or any in-memory store) to store mappings between short and long URLs.
- Ensure the shortened URL is unique and as short as possible.
- Handle edge cases (e.g., invalid URLs, duplicate URLs).
- Write unit tests to validate your implementation.
Example Input/Output
# Example usage
shortener = URLShortener()
# Shorten a URL
short_url = shortener.shorten_url("https://www.example.com/very/long/url")
print(short_url) # Output: "https://short.url/abc123"
# Redirect to the original URL
original_url = shortener.redirect("https://short.url/abc123")
print(original_url) # Output: "https://www.example.com/very/long/url"
# Get click count
click_count = shortener.get_click_count("https://short.url/abc123")
print(click_count) # Output: 1 (after one access)
Solution
Step 1: Design the System
- Database Schema:
id: Primary key (auto-incrementing integer).
long_url: The original long URL.
short_url: The shortened URL.
click_count: Number of times the short URL has been accessed.
- Shortening Algorithm:
- Use a base-62 encoding (a-z, A-Z, 0-9) to generate a unique short code from the
id.
- This ensures the short URL is as short as possible.
- Use a base-62 encoding (a-z, A-Z, 0-9) to generate a unique short code from the
- Thread Safety:
- Use a
threading.Lockto ensure thread safety when accessing shared resources.
- Use a
Step 2: Implementation
import string
import threading
from collections import defaultdict
# Base-62 characters for encoding
BASE62 = string.ascii_letters + string.digits
class URLShortener:
def __init__(self):
self.url_mapping = {} # Stores short URL to long URL mappings
self.click_count = defaultdict(int) # Tracks click counts
self.lock = threading.Lock() # Ensures thread safety
def shorten_url(self, long_url):
"""Shortens a long URL and stores the mapping."""
with self.lock:
# Check if the long URL already exists
for short_url, url in self.url_mapping.items():
if url == long_url:
return short_url
# Generate a unique short code
short_code = self._generate_short_code()
short_url = f"https://short.url/{short_code}"
# Store the mapping
self.url_mapping[short_url] = long_url
return short_url
def redirect(self, short_url):
"""Redirects to the original URL and increments the click count."""
with self.lock:
if short_url not in self.url_mapping:
raise ValueError("Short URL not found")
# Increment click count
self.click_count[short_url] += 1
# Return the original URL
return self.url_mapping[short_url]
def get_click_count(self, short_url):
"""Returns the click count for a given short URL."""
return self.click_count.get(short_url, 0)
def _generate_short_code(self):
"""Generates a unique short code using base-62 encoding."""
# For simplicity, use the length of the mapping as the ID
id = len(self.url_mapping)
short_code = ""
while id > 0:
id, remainder = divmod(id, 62)
short_code = BASE62[remainder] + short_code
return short_code or BASE62[0]
Step 3: Unit Tests
import unittest
class TestURLShortener(unittest.TestCase):
def test_shorten_url(self):
shortener = URLShortener()
long_url = "https://www.example.com"
short_url = shortener.shorten_url(long_url)
# Test if the short URL is generated
self.assertTrue(short_url.startswith("https://short.url/"))
# Test if the same long URL returns the same short URL
self.assertEqual(shortener.shorten_url(long_url), short_url)
def test_redirect(self):
shortener = URLShortener()
long_url = "https://www.example.com"
short_url = shortener.shorten_url(long_url)
# Test redirect
self.assertEqual(shortener.redirect(short_url), long_url)
# Test click count
self.assertEqual(shortener.get_click_count(short_url), 1)
def test_invalid_short_url(self):
shortener = URLShortener()
with self.assertRaises(ValueError):
shortener.redirect("https://short.url/invalid")
if __name__ == "__main__":
unittest.main()
Explanation
- Base-62 Encoding:
- Converts an integer
idinto a short string using charactersa-z,A-Z, and0-9.
- Ensures the short URL is compact and unique.
- Converts an integer
- Thread Safety:
- The
threading.Lockensures that concurrent access to shared resources (e.g.,url_mappingandclick_count) is handled safely.
- The
- Database Integration:
- For a production-level solution, replace the in-memory
url_mappingandclick_countwith a database like PostgreSQL or SQLite.
- For a production-level solution, replace the in-memory
- Scalability:
- The system can be scaled by using distributed databases (e.g., Redis) and load balancers.
Bonus Features
- Custom Short URLs: Allow users to provide a custom short code.
- Expiration: Add an expiration time for short URLs.
- Analytics: Provide detailed analytics (e.g., referrers, geolocation).
Challenge: Process and Analyze Large Log Files
Problem Statement
You are given a large log file (e.g., several GBs in size) containing records of user activities on a website. Each line in the log file has the following format:
<timestamp> <user_id> <activity_type> <duration>
Example:
2023-10-01T12:00:00 user1 login 5
2023-10-01T12:01:00 user2 purchase 10
2023-10-01T12:02:00 user1 logout 3
Your task is to process this log file and generate the following insights:
- Total number of activities for each user.
- Total duration of activities for each user.
- Most frequent activity type across all users.
- Top 5 users with the highest total duration.
Requirements:
- The log file is too large to fit into memory, so you must process it line by line.
- Optimize for performance and memory usage.
- Write clean, modular, and well-documented code.
- Include unit tests to validate your implementation.
Example Input/Output
Input (log file):
2023-10-01T12:00:00 user1 login 5
2023-10-01T12:01:00 user2 purchase 10
2023-10-01T12:02:00 user1 logout 3
2023-10-01T12:03:00 user2 login 7
2023-10-01T12:04:00 user3 login 2
Output:
{
"user1": {"total_activities": 2, "total_duration": 8},
"user2": {"total_activities": 2, "total_duration": 17},
"user3": {"total_activities": 1, "total_duration": 2},
"most_frequent_activity": "login",
"top_5_users": ["user2", "user1", "user3"]
}
Solution
Step 1: Design the System
- Process the File Line by Line:
- Use a generator to read the file line by line to avoid loading the entire file into memory.
- Track User Activity:
- Use a dictionary to store user-specific data (e.g., total activities and total duration).
- Track Activity Frequency:
- Use a
Counterto track the frequency of each activity type.
- Use a
- Optimize for Performance:
- Avoid unnecessary computations and use efficient data structures.
Step 2: Implementation
from collections import defaultdict, Counter
def process_log_file(file_path):
"""Processes a large log file and generates insights."""
user_stats = defaultdict(lambda: {"total_activities": 0, "total_duration": 0})
activity_counter = Counter()
with open(file_path, "r") as file:
for line in file:
# Parse the log line
timestamp, user_id, activity_type, duration = line.strip().split()
duration = int(duration)
# Update user stats
user_stats[user_id]["total_activities"] += 1
user_stats[user_id]["total_duration"] += duration
# Update activity frequency
activity_counter[activity_type] += 1
# Find the most frequent activity
most_frequent_activity = activity_counter.most_common(1)[0][0]
# Find the top 5 users by total duration
top_5_users = sorted(
user_stats.keys(),
key=lambda user: user_stats[user]["total_duration"],
reverse=True
)[:5]
# Prepare the result
result = {
"user_stats": user_stats,
"most_frequent_activity": most_frequent_activity,
"top_5_users": top_5_users
}
return result
Step 3: Unit Tests
import unittest
import tempfile
class TestLogProcessor(unittest.TestCase):
def test_process_log_file(self):
# Create a temporary log file
log_data = [
"2023-10-01T12:00:00 user1 login 5\n",
"2023-10-01T12:01:00 user2 purchase 10\n",
"2023-10-01T12:02:00 user1 logout 3\n",
"2023-10-01T12:03:00 user2 login 7\n",
"2023-10-01T12:04:00 user3 login 2\n"
]
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
temp_file.writelines(log_data)
temp_file_path = temp_file.name
# Process the log file
result = process_log_file(temp_file_path)
# Validate the results
self.assertEqual(result["user_stats"]["user1"]["total_activities"], 2)
self.assertEqual(result["user_stats"]["user1"]["total_duration"], 8)
self.assertEqual(result["user_stats"]["user2"]["total_activities"], 2)
self.assertEqual(result["user_stats"]["user2"]["total_duration"], 17)
self.assertEqual(result["most_frequent_activity"], "login")
self.assertEqual(result["top_5_users"], ["user2", "user1", "user3"])
if __name__ == "__main__":
unittest.main()
Explanation
- Memory Efficiency:
- The file is processed line by line using a generator, ensuring minimal memory usage.
- Data Structures:
defaultdictis used to simplify user stats tracking.
Counteris used to efficiently count activity frequencies.
- Performance:
- The solution is optimized for large files and avoids unnecessary computations.
Bonus Features
- Parallel Processing:
- Use multiprocessing to process chunks of the log file in parallel.
- Error Handling:
- Handle malformed log lines gracefully.
- Visualization:
- Generate charts or graphs for the insights (e.g., using
matplotlib).
- Generate charts or graphs for the insights (e.g., using
Python Coding Challenge Example
Problem: Build a Task Scheduler with Priority Queue
You are tasked with designing a Task Scheduler that can manage and execute tasks based on their priority. The scheduler should support the following features:
- Add a Task: Each task has a unique ID, a priority level (integer), and a function to execute.
- Execute the Highest Priority Task: The scheduler should execute the task with the highest priority. If two tasks have the same priority, execute the one that was added first (FIFO order).
- Remove a Task: Remove a task by its ID.
- List All Tasks: Return a list of all tasks in order of their execution priority.
Requirements:
- Use Python’s
heapqmodule or a custom priority queue implementation. - Ensure thread safety if multiple threads are adding or executing tasks.
- Write unit tests to validate your implementation.
Example Input/Output
# Example usage
scheduler = TaskScheduler()
# Add tasks
scheduler.add_task("task1", 2, lambda: print("Running Task 1"))
scheduler.add_task("task2", 1, lambda: print("Running Task 2"))
scheduler.add_task("task3", 3, lambda: print("Running Task 3"))
# Execute the highest priority task
scheduler.execute_task() # Output: "Running Task 3"
# List all tasks
print(scheduler.list_tasks()) # Output: [("task3", 3), ("task1", 2), ("task2", 1)]
# Remove a task
scheduler.remove_task("task3")
# Execute the next highest priority task
scheduler.execute_task() # Output: "Running Task 1"
Solution Guidelines
- Use a min-heap or max-heap to manage task priorities efficiently.
- Track the order of task insertion for tie-breaking.
- Implement thread safety using
threading.Lockif required. - Write clean, modular, and well-documented code.
- Include unit tests using
unittestorpytest.
Bonus Points
- Implement a feature to schedule tasks at specific times (e.g., using
datetime). - Add support for recurring tasks.
- Optimize the scheduler for large-scale task management.
This type of challenge tests:
- Advanced Python skills (e.g., working with
heapq,threading, and lambdas). - Problem-solving and algorithmic thinking.
- Software design and best practices (e.g., modularity, testing, and documentation).
Challenge 5: LRU Cache Implementation (Focus: Data Structures, Algorithm Efficiency)
- Problem: Implement an LRU (Least Recently Used) cache. An LRU cache is a data structure that stores a limited number of items. When the cache is full, the least recently used item is evicted to make room for a new item.
- Requirements:
- Implement the LRU cache as a class with get(key) and put(key, value) methods.
- get(key): Returns the value associated with the key if it exists in the cache, otherwise returns -1.
- put(key, value): Updates the value of the key if the key exists, otherwise adds the key-value pair to the cache. If the cache is full, evicts the least recently used item before adding the new item.
- The cache should have a fixed capacity.
- Optimize for both get and put operations (ideally O(1) average time complexity).
- Solution (using collections.OrderedDict):
from collections import OrderedDict
class LRUCache:
def __init__(self, capacity):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return -1
value = self.cache.pop(key) # Move key to end (most recently used)
self.cache[key] = value
return value
def put(self, key, value):
if key in self.cache:
self.cache.pop(key) # Update and move to end
elif len(self.cache) == self.capacity:
self.cache.popitem(last=False) # Evict LRU (first item)
self.cache[key] = value
# Example usage:
cache = LRUCache(2)
cache.put(1, 1)
cache.put(2, 2)
print(cache.get(1)) # Output: 1
cache.put(3, 3) # Evicts key 2
print(cache.get(2)) # Output: -1
cache.put(1, 4) # Updates key 1
print(cache.get(1)) # Output: 4
- Explanation: OrderedDict maintains the order of insertion, making it easy to track the least recently used item. pop(key) removes the item and returns the value, and then we re-insert it to make it the most recently used. popitem(last=False) removes and returns the first item (LRU).
Challenge 6: Implement a Thread Pool (Focus: Concurrency, Design)
- Problem: Design and implement a thread pool. A thread pool manages a fixed number of threads to execute tasks concurrently. This can improve performance by reducing the overhead of creating and destroying threads for each task.
- Requirements:
- Implement a ThreadPool class with a submit(task) method that adds a task to the pool’s queue.
- The pool should have a fixed number of worker threads that continuously take tasks from the queue and execute them.
- Handle exceptions gracefully within the worker threads.
- Provide a way to shut down the thread pool.
- Solution (Simplified Example):
import threading
import queue
class ThreadPool:
def __init__(self, num_threads):
self.queue = queue.Queue()
self.threads = []
for _ in range(num_threads):
thread = threading.Thread(target=self._worker)
self.threads.append(thread)
thread.daemon = True # Allow main thread to exit
thread.start()
def submit(self, task):
self.queue.put(task)
def _worker(self):
while True:
try:
task = self.queue.get(timeout=1) # Block with timeout
if task is None: # Sentinel value for shutdown
break
task() # Execute the task
self.queue.task_done()
except queue.Empty:
continue # Just loop around
except Exception as e:
print(f"Error executing task: {e}")
def shutdown(self):
for _ in range(len(self.threads)):
self.queue.put(None) # Put sentinel values to stop workers
for thread in self.threads:
thread.join() # Wait for threads to finish
# Example usage:
pool = ThreadPool(4)
for i in range(10):
pool.submit(lambda x=i: print(f"Task {x} executed by {threading.current_thread()}"))
pool.shutdown()
- Explanation: A queue.Queue is used to hold the tasks. Worker threads continuously try to get tasks from the queue. task_done() signals that a task is complete. A sentinel value (None) is used to signal the threads to exit gracefully. Error handling is included.
These examples provide a starting point. Real-world thread pool implementations often have more advanced features (e.g., managing timeouts, returning results, more sophisticated shutdown procedures). The key is to demonstrate a solid understanding of concurrency and design principles. Let me know if you’d like to explore other challenging problems.
let’s refine the ThreadPool example to be more robust and practical, addressing potential issues and adding some useful features.
import threading
import queue
import time
import concurrent.futures # For a higher-level abstraction (optional)
class ThreadPool:
def __init__(self, num_threads, max_queue_size=0): # Added max_queue_size
self.queue = queue.Queue(maxsize=max_queue_size) # Bounded queue
self.threads = []
self.shutdown_event = threading.Event() # For clean shutdown
self.exceptions = [] # To collect exceptions from threads
for _ in range(num_threads):
thread = threading.Thread(target=self._worker)
self.threads.append(thread)
thread.daemon = True # Allow main thread to exit
thread.start()
def submit(self, func, *args, **kwargs):
if self.shutdown_event.is_set():
raise RuntimeError("ThreadPool is shut down.")
future = concurrent.futures.Future() # To get return val and exceptions
def task_wrapper():
if self.shutdown_event.is_set(): # Check inside task too
future.set_exception(RuntimeError("Task submitted after shutdown."))
return
try:
result = func(*args, **kwargs)
future.set_result(result)
except Exception as e:
self.exceptions.append(e) # Store exception
future.set_exception(e) # Set exception on Future as well
print(f"Error executing task: {e}") #Print but continue execution
finally:
self.queue.task_done()
self.queue.put(task_wrapper)
return future # Return the future
def _worker(self):
while True:
try:
task = self.queue.get(timeout=1) # Block with timeout
if task is None: # Sentinel value for shutdown
break
if self.shutdown_event.is_set(): # Check before executing
self.queue.task_done()
break
task() # Execute the task
except queue.Empty:
continue # Just loop around
except Exception as e: # Catch any thread related exceptions
print(f"Worker thread exception: {e}")
self.exceptions.append(e) # Capture it for later processing
finally:
if self.shutdown_event.is_set(): # Ensure we don't block
self.queue.queue.clear() # Clear any remaining tasks
break
def shutdown(self, wait=True):
self.shutdown_event.set() # Signal shutdown
for _ in range(len(self.threads)):
self.queue.put(None) # Put sentinel values to stop workers
if wait: # Wait for threads to complete
for thread in self.threads:
thread.join()
if self.exceptions: # Raise exception if any occurred
raise ExceptionGroup("Exceptions occurred in threads", self.exceptions)
# Example usage:
def my_task(x):
if x == 5:
raise ValueError("Something went wrong!")
time.sleep(1)
return x * 2
pool = ThreadPool(4, max_queue_size=5) # Example with max queue size
futures = []
for i in range(10):
try: # Handle potential queue full errors
future = pool.submit(my_task, i)
futures.append(future)
except queue.Full:
print("Queue is full, task rejected")
try:
pool.shutdown() # Wait for all tasks to complete
for future in futures: # Retrieve results or handle exceptions
try:
result = future.result()
print(f"Result: {result}")
except Exception as e:
print(f"Task exception: {e}")
except Exception as e: # Handle exceptions in shutdown
print(f"ThreadPool exception: {e}")
print("ThreadPool shut down cleanly.")
Brilliant writing! Speaking of Trump’s victory, Trump Coin remains unbeatable.
Thanks for your inspiration.
Really, I am very happy for your time.
Thanks again
Brilliant analysis! The beauty of AI Tools is in its innovative approach.
Thanks for your comments.
Please look at the tutorial on YouTube
Thanks for your comments.
Please look at the tutorial on YouTube