diff options
Diffstat (limited to 'python/pi_shark.py')
| -rwxr-xr-x | python/pi_shark.py | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/python/pi_shark.py b/python/pi_shark.py new file mode 100755 index 0000000..82b7024 --- /dev/null +++ b/python/pi_shark.py @@ -0,0 +1,85 @@ +""" + file: pi_shark.py + + Copyright (C) 2025 Private Island Networks Inc. + +""" +import sys +import argparse +import platform +import pyshark +from socket import socket, AF_INET, SOCK_DGRAM +from time import sleep + +DEBUG = True +VERSION = 0.1 + +msg_src ="02bc000000000000" +msg_dst = "03bc000000000001" +failures = 0 +packet = None + +def play(file, dest_ipaddress, src_ipaddress): + + try: + sko=socket(AF_INET,SOCK_DGRAM) + sko.bind((src_ipaddress,0x3000)) + except: + print("bind failed in Exception: {0}", sys.exc_info()) + sko.close() + sys.exit() + + time_m1 = 0 + num_pkts = 0 + cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101") + for pkt in cap: + udp_dst_port = pkt.udp.dstport.hex_value + num_pkts = num_pkts + 1 + #data = pkt.data.data.__str__() + time = float(pkt.frame_info.get_field("frame.time_relative").__str__()[0:6]) + if time_m1 != 0: + sleep(time-time_m1) + try: + sko.sendto(pkt.data.data.binary_value,(dest_ipaddress,udp_dst_port)) + except: + print("send failed in Exception: {0}", sys.exc_info()) + sko.close() + sys.exit() + + time_m1 = time + + print('finished. Number of packets transmitted:', num_pkts) + +def test(file, src_addr, dst_addr): + print("Start Automated Testing\n") + cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101") + failures = 0 + print("length of capture: ",len([packet for packet in cap])) + for pkt in cap: + if (pkt.ip.src == src_addr and pkt.data.data != msg_src): + print("testing packet #", pkt.number,': ', pkt.data.data, "failed") + failures = failures + 1 + elif (pkt.ip.src == dst_addr and pkt.data.data != msg_dst): + print("testing packet #", pkt.number,': ', pkt.data.data, "failed") + failures = failures + 1 + + if (failures): + print("\nfailures: ", failures) + else: + print("\nNo Failures!") + print("\nfinished") + +if __name__ == '__main__': + parser = argparse.ArgumentParser(description='PyShark Script', epilog='Version: ' + str(VERSION)) + parser.add_argument('-d','--dstipaddress',default="192.168.3.100", help='Destination ip address',action='store', required=False) + parser.add_argument('-f','--file',default="wireshark/test.pcapng", help='Wireshark log file',action='store', required=False) + parser.add_argument('-m','--mode',default="play", help='mode live or file',action='store', required=False) + parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False) + args = parser.parse_args() + + if args.mode == "play": + play(args.file, args.dstipaddress, args.srcipaddress) + elif args.mode == "test": + test(args.file, args.srcipaddress, args.dstipaddress) + else: + print("nothing to do, exit") |



