""" file: pi_shark.py Copyright (C) 2025 Private Island Networks Inc. """ import sys import argparse import platform import pyshark from socket import socket, AF_INET, SOCK_DGRAM from time import sleep DEBUG = True VERSION = 0.1 msg_src ="02bc000000000000" msg_dst = "03bc000000000001" failures = 0 packet = None def play(file, dest_ipaddress, src_ipaddress): try: sko=socket(AF_INET,SOCK_DGRAM) sko.bind((src_ipaddress,0x3000)) except: print("bind failed in Exception: {0}", sys.exc_info()) sko.close() sys.exit() time_m1 = 0 num_pkts = 0 cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101") for pkt in cap: udp_dst_port = pkt.udp.dstport.hex_value num_pkts = num_pkts + 1 #data = pkt.data.data.__str__() time = float(pkt.frame_info.get_field("frame.time_relative").__str__()[0:6]) if time_m1 != 0: sleep(time-time_m1) try: sko.sendto(pkt.data.data.binary_value,(dest_ipaddress,udp_dst_port)) except: print("send failed in Exception: {0}", sys.exc_info()) sko.close() sys.exit() time_m1 = time print('finished. Number of packets transmitted:', num_pkts) def test(file, src_addr, dst_addr): print("Start Automated Testing\n") cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101") failures = 0 print("length of capture: ",len([packet for packet in cap])) for pkt in cap: if (pkt.ip.src == src_addr and pkt.data.data != msg_src): print("testing packet #", pkt.number,': ', pkt.data.data, "failed") failures = failures + 1 elif (pkt.ip.src == dst_addr and pkt.data.data != msg_dst): print("testing packet #", pkt.number,': ', pkt.data.data, "failed") failures = failures + 1 if (failures): print("\nfailures: ", failures) else: print("\nNo Failures!") print("\nfinished") if __name__ == '__main__': parser = argparse.ArgumentParser(description='PyShark Script', epilog='Version: ' + str(VERSION)) parser.add_argument('-d','--dstipaddress',default="192.168.3.100", help='Destination ip address',action='store', required=False) parser.add_argument('-f','--file',default="wireshark/test.pcapng", help='Wireshark log file',action='store', required=False) parser.add_argument('-m','--mode',default="play", help='mode live or file',action='store', required=False) parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False) args = parser.parse_args() if args.mode == "play": play(args.file, args.dstipaddress, args.srcipaddress) elif args.mode == "test": test(args.file, args.srcipaddress, args.dstipaddress) else: print("nothing to do, exit")