1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
|
"""
file: pi_shark.py
Copyright (C) 2025 Private Island Networks Inc.
"""
import sys
import argparse
import platform
import pyshark
from socket import socket, AF_INET, SOCK_DGRAM
from time import sleep
DEBUG = True
VERSION = 0.1
msg_src ="02bc000000000000"
msg_dst = "03bc000000000001"
failures = 0
packet = None
def play(file, dest_ipaddress, src_ipaddress):
try:
sko=socket(AF_INET,SOCK_DGRAM)
sko.bind((src_ipaddress,0x3000))
except:
print("bind failed in Exception: {0}", sys.exc_info())
sko.close()
sys.exit()
time_m1 = 0
num_pkts = 0
cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101")
for pkt in cap:
udp_dst_port = pkt.udp.dstport.hex_value
num_pkts = num_pkts + 1
#data = pkt.data.data.__str__()
time = float(pkt.frame_info.get_field("frame.time_relative").__str__()[0:6])
if time_m1 != 0:
sleep(time-time_m1)
try:
sko.sendto(pkt.data.data.binary_value,(dest_ipaddress,udp_dst_port))
except:
print("send failed in Exception: {0}", sys.exc_info())
sko.close()
sys.exit()
time_m1 = time
print('finished. Number of packets transmitted:', num_pkts)
def test(file, src_addr, dst_addr):
print("Start Automated Testing\n")
cap=pyshark.FileCapture(input_file=file, display_filter="ip.addr==192.168.3.101")
failures = 0
print("length of capture: ",len([packet for packet in cap]))
for pkt in cap:
if (pkt.ip.src == src_addr and pkt.data.data != msg_src):
print("testing packet #", pkt.number,': ', pkt.data.data, "failed")
failures = failures + 1
elif (pkt.ip.src == dst_addr and pkt.data.data != msg_dst):
print("testing packet #", pkt.number,': ', pkt.data.data, "failed")
failures = failures + 1
if (failures):
print("\nfailures: ", failures)
else:
print("\nNo Failures!")
print("\nfinished")
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='PyShark Script', epilog='Version: ' + str(VERSION))
parser.add_argument('-d','--dstipaddress',default="192.168.3.100", help='Destination ip address',action='store', required=False)
parser.add_argument('-f','--file',default="wireshark/test.pcapng", help='Wireshark log file',action='store', required=False)
parser.add_argument('-m','--mode',default="play", help='mode live or file',action='store', required=False)
parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False)
args = parser.parse_args()
if args.mode == "play":
play(args.file, args.dstipaddress, args.srcipaddress)
elif args.mode == "test":
test(args.file, args.srcipaddress, args.dstipaddress)
else:
print("nothing to do, exit")
|