Most common Python Problem Solving

Challenge: Design a URL Shortener Service

Problem Statement

Design a URL shortener service similar to bit.ly. The service should:

  1. Shorten a URL: Accept a long URL and return a shortened version.
  2. Redirect a URL: Accept a shortened URL and redirect to the original URL.
  3. Track Clicks: Track the number of times a shortened URL has been accessed.
  4. Prevent Duplicates: If the same long URL is shortened multiple times, return the same shortened URL.

Requirements:

  • Use a database (e.g., SQLite, PostgreSQL, or any in-memory store) to store mappings between short and long URLs.
  • Ensure the shortened URL is unique and as short as possible.
  • Handle edge cases (e.g., invalid URLs, duplicate URLs).
  • Write unit tests to validate your implementation.

Example Input/Output

# Example usage
shortener = URLShortener()

# Shorten a URL
short_url = shortener.shorten_url("https://www.example.com/very/long/url")
print(short_url)  # Output: "https://short.url/abc123"

# Redirect to the original URL
original_url = shortener.redirect("https://short.url/abc123")
print(original_url)  # Output: "https://www.example.com/very/long/url"

# Get click count
click_count = shortener.get_click_count("https://short.url/abc123")
print(click_count)  # Output: 1 (after one access)

Solution

Step 1: Design the System

  1. Database Schema:
    1. id: Primary key (auto-incrementing integer).
    1. long_url: The original long URL.
    1. short_url: The shortened URL.
    1. click_count: Number of times the short URL has been accessed.
  2. Shortening Algorithm:
    1. Use a base-62 encoding (a-z, A-Z, 0-9) to generate a unique short code from the id.
    1. This ensures the short URL is as short as possible.
  3. Thread Safety:
    1. Use a threading.Lock to ensure thread safety when accessing shared resources.

Step 2: Implementation

import string
import threading
from collections import defaultdict

# Base-62 characters for encoding
BASE62 = string.ascii_letters + string.digits

class URLShortener:
    def __init__(self):
        self.url_mapping = {}  # Stores short URL to long URL mappings
        self.click_count = defaultdict(int)  # Tracks click counts
        self.lock = threading.Lock()  # Ensures thread safety

    def shorten_url(self, long_url):
        """Shortens a long URL and stores the mapping."""
        with self.lock:
            # Check if the long URL already exists
            for short_url, url in self.url_mapping.items():
                if url == long_url:
                    return short_url

            # Generate a unique short code
            short_code = self._generate_short_code()
            short_url = f"https://short.url/{short_code}"

            # Store the mapping
            self.url_mapping[short_url] = long_url
            return short_url

    def redirect(self, short_url):
        """Redirects to the original URL and increments the click count."""
        with self.lock:
            if short_url not in self.url_mapping:
                raise ValueError("Short URL not found")

            # Increment click count
            self.click_count[short_url] += 1

            # Return the original URL
            return self.url_mapping[short_url]

    def get_click_count(self, short_url):
        """Returns the click count for a given short URL."""
        return self.click_count.get(short_url, 0)

    def _generate_short_code(self):
        """Generates a unique short code using base-62 encoding."""
        # For simplicity, use the length of the mapping as the ID
        id = len(self.url_mapping)
        short_code = ""
        while id > 0:
            id, remainder = divmod(id, 62)
            short_code = BASE62[remainder] + short_code
        return short_code or BASE62[0]

Step 3: Unit Tests

import unittest

class TestURLShortener(unittest.TestCase):
    def test_shorten_url(self):
        shortener = URLShortener()
        long_url = "https://www.example.com"
        short_url = shortener.shorten_url(long_url)

        # Test if the short URL is generated
        self.assertTrue(short_url.startswith("https://short.url/"))

        # Test if the same long URL returns the same short URL
        self.assertEqual(shortener.shorten_url(long_url), short_url)

    def test_redirect(self):
        shortener = URLShortener()
        long_url = "https://www.example.com"
        short_url = shortener.shorten_url(long_url)

        # Test redirect
        self.assertEqual(shortener.redirect(short_url), long_url)

        # Test click count
        self.assertEqual(shortener.get_click_count(short_url), 1)

    def test_invalid_short_url(self):
        shortener = URLShortener()
        with self.assertRaises(ValueError):
            shortener.redirect("https://short.url/invalid")

if __name__ == "__main__":
    unittest.main()

Explanation

  1. Base-62 Encoding:
    1. Converts an integer id into a short string using characters a-zA-Z, and 0-9.
    1. Ensures the short URL is compact and unique.
  2. Thread Safety:
    1. The threading.Lock ensures that concurrent access to shared resources (e.g., url_mapping and click_count) is handled safely.
  3. Database Integration:
    1. For a production-level solution, replace the in-memory url_mapping and click_count with a database like PostgreSQL or SQLite.
  4. Scalability:
    1. The system can be scaled by using distributed databases (e.g., Redis) and load balancers.

Bonus Features

  1. Custom Short URLs: Allow users to provide a custom short code.
  2. Expiration: Add an expiration time for short URLs.
  3. Analytics: Provide detailed analytics (e.g., referrers, geolocation).

Challenge: Process and Analyze Large Log Files

Problem Statement

You are given a large log file (e.g., several GBs in size) containing records of user activities on a website. Each line in the log file has the following format:

<timestamp> <user_id> <activity_type> <duration>

Example:

2023-10-01T12:00:00 user1 login 5

2023-10-01T12:01:00 user2 purchase 10

2023-10-01T12:02:00 user1 logout 3

Your task is to process this log file and generate the following insights:

  1. Total number of activities for each user.
  2. Total duration of activities for each user.
  3. Most frequent activity type across all users.
  4. Top 5 users with the highest total duration.

Requirements:

  • The log file is too large to fit into memory, so you must process it line by line.
  • Optimize for performance and memory usage.
  • Write clean, modular, and well-documented code.
  • Include unit tests to validate your implementation.

Example Input/Output

Input (log file):

2023-10-01T12:00:00 user1 login 5
2023-10-01T12:01:00 user2 purchase 10
2023-10-01T12:02:00 user1 logout 3
2023-10-01T12:03:00 user2 login 7
2023-10-01T12:04:00 user3 login 2

Output:

{
    "user1": {"total_activities": 2, "total_duration": 8},
    "user2": {"total_activities": 2, "total_duration": 17},
    "user3": {"total_activities": 1, "total_duration": 2},
    "most_frequent_activity": "login",
    "top_5_users": ["user2", "user1", "user3"]
}

Solution

Step 1: Design the System

  1. Process the File Line by Line:
    1. Use a generator to read the file line by line to avoid loading the entire file into memory.
  2. Track User Activity:
    1. Use a dictionary to store user-specific data (e.g., total activities and total duration).
  3. Track Activity Frequency:
    1. Use a Counter to track the frequency of each activity type.
  4. Optimize for Performance:
    1. Avoid unnecessary computations and use efficient data structures.

Step 2: Implementation

from collections import defaultdict, Counter

def process_log_file(file_path):
    """Processes a large log file and generates insights."""
    user_stats = defaultdict(lambda: {"total_activities": 0, "total_duration": 0})
    activity_counter = Counter()

    with open(file_path, "r") as file:
        for line in file:
            # Parse the log line
            timestamp, user_id, activity_type, duration = line.strip().split()
            duration = int(duration)

            # Update user stats
            user_stats[user_id]["total_activities"] += 1
            user_stats[user_id]["total_duration"] += duration

            # Update activity frequency
            activity_counter[activity_type] += 1

    # Find the most frequent activity
    most_frequent_activity = activity_counter.most_common(1)[0][0]

    # Find the top 5 users by total duration
    top_5_users = sorted(
        user_stats.keys(),
        key=lambda user: user_stats[user]["total_duration"],
        reverse=True
    )[:5]

    # Prepare the result
    result = {
        "user_stats": user_stats,
        "most_frequent_activity": most_frequent_activity,
        "top_5_users": top_5_users
    }
    return result

Step 3: Unit Tests

import unittest
import tempfile

class TestLogProcessor(unittest.TestCase):
    def test_process_log_file(self):
        # Create a temporary log file
        log_data = [
            "2023-10-01T12:00:00 user1 login 5\n",
            "2023-10-01T12:01:00 user2 purchase 10\n",
            "2023-10-01T12:02:00 user1 logout 3\n",
            "2023-10-01T12:03:00 user2 login 7\n",
            "2023-10-01T12:04:00 user3 login 2\n"
        ]

        with tempfile.NamedTemporaryFile(mode="w+", delete=False) as temp_file:
            temp_file.writelines(log_data)
            temp_file_path = temp_file.name

        # Process the log file
        result = process_log_file(temp_file_path)

        # Validate the results
        self.assertEqual(result["user_stats"]["user1"]["total_activities"], 2)
        self.assertEqual(result["user_stats"]["user1"]["total_duration"], 8)
        self.assertEqual(result["user_stats"]["user2"]["total_activities"], 2)
        self.assertEqual(result["user_stats"]["user2"]["total_duration"], 17)
        self.assertEqual(result["most_frequent_activity"], "login")
        self.assertEqual(result["top_5_users"], ["user2", "user1", "user3"])

if __name__ == "__main__":
    unittest.main()

Explanation

  1. Memory Efficiency:
    1. The file is processed line by line using a generator, ensuring minimal memory usage.
  2. Data Structures:
    1. defaultdict is used to simplify user stats tracking.
    1. Counter is used to efficiently count activity frequencies.
  3. Performance:
    1. The solution is optimized for large files and avoids unnecessary computations.

Bonus Features

  1. Parallel Processing:
    1. Use multiprocessing to process chunks of the log file in parallel.
  2. Error Handling:
    1. Handle malformed log lines gracefully.
  3. Visualization:
    1. Generate charts or graphs for the insights (e.g., using matplotlib).

Python Coding Challenge Example

Problem: Build a Task Scheduler with Priority Queue

You are tasked with designing a Task Scheduler that can manage and execute tasks based on their priority. The scheduler should support the following features:

  1. Add a Task: Each task has a unique ID, a priority level (integer), and a function to execute.
  2. Execute the Highest Priority Task: The scheduler should execute the task with the highest priority. If two tasks have the same priority, execute the one that was added first (FIFO order).
  3. Remove a Task: Remove a task by its ID.
  4. List All Tasks: Return a list of all tasks in order of their execution priority.

Requirements:

  • Use Python’s heapq module or a custom priority queue implementation.
  • Ensure thread safety if multiple threads are adding or executing tasks.
  • Write unit tests to validate your implementation.

Example Input/Output

# Example usage
scheduler = TaskScheduler()

# Add tasks
scheduler.add_task("task1", 2, lambda: print("Running Task 1"))
scheduler.add_task("task2", 1, lambda: print("Running Task 2"))
scheduler.add_task("task3", 3, lambda: print("Running Task 3"))

# Execute the highest priority task
scheduler.execute_task()  # Output: "Running Task 3"

# List all tasks
print(scheduler.list_tasks())  # Output: [("task3", 3), ("task1", 2), ("task2", 1)]

# Remove a task
scheduler.remove_task("task3")

# Execute the next highest priority task
scheduler.execute_task()  # Output: "Running Task 1"

Solution Guidelines

  1. Use a min-heap or max-heap to manage task priorities efficiently.
  2. Track the order of task insertion for tie-breaking.
  3. Implement thread safety using threading.Lock if required.
  4. Write clean, modular, and well-documented code.
  5. Include unit tests using unittest or pytest.

Bonus Points

  • Implement a feature to schedule tasks at specific times (e.g., using datetime).
  • Add support for recurring tasks.
  • Optimize the scheduler for large-scale task management.

This type of challenge tests:

  • Advanced Python skills (e.g., working with heapqthreading, and lambdas).
  • Problem-solving and algorithmic thinking.
  • Software design and best practices (e.g., modularity, testing, and documentation).

Challenge 5: LRU Cache Implementation (Focus: Data Structures, Algorithm Efficiency)

  • Problem: Implement an LRU (Least Recently Used) cache. An LRU cache is a data structure that stores a limited number of items. When the cache is full, the least recently used item is evicted to make room for a new item.
  • Requirements:
    • Implement the LRU cache as a class with get(key) and put(key, value) methods.
    • get(key): Returns the value associated with the key if it exists in the cache, otherwise returns -1.
    • put(key, value): Updates the value of the key if the key exists, otherwise adds the key-value pair to the cache. If the cache is full, evicts the least recently used item before adding the new item.
    • The cache should have a fixed capacity.
    • Optimize for both get and put operations (ideally O(1) average time complexity).
  • Solution (using collections.OrderedDict):
from collections import OrderedDict

class LRUCache:
    def __init__(self, capacity):
        self.capacity = capacity
        self.cache = OrderedDict()
    def get(self, key):
        if key not in self.cache:
            return -1
        value = self.cache.pop(key)  # Move key to end (most recently used)
        self.cache[key] = value
        return value
    def put(self, key, value):
        if key in self.cache:
            self.cache.pop(key)  # Update and move to end
        elif len(self.cache) == self.capacity:
            self.cache.popitem(last=False)  # Evict LRU (first item)
        self.cache[key] = value

# Example usage:
cache = LRUCache(2)
cache.put(1, 1)
cache.put(2, 2)
print(cache.get(1))   # Output: 1
cache.put(3, 3)       # Evicts key 2
print(cache.get(2))   # Output: -1
cache.put(1, 4)       # Updates key 1
print(cache.get(1))   # Output: 4
  • Explanation: OrderedDict maintains the order of insertion, making it easy to track the least recently used item. pop(key) removes the item and returns the value, and then we re-insert it to make it the most recently used. popitem(last=False) removes and returns the first item (LRU).

Challenge 6: Implement a Thread Pool (Focus: Concurrency, Design)

  • Problem: Design and implement a thread pool. A thread pool manages a fixed number of threads to execute tasks concurrently. This can improve performance by reducing the overhead of creating and destroying threads for each task.
  • Requirements:
    • Implement a ThreadPool class with a submit(task) method that adds a task to the pool’s queue.
    • The pool should have a fixed number of worker threads that continuously take tasks from the queue and execute them.
    • Handle exceptions gracefully within the worker threads.
    • Provide a way to shut down the thread pool.
  • Solution (Simplified Example):
import threading
import queue

class ThreadPool:
    def __init__(self, num_threads):
        self.queue = queue.Queue()
        self.threads = []
        for _ in range(num_threads):
            thread = threading.Thread(target=self._worker)
            self.threads.append(thread)
            thread.daemon = True  # Allow main thread to exit
            thread.start()

    def submit(self, task):
        self.queue.put(task)
    def _worker(self):
        while True:

            try:
                task = self.queue.get(timeout=1)  # Block with timeout
                if task is None:  # Sentinel value for shutdown
                    break
                task()  # Execute the task
                self.queue.task_done()

            except queue.Empty:
                continue  # Just loop around

            except Exception as e:
                print(f"Error executing task: {e}")

    def shutdown(self):
        for _ in range(len(self.threads)):
            self.queue.put(None)  # Put sentinel values to stop workers

        for thread in self.threads:
            thread.join()  # Wait for threads to finish

# Example usage:
pool = ThreadPool(4)

for i in range(10):
    pool.submit(lambda x=i: print(f"Task {x} executed by {threading.current_thread()}"))
pool.shutdown()
  • Explanation: A queue.Queue is used to hold the tasks. Worker threads continuously try to get tasks from the queue. task_done() signals that a task is complete. A sentinel value (None) is used to signal the threads to exit gracefully. Error handling is included.

These examples provide a starting point. Real-world thread pool implementations often have more advanced features (e.g., managing timeouts, returning results, more sophisticated shutdown procedures). The key is to demonstrate a solid understanding of concurrency and design principles. Let me know if you’d like to explore other challenging problems.

let’s refine the ThreadPool example to be more robust and practical, addressing potential issues and adding some useful features.

import threading
import queue
import time
import concurrent.futures  # For a higher-level abstraction (optional)

class ThreadPool:
    def __init__(self, num_threads, max_queue_size=0):  # Added max_queue_size
        self.queue = queue.Queue(maxsize=max_queue_size) # Bounded queue
        self.threads = []
        self.shutdown_event = threading.Event()  # For clean shutdown
        self.exceptions = []  # To collect exceptions from threads
        for _ in range(num_threads):
            thread = threading.Thread(target=self._worker)
            self.threads.append(thread)
            thread.daemon = True  # Allow main thread to exit
            thread.start()

    def submit(self, func, *args, **kwargs):
        if self.shutdown_event.is_set():
            raise RuntimeError("ThreadPool is shut down.")
        future = concurrent.futures.Future() # To get return val and exceptions

        def task_wrapper():
            if self.shutdown_event.is_set(): # Check inside task too
                future.set_exception(RuntimeError("Task submitted after shutdown."))
                return

            try:
                result = func(*args, **kwargs)
                future.set_result(result)

            except Exception as e:
                self.exceptions.append(e) # Store exception
                future.set_exception(e)  # Set exception on Future as well
                print(f"Error executing task: {e}") #Print but continue execution

            finally:
                self.queue.task_done()
        self.queue.put(task_wrapper)
        return future # Return the future

    def _worker(self):
        while True:

            try:
                task = self.queue.get(timeout=1)  # Block with timeout
                if task is None:  # Sentinel value for shutdown
                    break

                if self.shutdown_event.is_set(): # Check before executing
                    self.queue.task_done()
                    break
                task()  # Execute the task

            except queue.Empty:
                continue  # Just loop around
            except Exception as e: # Catch any thread related exceptions
                print(f"Worker thread exception: {e}")
                self.exceptions.append(e) # Capture it for later processing

            finally:
                if self.shutdown_event.is_set(): # Ensure we don't block
                    self.queue.queue.clear() # Clear any remaining tasks
                    break

    def shutdown(self, wait=True):
        self.shutdown_event.set()  # Signal shutdown
        for _ in range(len(self.threads)):
            self.queue.put(None)  # Put sentinel values to stop workers

        if wait: # Wait for threads to complete
            for thread in self.threads:
                thread.join()

        if self.exceptions: # Raise exception if any occurred
            raise ExceptionGroup("Exceptions occurred in threads", self.exceptions)

# Example usage:
def my_task(x):
    if x == 5:
        raise ValueError("Something went wrong!")
    time.sleep(1)
    return x * 2

pool = ThreadPool(4, max_queue_size=5) # Example with max queue size
futures = []

for i in range(10):
    try: # Handle potential queue full errors
        future = pool.submit(my_task, i)
        futures.append(future)
    except queue.Full:
        print("Queue is full, task rejected")

try:
    pool.shutdown() # Wait for all tasks to complete
    for future in futures: # Retrieve results or handle exceptions

        try:
            result = future.result()
            print(f"Result: {result}")

        except Exception as e:
            print(f"Task exception: {e}")

except Exception as e: # Handle exceptions in shutdown
    print(f"ThreadPool exception: {e}")
print("ThreadPool shut down cleanly.")

5 Replies to “Most common Python Problem Solving”

Leave a Reply

Your email address will not be published. Required fields are marked *