tests: Correct timing of python tests with python3 < 3.7
[lttng-tools.git] / tests / utils / lttngtest / tap_generator.py
1 #!/usr/bin/env python3
2 #
3 # Copyright (C) 2022 Jérémie Galarneau <jeremie.galarneau@efficios.com>
4 #
5 # SPDX-License-Identifier: GPL-2.0-only
6 #
7
8 import contextlib
9 import os
10 import sys
11 import time
12 from typing import Iterator, Optional
13
14
15 def _get_time_ns():
16 assert sys.version_info > (3, 3, 0)
17 # time.monotonic_ns is only available for python >= 3.8
18 return time.monotonic() * 1000000000
19
20
21 class InvalidTestPlan(RuntimeError):
22 def __init__(self, msg):
23 # type: (str) -> None
24 super().__init__(msg)
25
26
27 class BailOut(RuntimeError):
28 def __init__(self, msg):
29 # type: (str) -> None
30 super().__init__(msg)
31
32
33 class TestCase:
34 def __init__(
35 self,
36 tap_generator, # type: "TapGenerator"
37 description, # type: str
38 ):
39 self._tap_generator = tap_generator # type: "TapGenerator"
40 self._result = None # type: Optional[bool]
41 self._description = description # type: str
42
43 @property
44 def result(self):
45 # type: () -> Optional[bool]
46 return self._result
47
48 @property
49 def description(self):
50 # type: () -> str
51 return self._description
52
53 def _set_result(self, result):
54 # type: (bool) -> None
55 if self._result is not None:
56 raise RuntimeError("Can't set test case result twice")
57
58 self._result = result
59 self._tap_generator.test(result, self._description)
60
61 def success(self):
62 # type: () -> None
63 self._set_result(True)
64
65 def fail(self):
66 # type: () -> None
67 self._set_result(False)
68
69
70 # Produces a test execution report in the TAP format.
71 class TapGenerator:
72 def __init__(self, total_test_count):
73 # type: (int) -> None
74 if total_test_count <= 0:
75 raise ValueError("Test count must be greater than zero")
76
77 self._total_test_count = total_test_count # type: int
78 self._last_test_case_id = 0 # type: int
79 self._printed_plan = False # type: bool
80 self._has_failure = False # type: bool
81 self._time_tests = True # type: bool
82 if os.getenv("TAP_AUTOTIME", "1") == "" or os.getenv("TAP_AUTOTIME", "1") == "0":
83 self._time_tests = False
84 self._last_time = _get_time_ns()
85
86 def __del__(self):
87 if self.remaining_test_cases > 0:
88 self.bail_out(
89 "Missing {remaining_test_cases} test cases".format(
90 remaining_test_cases=self.remaining_test_cases
91 )
92 )
93
94 @property
95 def remaining_test_cases(self):
96 # type: () -> int
97 return self._total_test_count - self._last_test_case_id
98
99 def _print(self, msg):
100 # type: (str) -> None
101 if not self._printed_plan:
102 print(
103 "1..{total_test_count}".format(total_test_count=self._total_test_count),
104 flush=True,
105 )
106 self._printed_plan = True
107
108 print(msg, flush=True)
109
110 def skip_all(self, reason):
111 # type: (str) -> None
112 if self._last_test_case_id != 0:
113 raise RuntimeError("Can't skip all tests after running test cases")
114
115 if reason:
116 self._print("1..0 # Skip all: {reason}".format(reason=reason))
117
118 self._last_test_case_id = self._total_test_count
119
120 def skip(self, reason, skip_count=1):
121 # type: (str, int) -> None
122 for i in range(skip_count):
123 self._last_test_case_id = self._last_test_case_id + 1
124 self._print(
125 "ok {test_number} # Skip: {reason}".format(
126 reason=reason, test_number=(i + self._last_test_case_id)
127 )
128 )
129
130 def bail_out(self, reason):
131 # type: (str) -> None
132 self._print("Bail out! {reason}".format(reason=reason))
133 self._last_test_case_id = self._total_test_count
134 raise BailOut(reason)
135
136 def test(self, result, description):
137 # type: (bool, str) -> None
138 duration = (_get_time_ns() - self._last_time) / 1000000
139 if self._last_test_case_id == self._total_test_count:
140 raise InvalidTestPlan("Executing too many tests")
141
142 if result is False:
143 self._has_failure = True
144
145 result_string = "ok" if result else "not ok"
146 self._last_test_case_id = self._last_test_case_id + 1
147 self._print(
148 "{result_string} {case_id} - {description}".format(
149 result_string=result_string,
150 case_id=self._last_test_case_id,
151 description=description,
152 )
153 )
154 if self._time_tests:
155 self._print("---\n duration_ms: {}\n...\n".format(duration))
156 self._last_time = _get_time_ns()
157
158 def ok(self, description):
159 # type: (str) -> None
160 self.test(True, description)
161
162 def fail(self, description):
163 # type: (str) -> None
164 self.test(False, description)
165
166 @property
167 def is_successful(self):
168 # type: () -> bool
169 return (
170 self._last_test_case_id == self._total_test_count and not self._has_failure
171 )
172
173 @contextlib.contextmanager
174 def case(self, description):
175 # type: (str) -> Iterator[TestCase]
176 test_case = TestCase(self, description)
177 try:
178 yield test_case
179 except Exception as e:
180 self.diagnostic(
181 "Exception `{exception_type}` thrown during test case `{description}`, marking as failure.".format(
182 description=test_case.description, exception_type=type(e).__name__
183 )
184 )
185
186 if str(e) != "":
187 self.diagnostic(str(e))
188
189 test_case.fail()
190 finally:
191 if test_case.result is None:
192 test_case.success()
193
194 def diagnostic(self, msg):
195 # type: (str) -> None
196 print("# {msg}".format(msg=msg), file=sys.stderr, flush=True)
This page took 0.033514 seconds and 4 git commands to generate.