UDP JSON-RPC 2.0 tester (newline-delimited) for Unreal / Pixera-style control APIs.
- Sends newline-delimited JSON-RPC 2.0 messages over UDP.
- Supports requests (with id) + waits for responses.
- Supports notifications (no id) with no response expected.
- Optional params.targets filtering.
- Presets for the spec methods: ic.prepareChangeMode, ic.changeMode, ic.ready,
ic.svmPrepareStart, ic.svmStart, ic.svmPrepareStop, ic.svmStop, tp.ping.
from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Dict, Optional, Tuple
return int(time.time() * 1000)
sent_at_ms: int = field(default_factory=now_ms)
response: Optional[Dict[str, Any]] = None
event: threading.Event = field(default_factory=threading.Event)
bind_host: str = "0.0.0.0",
recv_buf: int = 2 * 1024 * 1024,
self.remote_addr = (remote_host, remote_port)
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, recv_buf)
self.sock.bind((bind_host, bind_port))
self.local_addr = self.sock.getsockname()
self._pending: Dict[int, PendingRequest] = {}
self._pending_lock = threading.Lock()
self._stop = threading.Event()
self._rx_thread = threading.Thread(target=self._rx_loop, daemon=True)
print(f"[info] UDP bound to {self.local_addr[0]}:{self.local_addr[1]}")
print(f"[info] Remote is {self.remote_addr[0]}:{self.remote_addr[1]}")
def _rx_loop(self) -> None:
Receives datagrams and splits on newline to support newline-delimited JSON.
In practice, most senders put exactly one JSON per datagram, but we handle both.
while not self._stop.is_set():
data, addr = self.sock.recvfrom(65535)
text = data.decode("utf-8", errors="replace")
# A datagram may contain multiple newline-delimited JSON objects
chunks = [c for c in text.splitlines() if c.strip()]
except json.JSONDecodeError:
print(f"[rx] Non-JSON from {addr}: {chunk!r}")
print(f"[rx] from {addr}: {json.dumps(msg, ensure_ascii=False)}")
# JSON-RPC response matching: {"jsonrpc":"2.0","id":X,"result":...} or {"error":...}
if isinstance(msg, dict) and "id" in msg and ("result" in msg or "error" in msg):
if isinstance(req_id, int):
pending = self._pending.get(req_id)
def send_notification(self, method: str, params: Optional[Dict[str, Any]] = None) -> None:
payload: Dict[str, Any] = {"jsonrpc": "2.0", "method": method}
payload["params"] = params
wire = json.dumps(payload, ensure_ascii=False) + "\n"
self.sock.sendto(wire.encode("utf-8"), self.remote_addr)
print(f"[tx] notif: {wire.strip()}")
params: Optional[Dict[str, Any]] = None,
req_id: Optional[int] = None,
) -> Tuple[int, Optional[Dict[str, Any]]]:
# simplistic increasing-ish id
req_id = int(time.time() * 1000) % 2_000_000_000
payload: Dict[str, Any] = {"jsonrpc": "2.0", "id": req_id, "method": method}
payload["params"] = params
pending = PendingRequest(req_id=req_id)
self._pending[req_id] = pending
wire = json.dumps(payload, ensure_ascii=False) + "\n"
self.sock.sendto(wire.encode("utf-8"), self.remote_addr)
print(f"[tx] req: {wire.strip()}")
ok = pending.event.wait(timeout_s)
self._pending.pop(req_id, None)
return req_id, pending.response if ok else None
def add_targets(params: Optional[Dict[str, Any]], targets: Optional[str]) -> Optional[Dict[str, Any]]:
targets_list = [t.strip() for t in targets.split(",") if t.strip()]
# spec: params.targets is an array of appIds
params["targets"] = targets_list
# ---- Presets per your spec ----
VALID_MODES = ["theme", "brand", "slot1", "slot2", "slot3", "slot4"]
def preset_prepare_change_mode(mode: str, targets: Optional[str]) -> Tuple[str, Dict[str, Any], bool]:
if mode not in VALID_MODES:
raise ValueError(f"mode must be one of {VALID_MODES}")
method = "ic.prepareChangeMode"
params = add_targets({"nextMode": mode}, targets)
return method, params or {"nextMode": mode}, False # notification by default
def preset_change_mode(mode: str, targets: Optional[str]) -> Tuple[str, Dict[str, Any], bool]:
if mode not in VALID_MODES:
raise ValueError(f"mode must be one of {VALID_MODES}")
params = add_targets({"mode": mode}, targets)
return method, params or {"mode": mode}, False # notification by default
def preset_ready(mode: str, targets: Optional[str]) -> Tuple[str, Dict[str, Any], bool]:
# mode-id could be anything per spec; we allow any string but suggest VALID_MODES
params = add_targets({"mode": mode}, targets)
return method, params or {"mode": mode}, False # notification by default
def preset_simple(method: str, targets: Optional[str]) -> Tuple[str, Optional[Dict[str, Any]], bool]:
# SVM messages have no params
params = add_targets(None, targets)
return method, params, False # notification by default
def preset_ping(targets: Optional[str]) -> Tuple[str, Optional[Dict[str, Any]], bool]:
params = add_targets(None, targets)
return method, params, True # request (expects response)
"prepare": preset_prepare_change_mode, # needs mode
"change": preset_change_mode, # needs mode
"ready": preset_ready, # needs mode
"svm_prepare_start": lambda targets: preset_simple("ic.svmPrepareStart", targets),
"svm_start": lambda targets: preset_simple("ic.svmStart", targets),
"svm_prepare_stop": lambda targets: preset_simple("ic.svmPrepareStop", targets),
"svm_stop": lambda targets: preset_simple("ic.svmStop", targets),
ap = argparse.ArgumentParser(description="UDP newline-delimited JSON-RPC 2.0 tester for Unreal.")
ap.add_argument("--host", required=True, help="Unreal listen host/IP")
ap.add_argument("--port", required=True, type=int, help="Unreal listen UDP port")
ap.add_argument("--bind-host", default="0.0.0.0", help="Local bind host (default 0.0.0.0)")
ap.add_argument("--bind-port", default=0, type=int, help="Local bind port (0 = ephemeral)")
ap.add_argument("--timeout", default=1.5, type=float, help="Request timeout seconds (default 1.5)")
ap.add_argument("--verbose", action="store_true", help="Verbose rx/tx logging")
sub = ap.add_subparsers(dest="cmd", required=True)
sp = sub.add_parser("prepare", help="ic.prepareChangeMode (notification)")
sp.add_argument("mode", choices=VALID_MODES)
sp.add_argument("--targets", help="Comma-separated appIds to put in params.targets")
sp = sub.add_parser("change", help="ic.changeMode (notification)")
sp.add_argument("mode", choices=VALID_MODES)
sp.add_argument("--targets", help="Comma-separated appIds to put in params.targets")
sp = sub.add_parser("ready", help="ic.ready (notification)")
sp.add_argument("mode", help="Mode id (e.g. theme/brand/slot1...)")
sp.add_argument("--targets", help="Comma-separated appIds to put in params.targets")
("svm_prepare_start", "ic.svmPrepareStart (notification)"),
("svm_start", "ic.svmStart (notification)"),
("svm_prepare_stop", "ic.svmPrepareStop (notification)"),
("svm_stop", "ic.svmStop (notification)"),
sp = sub.add_parser(name, help=help_text)
sp.add_argument("--targets", help="Comma-separated appIds to put in params.targets")
sp = sub.add_parser("ping", help="tp.ping (request; expects result pong)")
sp.add_argument("--targets", help="Comma-separated appIds to put in params.targets")
sp.add_argument("--id", type=int, help="Explicit request id (optional)")
sp = sub.add_parser("raw", help="Send an arbitrary JSON-RPC method (request or notification).")
sp.add_argument("method", help="JSON-RPC method name")
sp.add_argument("--params", default=None, help='Params as JSON string, e.g. \'{"a":1}\'')
sp.add_argument("--targets", help="Comma-separated appIds to put in params.targets")
sp.add_argument("--id", type=int, default=None, help="If provided, send as request and wait for response")
sp.add_argument("--no-wait", action="store_true", help="If set, do not wait even if id is provided")
client = UdpJsonRpcClient(
bind_host=args.bind_host,
bind_port=args.bind_port,
if args.cmd in ("prepare", "change", "ready"):
method, params, is_request = PRESETS[args.cmd](args.mode, getattr(args, "targets", None))
req_id, resp = client.send_request(method, params=params, timeout_s=args.timeout)
print(json.dumps({"sent_id": req_id, "response": resp}, ensure_ascii=False, indent=2))
client.send_notification(method, params=params)
print(json.dumps({"sent": {"method": method, "params": params}}, ensure_ascii=False, indent=2))
method, params, _is_request = PRESETS["ping"](getattr(args, "targets", None))
req_id, resp = client.send_request(method, params=params, req_id=getattr(args, "id", None), timeout_s=args.timeout)
print(json.dumps({"sent_id": req_id, "error": f"timeout after {args.timeout}s"}, ensure_ascii=False, indent=2))
print(json.dumps({"sent_id": req_id, "response": resp}, ensure_ascii=False, indent=2))
method, params, is_request = PRESETS[args.cmd](getattr(args, "targets", None))
req_id, resp = client.send_request(method, params=params, timeout_s=args.timeout)
print(json.dumps({"sent_id": req_id, "response": resp}, ensure_ascii=False, indent=2))
client.send_notification(method, params=params)
print(json.dumps({"sent": {"method": method, "params": params}}, ensure_ascii=False, indent=2))
if args.params is not None:
params = json.loads(args.params)
except json.JSONDecodeError as e:
raise SystemExit(f"--params must be valid JSON. Error: {e}")
params = add_targets(params, args.targets)
if args.id is not None and not args.no_wait:
req_id, resp = client.send_request(args.method, params=params, req_id=args.id, timeout_s=args.timeout)
print(json.dumps({"sent_id": req_id, "error": f"timeout after {args.timeout}s"}, ensure_ascii=False, indent=2))
print(json.dumps({"sent_id": req_id, "response": resp}, ensure_ascii=False, indent=2))
# If id provided but --no-wait, still send as request; just don't wait
payload = {"jsonrpc": "2.0", "id": args.id, "method": args.method}
payload["params"] = params
wire = json.dumps(payload, ensure_ascii=False) + "\n"
client.sock.sendto(wire.encode("utf-8"), client.remote_addr)
print(json.dumps({"sent": payload, "note": "sent request without waiting"}, ensure_ascii=False, indent=2))
client.send_notification(args.method, params=params)
print(json.dumps({"sent": {"method": args.method, "params": params}}, ensure_ascii=False, indent=2))
raise SystemExit("Unknown command.")
if __name__ == "__main__":