Advanced Programming

Positive Even Squares

Write a function that accepts any number of positional arguments, all of which you may assume will be lists of integers. Your function should filter all of these lists such that they only contain even positive integers and combine all of the lists into one list of integers. Your function should then modify the combined list such that it contains the squares of all of the elements and return that list.

Use a combination of the map, filter and lambda functions/keywords to modify the lists.

See the sample input for an example.

Sample Input #1

*args = [[-5, 2, 3, 4, 5], [1, 3, 5, 6, 7], [-9, -8, 10]] # arguments will be passed positionally to the function like this: positive_even_squares([-5, 2, 3, 4, 5], [1, 3, 5, 6, 7], [-9, -8, 10])

Sample Output #1

[4, 16, 36, 100] # The combined list of positive even integers is: [2, 4, 6, 10], the result is the squares of all of these values.

Solution

def positive_even_squares(*args):
    positive_even_nums = []

    for lst in args:
        filtered_list = filter(lambda x: x > 0 and x % 2 == 0, lst)
        positive_even_nums.extend(filtered_list)

    squares = map(lambda x: x ** 2, positive_even_nums)
    return list(squares)

Integer Sum

Write a function named integer_sum that accepts any number of positional arguments, which are assumed to be integers. This function should return the sum of all of these integers.

To handle invalid input (arguments that are not integers) you must write the following decorators and use them to decorate the integer_sum function.

  • flatten_lists: this decorator should flatten any list arguments for the decorated function by extracting their elements and passing them as individual arguments instead of the list. For example, if [1, 2, True] is an argument, then 1, 2 and True should be extracted and passed as arguments instead of the list to the decorated function.
  • convert_strings_to_ints: this decorator should convert any string arguments that are valid integers to integers and pass them to the decorated function. Any string that is not a valid integer should be removed as an argument to the decorated function.
  • filter_integers: this decorator should remove any argument that is not an integer and call the decorated function with only integer arguments.

You may assume all arguments passed to integer_sum will be of type float, int, str or list.

Sample Input #1

args = ["1", "2", -0.9, 4, [5, "hi", "3"]] # arguments will be passed positionally to the function like this: integer_sum("1", "2", -0.9, 4, [5, "hi", "3"])

Sample Output #1

15# the sum of "1", "2", 4, 5 and "3".

Solution

def flatten_lists(func):
    def wrapper(*args):
        new_args = []
        for arg in args:
            if isinstance(arg, list):
                new_args.extend(arg)
            else:
                new_args.append(arg)

        result = func(*new_args)
        return result

    return wrapper


def convert_strings_to_ints(func):
    def wrapper(*args):
        new_args = []
        for arg in args:
            if isinstance(arg, str) and arg.isdigit():
                new_args.append(int(arg))
            else:
                new_args.append(arg)

        result = func(*new_args)
        return result

    return wrapper


def filter_integers(func):
    def wrapper(*args):
        new_args = []
        for arg in args:
            if isinstance(arg, int):
                new_args.append(arg)

        result = func(*new_args)
        return result

    return wrapper


@flatten_lists
@convert_strings_to_ints
@filter_integers
def integer_sum(*args):
    return sum(args)

Generate String

Write a generator that accepts a string and an integer called frequency and generates a sequence as follows: string[0] * frequency + string[1] * frequency + … + string[-2] * frequency + string[-1] * frequency. Your generator should not store this string, it should generate the next element in the sequence each time its next method is called.

You should create this generator in both a functional and class based way. Your functional generator should be named generate_string and your class based generator (a.k.a iterator) should be named GenerateString.

You may assume that frequency >= 0.

Sample Input #1

string = "hello" frequency = 3

Sample Output #1

"hhheeellllllooo" # The sequence that should be generated by both generators

Solution

def generate_string(string, frequency):
    for char in string:
        for _ in range(frequency):
            yield char


class GenerateString:
    def __init__(self, string, frequency):
        self.string = string
        self.frequency = frequency

    def __iter__(self):
        self.current_char_index = 0
        self.char_counter = 0
        return self

    def __next__(self):
        if self.char_counter >= self.frequency:
            self.char_counter = 0
            self.current_char_index += 1

        if self.current_char_index >= len(self.string):
            raise StopIteration

        self.char_counter += 1
        return self.string[self.current_char_index]

Thread Safe Counter

Write a WordCounter class that is meant to be able to count words in large texts, so that a user of that class can quickly calculate how many times a specific word occurs in a string.

WordCounter should implement the following methods:

  • process_text(text) should take in a string, text, and update the internal attributes of WordCounter in a thread-safe manner. You may assume naively that text.split(" ") is good enough to return the list of words in the passed text.
  • get_word_count(word) should take in a string, word, and check how many times that word has been seen in all the texts that this WordCounter has processed. If this word has never been seen, you should return 0.

NOTE: This class must be thread-safe; meaning that many threads should be able to use the WordCounter at the same time, and the calculations must remain accurate as if only a single thread was using the instance of WordCounter.

NOTE: You may not use the Counter class of the collections standard library.

>>> wc = WordCounter()
>>> wc.process_text("the cat is in the bag")
>>> wc.get_word_count("the")
2
>>> wc.get_word_count("bag")
1
>>> wc.get_word_count("dog")
0

Solution

import threading


class WordCounter:
    def __init__(self):
        self.lock = threading.Lock()
        self.word_counts = {}

    def process_text(self, text):
        words = text.split(" ")
        for word in words:
            self._increment_word_count(word)

    def get_word_count(self, word):
        self.lock.acquire()
        count = self.word_counts.get(word, 0)
        self.lock.release()
        return count

    def _increment_word_count(self, word):
        self.lock.acquire()
        self.word_counts[word] = self.word_counts.get(word, 0) + 1
        self.lock.release()

Asynchronous Fetcher

Write a BatchFetcher class that is meant to fetch lots of records from a database very quickly.

Your constructor takes in a database object that has an async method called async_fetch. This method takes a record identifier (or record_id) and returns whatever the database has in storage for that record.

BatchFetcher should implement the async method fetch_records, which takes in a list, record_ids, and should return the list of records corresponding to those record_ids.

>>> fetcher = BatchFetcher(database)
>>> fetcher.fetch_records(["A", "B", "C"])
[{"data": "data of record A"}, {"data": "data of record B"}, {"data": "data of record C"}]

Solution

import asyncio


class BatchFetcher:
    def __init__(self, database):
        self.database = database

    async def fetch_records(self, record_ids):
        pending_records = []
        for record_id in record_ids:
            pending_records.append(self.database.async_fetch(record_id))

        return await asyncio.gather(*pending_records)

import asyncio


class BatchFetcher:
    def __init__(self, database):
        self.database = database

    async def fetch_records(self, record_ids):
        pending_records = []
        for record_id in record_ids:
            task = asyncio.create_task(self.database.async_fetch(record_id))
            pending_records.append(task)

        records = []
        for pending_record in pending_records:
            records.append(await pending_record)
        return records