diff --git a/python/fusion_engine_client/applications/p1_capture.py b/python/fusion_engine_client/applications/p1_capture.py index 75cf0732..8e545a6e 100755 --- a/python/fusion_engine_client/applications/p1_capture.py +++ b/python/fusion_engine_client/applications/p1_capture.py @@ -82,8 +82,12 @@ def main(): - kernel-sw - Log kernel SW timestamps. This is only available for socket connections. - hw - Log HW timestamps from device driver. This needs HW driver support. Run `./fusion_engine_client/utils/socket_timestamping.py` to test.""") file_group.add_argument( - '-o', '--output', type=str, - help="If specified, save the incoming data in the specified file.") + '-o', '--output', metavar='PATH', type=str, + help=f"""\ +If specified, save the incoming data in the specified file or transport. + +Supported formats include: +{TRANSPORT_HELP_OPTIONS}""") parser.add_argument( 'transport', type=str, @@ -94,17 +98,24 @@ def main(): if options.quiet: options.display = False + # If the user is sending output to stdout, route all other messages to stderr so the logging prints and the data + # don't get mixed up. Otherwise, print to stdout. + if options.output in ('', '-', 'file://-'): + logging_stream = sys.stderr + else: + logging_stream = sys.stdout + # Configure logging. if options.verbose >= 1: logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s', - stream=sys.stdout) + stream=logging_stream) if options.verbose == 1: logging.getLogger('point_one.fusion_engine.parsers').setLevel(logging.DEBUG) else: logging.getLogger('point_one.fusion_engine.parsers').setLevel( logging.getTraceLevel(depth=options.verbose - 1)) else: - logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout) + logging.basicConfig(level=logging.INFO, format='%(message)s', stream=logging_stream) HighlightFormatter.install(color=True, standoff_level=logging.WARNING) BrokenPipeStreamHandler.install() @@ -130,7 +141,7 @@ def main(): # Connect to the device using the specified transport. try: - transport = create_transport(options.transport) + transport = create_transport(options.transport, mode='input') except Exception as e: _logger.error(str(e)) sys.exit(1) @@ -143,7 +154,10 @@ def main(): if os.path.exists(p1i_path): os.remove(p1i_path) - output_file = open(options.output, 'wb') + output_file = create_transport(options.output, mode='output') + + if isinstance(output_file, VirtualSerial): + _logger.info(f'Writing output to: {output_file}') if options.log_timestamp_source and options.output_format != 'csv': timestamp_file = open(options.output + TIMESTAMP_FILE_ENDING, 'wb') @@ -170,9 +184,11 @@ def main(): # If this is a serial port, configure its read timeout. else: if options.log_timestamp_source and options.log_timestamp_source != 'user-sw': - _logger.error(f'--log-timestamp-source={options.log_timestamp_source} is not supported. Only "user-sw" timestamps are supported on serial port captures.') + _logger.error(f'--log-timestamp-source={options.log_timestamp_source} is not supported. Only "user-sw" timestamps are supported on non-socket captures.') sys.exit(1) - transport.timeout = read_timeout_sec + + if isinstance(transport, serial.Serial): + transport.timeout = read_timeout_sec # Listen for incoming data. decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True) @@ -189,7 +205,7 @@ def main(): def _print_status(now): if options.summary: # Clear the terminal. - print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='') + print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='', file=logging_stream) _logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' % (bytes_received, messages_received, (now - start_time).total_seconds())) if options.summary: @@ -209,7 +225,7 @@ def _print_status(now): received_data, kernel_ts, hw_ts = recv(transport, 1024) else: received_data = [] - # If this is a serial port, we set the read timeout above. + # If this is a serial port or file, we set the read timeout above. else: received_data = transport.read(1024) diff --git a/python/fusion_engine_client/applications/p1_filter.py b/python/fusion_engine_client/applications/p1_filter.py index 35a43fe3..5e9b2bb3 100755 --- a/python/fusion_engine_client/applications/p1_filter.py +++ b/python/fusion_engine_client/applications/p1_filter.py @@ -15,26 +15,59 @@ root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) sys.path.insert(0, root_dir) -from fusion_engine_client.messages import MessagePayload, message_type_by_name +from fusion_engine_client.messages import InputDataType, MessagePayload, MessageType, message_type_by_name from fusion_engine_client.parsers import FusionEngineDecoder from fusion_engine_client.utils.argument_parser import ArgumentParser, ExtendedBooleanAction +from fusion_engine_client.utils.transport_utils import * -if __name__ == "__main__": +def main(): parser = ArgumentParser(description="""\ -Filter FusionEngine data coming through stdin. Examples: - netcat 192.168.1.138 30210 | \ - ./p1_filter.py --blacklist -m GNSSSatellite --display > /tmp/out.p1log +Filter FusionEngine data coming from a device, or via stdin, and send the +filtered result to stdout. + +Examples: + # Remove GNSSSatellite from the data stream of a device connected over TCP. + ./p1_filter.py tcp://192.168.1.138:30202 \ + --invert -m GNSSSatellite --display > /tmp/out.p1log + + # Same as above, but capture data using netcat. + netcat 192.168.1.138 30202 | \ + ./p1_filter.py --invert -m GNSSSatellite --display > /tmp/out.p1log + + # Only keep Pose messages from a recorded data file. cat /tmp/out.p1log | ./p1_filter.py -m Pose > /tmp/pose_out.p1log + + # Only keep Pose messages from an incoming serial data stream. + ./p1_filter.py tty:///dev/ttyUSB0:460800 \ + -m Pose > /tmp/pose_out.p1log + + # Similar to above, but open the serial port manually using stty and cat. stty -F /dev/ttyUSB0 speed 460800 cs8 \ - -cstopb -parenb -icrnl -ixon -ixoff -opost -isig -icanon -echo && \ - cat /dev/ttyUSB0 | \ - ./p1_filter.py -m Pose > /tmp/pose_out.p1log + -cstopb -parenb -icrnl -ixon -ixoff -opost -isig -icanon -echo && \ + cat /dev/ttyUSB0 | \ + ./p1_filter.py -m Pose > /tmp/pose_out.p1log + + # Extract GNSS receiver data in its native format (RTCM, SBF, etc.) from a + # remote Point One device, and pass the data to another application to be + # parsed and displayed. + ./p1_filter.py tcp://192.168.1.138:30202 \ + --unwrap --data-type EXTERNAL_UNFRAMED_GNSS | \ + rtcm_print """) + parser.add_argument( + '-V', '--invert', action=ExtendedBooleanAction, default=False, + help="""\ +If specified, discard all message types specified with --message-type and output everything else. + +By default, all specified message types are output and all others are discarded.""") + parser.add_argument( + '--display', action=ExtendedBooleanAction, default=False, + help="Periodically print status on stderr.") parser.add_argument( '-m', '--message-type', type=str, action='append', - help="An list of class names corresponding with the message types to forward or discard (see --blacklist).\n" + help="An list of class names corresponding with the message types to forward or discard (see --invert).\n" "\n" "May be specified multiple times (-m Pose -m PoseAux), or as a comma-separated list (-m Pose,PoseAux). " "All matches are case-insensitive.\n" @@ -44,20 +77,43 @@ "\n" "Supported types:\n%s" % '\n'.join(['- %s' % c for c in message_type_by_name.keys()])) parser.add_argument( - '--blacklist', action=ExtendedBooleanAction, + '-o', '--output', metavar='PATH', type=str, + help=f"""\ +If specified, write output to the specified file. Otherwise, output is sent to +stdout by default. + +Supported formats include: +{TRANSPORT_HELP_OPTIONS}""") + + wrapper_group = parser.add_argument_group('InputDataWrapper Support') + wrapper_group.add_argument( + '-d', '--data-type', type=str, action='append', + help="If specified, discard InputDataWrapper messages for data types other than the listed values.") + wrapper_group.add_argument( + '-u', '--unwrap', action=ExtendedBooleanAction, default=False, help="""\ -If specified, discard all message types specified with --message-type and output everything else. +Unwrap incoming InputDataWrapper messages and output their contents without FusionEngine framing. Discard all other +FusionEngine messages. + +Note that we strongly recommend using this option with a single --data-type specified. When --data-type is not +specified, or when multiple data types are specified, the unwrapped stream will contain multiple interleaved binary +data streams with no frame alignment enforced.""") -By default, all specified message types are output and all others are discarded.""") parser.add_argument( - '--display', action=ExtendedBooleanAction, - help="Periodically print status on stderr.") + 'input', metavar='PATH', type=str, nargs='?', default='-', + help=TRANSPORT_HELP_STRING) options = parser.parse_args() # If the user specified a set of message names, lookup their type values. Below, we will limit the printout to only # those message types. message_types = set() - if options.message_type is not None: + if options.unwrap: + if options.message_type is not None: + print('Error: You cannot specify both --unwrap and --message-type.') + sys.exit(1) + + message_types = {MessageType.INPUT_DATA_WRAPPER} + elif options.message_type is not None: # Pattern match to any of: # -m Type1 # -m Type1 -m Type2 @@ -73,6 +129,29 @@ print(str(e)) sys.exit(1) + # For InputDataWrapper messages, if the user specified desired data types, limit the output to only those. + input_data_types = set() + if options.data_type is not None: + try: + input_data_types = InputDataType.find_matching_values(options.data_type, prefix='M_TYPE_', print_func=print) + if len(input_data_types) == 0: + # find_matching_values() will print an error. + sys.exit(1) + except ValueError as e: + print(str(e)) + sys.exit(1) + + # Open the output stream/data file. + if options.output is None: + options.output = 'file://-' + output_transport = create_transport(options.output, mode='output', stdout=original_stdout) + if isinstance(output_transport, VirtualSerial): + print(f'Writing output to: {output_transport}') + + # Open the input stream/data file. + input_transport = create_transport(options.input, mode='input') + + # Listen for incoming data. start_time = datetime.now() last_print_time = datetime.now() bytes_received = 0 @@ -80,26 +159,58 @@ messages_received = 0 messages_forwarded = 0 - # Listen for incoming data. decoder = FusionEngineDecoder(return_bytes=True) try: while True: # Need to specify read size or read waits for end of file character. # This returns immediately even if 0 bytes are available. - received_data = sys.stdin.buffer.read(64) + if isinstance(input_transport, socket.socket): + received_data = input_transport.recv(64) + else: + received_data = input_transport.read(64) + if len(received_data) == 0: time.sleep(0.1) else: bytes_received += len(received_data) messages = decoder.on_data(received_data) for (header, message, raw_data) in messages: + # In unwrap mode, discard all but InputDataWrapper messages. + if options.unwrap and header.message_type != MessageType.INPUT_DATA_WRAPPER: + continue + messages_received += 1 - pass_through_message = (options.blacklist and header.message_type not in message_types) or ( - not options.blacklist and header.message_type in message_types) + + # In unwrap mode, the input message is always an InputDataWrapper. + if options.unwrap: + pass_through_message = True + # Otherwise, see if this is in the list of user-specified message types to keep. If the list is + # empty, keep all messages. + else: + pass_through_message = ( + len(message_types) == 0 or + (options.invert and header.message_type not in message_types) or + (not options.invert and header.message_type in message_types) + ) + + # If this is an InputDataWrapper and the user specified a list of data types to keep, keep only the + # messages with that kind of data. If the list is empty, keep all messages. + if pass_through_message and header.message_type == MessageType.INPUT_DATA_WRAPPER: + pass_through_message = ( + len(input_data_types) == 0 or + (options.invert and message.data_type not in input_data_types) or + (not options.invert and message.data_type in input_data_types) + ) + + # If the message passed the filters above, output it now. if pass_through_message: messages_forwarded += 1 - bytes_forwarded += len(raw_data) - original_stdout.buffer.write(raw_data) + if options.unwrap: + bytes_forwarded += len(message.data) + output_transport.write(message.data) + else: + bytes_forwarded += len(raw_data) + output_transport.write(raw_data) if options.display: now = datetime.now() @@ -111,3 +222,7 @@ except KeyboardInterrupt: pass + + +if __name__ == "__main__": + main() diff --git a/python/fusion_engine_client/utils/enum_utils.py b/python/fusion_engine_client/utils/enum_utils.py index dae9c284..7e77654f 100644 --- a/python/fusion_engine_client/utils/enum_utils.py +++ b/python/fusion_engine_client/utils/enum_utils.py @@ -1,7 +1,8 @@ from enum import EnumMeta, IntEnum as IntEnumBase import functools import inspect -from typing import List, Union +import re +from typing import List, Set, Union from aenum import extend_enum @@ -109,6 +110,99 @@ def to_string(self, include_value=True): else: return str(self) + @classmethod + def find_matching_values(cls, pattern: Union[int, str, List[int], List[str]], raise_on_unrecognized: bool = False, + prefix: str = None, print_func=None) -> Set['IntEnum']: + """! + @brief Find one or more enum values that match the specified pattern(s). + + Examples: + ```py + class MyType(IntEnum): + THING_A = 1 + THING_B = 2 + + MyType.find_matching_values('thing_abc') # {MyType.THING_ABC} + MyType.find_matching_values('THING_ABC') # {MyType.THING_ABC} + MyType.find_matching_values('THING_A') # {MyType.THING_ABC} + MyType.find_matching_values('thing') # ValueError - multiple possible matches + MyType.find_matching_values('thing*') # {MyType.THING_DEF, MyType.THING_DEF} + MyType.find_matching_values('thing_abc,thing_def') # {MyType.THING_ABC, MyType.THING_DEF} + MyType.find_matching_values(['thing_abc', 'thing_def']) # {MyType.THING_ABC, MyType.THING_DEF} + MyType.find_matching_values(1) # {MyType.THING_ABC} + MyType.find_matching_values([1, 2]) # {MyType.THING_ABC, MyType.THING_DEF} + MyType.find_matching_values(3) # {MyType._U_3} + ``` + + @param pattern A `list` or a comma-separated string containing one or more search patterns. Patterns may match + part or all of an enum name. Patterns may include wildcards (`*`) to match multiple enums. If no + wildcards are specified and multiple enums match, a single result will be returned if there is an exact + match (e.g., `thing_a` will match to `MyType.THING_ABC`, not `MyType.THING_DEF`). All matches are + case-insensitive. + @param raise_on_unrecognized If `True`, raise an exception for any unrecognized integer values. Unrecognized + values will automatically create new enum entries by default. + @param prefix If specified, prepend the prefix to all string values if they do not already start with it. + + @return A set containing the matching enum values. + """ + # Convert to a list of strings for consistency. + if isinstance(pattern, int): + patterns = [f'{pattern}'] + elif isinstance(pattern, str): + patterns = [pattern] + else: + if len(pattern) > 0 and isinstance(pattern[0], int): + patterns = [f'{p}' for p in pattern] + else: + patterns = pattern + + # Split and flatten comma-separated lists of names/patterns: + # ['VersionInfoMessage', 'PoseMessage,GNSS*'] -> + # ['VersionInfoMessage', 'PoseMessage', 'GNSS*'] + requested_types = [p.strip() for entry in patterns for p in entry.split(',')] + + # Now find matches to each pattern. + result = set() + for pattern in requested_types: + # Check if pattern is the message integer value. + try: + if pattern.lower().startswith('0x'): + int_val = int(pattern, base=16) + else: + int_val = int(pattern) + enum_val = cls(int_val, raise_on_unrecognized=raise_on_unrecognized) + result.add(enum_val) + if str(enum_val) == '(Unrecognized)' and print_func: + print_func(f"{pattern} is an unknown {cls.__name__} value.") + except: + if prefix is not None and not pattern.startswith(prefix): + pattern = f'{prefix}{pattern}' + + allow_multiple = '*' in pattern + re_pattern = pattern.replace('*', '.*') + # if pattern[0] != '^': + # re_pattern = r'.*' + re_pattern + # if pattern[-1] != '$': + # re_pattern += '.*' + + # Check for matches. + matched_types = [v for k, v in cls._member_map_.items() + if re.match(re_pattern, k, flags=re.IGNORECASE)] + if len(matched_types) == 0 and print_func: + print_func("No message types matching pattern '%s'." % pattern) + continue + + # If there are too many matches, fail. + if len(matched_types) > 1 and not allow_multiple: + raise ValueError("Pattern '%s' matches multiple message types:%s\n\nAdd a wildcard (%s*) to display " + "all matching types." % + (pattern, ''.join(['\n %s' % c for c in cls._member_map_.keys()]), pattern)) + # Otherwise, update the set of message types. + else: + result.update(matched_types) + + return result + def enum_bitmask(enum_type, offset=0, define_bits=True, predicate=None): """! diff --git a/python/fusion_engine_client/utils/transport_utils.py b/python/fusion_engine_client/utils/transport_utils.py index deace9df..677059f7 100644 --- a/python/fusion_engine_client/utils/transport_utils.py +++ b/python/fusion_engine_client/utils/transport_utils.py @@ -1,10 +1,11 @@ import re import socket -from typing import Callable, Union +import sys +from typing import BinaryIO, Callable, TextIO, Union +# WebSocket support is optional. To use, install with: +# pip install websockets try: - # WebSocket support is optional. To use, install with: - # pip install websockets import websockets.sync.client as ws ws_supported = True except ImportError: @@ -13,9 +14,9 @@ class ws: class ClientConnection: pass +# Serial port support is optional. To use, install with: +# pip install pyserial try: - # Serial port support is optional. To use, install with: - # pip install pyserial import serial serial_supported = True @@ -38,24 +39,170 @@ class serial: class Serial: pass class SerialException(Exception): pass -TRANSPORT_HELP_STRING = """\ -The method used to communicate with the target device: +# Virtual serial port support is optional. To use, install with: +# pip install pyvirtualserialports pyserial +try: + if not serial_supported: + raise ImportError() + + from virtualserialports import VirtualSerialPorts + virtual_serial_supported = True + + class VirtualSerial(serial.Serial): + def __init__(self): + # Virtual ports work as a pair: + # - ports[0] is used internally by the application to send to/receive from ports[1] + # - ports[1] is what the user actually connects to + # + # Note that baud rate doesn't matter for virtual serial ports. The user can connect to ports[1] with any + # baud rate and it'll work. + self.virtual_serial = VirtualSerialPorts(2) + self.virtual_serial.open() + self.virtual_serial.start() + self.internal_port = self.virtual_serial.ports[0] + self.external_port = self.virtual_serial.ports[1] + super().__init__(port=self.internal_port) + + def close(self): + super().close() + self.virtual_serial.stop() + self.virtual_serial.close() + + def __str__(self): + return f'tty://{self.external_port}' +except ImportError: + virtual_serial_supported = False + class VirtualSerial: pass + + +class FileTransport: + def __init__(self, input: Union[str, BinaryIO, TextIO] = None, output: Union[str, BinaryIO, TextIO] = None): + # If input is a path, open the specified file. If '-', read from stdin. + self.close_input = False + if isinstance(input, str): + if input in ('', '-'): + self.input = sys.stdin.buffer + self.input_path = 'stdin' + else: + self.input = open(input, 'rb') + self.input_path = input + self.close_input = True + # Otherwise, assume input is a file-like object and use it as is. + elif isinstance(input, TextIO): + self.input = input.buffer + self.input_path = input.name if input else None + elif isinstance(input, BinaryIO): + self.input = input + self.input_path = input.name if input else None + elif input is None: + self.input = None + self.input_path = None + else: + raise ValueError('Unsupported input type.') + + # If output is a path, open the specified file. If '-', write to stdout. + self.close_output = False + if isinstance(output, str): + if output in ('', '-'): + self.output = sys.stdout.buffer + self.output_path = 'stdout' + else: + self.output = open(output, 'wb') + self.output_path = output + self.close_output = True + # Otherwise, assume output is a file-like object and use it as is. + elif isinstance(output, TextIO): + self.output = output.buffer + self.output_path = output.name if output else None + elif isinstance(output, BinaryIO): + self.output = output + self.output_path = output.name if output else None + elif output is None: + self.output = None + self.output_path = None + else: + raise ValueError('Unsupported input type.') + + def close(self): + if self.close_input: + self.input.close() + if self.close_output: + self.output.close() + + def read(self, size: int = -1) -> bytes: + if self.input: + return self.input.read(size) + else: + raise RuntimeError('Input file not opened.') + + def write(self, data: Union[bytes, bytearray]) -> int: + if self.output: + return self.output.write(data) + else: + raise RuntimeError('Output file not opened.') + + +TRANSPORT_HELP_OPTIONS = """\ +- - Read from stdin and/or write to stdout +- [file://](PATH|-) - Read from/write to the specified file, or to stdin/stdout + if PATH is '-'. - tcp://HOSTNAME[:PORT] - Connect to the specified hostname (or IP address) and - port over TCP (e.g., tty://192.168.0.3:30202); defaults to port 30200 + port over TCP (e.g., tty://192.168.0.3:30202); defaults to port 30200. - udp://:PORT - Listen for incoming data on the specified UDP port (e.g., - udp://:12345) + udp://:12345). Note: When using UDP, you must configure the device to send data to your machine. +- unix://FILENAME - Connect to the specified UNIX domain socket file. - ws://HOSTNAME:PORT - Connect to the specified hostname (or IP address) and - port over WebSocket (e.g., ws://192.168.0.3:30300) -- unix://FILENAME - Connect to the specified UNIX domain socket file + port over WebSocket (e.g., ws://192.168.0.3:30300). - [(serial|tty)://]DEVICE:BAUD - Connect to a serial device with the specified - baud rate (e.g., tty:///dev/ttyUSB0:460800 or /dev/ttyUSB0:460800) + baud rate (e.g., tty:///dev/ttyUSB0:460800 or /dev/ttyUSB0:460800). +- (serial|tty)://virtual - Create a virtual serial port (PTS) that another + application can connect to. +""" + +TRANSPORT_HELP_STRING = f"""\ +The method used to communicate with the target device: +{TRANSPORT_HELP_OPTIONS} """ -def create_transport(descriptor: str, timeout_sec: float = None, print_func: Callable = None) -> \ - Union[socket.socket, serial.Serial, ws.ClientConnection]: +def create_transport(descriptor: str, timeout_sec: float = None, print_func: Callable = None, mode: str = 'both', + stdout=sys.stdout) -> \ + Union[socket.socket, serial.Serial, ws.ClientConnection, FileTransport]: + # File: path, '-' (stdin/stdout), empty string (stdin/stdout) + if descriptor in ('', '-'): + descriptor = 'file://-' + + m = re.match(r'^(?:file://)?([a-zA-Z0-9-_./]+)$', descriptor) + if m: + path = m.group(1) + if mode == 'both': + if path != '-': + raise ValueError("Cannot open a file for both read and write access.") + + if print_func is not None: + print_func(f'Connecting to stdin/stdout.') + transport = FileTransport(input='-', output='-') + elif mode == 'input': + if print_func is not None: + if path == '-': + print_func(f'Reading from stdin.') + else: + print_func(f'Reading from {path}.') + transport = FileTransport(input=path, output=None) + elif mode == 'output': + if print_func is not None: + if path == '-': + print_func(f'Writing to stdout.') + else: + print_func(f'Writing to {path}.') + transport = FileTransport(input=None, output=stdout.buffer) + else: + raise ValueError(f"Unsupported file mode '{mode}'.") + return transport + + # TCP client m = re.match(r'^tcp://([a-zA-Z0-9-_.]+)?(?::([0-9]+))?$', descriptor) if m: hostname = m.group(1) @@ -73,6 +220,7 @@ def create_transport(descriptor: str, timeout_sec: float = None, print_func: Cal raise socket.timeout(f'Timed out connecting to tcp://{ip_address}:{port}.') return transport + # UDP client m = re.match(r'^udp://:([0-9]+)$', descriptor) if m: port = int(m.group(1)) @@ -86,6 +234,7 @@ def create_transport(descriptor: str, timeout_sec: float = None, print_func: Cal transport.bind(('', port)) return transport + # Websocket client m = re.match(r'^ws://([a-zA-Z0-9-_.]+):([0-9]+)$', descriptor) if m: hostname = m.group(1) @@ -107,6 +256,7 @@ def create_transport(descriptor: str, timeout_sec: float = None, print_func: Cal raise TimeoutError(f'Timed out connecting to {url}.') return transport + # UNIX domain socket m = re.match(r'^unix://([a-zA-Z0-9-_./]+)$', descriptor) if m: path = m.group(1) @@ -119,7 +269,20 @@ def create_transport(descriptor: str, timeout_sec: float = None, print_func: Cal transport.connect(path) return transport - m = re.match(r'^(?:(?:serial|tty)://)?([^:]+)(?::([0-9]+))?$', descriptor) + # Virtual serial port + m = re.match(r'^(?:serial|tty)://virtual$', descriptor) + if m: + if not virtual_serial_supported: + raise RuntimeError(f'Virtual serial port support not found.' + f'Please install (pip install pyvirtualserialports pyserial) and run again.') + + transport = VirtualSerial() + if print_func is not None: + print_func(f'Connecting to {str(transport)}.') + return transport + + # Serial port + m = re.match(r'^(?:(?:serial|tty)://)?([^:]+)(?::([0-9]+))$', descriptor) if m: path = m.group(1) if m.group(2) is None: @@ -136,4 +299,4 @@ def create_transport(descriptor: str, timeout_sec: float = None, print_func: Cal transport = serial.Serial(port=path, baudrate=baud_rate, timeout=timeout_sec) return transport - raise ValueError('Unsupported transport descriptor.') + raise ValueError(f"Unsupported transport descriptor '{descriptor}'.") diff --git a/python/setup.py b/python/setup.py index e0355d90..554a8450 100644 --- a/python/setup.py +++ b/python/setup.py @@ -85,6 +85,7 @@ def find_version(*file_paths): 'p1_capture = fusion_engine_client.applications.p1_capture:main', 'p1_display = fusion_engine_client.applications.p1_display:main', 'p1_extract = fusion_engine_client.applications.p1_extract:main', + 'p1_filter = fusion_engine_client.applications.p1_filter:main', 'p1_lband_extract = fusion_engine_client.applications.p1_lband_extract:main', 'p1_print = fusion_engine_client.applications.p1_print:main', ] diff --git a/python/tests/test_enum_utils.py b/python/tests/test_enum_utils.py index 04769d5b..acc29553 100644 --- a/python/tests/test_enum_utils.py +++ b/python/tests/test_enum_utils.py @@ -75,6 +75,41 @@ def test_unrecognized(Enum): assert int(value) == -2 +def test_find(): + class TestEnum(IntEnum): + THING_ABC = 1 + THING_DEF = 2 + + # Search by integer. + assert TestEnum.find_matching_values(1) == {TestEnum.THING_ABC} + assert TestEnum.find_matching_values(2) == {TestEnum.THING_DEF} + assert TestEnum.find_matching_values([1, 2]) == {TestEnum.THING_ABC, TestEnum.THING_DEF} + assert TestEnum.find_matching_values(0x2) == {TestEnum.THING_DEF} + + assert TestEnum.find_matching_values('THING_ABC') == {TestEnum.THING_ABC} + assert TestEnum.find_matching_values('THING_DEF') == {TestEnum.THING_DEF} + assert TestEnum.find_matching_values(['THING_ABC', 'THING_DEF']) == {TestEnum.THING_ABC, TestEnum.THING_DEF} + + assert TestEnum.find_matching_values(['THING_A']) == {TestEnum.THING_ABC} + assert TestEnum.find_matching_values(['THING*']) == {TestEnum.THING_ABC, TestEnum.THING_DEF} + with pytest.raises(ValueError): + assert TestEnum.find_matching_values(['THING']) + + assert TestEnum.find_matching_values(['ABC', 'DEF'], prefix='THING_') == {TestEnum.THING_ABC, TestEnum.THING_DEF} + assert TestEnum.find_matching_values(['*'], prefix='THING_') == {TestEnum.THING_ABC, TestEnum.THING_DEF} + assert TestEnum.find_matching_values(['G_*'], prefix='THIN') == {TestEnum.THING_ABC, TestEnum.THING_DEF} + + # Unrecognized value. + r = TestEnum.find_matching_values(3) + assert len(r) == 1 and list(r)[0].name == '_U_3' + r = TestEnum.find_matching_values(0x4) + assert len(r) == 1 and list(r)[0].name == '_U_4' + r = TestEnum.find_matching_values('5') + assert len(r) == 1 and list(r)[0].name == '_U_5' + r = TestEnum.find_matching_values('0x6') + assert len(r) == 1 and list(r)[0].name == '_U_6' + + def test_bitmask_decorator(Enum): @enum_bitmask(Enum) class EnumMask: pass