diff options
| -rw-r--r-- | python/.gitignore | 1 | ||||
| -rwxr-xr-x | python/pi_rcv.py | 37 | ||||
| -rwxr-xr-x | python/pi_send.py | 74 | ||||
| -rwxr-xr-x | python/pi_shark.py | 85 |
4 files changed, 197 insertions, 0 deletions
diff --git a/python/.gitignore b/python/.gitignore new file mode 100644 index 0000000..0d20b64 --- /dev/null +++ b/python/.gitignore @@ -0,0 +1 @@ +*.pyc diff --git a/python/pi_rcv.py b/python/pi_rcv.py new file mode 100755 index 0000000..eb9a4d9 --- /dev/null +++ b/python/pi_rcv.py @@ -0,0 +1,37 @@ +""" + file: pi_rcv.py + + Copyright (C) 2025 Private Island Networks Inc. + +""" + +import sys +import argparse +import asyncio +import asyncio_dgram + +VERSION = '0.1' + +async def udp_rcv(args): + """ controller response """ + stream = await asyncio_dgram.bind((args.ipaddr, 0x8000)) + print(f"Serving on {stream.sockname}") + while(True): + data, remote_addr = await stream.recv() + if args.verbose: + print(f"received: {data[0], data[1], data[2], data[3], data[4], data[5], data[6], data[7]}") + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description='UDP (datagram) test receiver', epilog='Version: ' + str(VERSION)) + parser.add_argument('-i','--ipaddr',default="192.168.5.40", help='receive IP address',action='store', required=False) + parser.add_argument('-p','--port',default="0x8020", help='receive port',action='store', required=False) + parser.add_argument('-v','--verbose',action='store_true', help='be verbose') + + args = parser.parse_args() + + asyncio.run(udp_rcv(args)) + + stream.close() + + print('exit') + diff --git a/python/pi_send.py b/python/pi_send.py new file mode 100755 index 0000000..114676c --- /dev/null +++ b/python/pi_send.py @@ -0,0 +1,74 @@ +""" + file: pi_send.py + + Copyright (C) 2025 Private Island Networks Inc. + +""" +import sys +import argparse +from socket import socket, AF_INET, SOCK_DGRAM +from time import sleep + +VERSION = '0.1' # + +# Message Types +MSG_TYPE_NULL = b'\x00' +MSG_TYPE_WRITE = b'\x01' +MSG_TYPE_READ = b'\x02' +MSG_TYPE_NOTIFY_SUCCESS = b'\x03' +MSG_TYPE_NOTIFY_ERROR = b'\x04' + + +# Preset Messages +TEST_MSG = b'\x07\x00\x01\x03\x00\x04\x00' + +def send_msg(msg_type, msg_token, src_ipaddress, dest_ipaddress, port, address, data, interval, count): + try: + sko=socket(AF_INET,SOCK_DGRAM) + sko.bind((src_ipaddress,0x3000)) + if msg_type == MSG_TYPE_WRITE: + msg = msg_type + msg_token + address + data + elif msg_type == MSG_TYPE_READ: + msg = msg_type + msg_token + address + data + else: # send something that creates an error + msg = MSG_TYPE_NULL + msg_token + address + data + + sko.sendto(msg,(dest_ipaddress,port)) + + print("success: message sent: ", count+1) + + except: + + print("Controller Command failed in Exception: {0}", sys.exc_info()) + sko.close() + sys.exit() + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='Private Island Python Utility Interface', epilog='Version: ' + str(VERSION)) + parser.add_argument('-a','--address',type=lambda x: int(x,0), default=0x0000, help='message address',action='store',required=False) + parser.add_argument('-c','--command',type=str, default="read", help='message type to send',action='store',required=False) + parser.add_argument('-d','--data',type=lambda x: int(x,0), default=0x0000, help='message data',action='store',required=False) + parser.add_argument('-i','--ipaddress',default="192.168.5.100", help='Destination IP address',action='store', required=False) + parser.add_argument('-n','--number',type=int, default=1, help='number of times to send message',action='store',required=False) + parser.add_argument('-p','--port',type=lambda x: int(x,0), default=0x9020, help='destination UDP port',action='store',required=False) + parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False) + parser.add_argument('--token', type=lambda x: int(x,0), default=0x77, help='message token',action='store', required=False) + parser.add_argument('-t','--interval',type=float, default=0.100, help='time interval between each message',action='store',required=False) + parser.set_defaults() + args = parser.parse_args() + + count = 0 + + if args.command == "write": + msg_type = MSG_TYPE_WRITE + else: + msg_type = MSG_TYPE_READ + + while count < args.number: + send_msg(msg_type, args.token.to_bytes(1), args.srcipaddress, args.ipaddress, args.port, args.address.to_bytes(2,'big'), \ + args.data.to_bytes(4,'big'), args.interval, count) + count = count + 1 + sleep(args.interval) + + print("exit") + diff --git a/python/pi_shark.py b/python/pi_shark.py new file mode 100755 index 0000000..82b7024 --- /dev/null +++ b/python/pi_shark.py @@ -0,0 +1,85 @@ +""" + file: pi_shark.py + + Copyright (C) 2025 Private Island Networks Inc. + +""" +import sys +import argparse +import platform +import pyshark +from socket import socket, AF_INET, SOCK_DGRAM +from time import sleep + +DEBUG = True +VERSION = 0.1 + +msg_src ="02bc000000000000" +msg_dst = "03bc000000000001" +failures = 0 +packet = None + +def play(file, dest_ipaddress, src_ipaddress): + + try: + sko=socket(AF_INET,SOCK_DGRAM) + sko.bind((src_ipaddress,0x3000)) + except: + print("bind failed in Exception: {0}", sys.exc_info()) + sko.close() + sys.exit() + + time_m1 = 0 + num_pkts = 0 + cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101") + for pkt in cap: + udp_dst_port = pkt.udp.dstport.hex_value + num_pkts = num_pkts + 1 + #data = pkt.data.data.__str__() + time = float(pkt.frame_info.get_field("frame.time_relative").__str__()[0:6]) + if time_m1 != 0: + sleep(time-time_m1) + try: + sko.sendto(pkt.data.data.binary_value,(dest_ipaddress,udp_dst_port)) + except: + print("send failed in Exception: {0}", sys.exc_info()) + sko.close() + sys.exit() + + time_m1 = time + + print('finished. Number of packets transmitted:', num_pkts) + +def test(file, src_addr, dst_addr): + print("Start Automated Testing\n") + cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101") + failures = 0 + print("length of capture: ",len([packet for packet in cap])) + for pkt in cap: + if (pkt.ip.src == src_addr and pkt.data.data != msg_src): + print("testing packet #", pkt.number,': ', pkt.data.data, "failed") + failures = failures + 1 + elif (pkt.ip.src == dst_addr and pkt.data.data != msg_dst): + print("testing packet #", pkt.number,': ', pkt.data.data, "failed") + failures = failures + 1 + + if (failures): + print("\nfailures: ", failures) + else: + print("\nNo Failures!") + print("\nfinished") + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='PyShark Script', epilog='Version: ' + str(VERSION)) + parser.add_argument('-d','--dstipaddress',default="192.168.3.100", help='Destination ip address',action='store', required=False) + parser.add_argument('-f','--file',default="wireshark/test.pcapng", help='Wireshark log file',action='store', required=False) + parser.add_argument('-m','--mode',default="play", help='mode live or file',action='store', required=False) + parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False) + args = parser.parse_args() + + if args.mode == "play": + play(args.file, args.dstipaddress, args.srcipaddress) + elif args.mode == "test": + test(args.file, args.srcipaddress, args.dstipaddress) + else: + print("nothing to do, exit") |



