summaryrefslogtreecommitdiffhomepage
path: root/python/pi_send.py
blob: 114676c481f4cdca563d85bd77d72ad06ace898d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
""" 
    file: pi_send.py

    Copyright (C) 2025 Private Island Networks Inc.

"""
import sys
import argparse
from socket import socket, AF_INET, SOCK_DGRAM
from time import sleep

VERSION = '0.1'  # 

# Message Types
MSG_TYPE_NULL = b'\x00'
MSG_TYPE_WRITE = b'\x01'
MSG_TYPE_READ = b'\x02'
MSG_TYPE_NOTIFY_SUCCESS = b'\x03'
MSG_TYPE_NOTIFY_ERROR = b'\x04'


# Preset Messages 
TEST_MSG = b'\x07\x00\x01\x03\x00\x04\x00'

def send_msg(msg_type, msg_token, src_ipaddress, dest_ipaddress, port, address, data, interval, count):
    try:
        sko=socket(AF_INET,SOCK_DGRAM)
        sko.bind((src_ipaddress,0x3000))
        if msg_type == MSG_TYPE_WRITE:
            msg = msg_type + msg_token + address + data
        elif msg_type == MSG_TYPE_READ:
            msg = msg_type + msg_token + address + data
        else:   # send something that creates an error
            msg = MSG_TYPE_NULL + msg_token + address + data
            
        sko.sendto(msg,(dest_ipaddress,port)) 
            
        print("success: message sent: ", count+1)        

    except: 
        
        print("Controller Command failed in Exception: {0}", sys.exc_info())
        sko.close()
        sys.exit()
    
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Private Island Python Utility Interface', epilog='Version: ' + str(VERSION))
    parser.add_argument('-a','--address',type=lambda x: int(x,0), default=0x0000, help='message address',action='store',required=False)
    parser.add_argument('-c','--command',type=str, default="read", help='message type to send',action='store',required=False)
    parser.add_argument('-d','--data',type=lambda x: int(x,0), default=0x0000, help='message data',action='store',required=False) 
    parser.add_argument('-i','--ipaddress',default="192.168.5.100", help='Destination IP address',action='store', required=False)
    parser.add_argument('-n','--number',type=int, default=1, help='number of times to send message',action='store',required=False) 
    parser.add_argument('-p','--port',type=lambda x: int(x,0), default=0x9020, help='destination UDP port',action='store',required=False)
    parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False)
    parser.add_argument('--token', type=lambda x: int(x,0), default=0x77, help='message token',action='store', required=False)
    parser.add_argument('-t','--interval',type=float, default=0.100, help='time interval between each message',action='store',required=False)   
    parser.set_defaults()
    args = parser.parse_args()
        
    count = 0    
    
    if args.command == "write": 
        msg_type =  MSG_TYPE_WRITE
    else:
        msg_type =  MSG_TYPE_READ 
        
    while count < args.number:
        send_msg(msg_type, args.token.to_bytes(1), args.srcipaddress, args.ipaddress, args.port, args.address.to_bytes(2,'big'), \
                args.data.to_bytes(4,'big'), args.interval, count)
        count = count + 1
        sleep(args.interval)
        
    print("exit")

Highly Recommended Verilog Books