Re: Python 2 on Raspberry Pi Linux 7
Posted: Wed Apr 01, 2026 1:35 am
Two way traffic monitor
(th)
Code: Select all
import serial
import time
import threading
import os
import sys
# ------------------------------------------------------------
# Configuration
# ------------------------------------------------------------
PORT_A_NAME = "FLOWARM"
PORT_A_DEV = "/dev/ttyUSB0"
PORT_A_BAUD = 9600
PORT_B_NAME = "LYNXARM"
PORT_B_DEV = "/dev/ttyUSB1"
PORT_B_BAUD = 57600
LOG_DIR = "."
LOG_FILENAME = "serial_bridge_%s.log" % time.strftime("%Y%m%d_%H%M%S")
LOG_PATH = os.path.join(LOG_DIR, LOG_FILENAME)
# ------------------------------------------------------------
# Shared state
# ------------------------------------------------------------
print_lock = threading.Lock()
stop_flag = False
def to_hex(data):
return " ".join(["%02X" % ord(ch) for ch in data])
def to_printable(data):
out = []
for ch in data:
o = ord(ch)
if 32 <= o <= 126:
out.append(ch)
else:
out.append(".")
return "".join(out)
def log_message(logfile, text):
with print_lock:
print text
logfile.write(text + "\n")
logfile.flush()
def log_packet(logfile, direction, packet):
timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
lines = []
lines.append("[%s] %s" % (timestamp, direction))
lines.append("ASCII: %s" % to_printable(packet))
lines.append("HEX : %s" % to_hex(packet))
lines.append("LEN : %d" % len(packet))
lines.append("")
block = "\n".join(lines)
with print_lock:
print block
logfile.write(block + "\n")
logfile.flush()
class BridgeThread(threading.Thread):
def __init__(self, src_ser, dst_ser, direction_label, logfile):
threading.Thread.__init__(self)
self.src_ser = src_ser
self.dst_ser = dst_ser
self.direction_label = direction_label
self.logfile = logfile
self.buffer = []
self.last_was_cr = False
def flush_packet(self):
if self.buffer:
packet = "".join(self.buffer)
log_packet(self.logfile, self.direction_label, packet)
self.buffer = []
def run(self):
global stop_flag
while not stop_flag:
try:
ch = self.src_ser.read(1)
if not ch:
continue
# Immediately forward byte to destination
self.dst_ser.write(ch)
self.dst_ser.flush()
# Ignore NUL for packet display purposes
if ch == "\x00":
continue
# If CRLF, treat as one terminator
if self.last_was_cr and ch == "\n":
self.last_was_cr = False
continue
# Packet boundary on CR or LF
if ch in ("\r", "\n"):
self.flush_packet()
self.last_was_cr = (ch == "\r")
continue
self.last_was_cr = False
self.buffer.append(ch)
except Exception as e:
log_message(self.logfile,
"ERROR in %s: %s" % (self.direction_label, str(e)))
break
# On shutdown, flush any partial packet
self.flush_packet()
def main():
global stop_flag
ser_a = None
ser_b = None
logfile = None
try:
logfile = open(LOG_PATH, "w")
header = []
header.append("Serial bridge log started: %s" % time.strftime("%Y-%m-%d %H:%M:%S"))
header.append("Log file: %s" % LOG_PATH)
header.append("Bridge mode active")
header.append("%s: %s @ %d" % (PORT_A_NAME, PORT_A_DEV, PORT_A_BAUD))
header.append("%s: %s @ %d" % (PORT_B_NAME, PORT_B_DEV, PORT_B_BAUD))
header.append("")
header_text = "\n".join(header)
print header_text
logfile.write(header_text + "\n")
logfile.flush()
ser_a = serial.Serial(PORT_A_DEV, PORT_A_BAUD, timeout=1)
ser_b = serial.Serial(PORT_B_DEV, PORT_B_BAUD, timeout=1)
log_message(logfile, "Opened %s on %s @ %d" % (PORT_A_NAME, PORT_A_DEV, PORT_A_BAUD))
log_message(logfile, "Opened %s on %s @ %d" % (PORT_B_NAME, PORT_B_DEV, PORT_B_BAUD))
log_message(logfile, "Forwarding traffic both directions...")
log_message(logfile, "")
t_ab = BridgeThread(
ser_a, ser_b,
"%s -> %s" % (PORT_A_NAME, PORT_B_NAME),
logfile
)
t_ba = BridgeThread(
ser_b, ser_a,
"%s -> %s" % (PORT_B_NAME, PORT_A_NAME),
logfile
)
t_ab.daemon = True
t_ba.daemon = True
t_ab.start()
t_ba.start()
while True:
time.sleep(0.2)
except KeyboardInterrupt:
print
print "Ctrl+C received. Shutting down cleanly..."
if logfile:
logfile.write("\nCtrl+C received. Shutting down cleanly...\n")
logfile.flush()
stop_flag = True
except Exception as e:
print "Fatal error:", str(e)
if logfile:
logfile.write("Fatal error: %s\n" % str(e))
logfile.flush()
stop_flag = True
finally:
time.sleep(0.5)
if ser_a and ser_a.isOpen():
ser_a.close()
if ser_b and ser_b.isOpen():
ser_b.close()
if logfile:
logfile.write("Log closed: %s\n" % time.strftime("%Y-%m-%d %H:%M:%S"))
logfile.close()
print "Serial ports closed."
print "Log saved to:", LOG_PATH
if __name__ == "__main__":
main()