summaryrefslogtreecommitdiffhomepage
diff options
context:
space:
mode:
authorPrivate Island Networks Inc <opensource@privateisland.tech>2025-12-21 20:35:27 -0500
committerPrivate Island Networks Inc <opensource@privateisland.tech>2025-12-21 20:35:27 -0500
commit48d05dbc26fb9bfb7814f15be7eb1565487bcedf (patch)
tree81d3f729e5e1a1a47d8b81842f716cd6f34ae23f
parent8363d90c7e3fe116d3a4ce12c3037d875f133a47 (diff)
python: add python unit test scripts
-rw-r--r--python/.gitignore1
-rwxr-xr-xpython/pi_rcv.py37
-rwxr-xr-xpython/pi_send.py74
-rwxr-xr-xpython/pi_shark.py85
4 files changed, 197 insertions, 0 deletions
diff --git a/python/.gitignore b/python/.gitignore
new file mode 100644
index 0000000..0d20b64
--- /dev/null
+++ b/python/.gitignore
@@ -0,0 +1 @@
+*.pyc
diff --git a/python/pi_rcv.py b/python/pi_rcv.py
new file mode 100755
index 0000000..eb9a4d9
--- /dev/null
+++ b/python/pi_rcv.py
@@ -0,0 +1,37 @@
+"""
+ file: pi_rcv.py
+
+ Copyright (C) 2025 Private Island Networks Inc.
+
+"""
+
+import sys
+import argparse
+import asyncio
+import asyncio_dgram
+
+VERSION = '0.1'
+
+async def udp_rcv(args):
+ """ controller response """
+ stream = await asyncio_dgram.bind((args.ipaddr, 0x8000))
+ print(f"Serving on {stream.sockname}")
+ while(True):
+ data, remote_addr = await stream.recv()
+ if args.verbose:
+ print(f"received: {data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}")
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser(description='UDP (datagram) test receiver', epilog='Version: ' + str(VERSION))
+ parser.add_argument('-i','--ipaddr',default="192.168.5.40", help='receive IP address',action='store', required=False)
+ parser.add_argument('-p','--port',default="0x8020", help='receive port',action='store', required=False)
+ parser.add_argument('-v','--verbose',action='store_true', help='be verbose')
+
+ args = parser.parse_args()
+
+ asyncio.run(udp_rcv(args))
+
+ stream.close()
+
+ print('exit')
+
diff --git a/python/pi_send.py b/python/pi_send.py
new file mode 100755
index 0000000..114676c
--- /dev/null
+++ b/python/pi_send.py
@@ -0,0 +1,74 @@
+"""
+ file: pi_send.py
+
+ Copyright (C) 2025 Private Island Networks Inc.
+
+"""
+import sys
+import argparse
+from socket import socket, AF_INET, SOCK_DGRAM
+from time import sleep
+
+VERSION = '0.1' #
+
+# Message Types
+MSG_TYPE_NULL = b'\x00'
+MSG_TYPE_WRITE = b'\x01'
+MSG_TYPE_READ = b'\x02'
+MSG_TYPE_NOTIFY_SUCCESS = b'\x03'
+MSG_TYPE_NOTIFY_ERROR = b'\x04'
+
+
+# Preset Messages
+TEST_MSG = b'\x07\x00\x01\x03\x00\x04\x00'
+
+def send_msg(msg_type, msg_token, src_ipaddress, dest_ipaddress, port, address, data, interval, count):
+ try:
+ sko=socket(AF_INET,SOCK_DGRAM)
+ sko.bind((src_ipaddress,0x3000))
+ if msg_type == MSG_TYPE_WRITE:
+ msg = msg_type + msg_token + address + data
+ elif msg_type == MSG_TYPE_READ:
+ msg = msg_type + msg_token + address + data
+ else: # send something that creates an error
+ msg = MSG_TYPE_NULL + msg_token + address + data
+
+ sko.sendto(msg,(dest_ipaddress,port))
+
+ print("success: message sent: ", count+1)
+
+ except:
+
+ print("Controller Command failed in Exception: {0}", sys.exc_info())
+ sko.close()
+ sys.exit()
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description='Private Island Python Utility Interface', epilog='Version: ' + str(VERSION))
+ parser.add_argument('-a','--address',type=lambda x: int(x,0), default=0x0000, help='message address',action='store',required=False)
+ parser.add_argument('-c','--command',type=str, default="read", help='message type to send',action='store',required=False)
+ parser.add_argument('-d','--data',type=lambda x: int(x,0), default=0x0000, help='message data',action='store',required=False)
+ parser.add_argument('-i','--ipaddress',default="192.168.5.100", help='Destination IP address',action='store', required=False)
+ parser.add_argument('-n','--number',type=int, default=1, help='number of times to send message',action='store',required=False)
+ parser.add_argument('-p','--port',type=lambda x: int(x,0), default=0x9020, help='destination UDP port',action='store',required=False)
+ parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False)
+ parser.add_argument('--token', type=lambda x: int(x,0), default=0x77, help='message token',action='store', required=False)
+ parser.add_argument('-t','--interval',type=float, default=0.100, help='time interval between each message',action='store',required=False)
+ parser.set_defaults()
+ args = parser.parse_args()
+
+ count = 0
+
+ if args.command == "write":
+ msg_type = MSG_TYPE_WRITE
+ else:
+ msg_type = MSG_TYPE_READ
+
+ while count < args.number:
+ send_msg(msg_type, args.token.to_bytes(1), args.srcipaddress, args.ipaddress, args.port, args.address.to_bytes(2,'big'), \
+ args.data.to_bytes(4,'big'), args.interval, count)
+ count = count + 1
+ sleep(args.interval)
+
+ print("exit")
+
diff --git a/python/pi_shark.py b/python/pi_shark.py
new file mode 100755
index 0000000..82b7024
--- /dev/null
+++ b/python/pi_shark.py
@@ -0,0 +1,85 @@
+"""
+ file: pi_shark.py
+
+ Copyright (C) 2025 Private Island Networks Inc.
+
+"""
+import sys
+import argparse
+import platform
+import pyshark
+from socket import socket, AF_INET, SOCK_DGRAM
+from time import sleep
+
+DEBUG = True
+VERSION = 0.1
+
+msg_src ="02bc000000000000"
+msg_dst = "03bc000000000001"
+failures = 0
+packet = None
+
+def play(file, dest_ipaddress, src_ipaddress):
+
+ try:
+ sko=socket(AF_INET,SOCK_DGRAM)
+ sko.bind((src_ipaddress,0x3000))
+ except:
+ print("bind failed in Exception: {0}", sys.exc_info())
+ sko.close()
+ sys.exit()
+
+ time_m1 = 0
+ num_pkts = 0
+ cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101")
+ for pkt in cap:
+ udp_dst_port = pkt.udp.dstport.hex_value
+ num_pkts = num_pkts + 1
+ #data = pkt.data.data.__str__()
+ time = float(pkt.frame_info.get_field("frame.time_relative").__str__()[0:6])
+ if time_m1 != 0:
+ sleep(time-time_m1)
+ try:
+ sko.sendto(pkt.data.data.binary_value,(dest_ipaddress,udp_dst_port))
+ except:
+ print("send failed in Exception: {0}", sys.exc_info())
+ sko.close()
+ sys.exit()
+
+ time_m1 = time
+
+ print('finished. Number of packets transmitted:', num_pkts)
+
+def test(file, src_addr, dst_addr):
+ print("Start Automated Testing\n")
+ cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101")
+ failures = 0
+ print("length of capture: ",len([packet for packet in cap]))
+ for pkt in cap:
+ if (pkt.ip.src == src_addr and pkt.data.data != msg_src):
+ print("testing packet #", pkt.number,': ', pkt.data.data, "failed")
+ failures = failures + 1
+ elif (pkt.ip.src == dst_addr and pkt.data.data != msg_dst):
+ print("testing packet #", pkt.number,': ', pkt.data.data, "failed")
+ failures = failures + 1
+
+ if (failures):
+ print("\nfailures: ", failures)
+ else:
+ print("\nNo Failures!")
+ print("\nfinished")
+
+if __name__ == '__main__':
+ parser = argparse.ArgumentParser(description='PyShark Script', epilog='Version: ' + str(VERSION))
+ parser.add_argument('-d','--dstipaddress',default="192.168.3.100", help='Destination ip address',action='store', required=False)
+ parser.add_argument('-f','--file',default="wireshark/test.pcapng", help='Wireshark log file',action='store', required=False)
+ parser.add_argument('-m','--mode',default="play", help='mode live or file',action='store', required=False)
+ parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False)
+ args = parser.parse_args()
+
+ if args.mode == "play":
+ play(args.file, args.dstipaddress, args.srcipaddress)
+ elif args.mode == "test":
+ test(args.file, args.srcipaddress, args.dstipaddress)
+ else:
+ print("nothing to do, exit")

Highly Recommended Verilog Books