Terarea  2
The automation project
Loading...
Searching...
No Matches
background_tasks.py
Go to the documentation of this file.
7
8"""
9 File in charge of setting up the cron jobs for the server.
10"""
11
12from typing import Union, Any, Dict, Tuple
13from apscheduler.job import Job
14from apscheduler.schedulers.background import BackgroundScheduler
15from apscheduler.schedulers import SchedulerAlreadyRunningError, SchedulerNotRunningError
16from display_tty import Disp, TOML_CONF, FILE_DESCRIPTOR, SAVE_TO_FILE, FILE_NAME
17
18
20 """_summary_
21 This is the class that is in charge of scheduling background tasks that need to run on intervals
22 """
23
24 def __init__(self, success: int = 0, error: int = 84, debug: bool = False) -> None:
25 # -------------------------- Inherited values --------------------------
26 self.success: int = success
27 self.error: int = error
28 self.debug: bool = debug
29 # ------------------------ The scheduler class ------------------------
30 self.scheduler = BackgroundScheduler()
31 # ------------------------ The logging function ------------------------
32 self.disp: Disp = Disp(
33 TOML_CONF,
34 FILE_DESCRIPTOR,
35 SAVE_TO_FILE,
36 FILE_NAME,
37 debug=self.debug,
38 logger=self.__class__.__name__
39 )
40
41 def __del__(self) -> None:
42 """_summary_
43 The destructor of the class
44 """
45 self.disp.log_info("Stopping background tasks.", "__del__")
46 exit_code = self.safe_stop()
47 msg = f"The cron exited with status {exit_code}."
48 if exit_code != self.success:
49 self.disp.log_error(msg, "__del__")
50 else:
51 self.disp.log_debug(msg, "__del__")
52
53 def safe_add_task(self, func: callable, args: Union[Tuple, None] = None, kwargs: Union[Dict, None] = None, trigger: Union[str, Any] = "interval", seconds: int = 5) -> Union[int, Job]:
54 """_summary_
55 A non-crashing implementation of the add_task function.
56
57 Args:
58 func (callable): _description_: The function to be called when it is time to run the job
59 args (Union[Tuple, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
60 kwargs (Union[Dict, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
61 trigger (Union[str, Any], optional): _description_. Defaults to "interval".
62 seconds (int, optional): _description_. Defaults to 5. The amount of seconds to wait before executing the task again (I don't think it is effective for the cron option)
63
64 Returns:
65 Union[int, Job]: _description_: returns self.error if there was an error, otherwise, returns a Job instance.
66 """
67 try:
68 return self.add_task(
69 func=func,
70 args=args,
71 kwargs=kwargs,
72 trigger=trigger,
73 seconds=seconds
74 )
75 except ValueError as e:
76 self.disp.log_error(
77 f"Runtime Error for add_task. {e}",
78 "safe_add_task"
79 )
80 return self.error
81
82 def safe_start(self) -> int:
83 """_summary_
84 This function is in charge of starting the scheduler. In a non-breaking way.
85
86 Returns:
87 int: _description_: Will return self.success if it worked, otherwise self.error.
88 """
89 try:
90 return self.start()
91 except RuntimeError as e:
92 self.disp.log_error(
93 f"Runtime Error for start. {e}",
94 "safe_start"
95 )
96 return self.error
97
98 def safe_pause(self, pause: bool = True) -> int:
99 """_summary_
100 This function is in charge of pausing the scheduler. In a non-breaking way.
101
102 Args:
103 pause (bool, optional): _description_: This is the boolean that will determine if the scheduler should be paused or not. Defaults to True.
104
105 Returns:
106 int: _description_: Will return self.success if it worked, otherwise self.error
107 """
108 try:
109 return self.pause(pause=pause)
110 except RuntimeError as e:
111 self.disp.log_error(
112 f"Runtime Error for start. {e}",
113 "safe_pause"
114 )
115 return self.error
116
117 def safe_resume(self) -> int:
118 """_summary_
119 This function is in charge of resuming the scheduler. In a non-breaking way.
120
121 Returns:
122 int: _description_: Will return self.success if it worked, otherwise self.error.
123 """
124 try:
125 return self.resume()
126 except RuntimeError as e:
127 self.disp.log_error(
128 f"Runtime Error for start. {e}",
129 "safe_resume"
130 )
131 return self.error
132
133 def safe_stop(self, wait: bool = True) -> int:
134 """_summary_
135 This function is in charge of stopping the scheduler. In a non-breaking way.
136
137 Args:
138 wait (bool, optional): _description_: Wait for the running tasks to finish. Defaults to True.
139
140 Returns:
141 int: _description_: will return self.success if it succeeds, otherwise self.error
142 """
143 try:
144 return self.stop(wait=wait)
145 except RuntimeError as e:
146 self.disp.log_error(
147 f"Runtime Error for start. {e}",
148 "safe_stop"
149 )
150 return self.error
151
152 def _to_dict(self, data: Union[Any, None] = None) -> dict:
153 """_summary_
154 Convert any data input into a dictionnary.
155 Args:
156 data (Union[Any, None], optional): _description_. Defaults to None. This is the data you are providing.
157
158 Returns:
159 dict: _description_: A dictionnary crea ted with what could be extracted from the data.
160 """
161 if data is None:
162 return {"none": None}
163 if isinstance(data, dict) is True:
164 return data
165 if isinstance(data, (list, tuple)) is True:
166 res = {}
167 for i in list(data):
168 res[i] = None
169 return res
170 return {"data": data}
171
172 def add_task(self, func: callable, args: Union[Tuple, None] = None, kwargs: Union[Dict, None] = None, trigger: Union[str, Any] = "interval", seconds: int = 5) -> Union[Job, None]:
173 """_summary_
174 Function in charge of adding an automated call to functions that are meant to run in the background.
175 They are meant to run on interval.
176
177 Args:
178 func (callable): _description_: The function to be called when it is time to run the job
179 args (Union[Tuple, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
180 kwargs (Union[Dict, None], optional): _description_. Defaults to None.: Arguments you wish to pass to the function when executed.
181 trigger (Union[str, Any], optional): _description_. Defaults to "interval".
182 seconds (int, optional): _description_. Defaults to 5. The amount of seconds to wait before executing the task again (I don't think it is effective for the cron option)
183
184 Returns:
185 Union[int,Job]: _description_: will raise a ValueError when an error occurs, otherwise, returns an instance of Job.
186 """
187 if callable(func) is False:
188 self.disp.log_error(
189 f"The provided function is not callable: {func}.",
190 "add_task"
191 )
192 raise ValueError("The function must be callable.")
193 if args is not None and isinstance(args, tuple) is False:
194 msg = f"The provided args for {func.__name__} are not tuples.\n"
195 msg += f"Converting args: '{args}' to tuples."
196 self.disp.log_warning(msg, "add_task")
197 args = tuple((args,))
198 if kwargs is not None and isinstance(kwargs, dict) is False:
199 msg = f"The provided kwargs for {func.__name__}"
200 msg += "are not dictionaries.\n"
201 msg += f"Converting kwargs: '{kwargs}' to dictionaries."
202 self.disp.log_warning(msg, "add_task")
203 kwargs = self._to_dict(kwargs)
204 self.disp.log_warning(f"Converted data = {kwargs}.", "add_task")
205 if trigger is not None and isinstance(trigger, str) is False:
206 self.disp.log_error(
207 f"The provided trigger is not a string: {trigger}.",
208 "add_task"
209 )
210 raise ValueError("The trigger must be a string.")
211 if isinstance(seconds, int) is False:
212 self.disp.log_error(
213 f"The provided seconds is not an integer: {seconds}.",
214 "add_task"
215 )
216 raise ValueError("The seconds must be an integer.")
217 msg = f"Adding job: {func.__name__} "
218 msg += f"with trigger: {trigger}, "
219 msg += f"seconds = {seconds}, "
220 msg += f"args = {args}, "
221 msg += f"kwargs = {kwargs}."
222 self.disp.log_debug(msg, "add_task")
223 return self.scheduler.add_job(
224 func=func,
225 trigger=trigger,
226 seconds=seconds,
227 args=args,
228 kwargs=kwargs
229 )
230
231 def start(self) -> Union[int, None]:
232 """_summary_
233 The function in charge of starting the scheduler loop.
234
235 Raises:
236 RuntimeError: _description_: Will raise a runtile error if the underlying functions failled.
237
238 Returns:
239 Union[int, None]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
240 """
241 try:
242 self.scheduler.start()
243 self.disp.log_info("Scheduler started...", "start")
244 return self.success
245 except SchedulerAlreadyRunningError:
246 self.disp.log_info("Scheduler is already running...", "start")
247 return self.success
248 except RuntimeError as e:
249 self.disp.log_error(
250 f"An error occurred while starting the scheduler: {e}",
251 "start"
252 )
253 msg = f"Error({self.__class__.__name__}): "
254 msg += "Failed to call the scheduler's start wrapper function."
255 raise RuntimeError(msg) from e
256 except Exception as e:
257 self.disp.log_error(
258 f"An error occurred while starting the scheduler: {e}", "start"
259 )
260 msg = f"Error({self.__class__.__name__}): "
261 msg += "Failed to call the scheduler's start wrapper function."
262 raise RuntimeError(msg) from e
263
264 def pause(self, pause: bool = True) -> Union[int, None]:
265 """_summary_
266 This function is in charge of pausing the scheduler if it was running.
267
268 Args:
269 pause (bool, optional): _description_: This is the boolean that will determine if the scheduler should be paused or not. Defaults to True.
270
271 Returns:
272 Union[int, None]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
273 """
274 try:
275 if pause is True:
276 self.scheduler.pause()
277 self.disp.log_info("Scheduler paused.", "pause")
278 else:
279 self.scheduler.resume()
280 self.disp.log_info("Scheduler resumed.", "pause")
281 return self.success
282 except Exception as e:
283 self.disp.log_error(
284 f"An error occurred while pausing the scheduler: {e}",
285 "pause"
286 )
287 msg = f"Error({self.__class__.__name__}): "
288 msg += "Failed to call the chron pause wrapper function."
289 raise RuntimeError(msg) from e
290
291 def resume(self) -> Union[int]:
292 """_summary_
293 This function is in charge of resuming the scheduler loop if it was paused.
294
295 Returns:
296 Union[int]: _description_: Will return self.success if it worked, otherwise None because it will have raised an error.
297 """
298 return self.pause(pause=False)
299
300 def stop(self, wait: bool = True) -> Union[int, None]:
301 """_summary_
302 This function is responsible for shutting down the scheduler, terminating any running jobs, and optionally waiting for those jobs to complete before exiting.
303
304 Args:
305 wait (bool, optional): _description_. Defaults to True. Wait for the running tasks to finish.
306
307 Raises:
308 RuntimeError: _description_: The function failed to call the underlying processes that were required for it to run.
309
310 Returns:
311 Union[int, None]: _description_: will return self.success if it succeeds, or none if it raised an error.
312 """
313 try:
314 self.scheduler.shutdown(wait=wait)
315 self.disp.log_info("Scheduler stopped.", "stop")
316 return self.success
317 except SchedulerNotRunningError:
318 self.disp.log_info("Scheduler is already stopped.", "stop")
319 return self.success
320 except Exception as e:
321 self.disp.log_error(
322 f"An error occurred while stopping the scheduler: {e}", "stop"
323 )
324 msg = f"Error({self.__class__.__name__}): "
325 msg += "Failed to call the chron stop wrapper function."
326 raise RuntimeError(msg) from e
327
328
329if __name__ == "__main__":
330 import sys
331 from time import sleep
332 from datetime import datetime
333
334 def test_current_date(*args: Any) -> None:
335 """_summary_
336 This is a test function that will print the current date.
337 Args:
338 date (datetime): _description_
339 """
340 if len(args) >= 1:
341 date = args[0]
342 else:
343 date = datetime.now()
344 if callable(date) is True:
345 print(f"(test_current_date) (Called) Current date: {date()}")
346 else:
347 print(f"(test_current_date) (Not called) Current date: {date}",)
348
349 def hello_world() -> None:
350 """_summary_
351 This is a test function that will print "Hello, World!"
352 """
353 print("Hello, World!")
354
355 def pending_world() -> None:
356 """_summary_
357 This is a test function that will print "Pending, World!"
358 """
359 print("Pending, World!")
360
361 def goodbye_world() -> None:
362 """_summary_
363 This is a test function that will print "Goodbye, World!"
364 """
365 print("Goodbye, World!")
366
367 print("Testing declared functions.")
372 print("Declared functions tested.")
373
374 SUCCES = 0
375 ERROR = 84
376 DEBUG = True
377 KIND_KILL = True
378 NB_REPEATS = 2
379 TRIGGER = "interval"
380 SECONDS = 2
381 NB_FUNCTIONS = 5
382 MAIN_THREAD_DELAY = int((SECONDS*NB_FUNCTIONS)*NB_REPEATS)
383
384 print(
385 f"Statuses:\nSUCCESS = {SUCCES}, ERROR = {ERROR}\n"
386 f"DEBUG = {DEBUG}, KIND_KILL = {KIND_KILL}, "
387 f"NB_REPEATS = {NB_REPEATS}, "
388 f"TRIGGER = {TRIGGER}, SECONDS = {SECONDS}, "
389 f"NB_FUNCTIONS = {NB_FUNCTIONS}, "
390 f"MAIN_THREAD_DELAY = {MAIN_THREAD_DELAY}"
391 )
392
393 print("Initialising class BackgroundTasks.")
395 success=SUCCES,
396 error=ERROR,
397 debug=DEBUG
398 )
399 print("Class BackgroundTasks initialised.")
400
401 print("Adding tasks to the scheduler.")
402 status = BTI.safe_add_task(
403 func=test_current_date,
404 args=(datetime.now,),
405 kwargs=None,
406 trigger=TRIGGER,
407 seconds=SECONDS
408 )
409 print(f"status {status}")
410 status = BTI.add_task(
411 hello_world,
412 args=None,
413 kwargs=None,
414 trigger=TRIGGER,
415 seconds=SECONDS
416 )
417 print(f"status {status}")
418 status = BTI.safe_add_task(
419 pending_world,
420 args=None,
421 kwargs=None,
422 trigger=TRIGGER,
423 seconds=SECONDS
424 )
425 print(f"status {status}")
426 status = BTI.add_task(
427 goodbye_world,
428 args=None,
429 kwargs=None,
430 trigger=TRIGGER,
431 seconds=SECONDS
432 )
433 print(f"status {status}")
434 status = BTI.add_task(
435 func=test_current_date,
436 args=datetime.now,
437 kwargs=None,
438 trigger=TRIGGER,
439 seconds=SECONDS
440 )
441 print(f"status {status}")
442 print("Added tasks to the scheduler.")
443
444 print("Startins scheduler.")
445 print(f"Status: {BTI.safe_start()}")
446 print("Scheduler started.")
447 print(f"Waiting {MAIN_THREAD_DELAY} on the main thread.")
448 sleep(MAIN_THREAD_DELAY)
449 print(f"Waited {MAIN_THREAD_DELAY} on the main thread.")
450 print("Stopping scheduler.")
451 status = BTI.safe_stop(KIND_KILL)
452 print(f"Status: {status}")
453 sys.exit(status)
Union[Job, None] add_task(self, callable func, Union[Tuple, None] args=None, Union[Dict, None] kwargs=None, Union[str, Any] trigger="interval", int seconds=5)
Union[int, Job] safe_add_task(self, callable func, Union[Tuple, None] args=None, Union[Dict, None] kwargs=None, Union[str, Any] trigger="interval", int seconds=5)
None __init__(self, int success=0, int error=84, bool debug=False)
dict _to_dict(self, Union[Any, None] data=None)
Union[int, None] pause(self, bool pause=True)
Union[int, None] stop(self, bool wait=True)