summaryrefslogtreecommitdiffhomepage
path: root/python/pi_shark.py
blob: 82b7024ee54c0fa872e016a7c540f33306ae5439 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
""" 
    file: pi_shark.py

    Copyright (C) 2025 Private Island Networks Inc.

"""
import sys
import argparse
import platform
import pyshark
from socket import socket, AF_INET, SOCK_DGRAM
from time import sleep

DEBUG = True 
VERSION = 0.1

msg_src ="02bc000000000000"
msg_dst = "03bc000000000001"
failures = 0
packet = None

def play(file, dest_ipaddress, src_ipaddress):
        
    try:
        sko=socket(AF_INET,SOCK_DGRAM)
        sko.bind((src_ipaddress,0x3000))
    except:
        print("bind failed in Exception: {0}", sys.exc_info())
        sko.close()
        sys.exit()
    
    time_m1 = 0
    num_pkts = 0
    cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101")
    for pkt in cap:
        udp_dst_port = pkt.udp.dstport.hex_value
        num_pkts = num_pkts + 1
        #data = pkt.data.data.__str__()
        time = float(pkt.frame_info.get_field("frame.time_relative").__str__()[0:6])
        if time_m1 != 0:
            sleep(time-time_m1)
        try:
            sko.sendto(pkt.data.data.binary_value,(dest_ipaddress,udp_dst_port))
        except:
            print("send failed in Exception: {0}", sys.exc_info())
            sko.close()
            sys.exit()
            
        time_m1 = time
        
    print('finished. Number of packets transmitted:', num_pkts)
                 
def test(file, src_addr, dst_addr):
    print("Start Automated Testing\n")
    cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101")
    failures = 0
    print("length of capture: ",len([packet for packet in cap]))
    for pkt in cap:
        if (pkt.ip.src == src_addr and pkt.data.data != msg_src):
            print("testing packet #", pkt.number,': ', pkt.data.data, "failed")
            failures = failures + 1
        elif (pkt.ip.src == dst_addr and pkt.data.data != msg_dst):
            print("testing packet #", pkt.number,': ', pkt.data.data, "failed")
            failures = failures + 1
            
    if (failures):
        print("\nfailures: ", failures)
    else:
        print("\nNo Failures!")
    print("\nfinished")

if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='PyShark Script', epilog='Version: ' + str(VERSION))
    parser.add_argument('-d','--dstipaddress',default="192.168.3.100", help='Destination ip address',action='store', required=False)
    parser.add_argument('-f','--file',default="wireshark/test.pcapng", help='Wireshark log file',action='store', required=False)
    parser.add_argument('-m','--mode',default="play", help='mode live or file',action='store', required=False)
    parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False)
    args = parser.parse_args()
    
    if args.mode == "play":
        play(args.file, args.dstipaddress, args.srcipaddress)
    elif args.mode == "test":
        test(args.file, args.srcipaddress, args.dstipaddress)
    else:
        print("nothing to do, exit")

Highly Recommended Verilog Books