# load_stems — robust stem loader; wipes 'td_stems_seen' after each successful load
import os, re, time, json, statistics
from collections import defaultdict
# --- CONFIG --------------------------------------------------------
STEM_DIR = r'H:\My Drive\02_Projects\00_Personal Projects\Synesthesia\Stems\TD_Stem_Auto_Setup_Folder'
STEM_DIR = STEM_DIR.replace('\\', '/')
FULL_SONG_OP = 'full_song1'
VALID_EXTS = ('.wav', '.mp3', '.aiff', '.aif', '.flac', '.ogg', '.m4a')
MIN_STEMS_REQUIRED = 2 # treat as a valid set only if >= this many tagged stems
STATUS_DAT_NAME = 'stem_status'
DEBUG_DAT_NAME = 'stem_debug'
# support either filename (you had both in screenshots/code)
os.path.join(STEM_DIR, '.td_stems_seen'),
os.path.join(STEM_DIR, '.td_stems_seen.json'),
# --- Flexible tag regexes (e.g., -guitar, - guitar, _guitar, (guitar), .guitar)
TAG_PAT = {k: re.compile(r'(^|[\s._\-]){}($|[\s._\-])'.format(re.escape(k)), re.IGNORECASE)
# --- Helpers -------------------------------------------------------
def _is_audio(fname): return fname.lower().endswith(VALID_EXTS)
def _normalize_base(base_no_ext):
base = re.sub(r'\s*-\s*copy(?:\s*$\d+$)?$', '', base_no_ext, flags=re.IGNORECASE)
base = re.sub(r'\s*$\d+$$', '', base)
return base.strip(' ._-')
def _song_key_for(fname_lower):
base = os.path.splitext(fname_lower)[0]
for k, pat in TAG_PAT.items():
return _normalize_base(base[:m.start()])
return _normalize_base(base)
def _reload_if_available(o):
if hasattr(o.par, 'reload'): o.par.reload.pulse()
elif hasattr(o.par, 'cue'): o.par.cue.pulse()
def _get_or_create_text_dat(name):
container = me.parent() if hasattr(me, 'parent') else op('/project1')
d = container.create(textDAT, name)
def _status(line): _get_or_create_text_dat(STATUS_DAT_NAME).text = line
def _debug(lines): _get_or_create_text_dat(DEBUG_DAT_NAME).text = '\n'.join(lines)
# Read whichever seen file exists first; else empty
for p in SEEN_CANDIDATES:
with open(p, 'r', encoding='utf-8') as f:
return {}, SEEN_CANDIDATES[0]
def _save_seen(seen, path_hint):
with open(path_hint, 'w', encoding='utf-8') as f:
json.dump(seen, f, ensure_ascii=False, indent=0)
for p in SEEN_CANDIDATES:
return (st.st_size, int(st.st_mtime))
def _stem_count(file_list):
lc = [os.path.splitext(f)[0].lower() for f in file_list]
for _, pat in TAG_PAT.items():
if any(pat.search(b) for b in lc):
def _choose_latest_set_by_first_seen(stem_dir, files, seen):
# Stamp unseen or changed files (same-name paste with new content)
path = os.path.join(stem_dir, f)
if entry is None or tuple(entry.get('fp', (None, None))) != fp:
seen[f] = {"first_seen": now, "fp": fp}
# Group by normalized key
groups = defaultdict(list)
groups[_song_key_for(f.lower())].append(f)
valid_groups, full_only_groups = [], []
for key, flist in groups.items():
(valid_groups if sc >= MIN_STEMS_REQUIRED else full_only_groups).append((key, flist, sc))
def cohort_median(flist):
return statistics.median([seen.get(f, {}).get("first_seen", 0) for f in flist]) if flist else 0
return (cohort_median(flist), sc, -len(key))
pool = valid_groups if valid_groups else full_only_groups
return None, [], groups, {}
best = max(pool, key=rank)
for k, fl in groups.items():
'stems': _stem_count(fl),
'median_first_seen': cohort_median(fl),
'first_seen_each': {f: seen.get(f, {}).get('first_seen', 0) for f in fl}
return key, flist, groups, dbg
# --- Main ----------------------------------------------------------
def load_stems(stem_dir=STEM_DIR):
if not os.path.isdir(stem_dir):
_status("Set: (folder not found)")
_debug(["ERROR: folder not found", stem_dir])
files = [f for f in os.listdir(stem_dir) if _is_audio(f)]
_status("Set: (no audio files)")
_debug(["No audio files in:", stem_dir])
seen, seen_path = _load_seen()
song_key, set_files, groups, dbg = _choose_latest_set_by_first_seen(stem_dir, files, seen)
_save_seen(seen, seen_path)
if not set_files or not song_key:
_debug(["No set resolved.", "Groups:"] + [f"- {k}: {groups[k]}" for k in groups])
# Assign stems to CHOPs; clear missing ones
for tag, op_name in TARGETS.items():
base = os.path.splitext(f)[0].lower()
if TAG_PAT[tag].search(base):
o.par.file = f"{stem_dir}/{match}"
# Assign full mix: any file in set with NO tag
base = os.path.splitext(f)[0].lower()
if not any(p.search(base) for p in TAG_PAT.values()):
full_candidates.append(f)
base = os.path.splitext(f.lower())[0]
exact_prefix = 1 if _song_key_for(base) == song_key else 0
# within this run, first_seen exists in `seen`
fs = seen.get(f, {}).get("first_seen", 0)
return (exact_prefix, fs)
full_file = max(full_candidates, key=score)
o_full = op(FULL_SONG_OP)
o_full.par.file = f"{stem_dir}/{full_file}"
_reload_if_available(o_full)
_status(f"Set: {song_key}")
lines = [f"Chosen set: {song_key}", "-"*60]
for k in sorted(dbg.keys()):
lines.append(f"{k} | stems:{info['stems']} | median_seen:{info['median_first_seen']:.0f}")
fs = info['first_seen_each'].get(f, 0)
lines.append(f" - {f} (seen:{fs:.0f})")
# 💥 Wipe cache so the next paste starts clean
# auto-run (or trigger from your Panel Execute button)