#!/usr/bin/env python3 # # Version: 1.0.0 # # Changelog: # 1.0.0 - 11.03.19: First release # # Contributors: # Ralf Stemmer - ralf.stemmer@uni-oldenburg.de # import sys import os import time import argparse import binascii try: from serial import * except: print("\033[1;31mModule \033[1;37mpyserial \033[1;31mmissing!") print("\033[1;34m Try \033[1;36mpip\033[1;30m3\033[1;36m install pyserial\033[1;34m as root to install it.\033[1;30m") exit(1) ShutdownReceiver = False cli = argparse.ArgumentParser( description="Reader for SDF time measurement values.", formatter_class=argparse.ArgumentDefaultsHelpFormatter ) cli.add_argument("-s", "--silent", default=False, action="store_true", help="Do not show progress") cli.add_argument("datadevice", type=str, action="store", help="Path to the serial communication device for reading measurement data.") cli.add_argument("n", type=str, action="store", help="Number of measurements.") cli.add_argument("outfile", type=str, action="store", help="Path where the measured data will be stored.") class HEADER(object): def __init__(self, headerbyte): if type(headerbyte) != bytes: raise TypeError("Header-Byte should be of type bytes as returned by uart.read(1)!") header = int.from_bytes(headerbyte, byteorder="little", signed=False) self.sync = (header >> 7) & 1 self.first = (header >> 6) & 1 self.zero = (header >> 5) & 1 self.overflow = (header >> 4) & 1 self.tmberror = (header >> 3) & 1 self.tmcerror = (header >> 2) & 1 self.size = (header >> 0) & 3 def isValid(self): if self.sync != 1: return False if self.zero != 0: return False return True def hasErrorFlags(self): if self.overflow == 1: return True if self.tmberror == 1: return True if self.tmcerror == 1: return True return False def GetDataSize(self): return self.size + 1 class PACKET(object): def __init__(self, header): if type(header) != HEADER: raise TypeError("header must be an instance of class HEADER!") self.header = header self.data = 0 def AddByte(self, position, byte): if type(position) != int: raise TypeError("position must be of type int") if type(byte) != bytes: raise TypeError("byte must be of type bytes") byte = int.from_bytes(byte, byteorder="little", signed=False) if (byte >> 7) == 1: raise ValueError("Invalid Sync-Bit. Bit is set to 1 but should be 0 for data bytes!") if position >= self.header.GetDataSize(): raise ValueError("Position of byte exceeds range of expected bytes regarding packet header") self.data |= (int(byte&0x7F) << (position*7)) def ReadPacket(uart): # 1.: Read Header (Skip byte if not header) header = None while True: try: byte = uart.read(1) except Exception as e: print("\033[1;31mReading byte from UART failed!\033[0m") raise(e) header = HEADER(byte) if header.isValid(): break print("\033[1;33mInvalid Header \033[1;35m%s\033[1;33m: "%(byte.hex()), end="") if header.sync == 0: print("\033[1;33mSync-Bit is 0 \033[1;30m(This is a data-byte. Will be dropped.)") continue if header.zero != 0: print("\033[1;31mSync-Bit and Zero-Bit are 1 \033[1;30m(This should never happen! Physical error?)") raise ValueError("Sync-Bit and Zero-Bit are 1") packet = PACKET(header) # 2.: Read data bytes for i in range(header.GetDataSize()): try: byte = uart.read(1) except Exception as e: print("\033[1;31mReading byte from UART failed!\033[0m") raise(e) try: packet.AddByte(i, byte) except Exception as e: print("\033[1;31mAdding byte to time value failed with error %s!\033[0m"%(str(e))) raise(e) return packet if __name__ == '__main__': # handle command line arguments args = cli.parse_args() DATADEVICE = args.datadevice OUTFILE = args.outfile N = int(args.n) SILENT = bool(args.silent) # Open terminal device try: datauart = Serial( port = DATADEVICE, baudrate= 115200, xonxoff = 0, rtscts = 0, interCharTimeout=None ) except Exception as e: print("\033[1;31mAccessing \033[1;37m" + DEVICE + " \033[1;31mfailed with the following excpetion:\033[0m") print(e) exit(1) try: outfile = open(OUTFILE, "w") except Exception as e: print("\033[1;31mAccessing \033[1;37m" + OUTFILE + " \033[1;31mfailed with the following excpetion:\033[0m") print(e) exit(1) fails = 0 for i in range(N+1): # Check fails-state if fails > 100: print("\033[1;33mUnexpected high error rate of %i times in a row!\033[0m"%(fails)) print("\033[1;31mExiting...\033[0m") datauart.close() outfile.close() exit(2) # Show progress if not SILENT: print("\033[1;30mGetTime: \033[1;34m[\033[0;36m%2i%%\033[1;30m %7i\033[1;34m]\033[0m"%(int((i*100)/(N+1)), i), end=" ") # Read Packet try: packet = ReadPacket(datauart) except Exception as e: fails += 1 if not SILENT: print("\033[1;31m(%s)\033[0m"%(str(e))) continue # Create file entry header = packet.header value = packet.data if header.hasErrorFlags(): errorlist = list() if header.overflow: errorlist.append("overflow") if header.tmberror: errorlist.append("tmberror") if header.tmcerror: errorlist.append("tmcerror") fails += 1 line = ",".join(errorlist) + "\n" else: fails = 0 line = str(value) + "\n" if not SILENT: value = packet.data print("\033[1;30m0x%X\033[1;35m -> \033[1;37m%i\033[0m"%(value, value), end="") if header.first: print(" \033[1;33mFirst Packet", end="") sys.stdout.flush() outfile.write(line) if not SILENT: print("\033[0m") time.sleep(0.1); datauart.close() outfile.close() # vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4