how to set this up so that it can accept up to 69 x 30 floats
import random
import numpy as np
import pacmap
import tensorflow as tf
import time
from pythonosc import dispatcher
from pythonosc import osc_server
from pythonosc import udp_client
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model
import lstm
def categorical_crossentropy_cpu(y_true, y_pred):
with tf.device(‘/CPU:0’):
return K.categorical_crossentropy(y_true, y_pred)
model_loc =‘Data/LSTMWeights/2024-10-25/lstm_model_W30_O5.h5’
train_data_loc = “Data/Training/24-10-24/mocapdata_2024-10-24_W30_O5.csv”
train_label_loc = “Data/Training/24-10-24/mocapdata_2024-10-24_W30_O5_labels.csv”
WINDOW_SIZE = 30
FEATURES = 69
model = tf.keras.models.load_model(model_loc, custom_objects={‘categorical_crossentropy_cpu’: categorical_crossentropy_cpu})
X_train = lstm.LSTMNeuralNet.load_data(train_data_loc,train_label_loc,“Sequence_ID”,WINDOW_SIZE)[0]
lstm_layer_output = model.layers[-2].output # Adjust index as needed
activation_model = Model(inputs=model.input, outputs=lstm_layer_output)
basis = activation_model.predict(X_train)
basis_flat = basis.reshape(basis.shape[0], -1)
pacmap_instance = pacmap.load(“Data/Exports/2024-10-25/DR_W30_O5”)
global new_data_buffer
new_data_buffer = None
latent_space_coordinates = []
neighbors_list = []
osc_client = udp_client.SimpleUDPClient(“172.27.144.1”, 5006)
#UNITY ON THE OTHER COMPUTER
osc_client2 = udp_client.SimpleUDPClient(“169.254.202.190”, 5007)
def osc_data_handler(address, *args):
global new_data_buffer
# Assuming incoming data is an array of 70 float values
new_data_buffer = np.array(args).reshape(1, WINDOW_SIZE, FEATURES) # Adjust shape as needed for LSTM model input
if new_data_buffer is not None:
# Perform inference using the LSTM model
prediction = activation_model.predict(new_data_buffer)
prediction_flat = prediction.reshape(prediction.shape[0], -1)
class_prediction = model.predict(new_data_buffer)
latent_projection = pacmap_instance.transform(X=prediction_flat, basis=basis_flat)
print(latent_projection[0])
# Send the latent projection to Unity
send_latent_projection_to_unity(latent_projection[0])
# latent_space_coordinates.append(latent_projection[0])
# Convert latent space to numpy array for nearest neighbor search
# latent_space_array = np.array(latent_space_coordinates)
new_data_buffer = None # Reset buffer after processing
def send_latent_projection_to_unity(latent_projection):
# Assuming latent_projection is a 2D array with shape (1, latent_dim)
projection_values = latent_projection.flatten().tolist()
# Send the data back to Unity using the OSC client
osc_client.send_message("/unity/latent_projection", projection_values)
#osc_client.send_message("/unity/point_id", random.randint(0,100))
osc_client2.send_message("/unity/latent_projection", projection_values)
print(f"Sent latent projection to Unity: {projection_values}")
def generate_fake_data():
Generate random data to simulate incoming data.
return np.random.rand(1, WINDOW_SIZE, FEATURES)
def main():
global new_data_buffer
global basis, activation_model
print("LSTM model is loaded and ready for real-time inference.")
print("Waiting for incoming data...")
# Setting up the OSC server
dispatcher_map = dispatcher.Dispatcher()
dispatcher_map.map("/unity/data", osc_data_handler)
server = osc_server.ThreadingOSCUDPServer(("172.27.144.1", 5010), dispatcher_map)
print("OSC server is running on port 5010.")
#new_data_buffer = generate_fake_data()
# Adding a delay to simulate real-time data flow
#time.sleep(0.1) # Adjust as needed, e.g. to match the actual frequency of incoming data
except KeyboardInterrupt:
print("Real-time inference terminated by user.")
if name == “main”:
main()