Send 32 random-walk positions via OSC (UDP) + live debug drawing.
Default message format (per person, bundled):
/person <int id> <float x> <float y>
You can change the address with --address.
python send_walk_osc.py --ip 127.0.0.1 --port 8000 --fps 30 --radius 5
from typing import List, Dict, Tuple
from pythonosc.udp_client import SimpleUDPClient
from pythonosc.osc_bundle_builder import OscBundleBuilder, IMMEDIATELY
from pythonosc.osc_message_builder import OscMessageBuilder
import matplotlib.pyplot as plt
def clamp_to_radius(x: float, y: float, radius: float) -> Tuple[float, float]:
if r2 <= radius * radius:
def init_positions(n: int, radius: float) -> List[Dict[str, float]]:
theta = random.random() * 2.0 * math.pi
r = math.sqrt(u) * radius
pos.append({"x": r * math.cos(theta), "y": r * math.sin(theta)})
def step_random_walk(pos: List[Dict[str, float]], radius: float, step_sigma: float) -> None:
p["x"] += random.gauss(0.0, step_sigma)
p["y"] += random.gauss(0.0, step_sigma)
p["x"], p["y"] = clamp_to_radius(p["x"], p["y"], radius)
def setup_plot(radius: float):
ax.set_aspect("equal", adjustable="box")
ax.set_xlim(-radius, radius)
ax.set_ylim(-radius, radius)
ax.set_title("OSC Random Walk Debug")
circle = plt.Circle((0, 0), radius, fill=False, linestyle="--")
scatter = ax.scatter([], [])
def update_plot(scatter, positions):
xs = [p["x"] for p in positions]
ys = [p["y"] for p in positions]
scatter.set_offsets(list(zip(xs, ys)))
def send_bundle_per_person(client: SimpleUDPClient, address: str, positions, tstamp: float):
Sends one bundle containing 32 messages:
bundle = OscBundleBuilder(IMMEDIATELY)
for i, p in enumerate(positions):
msg = OscMessageBuilder(address=address)
msg.add_arg(float(p["x"])) # x
msg.add_arg(float(p["y"])) # y
# optionally add timestamp:
# msg.add_arg(float(tstamp))
bundle.add_content(msg.build())
client.send(bundle.build())
ap = argparse.ArgumentParser()
ap.add_argument("--ip", default="127.0.0.1", help="Destination IP (UE machine IP).")
ap.add_argument("--port", type=int, default=8000, help="Destination port (UE OSC receive port).")
ap.add_argument("--address", default="/person", help="OSC address pattern.")
ap.add_argument("--n", type=int, default=32)
ap.add_argument("--radius", type=float, default=5.0)
ap.add_argument("--step", type=float, default=0.05)
ap.add_argument("--fps", type=float, default=30.0)
ap.add_argument("--seed", type=int, default=None)
if args.seed is not None:
positions = init_positions(args.n, args.radius)
scatter = setup_plot(args.radius)
client = SimpleUDPClient(args.ip, args.port)
dt = 1.0 / max(args.fps, 0.1)
step_random_walk(positions, args.radius, args.step)
update_plot(scatter, positions)
send_bundle_per_person(client, args.address, positions, t)
if __name__ == "__main__":