""" file: pi_send.py Copyright (C) 2025 Private Island Networks Inc. """ import sys import argparse from socket import socket, AF_INET, SOCK_DGRAM from time import sleep VERSION = '0.1' # # Message Types MSG_TYPE_NULL = b'\x00' MSG_TYPE_WRITE = b'\x01' MSG_TYPE_READ = b'\x02' MSG_TYPE_NOTIFY_SUCCESS = b'\x03' MSG_TYPE_NOTIFY_ERROR = b'\x04' # Preset Messages TEST_MSG = b'\x07\x00\x01\x03\x00\x04\x00' def send_msg(msg_type, msg_token, src_ipaddress, dest_ipaddress, port, address, data, interval, count): try: sko=socket(AF_INET,SOCK_DGRAM) sko.bind((src_ipaddress,0x3000)) if msg_type == MSG_TYPE_WRITE: msg = msg_type + msg_token + address + data elif msg_type == MSG_TYPE_READ: msg = msg_type + msg_token + address + data else: # send something that creates an error msg = MSG_TYPE_NULL + msg_token + address + data sko.sendto(msg,(dest_ipaddress,port)) print("success: message sent: ", count+1) except: print("Controller Command failed in Exception: {0}", sys.exc_info()) sko.close() sys.exit() if __name__ == '__main__': parser = argparse.ArgumentParser(description='Private Island Python Utility Interface', epilog='Version: ' + str(VERSION)) parser.add_argument('-a','--address',type=lambda x: int(x,0), default=0x0000, help='message address',action='store',required=False) parser.add_argument('-c','--command',type=str, default="read", help='message type to send',action='store',required=False) parser.add_argument('-d','--data',type=lambda x: int(x,0), default=0x0000, help='message data',action='store',required=False) parser.add_argument('-i','--ipaddress',default="192.168.5.100", help='Destination IP address',action='store', required=False) parser.add_argument('-n','--number',type=int, default=1, help='number of times to send message',action='store',required=False) parser.add_argument('-p','--port',type=lambda x: int(x,0), default=0x9020, help='destination UDP port',action='store',required=False) parser.add_argument('-s','--srcipaddress',default="192.168.5.40", help='Source IP address',action='store', required=False) parser.add_argument('--token', type=lambda x: int(x,0), default=0x77, help='message token',action='store', required=False) parser.add_argument('-t','--interval',type=float, default=0.100, help='time interval between each message',action='store',required=False) parser.set_defaults() args = parser.parse_args() count = 0 if args.command == "write": msg_type = MSG_TYPE_WRITE else: msg_type = MSG_TYPE_READ while count < args.number: send_msg(msg_type, args.token.to_bytes(1), args.srcipaddress, args.ipaddress, args.port, args.address.to_bytes(2,'big'), \ args.data.to_bytes(4,'big'), args.interval, count) count = count + 1 sleep(args.interval) print("exit")