Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 26 additions & 10 deletions python/fusion_engine_client/applications/p1_capture.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,12 @@ def main():
- kernel-sw - Log kernel SW timestamps. This is only available for socket connections.
- hw - Log HW timestamps from device driver. This needs HW driver support. Run `./fusion_engine_client/utils/socket_timestamping.py` to test.""")
file_group.add_argument(
'-o', '--output', type=str,
help="If specified, save the incoming data in the specified file.")
'-o', '--output', metavar='PATH', type=str,
help=f"""\
If specified, save the incoming data in the specified file or transport.

Supported formats include:
{TRANSPORT_HELP_OPTIONS}""")

parser.add_argument(
'transport', type=str,
Expand All @@ -94,17 +98,24 @@ def main():
if options.quiet:
options.display = False

# If the user is sending output to stdout, route all other messages to stderr so the logging prints and the data
# don't get mixed up. Otherwise, print to stdout.
if options.output in ('', '-', 'file://-'):
logging_stream = sys.stderr
else:
logging_stream = sys.stdout

# Configure logging.
if options.verbose >= 1:
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(name)s:%(lineno)d - %(message)s',
stream=sys.stdout)
stream=logging_stream)
if options.verbose == 1:
logging.getLogger('point_one.fusion_engine.parsers').setLevel(logging.DEBUG)
else:
logging.getLogger('point_one.fusion_engine.parsers').setLevel(
logging.getTraceLevel(depth=options.verbose - 1))
else:
logging.basicConfig(level=logging.INFO, format='%(message)s', stream=sys.stdout)
logging.basicConfig(level=logging.INFO, format='%(message)s', stream=logging_stream)

HighlightFormatter.install(color=True, standoff_level=logging.WARNING)
BrokenPipeStreamHandler.install()
Expand All @@ -130,7 +141,7 @@ def main():

# Connect to the device using the specified transport.
try:
transport = create_transport(options.transport)
transport = create_transport(options.transport, mode='input')
except Exception as e:
_logger.error(str(e))
sys.exit(1)
Expand All @@ -143,7 +154,10 @@ def main():
if os.path.exists(p1i_path):
os.remove(p1i_path)

output_file = open(options.output, 'wb')
output_file = create_transport(options.output, mode='output')

if isinstance(output_file, VirtualSerial):
_logger.info(f'Writing output to: {output_file}')

if options.log_timestamp_source and options.output_format != 'csv':
timestamp_file = open(options.output + TIMESTAMP_FILE_ENDING, 'wb')
Expand All @@ -170,9 +184,11 @@ def main():
# If this is a serial port, configure its read timeout.
else:
if options.log_timestamp_source and options.log_timestamp_source != 'user-sw':
_logger.error(f'--log-timestamp-source={options.log_timestamp_source} is not supported. Only "user-sw" timestamps are supported on serial port captures.')
_logger.error(f'--log-timestamp-source={options.log_timestamp_source} is not supported. Only "user-sw" timestamps are supported on non-socket captures.')
sys.exit(1)
transport.timeout = read_timeout_sec

if isinstance(transport, serial.Serial):
transport.timeout = read_timeout_sec

# Listen for incoming data.
decoder = FusionEngineDecoder(warn_on_unrecognized=not options.quiet and not options.summary, return_bytes=True)
Expand All @@ -189,7 +205,7 @@ def main():
def _print_status(now):
if options.summary:
# Clear the terminal.
print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='')
print(colorama.ansi.CSI + 'H' + colorama.ansi.CSI + 'J', end='', file=logging_stream)
_logger.info('Status: [bytes_received=%d, messages_received=%d, elapsed_time=%d sec]' %
(bytes_received, messages_received, (now - start_time).total_seconds()))
if options.summary:
Expand All @@ -209,7 +225,7 @@ def _print_status(now):
received_data, kernel_ts, hw_ts = recv(transport, 1024)
else:
received_data = []
# If this is a serial port, we set the read timeout above.
# If this is a serial port or file, we set the read timeout above.
else:
received_data = transport.read(1024)

Expand Down
157 changes: 136 additions & 21 deletions python/fusion_engine_client/applications/p1_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,59 @@
root_dir = os.path.normpath(os.path.join(os.path.dirname(__file__), '..'))
sys.path.insert(0, root_dir)

from fusion_engine_client.messages import MessagePayload, message_type_by_name
from fusion_engine_client.messages import InputDataType, MessagePayload, MessageType, message_type_by_name
from fusion_engine_client.parsers import FusionEngineDecoder
from fusion_engine_client.utils.argument_parser import ArgumentParser, ExtendedBooleanAction
from fusion_engine_client.utils.transport_utils import *


if __name__ == "__main__":
def main():
parser = ArgumentParser(description="""\
Filter FusionEngine data coming through stdin. Examples:
netcat 192.168.1.138 30210 | \
./p1_filter.py --blacklist -m GNSSSatellite --display > /tmp/out.p1log
Filter FusionEngine data coming from a device, or via stdin, and send the
filtered result to stdout.

Examples:
# Remove GNSSSatellite from the data stream of a device connected over TCP.
./p1_filter.py tcp://192.168.1.138:30202 \
--invert -m GNSSSatellite --display > /tmp/out.p1log

# Same as above, but capture data using netcat.
netcat 192.168.1.138 30202 | \
./p1_filter.py --invert -m GNSSSatellite --display > /tmp/out.p1log

# Only keep Pose messages from a recorded data file.
cat /tmp/out.p1log | ./p1_filter.py -m Pose > /tmp/pose_out.p1log

# Only keep Pose messages from an incoming serial data stream.
./p1_filter.py tty:///dev/ttyUSB0:460800 \
-m Pose > /tmp/pose_out.p1log

# Similar to above, but open the serial port manually using stty and cat.
stty -F /dev/ttyUSB0 speed 460800 cs8 \
-cstopb -parenb -icrnl -ixon -ixoff -opost -isig -icanon -echo && \
cat /dev/ttyUSB0 | \
./p1_filter.py -m Pose > /tmp/pose_out.p1log
-cstopb -parenb -icrnl -ixon -ixoff -opost -isig -icanon -echo && \
cat /dev/ttyUSB0 | \
./p1_filter.py -m Pose > /tmp/pose_out.p1log

# Extract GNSS receiver data in its native format (RTCM, SBF, etc.) from a
# remote Point One device, and pass the data to another application to be
# parsed and displayed.
./p1_filter.py tcp://192.168.1.138:30202 \
--unwrap --data-type EXTERNAL_UNFRAMED_GNSS | \
rtcm_print
""")

parser.add_argument(
'-V', '--invert', action=ExtendedBooleanAction, default=False,
help="""\
If specified, discard all message types specified with --message-type and output everything else.

By default, all specified message types are output and all others are discarded.""")
parser.add_argument(
'--display', action=ExtendedBooleanAction, default=False,
help="Periodically print status on stderr.")
parser.add_argument(
'-m', '--message-type', type=str, action='append',
help="An list of class names corresponding with the message types to forward or discard (see --blacklist).\n"
help="An list of class names corresponding with the message types to forward or discard (see --invert).\n"
"\n"
"May be specified multiple times (-m Pose -m PoseAux), or as a comma-separated list (-m Pose,PoseAux). "
"All matches are case-insensitive.\n"
Expand All @@ -44,20 +77,43 @@
"\n"
"Supported types:\n%s" % '\n'.join(['- %s' % c for c in message_type_by_name.keys()]))
parser.add_argument(
'--blacklist', action=ExtendedBooleanAction,
'-o', '--output', metavar='PATH', type=str,
help=f"""\
If specified, write output to the specified file. Otherwise, output is sent to
stdout by default.

Supported formats include:
{TRANSPORT_HELP_OPTIONS}""")

wrapper_group = parser.add_argument_group('InputDataWrapper Support')
wrapper_group.add_argument(
'-d', '--data-type', type=str, action='append',
help="If specified, discard InputDataWrapper messages for data types other than the listed values.")
wrapper_group.add_argument(
'-u', '--unwrap', action=ExtendedBooleanAction, default=False,
help="""\
If specified, discard all message types specified with --message-type and output everything else.
Unwrap incoming InputDataWrapper messages and output their contents without FusionEngine framing. Discard all other
FusionEngine messages.

Note that we strongly recommend using this option with a single --data-type specified. When --data-type is not
specified, or when multiple data types are specified, the unwrapped stream will contain multiple interleaved binary
data streams with no frame alignment enforced.""")

By default, all specified message types are output and all others are discarded.""")
parser.add_argument(
'--display', action=ExtendedBooleanAction,
help="Periodically print status on stderr.")
'input', metavar='PATH', type=str, nargs='?', default='-',
help=TRANSPORT_HELP_STRING)
options = parser.parse_args()

# If the user specified a set of message names, lookup their type values. Below, we will limit the printout to only
# those message types.
message_types = set()
if options.message_type is not None:
if options.unwrap:
if options.message_type is not None:
print('Error: You cannot specify both --unwrap and --message-type.')
sys.exit(1)

message_types = {MessageType.INPUT_DATA_WRAPPER}
elif options.message_type is not None:
# Pattern match to any of:
# -m Type1
# -m Type1 -m Type2
Expand All @@ -73,33 +129,88 @@
print(str(e))
sys.exit(1)

# For InputDataWrapper messages, if the user specified desired data types, limit the output to only those.
input_data_types = set()
if options.data_type is not None:
try:
input_data_types = InputDataType.find_matching_values(options.data_type, prefix='M_TYPE_', print_func=print)
if len(input_data_types) == 0:
# find_matching_values() will print an error.
sys.exit(1)
except ValueError as e:
print(str(e))
sys.exit(1)

# Open the output stream/data file.
if options.output is None:
options.output = 'file://-'
output_transport = create_transport(options.output, mode='output', stdout=original_stdout)
if isinstance(output_transport, VirtualSerial):
print(f'Writing output to: {output_transport}')

# Open the input stream/data file.
input_transport = create_transport(options.input, mode='input')

# Listen for incoming data.
start_time = datetime.now()
last_print_time = datetime.now()
bytes_received = 0
bytes_forwarded = 0
messages_received = 0
messages_forwarded = 0

# Listen for incoming data.
decoder = FusionEngineDecoder(return_bytes=True)
try:
while True:
# Need to specify read size or read waits for end of file character.
# This returns immediately even if 0 bytes are available.
received_data = sys.stdin.buffer.read(64)
if isinstance(input_transport, socket.socket):
received_data = input_transport.recv(64)
else:
received_data = input_transport.read(64)

if len(received_data) == 0:
time.sleep(0.1)
else:
bytes_received += len(received_data)
messages = decoder.on_data(received_data)
for (header, message, raw_data) in messages:
# In unwrap mode, discard all but InputDataWrapper messages.
if options.unwrap and header.message_type != MessageType.INPUT_DATA_WRAPPER:
continue

messages_received += 1
pass_through_message = (options.blacklist and header.message_type not in message_types) or (
not options.blacklist and header.message_type in message_types)

# In unwrap mode, the input message is always an InputDataWrapper.
if options.unwrap:
pass_through_message = True
# Otherwise, see if this is in the list of user-specified message types to keep. If the list is
# empty, keep all messages.
else:
pass_through_message = (
len(message_types) == 0 or
(options.invert and header.message_type not in message_types) or
(not options.invert and header.message_type in message_types)
)

# If this is an InputDataWrapper and the user specified a list of data types to keep, keep only the
# messages with that kind of data. If the list is empty, keep all messages.
if pass_through_message and header.message_type == MessageType.INPUT_DATA_WRAPPER:
pass_through_message = (
len(input_data_types) == 0 or
(options.invert and message.data_type not in input_data_types) or
(not options.invert and message.data_type in input_data_types)
)

# If the message passed the filters above, output it now.
if pass_through_message:
messages_forwarded += 1
bytes_forwarded += len(raw_data)
original_stdout.buffer.write(raw_data)
if options.unwrap:
bytes_forwarded += len(message.data)
output_transport.write(message.data)
else:
bytes_forwarded += len(raw_data)
output_transport.write(raw_data)

if options.display:
now = datetime.now()
Expand All @@ -111,3 +222,7 @@

except KeyboardInterrupt:
pass


if __name__ == "__main__":
main()
Loading
Loading