jarvis.ProcessPool

View Source
#
# Copyright (c) 2020 by Philipp Scheer. All Rights Reserved.
#

import multiprocessing
import time


class ProcessPool:
    """
    ProcessPool stores a list of background processes and provides several features to control these processes
    """

    def __init__(self, logging_instance: any = None) -> None:
        """
        Initialize an empty ProcessPool with a given `logging_instance`  
        `logging_instance` should be a `Logger` instance
        """
        self.processes = []
        self.logging_instance = logging_instance

    def terminate(self, p: multiprocessing.Process, name: str, max_tries: int = 3, time_between_tries: int = 2) -> None:
        """
        Terminate a p `process` with a given `name`
        * `max_tries` specifies the max tries with a terminate signal before killing the process
        * `time_between_tries` sets the seconds between each try
        """
        p.terminate()
        print(p)
        i = 0
        while p.is_alive():
            i += 1
            if i > max_tries:
                if self.logging_instance is not None:
                    self.logging_instance.i(
                        "process", f"killing process '{name}', didn't react to terminate signals")
                p.kill()
                return
            else:
                if self.logging_instance is not None:
                    self.logging_instance.i(
                        "process", f"waiting for the '{name}' process to terminate (try {i})")
                time.sleep(time_between_tries)

    def register(self, target_function: any, process_name: str, args: list = []) -> None:
        """
        Register a background process and add it to the ProcessPool
        * `target_function` specifies the function which should be run in background
        * `process_name` specifies a short and descriptive name what this function is doing
        * `args` specifies a list of arguments which should be passed to the function
        """
        p = multiprocessing.Process(
            target=target_function, name=process_name, args=args)
        p.start()
        self.processes.append(p)

    def stop_all(self) -> None:
        """
        Stop all processes in the ProcessPool  
        Try terminating the processes before killing them
        """
        for p in self.processes:
            self.terminate(p, p.name)
#   class ProcessPool:
View Source
class ProcessPool:
    """
    ProcessPool stores a list of background processes and provides several features to control these processes
    """

    def __init__(self, logging_instance: any = None) -> None:
        """
        Initialize an empty ProcessPool with a given `logging_instance`  
        `logging_instance` should be a `Logger` instance
        """
        self.processes = []
        self.logging_instance = logging_instance

    def terminate(self, p: multiprocessing.Process, name: str, max_tries: int = 3, time_between_tries: int = 2) -> None:
        """
        Terminate a p `process` with a given `name`
        * `max_tries` specifies the max tries with a terminate signal before killing the process
        * `time_between_tries` sets the seconds between each try
        """
        p.terminate()
        print(p)
        i = 0
        while p.is_alive():
            i += 1
            if i > max_tries:
                if self.logging_instance is not None:
                    self.logging_instance.i(
                        "process", f"killing process '{name}', didn't react to terminate signals")
                p.kill()
                return
            else:
                if self.logging_instance is not None:
                    self.logging_instance.i(
                        "process", f"waiting for the '{name}' process to terminate (try {i})")
                time.sleep(time_between_tries)

    def register(self, target_function: any, process_name: str, args: list = []) -> None:
        """
        Register a background process and add it to the ProcessPool
        * `target_function` specifies the function which should be run in background
        * `process_name` specifies a short and descriptive name what this function is doing
        * `args` specifies a list of arguments which should be passed to the function
        """
        p = multiprocessing.Process(
            target=target_function, name=process_name, args=args)
        p.start()
        self.processes.append(p)

    def stop_all(self) -> None:
        """
        Stop all processes in the ProcessPool  
        Try terminating the processes before killing them
        """
        for p in self.processes:
            self.terminate(p, p.name)

ProcessPool stores a list of background processes and provides several features to control these processes

#   ProcessPool(logging_instance: <built-in function any> = None)
View Source
    def __init__(self, logging_instance: any = None) -> None:
        """
        Initialize an empty ProcessPool with a given `logging_instance`  
        `logging_instance` should be a `Logger` instance
        """
        self.processes = []
        self.logging_instance = logging_instance

Initialize an empty ProcessPool with a given logging_instance
logging_instance should be a Logger instance

#   def terminate( self, p: multiprocessing.context.Process, name: str, max_tries: int = 3, time_between_tries: int = 2 ) -> None:
View Source
    def terminate(self, p: multiprocessing.Process, name: str, max_tries: int = 3, time_between_tries: int = 2) -> None:
        """
        Terminate a p `process` with a given `name`
        * `max_tries` specifies the max tries with a terminate signal before killing the process
        * `time_between_tries` sets the seconds between each try
        """
        p.terminate()
        print(p)
        i = 0
        while p.is_alive():
            i += 1
            if i > max_tries:
                if self.logging_instance is not None:
                    self.logging_instance.i(
                        "process", f"killing process '{name}', didn't react to terminate signals")
                p.kill()
                return
            else:
                if self.logging_instance is not None:
                    self.logging_instance.i(
                        "process", f"waiting for the '{name}' process to terminate (try {i})")
                time.sleep(time_between_tries)

Terminate a p process with a given name

  • max_tries specifies the max tries with a terminate signal before killing the process
  • time_between_tries sets the seconds between each try
#   def register( self, target_function: <built-in function any>, process_name: str, args: list = [] ) -> None:
View Source
    def register(self, target_function: any, process_name: str, args: list = []) -> None:
        """
        Register a background process and add it to the ProcessPool
        * `target_function` specifies the function which should be run in background
        * `process_name` specifies a short and descriptive name what this function is doing
        * `args` specifies a list of arguments which should be passed to the function
        """
        p = multiprocessing.Process(
            target=target_function, name=process_name, args=args)
        p.start()
        self.processes.append(p)

Register a background process and add it to the ProcessPool

  • target_function specifies the function which should be run in background
  • process_name specifies a short and descriptive name what this function is doing
  • args specifies a list of arguments which should be passed to the function
#   def stop_all(self) -> None:
View Source
    def stop_all(self) -> None:
        """
        Stop all processes in the ProcessPool  
        Try terminating the processes before killing them
        """
        for p in self.processes:
            self.terminate(p, p.name)

Stop all processes in the ProcessPool
Try terminating the processes before killing them