""" Copyright (C) 2026 Private Island Networks Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. file: main.py """ import os import sys import argparse import logging import aiohttp import asyncio import asyncio_dgram import pyshark import aiohttp_jinja2 import jinja2 from multiprocessing import Process, Manager from aiohttp import web from queue import Empty import platform from socket import socket, AF_INET, SOCK_DGRAM from time import sleep import json from websockets.asyncio.server import serve from constants import * DEBUG = True VERSION = 0.1 PROJECT_ROOT = os.path.dirname(os.path.realpath(__file__)) STATIC_PATH = PROJECT_ROOT + '/static' TEMPLATE_PATH = PROJECT_ROOT + '/templates' LOG_PATH = PROJECT_ROOT + '/logs' ws_key = web.AppKey("ws_key", dict[str, web.WebSocketResponse]) async def _parse_pkt(app, pkt): """ Parse a Wireshark received packet""" # Drop unwanted packets if not (pkt.ip.src == app['ipaddr_betsy'] or pkt.ip.src == app['ipaddr_pc']): return None try: data = pkt.data.data.__str__() except: if DEBUG: print('non msg received') return None message = dict() try: msg_type = MSG_TYPE_MAP[int(data[0:2],16)] msg_token = '0x' + data[2:4] try: msg_addr = BETSY_ADDR_MAP[data[4:8]] except: msg_addr = '0x' + data[4:8] msg_data = '0x' + data[8::] except Exception as e: print("mapping error:", str(e)) pass dst_port = pkt.udp.dstport.hex_value if pkt.ip.src == app['ipaddr_betsy']: port = PORT_PHY0 elif pkt.ip.src == app['ipaddr_pc'] and dst_port == 0x9020: port = PORT_PC else: port = PORT_UNDEFINED message["format"] = MSG_FORMAT_CONTROLLER message["time"] = pkt.frame_info.get_field("frame.time_relative").__str__()[0:6] message['port'] = port message['type'] = msg_type message['token'] = msg_token message['address'] = msg_addr message['data'] = msg_data return message async def home(request): # i_query = request.app['i_query'] data = request.match_info.get('data', "") ctx = _default_context('main') ctx.update(locals()) response = aiohttp_jinja2.render_template('index.html', request, ctx) response.headers['Content-Language'] = 'en-US' return response async def mdio(request): if request.headers.get('X-Requested-With') == 'XMLHttpRequest': q_cont = request.app['q_cont'] values = dict() _query_mdio(request.app) for i in range(16): pkt = q_cont.get(block=True) # Query pkt = q_cont.get(block=True) # Response msg = pkt.data.data.__str__() values[i] = int("0x" + msg[10:],16) d = json.dumps({'values': values}) response = web.Response(text=d, status=200) else: ctx = _default_context('mdio') ctx['ss'] = True ctx['regs'] = DP83867_MAP response = aiohttp_jinja2.render_template('mdio.html', request, ctx) response.headers['Content-Language'] = 'en-US' return response async def _send_msg(app, msg_type, token, address, data): try: sko=socket(AF_INET,SOCK_DGRAM) sko.bind((app['ipaddr_pc'],0x3000)) msg = msg_type + token + address + data sko.sendto(msg,("192.168.5.100",0x9020)) except: print("Controller Command failed in Exception: {0}", sys.exc_info()) pass sko.close() async def _udp_rcv(): """ controller response """ stream = await asyncio_dgram.bind(("192.168.5.40", 0x8000)) data, remote_addr = await stream.recv() return data stream.close() async def controller(request): if request.headers.get('X-Requested-With') == 'XMLHttpRequest': try: send_pkt = rcv_pkt = None data = await request.post() result = STATUS.OK q_cont = request.app['q_cont'] await _send_msg(request.app, int(data.get('type')).to_bytes(1), int(data.get('token')).to_bytes(1), int(data.get('address')).to_bytes(2), int(data.get('data')).to_bytes(4)) send_pkt = await _parse_pkt(request.app, q_cont.get(block=True)) if send_pkt['type'] == "READ": rcv_pkt = await _parse_pkt(request.app, q_cont.get(block=True)) except Exception as e: print(str(e)) result = STATUS.ERROR_SERVER pass finally: resp_d = json.dumps({'r':result, 'd' : [send_pkt, rcv_pkt]}) response = web.Response(text=resp_d) else: ctx = _default_context('controller') ctx['regs'] = CONTROLLER_MAP response = aiohttp_jinja2.render_template('controller.html', request, ctx) response.headers['Content-Language'] = 'en-US' return response async def mle(request): data = request.match_info.get('data', "") ctx = _default_context('main') ctx.update({'app_layer': 'mle', 'data': data, 'regs': ML_ENGINE_MAP, 'ws': True }) ctx.update(locals()) response = aiohttp_jinja2.render_template('mle.html', request, ctx) response.headers['Content-Language'] = 'en-US' return response async def fabric(request): data = request.match_info.get('data', "") ctx = _default_context('fabric') regs = {} ctx.update(locals()) response = aiohttp_jinja2.render_template('fabric.html', request, ctx) response.headers['Content-Language'] = 'en-US' return response async def support(request): data = request.match_info.get('data', "") ctx = _default_context('support') response = aiohttp_jinja2.render_template('help.html', request, ctx) response.headers['Content-Language'] = 'en-US' return response async def lmmi(request): q_cont = request.app['q_cont'] data = request.match_info.get('data', "") if request.headers.get('X-Requested-With') == 'XMLHttpRequest': values = dict() _query_lmmi(app) for i in range(28): pkt = q_cont.get(block=True) msg = pkt.data.data.__str__() if len(msg) == 6 and (msg == '03e0fe' or msg == '03e0ff'): continue values[round((i-1)/2)] = int("0x" + msg[12:],16) d = json.dumps({'values': values}) resp = web.Response(text=d, status=200) return resp else: context = {'data': data, 'regs0': SGMII_CDR_MAP, 'regs1': SGMII_CDR_MAP} response = aiohttp_jinja2.render_template('lmmi.html', request, context) response.headers['Content-Language'] = 'en-US' return response async def set_filename(request): if not request.can_read_body: raise web.HTTPBadRequest() data = await request.json() ns = request.app['ns'] ns.filename = data['filename'] return web.Response(text="ok", status=200) async def start(request): if not request.can_read_body: raise web.HTTPBadRequest() data = await request.json() mode = request.app['mode'] mode.value = data['mode'] return web.Response(text="ok", status=200) async def stop(request): if not request.can_read_body: raise web.HTTPBadRequest() data = await request.json() mode = request.app['mode'] mode.value = MODE_IDLE return web.Response(text="ok", status=200) async def get_icon(request): raise aiohttp.web.HTTPNotFound() # WebSocket handle async def wslogger(request): ws = web.WebSocketResponse() await ws.prepare(request) # onopen is called in the browser request.app[ws_key]['primary'] = ws pkt = None num_pkts = 0 q_mle = request.app['q_mle'] mode = request.app['mode'] async for msg in ws: if msg.type == web.WSMsgType.TEXT: while True: try: pkt = q_mle.get(block=False) except Empty: pass await asyncio.sleep(1) # let other tasks run continue num_pkts = num_pkts + 1 if DEBUG: print('get from Q:', num_pkts) message = await _parse_pkt(request.app, pkt) if message: await ws.send_json(message) elif msg.type == web.WSMsgType.CLOSE: break def _query_lmmi(app): """ Lattice Memory Mapped Interface """ try: sko = socket(AF_INET, SOCK_DGRAM) sko.bind((app['ipaddr_pc'], 0x3000)) # Query the LMMI Registers for i in range(16): msg = MSG_TYPE_READ + b'\x06' + i.to_bytes(1) + b'\x00\x00' sko.sendto(msg, (BETSY_IP_ADDRESS, 0x9020)) sleep(0.01) except: print("Controller Command failed in Exception: {0}", sys.exc_info()) pass finally: sko.close() if DEBUG: print("query_lmmi finished") def _query_mdio(app): try: sko = socket(AF_INET, SOCK_DGRAM) sko.bind((app['ipaddr_pc'], 0x3000)) # Query the LMMI Registers for i in range(16): msg = MSG_TYPE_READ.to_bytes(1) + (i).to_bytes(1) + (MSG_MDIO_ADDR+i).to_bytes(2, byteorder='big') + b'\x00\x00\x00\x00' sko.sendto(msg, (app['ipaddr_betsy'], 0x9020)) sleep(0.01) except: print("Controller Command failed in Exception: {0}", sys.exc_info()) pass finally: sko.close() if DEBUG: print("query_lmmi finished") async def _shutdown(app): for ws in app[ws_key].values(): await ws.close() app[ws_key].clear() def _default_context(app_layer): return { "version" : VERSION, "app_layer" : app_layer } def _run_shark(q_cont, q_mle, ife, df, mode, ns, time): num_pkts = 0 ts = time[0] tf = time[1] cap_live = pyshark.LiveCapture(interface=ife, display_filter=df) while True: if mode.value == MODE_IDLE: continue # mode is a global set during initialization by the browser elif mode.value == MODE_LIVE: cap_live.reset() for pkt in cap_live.sniff_continuously(): num_pkts = num_pkts + 1 try: msg_type = int(pkt.data.data[0:2],16) if msg_type < MSG_TYPE_NOTIFY_MLE: q_cont.put(pkt) print ('put in Cont Q: ', num_pkts) elif msg_type == MSG_TYPE_NOTIFY_MLE: q_mle.put(pkt) print ('put in MLE Q: ', num_pkts) except Exception as e: print(str(e)) pass if mode.value == MODE_IDLE: break elif mode.value == MODE_FILE: cap_file = pyshark.FileCapture(input_file=LOG_PATH + ns.filename, display_filter=df) for pkt in cap_file: num_pkts = num_pkts + 1 time = float(pkt.frame_info.get_field("frame.time_relative").__str__()[0:9]) if time >= ts and time < tf: q_cont.put(pkt) if DEBUG: print ('put in Q: ', num_pkts) if DEBUG: print("file replay finished") cap_file.close() mode.value = MODE_IDLE async def make_app(args, manager): ns = manager.Namespace() q_cont = manager.Queue() q_mle = manager.Queue() if args.ife: ife = args.ife else: ife = 'WiFi' if args.mode == "live": mode = manager.Value('i', MODE_LIVE) elif args.mode == "file": mode = manager.Value('i', MODE_FILE) else: mode = manager.Value('i', MODE_IDLE) # filename = mp.Value(ctypes.c_char_p, args.file.encode("utf-8")) ns.filename = args.file df = args.df time = (float(args.ts), float(args.tf)) p1 = Process(target=_run_shark, args=(q_cont, q_mle, ife, df, mode, ns, time)) p1.start() app = web.Application() app[ws_key] = {} app['ipaddr_pc'] = args.ipaddr_pc app['ipaddr_betsy'] = args.ipaddr_betsy app['i_query'] = args.iq app['mode'] = mode app['ns'] = ns app['q_cont'] = q_cont app['q_mle'] = q_mle app['p1'] = p1 app.on_shutdown.append(_shutdown) app.add_routes([ web.get('/', home), web.get('/mdio' , mdio), web.get('/controller', controller), web.post('/controller',controller), web.get('/mle', mle), web.get('/fabric', fabric), web.get('/help', support), web.get('/lmmi', lmmi), web.post('/start', start), web.post('/stop', stop), web.post('/filename', set_filename), web.get('/favicon.ico', get_icon), web.get('/logger', wslogger), web.static('/static', STATIC_PATH)]) aiohttp_jinja2.setup(app, loader=jinja2.FileSystemLoader(TEMPLATE_PATH)) return app if __name__ == '__main__': parser = argparse.ArgumentParser(description='Private Island Mind Net', epilog='Version: ' + str(VERSION)) parser.add_argument('--ipaddr_pc', default="192.168.5.40", help="IP Addr of this PC") parser.add_argument('--ipaddr_betsy', default="192.168.5.100", help="IP Addr of Betsy") parser.add_argument('--port', type=int, default=8010, help="Web server port number") parser.add_argument('-d', '--df', default="ip.addr==192.168.5.100", help='display filter', action='store', required=False) parser.add_argument('--ife', default='WiFi', help='tshark interface', action='store', required=False) parser.add_argument('--iq', type=int, default=0, help='enable initial query on startup', action='store', required=False) parser.add_argument('-m', '--mode', default="live", help='mode live or file replay', action='store', required=False) parser.add_argument('-f', '--file', default="logs\\test.pcapng", help='Wireshark log file', action='store', required=False) parser.add_argument('--ts', default="0", help='time start for file replay', action='store', required=False) parser.add_argument('--tf', default="100000.00", help='time last for file replay', action='store', required=False) args = parser.parse_args() filename = args.file logging.basicConfig(level=logging.DEBUG) manager = Manager() app = web.run_app(make_app(args, manager), port=args.port) app['p1'].join()