import pprint
import sys
import time
+import json
from collections import defaultdict
-NSEC_PER_SEC = 1000000000
-
try:
- from babeltrace import TraceCollection
+ import bt2
except ImportError:
# quick fix for debian-based distros
- sys.path.append("/usr/local/lib/python%d.%d/site-packages" %
- (sys.version_info.major, sys.version_info.minor))
- from babeltrace import TraceCollection
+ sys.path.append(
+ "/usr/local/lib/python%d.%d/site-packages"
+ % (sys.version_info.major, sys.version_info.minor)
+ )
+ import bt2
+
+NSEC_PER_SEC = 1000000000
class TraceParser:
- def __init__(self, trace, pid):
- self.trace = trace
+ def __init__(self, trace_msg_iter, pid):
+ self.trace = trace_msg_iter
self.pid = pid
# This dictionnary holds the results of each testcases of a test.
# Each test classes checks the payload of different events. Each of
# those checks are stored in a event_name specific dictionnary in this
# data structure.
- self.expect = defaultdict(lambda : defaultdict(int))
+ self.expect = defaultdict(lambda: defaultdict(int))
# This dictionnary holds the value recorded in the trace that are
# tested. Its content is use to print the values that caused a test to
self.recorded_values = {}
def ns_to_hour_nsec(self, ns):
- d = time.localtime(ns/NSEC_PER_SEC)
- return "%02d:%02d:%02d.%09d" % (d.tm_hour, d.tm_min, d.tm_sec,
- ns % NSEC_PER_SEC)
+ d = time.localtime(ns / NSEC_PER_SEC)
+ return "%02d:%02d:%02d.%09d" % (
+ d.tm_hour,
+ d.tm_min,
+ d.tm_sec,
+ ns % NSEC_PER_SEC,
+ )
def parse(self):
# iterate over all the events
- for event in self.trace.events:
- if self.pid is not None and event["pid"] != self.pid:
+ for msg in self.trace:
+ if type(msg) is not bt2._EventMessageConst:
+ continue
+
+ if self.pid is not None and msg.event["pid"] != self.pid:
continue
- method_name = "handle_%s" % event.name.replace(":", "_").replace(
- "+", "_")
+ method_name = "handle_%s" % msg.event.name.replace(":", "_").replace(
+ "+", "_"
+ )
# call the function to handle each event individually
if hasattr(TraceParser, method_name):
func = getattr(TraceParser, method_name)
- func(self, event)
+ func(self, msg.event)
ret = 0
# For each event of the test case, check all entries for failed
self.epoll_wait_exit(event)
def handle_compat_syscall_entry_epoll_pwait(self, event):
- self.epoll_wait_entry(event)
+ self.epoll_pwait_entry(event)
def handle_compat_syscall_exit_epoll_pwait(self, event):
- self.epoll_wait_exit(event)
+ self.epoll_pwait_exit(event)
def handle_syscall_entry_epoll_pwait(self, event):
- self.epoll_wait_entry(event)
+ self.epoll_pwait_entry(event)
def handle_syscall_exit_epoll_pwait(self, event):
- self.epoll_wait_exit(event)
+ self.epoll_pwait_exit(event)
def epoll_wait_entry(self, event):
pass
def epoll_wait_exit(self, event):
pass
+ def epoll_pwait_entry(self, event):
+ self.epoll_wait_entry(event)
+
+ def epoll_pwait_exit(self, event):
+ self.epoll_wait_exit(event)
+
## poll + ppoll
def handle_compat_syscall_entry_poll(self, event):
self.poll_entry(event)
pass
-class Test1(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class WorkingCases(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
+
+ # Values expected in the trace
+ self.epoll_wait_fd = validation_args["epoll_wait_fd"]
+ self.epoll_pwait_fd = validation_args["epoll_pwait_fd"]
+
self.expect["select_entry"]["select_in_fd0"] = 0
self.expect["select_entry"]["select_in_fd1023"] = 0
self.expect["select_exit"]["select_out_fd0"] = 0
self.expect["epoll_ctl_exit"]["epoll_ctl_out_ok"] = 0
self.expect["epoll_wait_entry"]["epoll_wait_in_ok"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_out_fd0"] = 0
+ self.expect["epoll_pwait_entry"]["epoll_pwait_in_ok"] = 0
+ self.expect["epoll_pwait_exit"]["epoll_pwait_out_fd0"] = 0
def select_entry(self, event):
n = event["n"]
exceptfd_127 = event["exceptfds"][127]
# check that the FD 1023 is actually set in the readfds
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and overflow == 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and overflow == 0
+ ):
self.expect["select_entry"]["select_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
readfd_127 = event["readfds"][127]
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and tvp == 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and tvp == 0
+ ):
self.expect["select_exit"]["select_out_fd1023"] = 1
# Save values of local variables to print in case of test failure
# the raw value matches the events bit field.
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x3 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_entry"]["poll_in_nfds1"] = 1
# Save values of local variables to print in case of test failure
# the raw value matches the events bit field.
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x1 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x1
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_exit"]["poll_out_nfds1"] = 1
# Save values of local variables to print in case of test failure
# check that we have FD 0 waiting for EPOLLIN|EPOLLPRI and that
# data.fd = 0
- if epfd == 3 and op_enum == "EPOLL_CTL_ADD" and fd == 0 and \
- _event["data_union"]["fd"] == 0 and \
- _event["events"]["EPOLLIN"] == 1 and \
- _event["events"]["EPOLLPRI"] == 1:
+ if (
+ (epfd == self.epoll_wait_fd or epfd == self.epoll_pwait_fd)
+ and "EPOLL_CTL_ADD" in op_enum.labels
+ and fd == 0
+ and _event["data_union"]["fd"] == 0
+ and _event["events"]["EPOLLIN"] == 1
+ and _event["events"]["EPOLLPRI"] == 1
+ ):
self.expect["epoll_ctl_entry"]["epoll_ctl_in_add"] = 1
# Save values of local variables to print in case of test failure
maxevents = event["maxevents"]
timeout = event["timeout"]
- if epfd == 3 and maxevents == 1 and timeout == -1:
+ if epfd == self.epoll_wait_fd and maxevents == 1 and timeout == -1:
self.expect["epoll_wait_entry"]["epoll_wait_in_ok"] = 1
# Save values of local variables to print in case of test failure
# check that FD 0 returned with EPOLLIN and the right data.fd
if ret == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if overflow == 0 and fd_0["data_union"]["fd"] == 0 and \
- fd_0["events"]["EPOLLIN"] == 1:
+ if (
+ overflow == 0
+ and fd_0["data_union"]["fd"] == 0
+ and fd_0["events"]["EPOLLIN"] == 1
+ ):
self.expect["epoll_wait_exit"]["epoll_wait_out_fd0"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_wait_exit"] = locals()
+ def epoll_pwait_entry(self, event):
+ epfd = event["epfd"]
+ maxevents = event["maxevents"]
+ timeout = event["timeout"]
+
+ if epfd == self.epoll_pwait_fd and maxevents == 1 and timeout == -1:
+ self.expect["epoll_pwait_entry"]["epoll_pwait_in_ok"] = 1
+
+ # Save values of local variables to print in case of test failure
+ self.recorded_values["epoll_pwait_entry"] = locals()
+
+ def epoll_pwait_exit(self, event):
+ ret = event["ret"]
+ fds_length = event["fds_length"]
+ overflow = event["overflow"]
+
+ # check that FD 0 returned with EPOLLIN and the right data.fd
+ if ret == 1 and fds_length == 1:
+ fd_0 = event["fds"][0]
+ if (
+ overflow == 0
+ and fd_0["data_union"]["fd"] == 0
+ and fd_0["events"]["EPOLLIN"] == 1
+ ):
+ self.expect["epoll_pwait_exit"]["epoll_pwait_out_fd0"] = 1
+
+ # Save values of local variables to print in case of test failure
+ self.recorded_values["epoll_pwait_exit"] = locals()
+
-class Test2(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class WorkingCasesTimeout(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["select_timeout_in_fd0"] = 0
self.expect["select_entry"]["select_timeout_in_fd1023"] = 0
self.expect["select_exit"]["select_timeout_out"] = 0
writefd_127 = event["writefds"][127]
exceptfd_127 = event["exceptfds"][127]
- if readfd_127 == 0x40 and writefd_127 == 0 and \
- exceptfd_127 == 0 and tvp != 0:
+ if (
+ readfd_127 == 0x40
+ and writefd_127 == 0
+ and exceptfd_127 == 0
+ and tvp != 0
+ ):
self.expect["select_entry"]["select_timeout_in_fd1023"] = 1
# Save values of local variables to print in case of test failure
# field matches the value of POLLIN
if nfds == 1 and fds_length == 1:
fd_0 = event["fds"][0]
- if fd_0["raw_events"] == 0x3 and \
- fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ ):
self.expect["poll_entry"]["poll_timeout_in"] = 1
# Save values of local variables to print in case of test failure
_event = event["event"]
# make sure we see a EPOLLIN|EPOLLPRI
- if op_enum == "EPOLL_CTL_ADD" and \
- _event["events"]["EPOLLIN"] == 1 and \
- _event["events"]["EPOLLPRI"] == 1:
+ if (
+ "EPOLL_CTL_ADD" in op_enum.labels
+ and _event["events"]["EPOLLIN"] == 1
+ and _event["events"]["EPOLLPRI"] == 1
+ ):
self.expect["epoll_ctl_entry"]["epoll_ctl_timeout_in_add"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_wait_exit"] = locals()
-class Test3(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PselectInvalidFd(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["select_invalid_fd_in"] = 0
self.expect["select_exit"]["select_invalid_fd_out"] = 0
self.recorded_values["select_exit"] = locals()
-class Test4(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PpollBig(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["big_poll_in"] = 0
self.expect["poll_exit"]["big_poll_out"] = 0
if nfds == 2047 and fds_length == 512 and overflow == 1:
fd_0 = event["fds"][0]
fd_511 = event["fds"][511]
- if fd_0["raw_events"] == 0x3 and fd_0["events"]["POLLIN"] == 1 and \
- fd_0["events"]["padding"] == 0 and \
- fd_511["events"]["POLLIN"] == 1 and \
- fd_511["events"]["POLLPRI"] == 1:
+ if (
+ fd_0["raw_events"] == 0x3
+ and fd_0["events"]["POLLIN"] == 1
+ and fd_0["events"]["padding"] == 0
+ and fd_511["events"]["POLLIN"] == 1
+ and fd_511["events"]["POLLPRI"] == 1
+ ):
self.expect["poll_entry"]["big_poll_in"] = 1
# Save values of local variables to print in case of test failure
# test of big list of FDs and the behaviour of the overflow
if ret == 2047 and nfds == 2047 and fds_length == 512 and overflow == 1:
- fd_0 = event["fds"][0]
- fd_511 = event["fds"][511]
- if fd_0["events"]["POLLIN"] == 1 and fd_511["events"]["POLLIN"] == 1:
- self.expect["poll_exit"]["big_poll_out"] = 1
+ fd_0 = event["fds"][0]
+ fd_511 = event["fds"][511]
+ if fd_0["events"]["POLLIN"] == 1 and fd_511["events"]["POLLIN"] == 1:
+ self.expect["poll_exit"]["big_poll_out"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["poll_exit"] = locals()
-class Test5(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+
+class PpollFdsBufferOverflow(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["poll_overflow_in"] = 0
self.expect["poll_exit"]["poll_overflow_out"] = 0
self.recorded_values["poll_exit"] = locals()
-class Test6(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PselectInvalidPointer(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
self.expect["select_entry"]["pselect_invalid_in"] = 0
self.expect["select_exit"]["pselect_invalid_out"] = 0
self.recorded_values["select_exit"] = locals()
-class Test7(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class PpollFdsULongMax(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
self.expect["poll_entry"]["poll_max_in"] = 0
self.expect["poll_exit"]["poll_max_out"] = 0
# Save values of local variables to print in case of test failure
self.recorded_values["poll_entry"] = locals()
-
def poll_exit(self, event):
ret = event["ret"]
nfds = event["nfds"]
self.recorded_values["poll_exit"] = locals()
-class Test8(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class EpollPwaitInvalidPointer(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
+
+ # Values expected in the trace
+ self.epoll_fd = validation_args["epollfd"]
+
self.expect["epoll_wait_entry"]["epoll_wait_invalid_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_invalid_out"] = 0
# test that event in valid even though the target buffer pointer is
# invalid and the program segfaults
- if epfd == 3 and maxevents == 1 and timeout == -1:
+ if epfd == self.epoll_fd and maxevents == 1 and timeout == -1:
self.expect["epoll_wait_entry"]["epoll_wait_invalid_in"] = 1
# Save values of local variables to print in case of test failure
self.recorded_values["epoll_wait_exit"] = locals()
-class Test9(TraceParser):
- def __init__(self, trace, pid):
- super().__init__(trace, pid)
+class EpollPwaitIntMax(TraceParser):
+ def __init__(self, trace, validation_args):
+ super().__init__(trace, validation_args["pid"])
+
+ # Values expected in the trace
+ self.epoll_fd = validation_args["epollfd"]
+
self.expect["epoll_wait_entry"]["epoll_wait_max_in"] = 0
self.expect["epoll_wait_exit"]["epoll_wait_max_out"] = 0
timeout = event["timeout"]
# check the proper working of INT_MAX maxevent value
- if epfd == 3 and maxevents == 2147483647 and timeout == -1:
+ if epfd == self.epoll_fd and maxevents == 2147483647 and timeout == -1:
self.expect["epoll_wait_entry"]["epoll_wait_max_in"] = 1
# Save values of local variables to print in case of test failure
if __name__ == "__main__":
- parser = argparse.ArgumentParser(description='Trace parser')
- parser.add_argument('path', metavar="<path/to/trace>", help='Trace path')
- parser.add_argument('-t', '--test', type=int, help='Test to validate')
- parser.add_argument('-p', '--pid', type=int, help='PID of the app')
+ parser = argparse.ArgumentParser(description="Trace parser")
+ parser.add_argument("path", metavar="<path/to/trace>", help="Trace path")
+ parser.add_argument("-t", "--test", type=str, help="Test to validate")
+ parser.add_argument(
+ "-o", "--validation-file", type=str, help="Validation file path"
+ )
args = parser.parse_args()
if not args.test:
- print("Need to pass a test to validate (-t)")
+ print("Need to pass a test to validate (--test/-t)")
sys.exit(1)
- if not args.pid:
- print("Need to pass the PID to check (-p)")
+ if not args.validation_file:
+ print("Need to pass the test validation file (--validation-file/-o)")
sys.exit(1)
- traces = TraceCollection()
- handle = traces.add_traces_recursive(args.path, "ctf")
- if handle is None:
- sys.exit(1)
+ traces = bt2.TraceCollectionMessageIterator(args.path)
+
+ with open(args.validation_file) as f:
+ try:
+ test_validation_args = json.load(f)
+ except Exception as e:
+ print("Failed to parse validation file: " + str(e))
+ sys.exit(1)
t = None
- if args.test == 1:
- t = Test1(traces, args.pid)
- elif args.test == 2:
- t = Test2(traces, args.pid)
- elif args.test == 3:
- t = Test3(traces, args.pid)
- elif args.test == 4:
- t = Test4(traces, args.pid)
- elif args.test == 5:
- t = Test5(traces, args.pid)
- elif args.test == 6:
- t = Test6(traces, args.pid)
- elif args.test == 7:
- t = Test7(traces, args.pid)
- elif args.test == 8:
- t = Test8(traces, args.pid)
- elif args.test == 9:
- t = Test9(traces, args.pid)
- elif args.test == 10:
+ if args.test == "working_cases":
+ t = WorkingCases(traces, test_validation_args)
+ elif args.test == "working_cases_timeout":
+ t = WorkingCasesTimeout(traces, test_validation_args)
+ elif args.test == "pselect_invalid_fd":
+ t = PselectInvalidFd(traces, test_validation_args)
+ elif args.test == "ppoll_big":
+ t = PpollBig(traces, test_validation_args)
+ elif args.test == "ppoll_fds_buffer_overflow":
+ t = PpollFdsBufferOverflow(traces, test_validation_args)
+ elif args.test == "pselect_invalid_pointer":
+ t = PselectInvalidPointer(traces, test_validation_args)
+ elif args.test == "ppoll_fds_ulong_max":
+ t = PpollFdsULongMax(traces, test_validation_args)
+ elif args.test == "epoll_pwait_invalid_pointer":
+ t = EpollPwaitInvalidPointer(traces, test_validation_args)
+ elif args.test == "epoll_pwait_int_max":
+ t = EpollPwaitIntMax(traces, test_validation_args)
+ elif args.test == "ppoll_concurrent_write":
# stress test, nothing reliable to check
ret = 0
- elif args.test == 11:
+ elif args.test == "epoll_pwait_concurrent_munmap":
# stress test, nothing reliable to check
ret = 0
else:
if t is not None:
ret = t.parse()
- for h in handle.values():
- traces.remove_trace(h)
-
sys.exit(ret)