Terarea  2
The automation project
Loading...
Searching...
No Matches
test_background_tasks.py
Go to the documentation of this file.
1"""_summary_
2 File in charge of testing the background task scheduler function.
3"""
4
5import os
6import sys
7from typing import Any
8from time import sleep
9from datetime import datetime
10
11import pytest
12from apscheduler.job import Job
13import constants as TCONST
14
15sys.path.append(os.getcwd())
16try:
17 from src.lib.components.background_tasks import BackgroundTasks
18except ImportError as e:
19 raise ImportError("Failed to import the src module") from e
20
21
22# Constants
23SUCCESS = TCONST.SUCCESS
24ERROR = TCONST.ERROR
25DEBUG = TCONST.DEBUG
26
27# Create a test function for each test scenario
28
29
30def test_current_date(*args: Any) -> None:
31 """_summary_
32 This is a test function that will print the current date.
33 Args:
34 date (datetime): _description_
35 """
36 if len(args) >= 1:
37 date = args[0]
38 else:
39 date = datetime.now()
40 if callable(date) is True:
41 print(f"(test_current_date) (Called) Current date: {date()}")
42 else:
43 print(f"(test_current_date) (Not called) Current date: {date}",)
44
45
46def hello_world() -> None:
47 """_summary_
48 This is a test function that will print "Hello, World!"
49 """
50 print("Hello, World!")
51
52
53def pending_world() -> None:
54 """_summary_
55 This is a test function that will print "Pending, World!"
56 """
57 print("Pending, World!")
58
59
60def goodbye_world() -> None:
61 """_summary_
62 This is a test function that will print "Goodbye, World!"
63 """
64 print("Goodbye, World!")
65
66
68 """
69 Test that BackgroundTasks initializes with correct parameters.
70 """
71 bgt = BackgroundTasks(success=SUCCESS, error=ERROR, debug=DEBUG)
72 status_success = bgt.success
73 status_error = bgt.error
74 status_debug = bgt.debug
75 del bgt
76 assert status_success == SUCCESS
77 assert status_error == ERROR
78 assert status_debug is DEBUG
79
80
82 """
83 Test safe_add_task adds a task successfully.
84 """
85
86 def mock_func():
87 return "mocked function"
88
89 bgt = BackgroundTasks(success=SUCCESS, error=ERROR, debug=DEBUG)
90
91 status = bgt.safe_add_task(
92 func=mock_func,
93 args=None,
94 kwargs=None,
95 trigger="interval",
96 seconds=2
97 )
98 status = isinstance(status, Job)
99 del bgt
100 assert status is True, "Task failed to be added"
101
102
104 """
105 Test that the scheduler starts and stops correctly.
106 """
107 bgt = BackgroundTasks(success=SUCCESS, error=ERROR, debug=DEBUG)
108 status_start = bgt.safe_start()
109 status_stop = bgt.safe_stop(wait=True)
110 del bgt
111
112 assert status_start == SUCCESS
113 assert status_stop == SUCCESS
114
115
117 """
118 Test adding multiple tasks to the scheduler.
119 """
120
121 bgt = BackgroundTasks(success=SUCCESS, error=ERROR, debug=DEBUG)
122
123 status_task1 = bgt.safe_add_task(
124 func=test_current_date,
125 args=(datetime.now,),
126 kwargs=None,
127 trigger="interval",
128 seconds=2
129 )
130 status_task2 = bgt.add_task(
131 hello_world,
132 args=None,
133 kwargs=None,
134 trigger="interval",
135 seconds=2
136 )
137 status_task3 = bgt.safe_add_task(
138 pending_world,
139 args=None,
140 kwargs=None,
141 trigger="interval",
142 seconds=2
143 )
144 status_task4 = bgt.add_task(
145 goodbye_world,
146 args=None,
147 kwargs=None,
148 trigger="interval",
149 seconds=2
150 )
151 status_start = bgt.safe_start()
152 sleep(10)
153 status_stop = bgt.safe_stop(wait=True)
154 status_task1 = isinstance(status_task1, Job)
155 status_task2 = isinstance(status_task2, Job)
156 status_task3 = isinstance(status_task3, Job)
157 status_task4 = isinstance(status_task4, Job)
158 del bgt
159
160 assert status_task1 is True
161 assert status_task2 is True
162 assert status_task3 is True
163 assert status_task4 is True
164 assert status_start == SUCCESS
165 assert status_stop == SUCCESS
166
167
169 """
170 Test that adding an invalid task (non-callable) raises an error.
171 """
172 bgt = BackgroundTasks(success=SUCCESS, error=ERROR, debug=DEBUG)
173
174 with pytest.raises(ValueError):
175 bgt.add_task(
176 func="not_callable", # this is invalid
177 args=None,
178 kwargs=None,
179 trigger="interval",
180 seconds=2
181 )
182
183
185 """
186 Test safe methods like safe_pause, safe_resume.
187 """
188 bgt = BackgroundTasks(success=SUCCESS, error=ERROR, debug=DEBUG)
189 status_start = bgt.safe_start()
190 status_pause = bgt.safe_pause(pause=True)
191 status_resume = bgt.safe_resume()
192 status_stop = bgt.safe_stop(wait=True)
193 del bgt
194
195 assert status_start == SUCCESS
196 assert status_pause == SUCCESS
197 assert status_resume == SUCCESS
198 assert status_stop == SUCCESS
None test_current_date(*Any args)