9 File in charge of setting up the cron jobs for the server.
12from typing
import Union, Any, Dict, Tuple
13from apscheduler.job
import Job
14from apscheduler.schedulers.background
import BackgroundScheduler
15from apscheduler.schedulers
import SchedulerAlreadyRunningError, SchedulerNotRunningError
16from display_tty
import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
21 This is the class that is in charge of scheduling background tasks that need to run on intervals
24 def __init__(self, success: int = 0, error: int = 84, debug: bool =
False) ->
None:
38 logger=self.__class__.__name__
43 The destructor of the class
45 self.
disp.log_info(
"Stopping background tasks.",
"__del__")
47 msg = f
"The cron exited with status {exit_code}."
49 self.
disp.log_error(msg,
"__del__")
51 self.
disp.log_debug(msg,
"__del__")
53 def safe_add_task(self, func: callable, args: Union[Tuple,
None] =
None, kwargs: Union[Dict,
None] =
None, trigger: Union[str, Any] =
"interval", seconds: int = 5) -> Union[int, Job]:
55 A non-crashing implementation of the add_task function.
58 func (callable): _description_: The function to be called when it is time to run the job
59 args (Union[Tuple, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
60 kwargs (Union[Dict, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
61 trigger (Union[str, Any], optional): _description_. Defaults to "interval".
62 seconds (int, optional): _description_. Defaults to 5. The amount of seconds to wait before executing the task again (I don't think it is effective for the cron option)
65 Union[int, Job]: _description_: returns self.error if there was an error, otherwise, returns a Job instance.
75 except ValueError
as e:
77 f
"Runtime Error for add_task. {e}",
84 This function is in charge of starting the scheduler. In a non-breaking way.
87 int: _description_: Will return self.success if it worked, otherwise self.error.
91 except RuntimeError
as e:
93 f
"Runtime Error for start. {e}",
100 This function is in charge of pausing the scheduler. In a non-breaking way.
103 pause (bool, optional): _description_: This is the boolean that will determine if the scheduler should be paused or not. Defaults to True.
106 int: _description_: Will return self.success if it worked, otherwise self.error
109 return self.
pause(pause=pause)
110 except RuntimeError
as e:
112 f
"Runtime Error for start. {e}",
119 This function is in charge of resuming the scheduler. In a non-breaking way.
122 int: _description_: Will return self.success if it worked, otherwise self.error.
126 except RuntimeError
as e:
128 f
"Runtime Error for start. {e}",
135 This function is in charge of stopping the scheduler. In a non-breaking way.
138 wait (bool, optional): _description_: Wait for the running tasks to finish. Defaults to True.
141 int: _description_: will return self.success if it succeeds, otherwise self.error
144 return self.
stop(wait=wait)
145 except RuntimeError
as e:
147 f
"Runtime Error for start. {e}",
152 def _to_dict(self, data: Union[Any,
None] =
None) -> dict:
154 Convert any data input into a dictionnary.
156 data (Union[Any, None], optional): _description_. Defaults to None. This is the data you are providing.
159 dict: _description_: A dictionnary crea ted with what could be extracted from the data.
162 return {
"none":
None}
163 if isinstance(data, dict)
is True:
165 if isinstance(data, (list, tuple))
is True:
170 return {
"data": data}
172 def add_task(self, func: callable, args: Union[Tuple,
None] =
None, kwargs: Union[Dict,
None] =
None, trigger: Union[str, Any] =
"interval", seconds: int = 5) -> Union[Job,
None]:
174 Function in charge of adding an automated call to functions that are meant to run in the background.
175 They are meant to run on interval.
178 func (callable): _description_: The function to be called when it is time to run the job
179 args (Union[Tuple, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
180 kwargs (Union[Dict, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
181 trigger (Union[str, Any], optional): _description_. Defaults to "interval".
182 seconds (int, optional): _description_. Defaults to 5. The amount of seconds to wait before executing the task again (I don't think it is effective for the cron option)
185 Union[int,Job]: _description_: will raise a ValueError when an error occurs, otherwise, returns an instance of Job.
187 if callable(func)
is False:
189 f
"The provided function is not callable: {func}.",
192 raise ValueError(
"The function must be callable.")
193 if args
is not None and isinstance(args, tuple)
is False:
194 msg = f
"The provided args for {func.__name__} are not tuples.\n"
195 msg += f
"Converting args: '{args}' to tuples."
196 self.
disp.log_warning(msg,
"add_task")
197 args = tuple((args,))
198 if kwargs
is not None and isinstance(kwargs, dict)
is False:
199 msg = f
"The provided kwargs for {func.__name__}"
200 msg +=
"are not dictionaries.\n"
201 msg += f
"Converting kwargs: '{kwargs}' to dictionaries."
202 self.
disp.log_warning(msg,
"add_task")
204 self.
disp.log_warning(f
"Converted data = {kwargs}.",
"add_task")
205 if trigger
is not None and isinstance(trigger, str)
is False:
207 f
"The provided trigger is not a string: {trigger}.",
210 raise ValueError(
"The trigger must be a string.")
211 if isinstance(seconds, int)
is False:
213 f
"The provided seconds is not an integer: {seconds}.",
216 raise ValueError(
"The seconds must be an integer.")
217 msg = f
"Adding job: {func.__name__} "
218 msg += f
"with trigger: {trigger}, "
219 msg += f
"seconds = {seconds}, "
220 msg += f
"args = {args}, "
221 msg += f
"kwargs = {kwargs}."
222 self.
disp.log_debug(msg,
"add_task")
231 def start(self) -> Union[int, None]:
233 The function in charge of starting the scheduler loop.
236 RuntimeError: _description_: Will raise a runtile error if the underlying functions failled.
239 Union[int, None]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
243 self.
disp.log_info(
"Scheduler started...",
"start")
245 except SchedulerAlreadyRunningError:
246 self.
disp.log_info(
"Scheduler is already running...",
"start")
248 except RuntimeError
as e:
250 f
"An error occurred while starting the scheduler: {e}",
253 msg = f
"Error({self.__class__.__name__}): "
254 msg +=
"Failed to call the scheduler's start wrapper function."
255 raise RuntimeError(msg)
from e
256 except Exception
as e:
258 f
"An error occurred while starting the scheduler: {e}",
"start"
260 msg = f
"Error({self.__class__.__name__}): "
261 msg +=
"Failed to call the scheduler's start wrapper function."
262 raise RuntimeError(msg)
from e
264 def pause(self, pause: bool =
True) -> Union[int,
None]:
266 This function is in charge of pausing the scheduler if it was running.
269 pause (bool, optional): _description_: This is the boolean that will determine if the scheduler should be paused or not. Defaults to True.
272 Union[int, None]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
277 self.
disp.log_info(
"Scheduler paused.",
"pause")
280 self.
disp.log_info(
"Scheduler resumed.",
"pause")
282 except Exception
as e:
284 f
"An error occurred while pausing the scheduler: {e}",
287 msg = f
"Error({self.__class__.__name__}): "
288 msg +=
"Failed to call the chron pause wrapper function."
289 raise RuntimeError(msg)
from e
293 This function is in charge of resuming the scheduler loop if it was paused.
296 Union[int]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
298 return self.
pause(pause=
False)
300 def stop(self, wait: bool =
True) -> Union[int,
None]:
302 This function is responsible for shutting down the scheduler, terminating any running jobs, and optionally waiting for those jobs to complete before exiting.
305 wait (bool, optional): _description_. Defaults to True. Wait for the running tasks to finish.
308 RuntimeError: _description_: The function failed to call the underlying processes that were required for it to run.
311 Union[int, None]: _description_: will return self.success if it succeeds, or none if it raised an error.
315 self.
disp.log_info(
"Scheduler stopped.",
"stop")
317 except SchedulerNotRunningError:
318 self.
disp.log_info(
"Scheduler is already stopped.",
"stop")
320 except Exception
as e:
322 f
"An error occurred while stopping the scheduler: {e}",
"stop"
324 msg = f
"Error({self.__class__.__name__}): "
325 msg +=
"Failed to call the chron stop wrapper function."
326 raise RuntimeError(msg)
from e
329if __name__ ==
"__main__":
331 from time
import sleep
332 from datetime
import datetime
336 This is a test function that will print the current date.
338 date (datetime): _description_
343 date = datetime.now()
344 if callable(date)
is True:
345 print(f
"(test_current_date) (Called) Current date: {date()}")
347 print(f
"(test_current_date) (Not called) Current date: {date}",)
351 This is a test function that will print "Hello, World!"
353 print(
"Hello, World!")
357 This is a test function that will print "Pending, World!"
359 print(
"Pending, World!")
363 This is a test function that will print "Goodbye, World!"
365 print(
"Goodbye, World!")
367 print(
"Testing declared functions.")
372 print(
"Declared functions tested.")
382 MAIN_THREAD_DELAY = int((SECONDS*NB_FUNCTIONS)*NB_REPEATS)
385 f
"Statuses:\nSUCCESS = {SUCCES}, ERROR = {ERROR}\n"
386 f
"DEBUG = {DEBUG}, KIND_KILL = {KIND_KILL}, "
387 f
"NB_REPEATS = {NB_REPEATS}, "
388 f
"TRIGGER = {TRIGGER}, SECONDS = {SECONDS}, "
389 f
"NB_FUNCTIONS = {NB_FUNCTIONS}, "
390 f
"MAIN_THREAD_DELAY = {MAIN_THREAD_DELAY}"
393 print(
"Initialising class BackgroundTasks.")
399 print(
"Class BackgroundTasks initialised.")
401 print(
"Adding tasks to the scheduler.")
402 status = BTI.safe_add_task(
403 func=test_current_date,
404 args=(datetime.now,),
409 print(f
"status {status}")
410 status = BTI.add_task(
417 print(f
"status {status}")
418 status = BTI.safe_add_task(
425 print(f
"status {status}")
426 status = BTI.add_task(
433 print(f
"status {status}")
434 status = BTI.add_task(
435 func=test_current_date,
441 print(f
"status {status}")
442 print(
"Added tasks to the scheduler.")
444 print(
"Startins scheduler.")
445 print(f
"Status: {BTI.safe_start()}")
446 print(
"Scheduler started.")
447 print(f
"Waiting {MAIN_THREAD_DELAY} on the main thread.")
448 sleep(MAIN_THREAD_DELAY)
449 print(f
"Waited {MAIN_THREAD_DELAY} on the main thread.")
450 print(
"Stopping scheduler.")
451 status = BTI.safe_stop(KIND_KILL)
452 print(f
"Status: {status}")
Union[Job, None] add_task(self, callable func, Union[Tuple, None] args=None, Union[Dict, None] kwargs=None, Union[str, Any] trigger="interval", int seconds=5)
Union[int, Job] safe_add_task(self, callable func, Union[Tuple, None] args=None, Union[Dict, None] kwargs=None, Union[str, Any] trigger="interval", int seconds=5)
None __init__(self, int success=0, int error=84, bool debug=False)
Union[int, None] start(self)
dict _to_dict(self, Union[Any, None] data=None)
int safe_pause(self, bool pause=True)
Union[int, None] pause(self, bool pause=True)
int safe_stop(self, bool wait=True)
Union[int, None] stop(self, bool wait=True)
None test_current_date(*Any args)