# TouchDesigner -> AxiDraw live stroke streamer (Windows / TD 2023)
# Uses pyaxidraw interactive mode + a worker thread to stream segments.
# Updated goals achieved in this version:
# - Uses PaperSpace (letterboxed paper + margin-aware ACTIVE plot region) for mapping.
# - Pointer outside ACTIVE region is ignored (not drawing) or ends stroke (if drawing).
# - Jitter reduction via:
# * Low-pass smoothing (SMOOTH_ALPHA)
# * Fixed-distance resampling (STEP_IN)
# * Rate limiting (MIN_DT)
# - Have paper_space.py available as a Module DAT or on TD's Python path:
# from paper_space import PaperSpace, PaperSpaceConfig
# - TouchDesigner Panel u/v conventions vary by setup.
# This file assumes your incoming v is "top-down" (v=0 at top, v=1 at bottom),
# matching your previous code that inverted v.
# If your v is already bottom-up (v=0 bottom, v=1 top), set INPUT_V_IS_TOPDOWN=False.
from pyaxidraw import axidraw
from paper_space import PaperSpace, PaperSpaceConfig
# ---------- Physical paper + safe margins (inches) ----------
# These MUST match your overlay / PaperSpace config.
# ---------- Input coordinate convention ----------
# True if incoming v is "top-down" (v=0 at top, v=1 at bottom).
# This matches your previous implementation where you did y = (1-v)*H.
self.INPUT_V_IS_TOPDOWN = True
# ---------- Jitter control / streaming ----------
# Low-pass smoothing for pointer -> inches
# 0.0 = no smoothing, 0.85..0.97 typical. Higher = smoother but more lag.
# Resample to fixed step distance (inches). Increase to reduce chatter.
# 0.02 in ~ 0.5mm. 0.03..0.06 can be very stable.
# Minimum time between enqueues (seconds). Increase to reduce command rate.
self.MIN_DT = 0.010 # 10 ms
# Safety cap on how many segments can be generated from a single input event.
# Helps avoid massive bursts if the pointer teleports.
self.MAX_SEGS_PER_INPUT = 64
# ---------- Pen calibration (servo positions) ----------
# ---------- Pen inversion (command semantics) ----------
# ---------- Paper space mapper (authoritative mapping) ----------
self.paper_cfg = PaperSpaceConfig(
MARGIN_IN=self.MARGIN_IN,
self.paper = PaperSpace(self.paper_cfg)
# ---------- Worker thread ----------
self.q = queue.Queue(maxsize=self.QMAX)
self.stop_evt = threading.Event()
# ---------- Stroke state ----------
# Smoothed point (inches) used for generating path
# Last point actually SENT to hardware as a segment endpoint (inches)
# Timing for rate limiting
# ---------- Connection ----------
print("AxiDraw: connect() called")
# Best-effort cleanup if a stale object exists
if self.ad is not None and not self.connected:
self.ad = axidraw.AxiDraw()
# Apply pen servo calibration BEFORE connect()
self._apply_pen_options()
print("AxiDraw: connect() returned:", ok)
print("AxiDraw: connect failed (busy? unplugged? another app open?)")
# Start worker thread for hardware calls
self.worker = threading.Thread(target=self._worker_loop, daemon=True)
# Start with PHYSICAL pen up
print("AxiDraw: pen up error:", e)
print("AxiDraw: connected and ready.")
self._end_local_stroke_state()
self.worker.join(timeout=1.0)
if self.ad and self.connected:
print("AxiDraw: disconnect error:", e)
print("AxiDraw: disconnected.")
# ---------- Pen options / inversion ----------
def _apply_pen_options(self):
"""Push servo settings into axidraw options (safe if ad exists)."""
self.ad.options.pen_pos_up = self.PEN_UP
self.ad.options.pen_pos_down = self.PEN_DOWN
self.ad.options.pen_rate_raise = self.PEN_RATE_RAISE
self.ad.options.pen_rate_lower = self.PEN_RATE_LOWER
"""PHYSICAL pen up (raises pen)."""
"""PHYSICAL pen down (lowers pen)."""
def _print_pen_state(self):
f"Pen state -> INVERT_PEN:{self.INVERT_PEN} | "
f"UP:{self.PEN_UP} DOWN:{self.PEN_DOWN} "
f"RAISE_RATE:{self.PEN_RATE_RAISE} LOWER_RATE:{self.PEN_RATE_LOWER}"
# ---------- Pen calibration helpers (servo endpoints) ----------
def set_pen(self, up=None, down=None, rate_raise=None, rate_lower=None, verbose=True):
Update stored calibration values (servo endpoints / rates) and push into options.
Use this to tune lift height and pressure.
self.PEN_DOWN = float(down)
if rate_raise is not None:
self.PEN_RATE_RAISE = float(rate_raise)
if rate_lower is not None:
self.PEN_RATE_LOWER = float(rate_lower)
self._apply_pen_options()
# ---------- Pen test helpers (for buttons) ----------
"""Instant test: PHYSICALLY raise pen."""
self._apply_pen_options()
self._enqueue(("PEN_UP",))
print("Tap: PEN UP (physical)")
"""Instant test: PHYSICALLY lower pen."""
self._apply_pen_options()
self._enqueue(("PEN_DOWN",))
print("Tap: PEN DOWN (physical)")
def swap_pen_direction(self):
"""Toggle inversion of pen commands."""
self.INVERT_PEN = not self.INVERT_PEN
print("Swapped pen direction. INVERT_PEN =", self.INVERT_PEN)
self._enqueue(("PEN_UP",))
# ---------- Mapping (PaperSpace authoritative) ----------
def set_container_pixels(self, w_px, h_px):
Call this if you know your container's pixel size.
PaperSpace will letterbox correctly only if it knows the container size.
If you don't call this, PaperSpace still works logically, but any letterboxing
assumptions won't match your overlay if your overlay is letterboxing.
self.paper.update_container_pixels(int(w_px), int(h_px))
def _panel_uv_to_inches_active(self, u, v):
Convert incoming panel u,v to inches INSIDE ACTIVE plot region.
Returns (x_in, y_in) or None if outside ACTIVE.
# Normalize V convention: PaperSpace expects bottom-up v (0 bottom, 1 top)
if self.INPUT_V_IS_TOPDOWN:
# PaperSpace returns None if outside ACTIVE by default
return self.paper.container_uv_to_inches(uu, vv, clamp_to_active=False)
# ---------- Input handling ----------
def on_input(self, u, v, left_down):
# Map to ACTIVE region inches; may be None if outside.
xy = self._panel_uv_to_inches_active(u, v)
print("input u,v:", round(float(u), 3), round(float(v), 3), "-> outside ACTIVE", "left:", left_down)
print("input u,v:", round(float(u), 3), round(float(v), 3), "-> x,y:", round(x, 3), round(y, 3), "left:", left_down)
# If mouse is up, end stroke (if any) regardless of position
if (not left_down) and self.drawing:
self._end_local_stroke_state()
# If mouse is down but we're outside active region:
if left_down and xy is None:
# If currently drawing, end stroke cleanly at boundary (no clamping surprises)
self._end_local_stroke_state()
# If mouse is down and we have a valid point:
if left_down and xy is not None:
self._last_emit_t = 0.0 # allow immediate start
self._enqueue(("START", x, y))
if (now - self._last_emit_t) < self.MIN_DT:
# Still update smoothing state, but don't emit yet
self._smooth_xy = self._ema_xy(self._smooth_xy, (x, y), self.SMOOTH_ALPHA)
self._smooth_xy = self._ema_xy(self._smooth_xy, (x, y), self.SMOOTH_ALPHA)
# Resample from last-sent to smoothed target by STEP_IN
if self._sent_xy is None:
self._sent_xy = self._smooth_xy
segs = self._resample_points(self._sent_xy, self._smooth_xy, self.STEP_IN, self.MAX_SEGS_PER_INPUT)
self._enqueue(("SEG", sx, sy))
# ---------- Smoothing / resampling ----------
def _ema_xy(self, prev, cur, alpha):
"""Exponential moving average in 2D."""
# new = a*prev + (1-a)*cur
nx = ax * px + (1.0 - ax) * cx
ny = ax * py + (1.0 - ax) * cy
def _resample_points(self, a, b, step_in, max_points):
Generate intermediate points from a->b spaced at ~step_in, excluding 'a' and including 'b' only if far enough.
Returns a list of points to enqueue as SEG endpoints.
if a is None or b is None:
dist = math.hypot(dx, dy)
# degenerate: just return b
# Number of steps; cap for safety
inv = 1.0 / dist if dist > 0 else 0.0
# produce points at step, 2*step, ... n*step from a
for i in range(1, n + 1):
# ---------- Safety / stroke state ----------
def _end_local_stroke_state(self):
def emergency_penup(self):
"""Queue a physical pen-up command immediately."""
self._enqueue(("PEN_UP",))
while not self.q.empty():
# ---------- Queue / Worker ----------
def _enqueue(self, item):
print("Queue full: dropping", item[0])
print("AxiDraw worker: started")
while not self.stop_evt.is_set():
item = self.q.get(timeout=0.05)
print("HW start:", round(x, 3), round(y, 3))
print("HW pen up (physical)")
print("HW pen down (physical)")
print("AxiDraw worker error:", e)
print("AxiDraw worker: stopped")
# Singleton instance used by the Execute DAT (and button callbacks)
streamer = AxiDrawStreamer()