Concurrent pool of functions execution

Posted on Oct 12, 2024
# Concurrent pool of functions execution 
# Date: 12/10/2024
# Written by Nandan M

import concurrent.futures
import inspect
from dataclasses import dataclass
from typing import Any, Callable, Dict, List


@dataclass
class IOTaskPoolExecutor:
    """
    Helper to start executing pool of tasks(sync or async functions) concurrently using threads.
    The total time taken will be equal to the duration of the slowest task.
    Each task is passed to worker() as keyword argument along with the other arguments.
    This class has limited support for tasks execution as all the arguments
    values cannot be different values for same key, each argument can have only one value.
    But tasks can have varing number of arguments and may not be common.
    Arguments are dynamically filtered and pass to tasks

    Example usage:
        def add_num(a, b): return a + b
        def sub_num(x, y): return x - y
        def mul_num(a, y): return a * y
        def zero(): return 0
        calc_task_list = [add_num, sub_num, mul_num, zero]
        fin_result = IOTaskPoolExecutor.executor(
                        task_list = calc_task_list,
                        a = 1,                          -|
                        b = 2,                           | All arguments of all tasks are passed, later
                        x = 3,                           | it is filtered and passed to respective tasks
                        y = 4                           -|
                    )
        # OUTPUT of fin_result
        # {'add_num': 3, 'sub_num': -1, 'mul_num': 4, 'zero': 0}
    all task results are aggregated in 'results_dict' dictionary with function name as key
    and return value as value
    results_dict: {'func_1_name': <return>, ...}
    """

    def worker(kwargs_dict: Dict[str, Any]) -> Dict[str, Any]:
        # Extract task and results object
        task: Callable = kwargs_dict["task"]

        # Validate that the task function accepts the required parameters.
        # Extract function parameters from task function signature and pass them as kwargs to the task function.
        valid_func_param_list: List[str] = list(
            inspect.signature(task).parameters.keys()
        )
        func_param: Dict[str, Any] = {
            param: kwargs_dict[param] for param in valid_func_param_list
        }

        # Call the task function with the extracted parameters.
        result: Any = task(**func_param)
        return {task.__name__: result}

    @classmethod
    def executor(cls, task_list: List[Callable], **kwargs) -> Dict[str, Any]:
        args_list: List[Dict] = []
        for task in task_list:
            executor_kwargs: Dict[str, Any] = {"task": task} | kwargs
            args_list.append(executor_kwargs)

        with concurrent.futures.ThreadPoolExecutor() as pool_executor:
            results_list: List[Dict[str, Any]] = list(
                pool_executor.map(cls.worker, args_list)
            )
        results_dict: Dict[str, Any] = {
            key: value for result in results_list for key, value in result.items()
        }
        return results_dict