tests: Add mechanism to start relayd in python testing environment
[lttng-tools.git] / tests / utils / lttngtest / lttngctl.py
index 0301b6e093afd1748e28f49d9547a7dd6116a7fb..5b937dc8acfd8cecc885fdb47af322f75236c91e 100644 (file)
@@ -22,7 +22,8 @@ control functionality that is used by tests.
 """
 
 
-def _generate_random_string(length: int) -> str:
+def _generate_random_string(length):
+    # type: (int) -> str
     return "".join(
         random.choice(string.ascii_lowercase + string.digits) for _ in range(length)
     )
@@ -55,16 +56,22 @@ class VgidContextType(ContextType):
 class JavaApplicationContextType(ContextType):
     """A java application-specific context field is a piece of state which the application provides."""
 
-    def __init__(self, retriever_name: str, field_name: str):
-        self._retriever_name: str = retriever_name
-        self._field_name: str = field_name
+    def __init__(
+        self,
+        retriever_name,  # type: str
+        field_name,  # type: str
+    ):
+        self._retriever_name = retriever_name  # type: str
+        self._field_name = field_name  # type: str
 
     @property
-    def retriever_name(self) -> str:
+    def retriever_name(self):
+        # type: () -> str
         return self._retriever_name
 
     @property
-    def field_name(self) -> str:
+    def field_name(self):
+        # type: () -> str
         return self._field_name
 
 
@@ -82,6 +89,17 @@ class TracingDomain(enum.Enum):
         return "<%s.%s>" % (self.__class__.__name__, self.name)
 
 
+@enum.unique
+class BufferSharingPolicy(enum.Enum):
+    """Buffer sharing policy."""
+
+    PerUID = "Per-UID buffering"
+    PerPID = "Per-PID buffering"
+
+    def __repr__(self):
+        return "<%s.%s>" % (self.__class__.__name__, self.name)
+
+
 class EventRule(abc.ABC):
     """Event rule base class, see LTTNG-EVENT-RULE(7)."""
 
@@ -89,75 +107,278 @@ class EventRule(abc.ABC):
 
 
 class LogLevelRule:
+    def __eq__(self, other):
+        # type (LogLevelRule) -> bool
+        if type(self) != type(other):
+            return False
+
+        return self.level == other.level
+
+
+@enum.unique
+class LogLevel(enum.Enum):
     pass
 
 
+@enum.unique
+class UserLogLevel(LogLevel):
+    EMERGENCY = 0
+    ALERT = 1
+    CRITICAL = 2
+    ERROR = 3
+    WARNING = 4
+    NOTICE = 5
+    INFO = 6
+    DEBUG_SYSTEM = 7
+    DEBUG_PROGRAM = 8
+    DEBUG_PROCESS = 9
+    DEBUG_MODULE = 10
+    DEBUG_UNIT = 11
+    DEBUG_FUNCTION = 12
+    DEBUG_LINE = 13
+    DEBUG = 14
+
+
+@enum.unique
+class JULLogLevel(LogLevel):
+    OFF = 2147483647
+    SEVERE = 1000
+    WARNING = 900
+    INFO = 800
+    CONFIG = 700
+    FINE = 500
+    FINER = 400
+    FINEST = 300
+    ALL = -2147483648
+
+
+@enum.unique
+class Log4jLogLevel(LogLevel):
+    OFF = 2147483647
+    FATAL = 50000
+    ERROR = 40000
+    WARN = 30000
+    INFO = 20000
+    DEBUG = 10000
+    TRACE = 5000
+    ALL = -2147483648
+
+
+@enum.unique
+class PythonLogLevel(LogLevel):
+    CRITICAL = 50
+    ERROR = 40
+    WARNING = 30
+    INFO = 20
+    DEBUG = 10
+    NOTSET = 0
+
+
 class LogLevelRuleAsSevereAs(LogLevelRule):
-    def __init__(self, level: int):
+    def __init__(self, level):
+        # type: (LogLevel)
         self._level = level
 
     @property
-    def level(self) -> int:
+    def level(self):
+        # type: () -> LogLevel
         return self._level
 
 
 class LogLevelRuleExactly(LogLevelRule):
-    def __init__(self, level: int):
+    def __init__(self, level):
+        # type: (LogLevel)
         self._level = level
 
     @property
-    def level(self) -> int:
+    def level(self):
+        # type: () -> LogLevel
         return self._level
 
 
 class TracepointEventRule(EventRule):
     def __init__(
         self,
-        name_pattern: Optional[str] = None,
-        filter_expression: Optional[str] = None,
-        log_level_rule: Optional[LogLevelRule] = None,
-        name_pattern_exclusions: Optional[List[str]] = None,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
     ):
-        self._name_pattern: Optional[str] = name_pattern
-        self._filter_expression: Optional[str] = filter_expression
-        self._log_level_rule: Optional[LogLevelRule] = log_level_rule
-        self._name_pattern_exclusions: Optional[List[str]] = name_pattern_exclusions
+        self._name_pattern = name_pattern  # type: Optional[str]
+        self._filter_expression = filter_expression  # type: Optional[str]
+
+    def _equals(self, other):
+        # type (TracepointEventRule) -> bool
+        # Overridden by derived classes that have supplementary attributes.
+        return True
+
+    def __eq__(self, other):
+        # type (TracepointEventRule) -> bool
+        if type(self) != type(other):
+            return False
+
+        if self.name_pattern != other.name_pattern:
+            return False
+
+        if self.filter_expression != other.filter_expression:
+            return False
+
+        return self._equals(other)
 
     @property
-    def name_pattern(self) -> Optional[str]:
+    def name_pattern(self):
+        # type: () -> Optional[str]
         return self._name_pattern
 
     @property
-    def filter_expression(self) -> Optional[str]:
+    def filter_expression(self):
+        # type: () -> Optional[str]
         return self._filter_expression
 
+
+class UserTracepointEventRule(TracepointEventRule):
+    def __init__(
+        self,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
+        log_level_rule=None,  # type: Optional[LogLevelRule]
+        name_pattern_exclusions=None,  # type: Optional[List[str]]
+    ):
+        TracepointEventRule.__init__(self, name_pattern, filter_expression)
+        self._log_level_rule = log_level_rule  # type: Optional[LogLevelRule]
+        self._name_pattern_exclusions = (
+            name_pattern_exclusions
+        )  # type: Optional[List[str]]
+
+        if log_level_rule and not isinstance(log_level_rule.level, UserLogLevel):
+            raise ValueError("Log level rule must use a UserLogLevel as its value")
+
+    def _equals(self, other):
+        # type (UserTracepointEventRule) -> bool
+        return (
+            self.log_level_rule == other.log_level_rule
+            and self.name_pattern_exclusions == other.name_pattern_exclusions
+        )
+
     @property
-    def log_level_rule(self) -> Optional[LogLevelRule]:
+    def log_level_rule(self):
+        # type: () -> Optional[LogLevelRule]
         return self._log_level_rule
 
     @property
-    def name_pattern_exclusions(self) -> Optional[List[str]]:
+    def name_pattern_exclusions(self):
+        # type: () -> Optional[List[str]]
         return self._name_pattern_exclusions
 
 
-class UserTracepointEventRule(TracepointEventRule):
+class Log4jTracepointEventRule(TracepointEventRule):
     def __init__(
         self,
-        name_pattern: Optional[str] = None,
-        filter_expression: Optional[str] = None,
-        log_level_rule: Optional[LogLevelRule] = None,
-        name_pattern_exclusions: Optional[List[str]] = None,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
+        log_level_rule=None,  # type: Optional[LogLevelRule]
+        name_pattern_exclusions=None,  # type: Optional[List[str]]
     ):
-        TracepointEventRule.__init__(**locals())
+        TracepointEventRule.__init__(self, name_pattern, filter_expression)
+        self._log_level_rule = log_level_rule  # type: Optional[LogLevelRule]
+        self._name_pattern_exclusions = (
+            name_pattern_exclusions
+        )  # type: Optional[List[str]]
+
+        if log_level_rule and not isinstance(log_level_rule.level, Log4jLogLevel):
+            raise ValueError("Log level rule must use a Log4jLogLevel as its value")
+
+    def _equals(self, other):
+        # type (Log4jTracepointEventRule) -> bool
+        return (
+            self.log_level_rule == other.log_level_rule
+            and self.name_pattern_exclusions == other.name_pattern_exclusions
+        )
+
+    @property
+    def log_level_rule(self):
+        # type: () -> Optional[LogLevelRule]
+        return self._log_level_rule
+
+    @property
+    def name_pattern_exclusions(self):
+        # type: () -> Optional[List[str]]
+        return self._name_pattern_exclusions
+
+
+class JULTracepointEventRule(TracepointEventRule):
+    def __init__(
+        self,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
+        log_level_rule=None,  # type: Optional[LogLevelRule]
+        name_pattern_exclusions=None,  # type: Optional[List[str]]
+    ):
+        TracepointEventRule.__init__(self, name_pattern, filter_expression)
+        self._log_level_rule = log_level_rule  # type: Optional[LogLevelRule]
+        self._name_pattern_exclusions = (
+            name_pattern_exclusions
+        )  # type: Optional[List[str]]
+
+        if log_level_rule and not isinstance(log_level_rule.level, JULLogLevel):
+            raise ValueError("Log level rule must use a JULLogLevel as its value")
+
+    def _equals(self, other):
+        # type (JULTracepointEventRule) -> bool
+        return (
+            self.log_level_rule == other.log_level_rule
+            and self.name_pattern_exclusions == other.name_pattern_exclusions
+        )
+
+    @property
+    def log_level_rule(self):
+        # type: () -> Optional[LogLevelRule]
+        return self._log_level_rule
+
+    @property
+    def name_pattern_exclusions(self):
+        # type: () -> Optional[List[str]]
+        return self._name_pattern_exclusions
+
+
+class PythonTracepointEventRule(TracepointEventRule):
+    def __init__(
+        self,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
+        log_level_rule=None,  # type: Optional[LogLevelRule]
+        name_pattern_exclusions=None,  # type: Optional[List[str]]
+    ):
+        TracepointEventRule.__init__(self, name_pattern, filter_expression)
+        self._log_level_rule = log_level_rule  # type: Optional[LogLevelRule]
+        self._name_pattern_exclusions = (
+            name_pattern_exclusions
+        )  # type: Optional[List[str]]
+
+        if log_level_rule and not isinstance(log_level_rule.level, PythonLogLevel):
+            raise ValueError("Log level rule must use a PythonLogLevel as its value")
+
+    def _equals(self, other):
+        # type (PythonTracepointEventRule) -> bool
+        return (
+            self.log_level_rule == other.log_level_rule
+            and self.name_pattern_exclusions == other.name_pattern_exclusions
+        )
+
+    @property
+    def log_level_rule(self):
+        # type: () -> Optional[LogLevelRule]
+        return self._log_level_rule
+
+    @property
+    def name_pattern_exclusions(self):
+        # type: () -> Optional[List[str]]
+        return self._name_pattern_exclusions
 
 
 class KernelTracepointEventRule(TracepointEventRule):
     def __init__(
         self,
-        name_pattern: Optional[str] = None,
-        filter_expression: Optional[str] = None,
-        log_level_rule: Optional[LogLevelRule] = None,
-        name_pattern_exclusions: Optional[List[str]] = None,
+        name_pattern=None,  # type: Optional[str]
+        filter_expression=None,  # type: Optional[str]
     ):
         TracepointEventRule.__init__(**locals())
 
@@ -169,26 +390,31 @@ class Channel(abc.ABC):
     """
 
     @staticmethod
-    def _generate_name() -> str:
+    def _generate_name():
+        # type: () -> str
         return "channel_{random_id}".format(random_id=_generate_random_string(8))
 
     @abc.abstractmethod
-    def add_context(self, context_type: ContextType) -> None:
-        pass
+    def add_context(self, context_type):
+        # type: (ContextType) -> None
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
-    def domain(self) -> TracingDomain:
-        pass
+    def domain(self):
+        # type: () -> TracingDomain
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
-    def name(self) -> str:
-        pass
+    def name(self):
+        # type: () -> str
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def add_recording_rule(self, rule: Type[EventRule]) -> None:
-        pass
+    def add_recording_rule(self, rule) -> None:
+        # type: (Type[EventRule]) -> None
+        raise NotImplementedError
 
 
 class SessionOutputLocation(abc.ABC):
@@ -196,14 +422,27 @@ class SessionOutputLocation(abc.ABC):
 
 
 class LocalSessionOutputLocation(SessionOutputLocation):
-    def __init__(self, trace_path: pathlib.Path):
+    def __init__(self, trace_path):
+        # type: (pathlib.Path)
         self._path = trace_path
 
     @property
-    def path(self) -> pathlib.Path:
+    def path(self):
+        # type: () -> pathlib.Path
         return self._path
 
 
+class NetworkSessionOutputLocation(SessionOutputLocation):
+    def __init__(self, set_url):
+        # type (str)
+        self._set_url = set_url
+
+    @property
+    def url(self):
+        # type: () -> str
+        return self._set_url
+
+
 class ProcessAttributeTracker(abc.ABC):
     """
     Process attribute tracker used to filter before the evaluation of event
@@ -226,167 +465,195 @@ class ProcessAttributeTracker(abc.ABC):
         def __repr__(self):
             return "<%s.%s>" % (self.__class__.__name__, self.name)
 
-    def __init__(self, policy: TrackingPolicy):
+    def __init__(self, policy):
+        # type: (TrackingPolicy)
         self._policy = policy
 
     @property
-    def tracking_policy(self) -> TrackingPolicy:
+    def tracking_policy(self):
+        # type: () -> TrackingPolicy
         return self._policy
 
 
 class ProcessIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, pid: int) -> None:
-        pass
+    def track(self, pid):
+        # type: (int) -> None
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def untrack(self, pid: int) -> None:
-        pass
+    def untrack(self, pid):
+        # type: (int) -> None
+        raise NotImplementedError
 
 
 class VirtualProcessIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, vpid: int) -> None:
-        pass
+    def track(self, vpid):
+        # type: (int) -> None
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def untrack(self, vpid: int) -> None:
-        pass
+    def untrack(self, vpid):
+        # type: (int) -> None
+        raise NotImplementedError
 
 
 class UserIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, uid: Union[int, str]) -> None:
-        pass
+    def track(self, uid):
+        # type: (Union[int, str]) -> None
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def untrack(self, uid: Union[int, str]) -> None:
-        pass
+    def untrack(self, uid):
+        # type: (Union[int, str]) -> None
+        raise NotImplementedError
 
 
 class VirtualUserIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, vuid: Union[int, str]) -> None:
-        pass
+    def track(self, vuid):
+        # type: (Union[int, str]) -> None
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def untrack(self, vuid: Union[int, str]) -> None:
-        pass
+    def untrack(self, vuid):
+        # type: (Union[int, str]) -> None
+        raise NotImplementedError
 
 
 class GroupIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, gid: Union[int, str]) -> None:
-        pass
+    def track(self, gid):
+        # type: (Union[int, str]) -> None
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def untrack(self, gid: Union[int, str]) -> None:
-        pass
+    def untrack(self, gid):
+        # type: (Union[int, str]) -> None
+        raise NotImplementedError
 
 
 class VirtualGroupIDProcessAttributeTracker(ProcessAttributeTracker):
     @abc.abstractmethod
-    def track(self, vgid: Union[int, str]) -> None:
-        pass
+    def track(self, vgid):
+        # type: (Union[int, str]) -> None
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def untrack(self, vgid: Union[int, str]) -> None:
-        pass
+    def untrack(self, vgid):
+        # type: (Union[int, str]) -> None
+        raise NotImplementedError
 
 
 class Session(abc.ABC):
     @staticmethod
-    def _generate_name() -> str:
+    def _generate_name():
+        # type: () -> str
         return "session_{random_id}".format(random_id=_generate_random_string(8))
 
     @property
     @abc.abstractmethod
-    def name(self) -> str:
-        pass
+    def name(self):
+        # type: () -> str
+        raise NotImplementedError
 
     @property
     @abc.abstractmethod
-    def output(self) -> Optional[Type[SessionOutputLocation]]:
-        pass
+    def output(self):
+        # type: () -> Optional[Type[SessionOutputLocation]]
+        raise NotImplementedError
 
     @abc.abstractmethod
     def add_channel(
-        self, domain: TracingDomain, channel_name: Optional[str] = None
-    ) -> Channel:
+        self,
+        domain,
+        channel_name=None,
+        buffer_sharing_policy=BufferSharingPolicy.PerUID,
+    ):
+        # type: (TracingDomain, Optional[str], BufferSharingPolicy) -> Channel
         """Add a channel with default attributes to the session."""
-        pass
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def start(self) -> None:
-        pass
+    def start(self):
+        # type: () -> None
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def stop(self):
+        # type: () -> None
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def destroy(self):
+        # type: () -> None
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def stop(self) -> None:
-        pass
+    def is_active(self):
+        # type: () -> bool
+        raise NotImplementedError
 
     @abc.abstractmethod
-    def destroy(self) -> None:
-        pass
+    def rotate(self):
+        # type: () -> None
+        raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_pid_process_attribute_tracker(
-        self,
-    ) -> Type[ProcessIDProcessAttributeTracker]:
+    def kernel_pid_process_attribute_tracker(self):
+        # type: () -> Type[ProcessIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_vpid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualProcessIDProcessAttributeTracker]:
+    def kernel_vpid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualProcessIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
     def user_vpid_process_attribute_tracker(
         self,
     ) -> Type[VirtualProcessIDProcessAttributeTracker]:
+        # type: () -> Type[VirtualProcessIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_gid_process_attribute_tracker(
-        self,
-    ) -> Type[GroupIDProcessAttributeTracker]:
+    def kernel_gid_process_attribute_tracker(self):
+        # type: () -> Type[GroupIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_vgid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualGroupIDProcessAttributeTracker]:
+    def kernel_vgid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualGroupIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def user_vgid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualGroupIDProcessAttributeTracker]:
+    def user_vgid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualGroupIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_uid_process_attribute_tracker(
-        self,
-    ) -> Type[UserIDProcessAttributeTracker]:
+    def kernel_uid_process_attribute_tracker(self):
+        # type: () -> Type[UserIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def kernel_vuid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualUserIDProcessAttributeTracker]:
+    def kernel_vuid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualUserIDProcessAttributeTracker]
         raise NotImplementedError
 
     @abc.abstractproperty
-    def user_vuid_process_attribute_tracker(
-        self,
-    ) -> Type[VirtualUserIDProcessAttributeTracker]:
+    def user_vuid_process_attribute_tracker(self):
+        # type: () -> Type[VirtualUserIDProcessAttributeTracker]
         raise NotImplementedError
 
 
 class ControlException(RuntimeError):
     """Base type for exceptions thrown by a controller."""
 
-    def __init__(self, msg: str):
+    def __init__(self, msg):
+        # type: (str)
         super().__init__(msg)
 
 
@@ -398,11 +665,114 @@ class Controller(abc.ABC):
     """
 
     @abc.abstractmethod
-    def create_session(
-        self, name: Optional[str] = None, output: Optional[SessionOutputLocation] = None
-    ) -> Session:
+    def create_session(self, name=None, output=None):
+        # type: (Optional[str], Optional[SessionOutputLocation]) -> Session
         """
         Create a session with an output. Don't specify an output
         to create a session without an output.
         """
-        pass
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def start_session_by_name(self, name):
+        # type: (str) -> None
+        """
+        Start a session by name.
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def start_session_by_glob_pattern(self, pattern):
+        # type: (str) -> None
+        """
+        Start sessions whose name matches `pattern`, see GLOB(7).
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def start_sessions_all(self):
+        """
+        Start all sessions visible to the current user.
+        """
+        # type: () -> None
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def stop_session_by_name(self, name):
+        # type: (str) -> None
+        """
+        Stop a session by name.
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def stop_session_by_glob_pattern(self, pattern):
+        # type: (str) -> None
+        """
+        Stop sessions whose name matches `pattern`, see GLOB(7).
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def stop_sessions_all(self):
+        """
+        Stop all sessions visible to the current user.
+        """
+        # type: () -> None
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def destroy_session_by_name(self, name):
+        # type: (str) -> None
+        """
+        Destroy a session by name.
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def destroy_session_by_glob_pattern(self, pattern):
+        # type: (str) -> None
+        """
+        Destroy sessions whose name matches `pattern`, see GLOB(7).
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def destroy_sessions_all(self):
+        # type: () -> None
+        """
+        Destroy all sessions visible to the current user.
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def list_sessions(self):
+        # type: () -> List[Session]
+        """
+        List all sessions visible to the current user.
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def rotate_session_by_name(self, name, wait=True):
+        # type: (str, bool) -> None
+        """
+        Rotate a session
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def schedule_size_based_rotation(self, name, size_bytes):
+        # type: (str, int) -> None
+        """
+        Schedule automatic size-based rotations.
+        """
+        raise NotImplementedError
+
+    @abc.abstractmethod
+    def schedule_time_based_rotation(self, name, period_seconds):
+        # type: (str, int) -> None
+        """
+        Schedule automatic time-based rotations.
+        """
+        raise NotImplementedError
This page took 0.038803 seconds and 4 git commands to generate.