# ==== chain execution =====
self.persona_workflow = None
self.full_workflow = None
self.user_inputs = {} # Stores user-modified prompts
#self.execution_history = [] # Stores all results
self.current_persona = "" # Stores the latest persona
self.persona_history = []
#self._initialize_config()
def _initialize_config(self):
"""Private method to load configurations."""
# Load environment variables
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
if not HUGGINGFACE_TOKEN:
raise ValueError("HUGGINGFACE_TOKEN is not set! Make sure to define it in .env.")
with open("config_article_silver_fir_story_1_OBEY.yml", "r", encoding="utf-8") as file:
config = yaml.safe_load(file)
self.parameters = config["parameters"]
self.workflow_steps = config["workflow"]
self.persona_steps = config["persona_workflow"]
self.prompts = config["prompts"]
self.rag_queries = config["rag"]
# Initialize Hugging Face LLM
self.llm = HuggingFaceAPI(api_url=config["API"]["model_url"], api_token=HUGGINGFACE_TOKEN)
# Initialize RAG Retriever
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
self.rag_retriever = RagRetrieveWithMeta(
config['rag']['scientific']['faiss'],
config['rag']['scientific']['embeddings'],
config['rag']['diary']['faiss'],
config['rag']['diary']['embeddings'],
config['rag']['weather']['faiss'],
config['rag']['weather']['embeddings'],
config['rag']['insights']['faiss'],
config['rag']['insights']['embeddings']
self.persona_history = []
def update_chain(self, config_path):
"""Update specific parts of the chain from a new YAML file"""
with open(config_path, "r", encoding="utf-8") as file:
new_config = yaml.safe_load(file)
# Only update the necessary parts
if "parameters" in new_config:
self.parameters.update(new_config["parameters"])
print(new_config["parameters"])
if "workflow" in new_config:
self.workflow_steps = new_config["workflow"] # Replace full workflow if needed
if "persona_workflow" in new_config:
self.persona_steps = new_config["persona_workflow"]
if "prompts" in new_config:
self.prompts.update(new_config["prompts"])
self.rag_queries.update(new_config["rag"])
# Rebuild the chain based on updated config
print("✅ Chain updated successfully!")
print(f"❌ Error updating chain: {str(e)}")
# ===== Function to Dynamically Generate Dictionaries from parameters =====
def get_step_inputs(self, step_inputs):
Fetches and formats the required inputs for a step.
- Uses dictionary-style input definitions from `parameters` in `config.yaml`.
- If an input isn't found, it checks `generated_inputs` for dynamically created values.
# Check in parameters first
for category in self.parameters.values(): # `parameters` now has grouped dictionaries
input_data[item] = category[item]
# If not found, check in dynamically generated inputs
if item in self.generated_inputs:
input_data[item] = self.generated_inputs[item]
# ======== FUNCTION to store and return llm outputs ===============
# =================================================================
def store_result_and_return(self, response, _output=None):
# 🛑 Debug: Check if step_output is set correctly
raise ValueError("⚠️ ERROR: `step_output` is missing in store_result_and_return()!")
# ✅ Store response in generated_inputs dictionary
self.generated_inputs[_output] = response
print(f"\n📝 Stored Result -> `{_output}`:\n{response}\n")
return {_output: response}
# ======== FUNCTION to format llm response ========================
# =================================================================
def extract_response(self, response: str):
# Use regex to find everything AFTER '</think>'
match = re.search(r"</think>\s*(.*)", response, re.DOTALL)
# If there's a match, return only the actual response (after '</think>')
return match.group(1).strip() if match else response
# ======== FUNCTION to call LLM chain step ========================
# =================================================================
def execute_llm_call(self, _inputs, _output, _prompt):
"""Executes the LLM call and stores the result."""
required_input = self.get_step_inputs(_inputs)
formatted_prompt = _prompt.format(**required_input)
print(f"PROMPT --- {formatted_prompt}")
response = self.llm.invoke(formatted_prompt)
clean_response = self.extract_response(response)
return self.store_result_and_return(clean_response, _output=_output)
def create_persona_chain(self):
for step in self.persona_steps:
step_inputs = step["inputs"]
step_output = step["output"]
print(f"\n🔄 Processing Step: {step_name} (Type: {step_type})")
print(f" 🔹 Expected Inputs: {step_inputs}")
print(f" 🔹 Output Key: {step_output}")
if step_name in persona_order:
print(f"⚠️ WARNING: Step `{step_name}` is being added TWICE!")
persona_order.append(step_name)
# ✅ Handle LLM Steps without unnecessary `RunnableLambda`
prompt_template = self.prompts[step["prompt"]]["template"]
prompt = PromptTemplate.from_template(prompt_template)
current_step = RunnableLambda(
lambda _, _inputs=step_inputs, _output=step_output, _prompt=prompt:
self.execute_llm_call(_inputs, _output, _prompt),
query = self.rag_queries.get(step["query"])
mode = step.get("mode", "scientific")
raise ValueError(f"RAG query `{step['query']}` not found in `rag_queries`.")
rag_result = "No relevant context found."
# ✅ Store the retrieved result for future steps
self.generated_inputs[step_output] = rag_result
print(f"\n✅ RAG Step Executed: {step_name}\n🔎 Query: {query}\n📚 Retrieved Context: {rag_result}\n")
current_step = RunnableLambda(lambda _: {step_output: rag_result}, name=f"RAG_{step_output}")
raise ValueError(f"Unknown step type: {step['type']}")
p_chain_steps.append(current_step)
return RunnableSequence(*p_chain_steps)
# ===== Function to Dynamically Build the Chain =====
"""Dynamically builds a LangChain pipeline from YAML-defined steps."""
for step in self.workflow_steps:
step_inputs = step["inputs"]
step_output = step["output"]
print(f"\n🔄 Processing Step: {step_name} (Type: {step_type})")
print(f" 🔹 Expected Inputs: {step_inputs}")
print(f" 🔹 Output Key: {step_output}")
if step_name in execution_order:
print(f"⚠️ WARNING: Step `{step_name}` is being added TWICE!")
execution_order.append(step_name)
# ✅ Handle LLM Steps without unnecessary `RunnableLambda`
prompt_template = self.prompts[step["prompt"]]["template"]
prompt = PromptTemplate.from_template(prompt_template)
current_step = RunnableLambda(
lambda _, _inputs=step_inputs, _output=step_output, _prompt=prompt:
self.execute_llm_call(_inputs, _output, _prompt),
# ✅ Handle RAG Steps (Fixed!)
query = self.rag_queries.get(step["query"])
mode = step.get("mode", "scientific")
raise ValueError(f"RAG query `{step['query']}` not found in `rag_queries`.")
# ✅ Execute RAG retrieval immediately
rag_result = self.rag_retriever.rag_wrapper(query=query, mode=mode)
rag_result = "No relevant context found."
# Store RAG result with metadata
self.rag_results[step_name] = rag_result
# Parse and save result and meta
retrieved_info, retrieved_meta = self.parse_retrieved_data(rag_result, mode)
rag_log['retrieved_data'] = retrieved_info
rag_log['retrieved_metadata'] = retrieved_meta
plain_result = [doc['content'] for doc in rag_result]
#metadata = [doc['metadata'] for doc in rag_result]
# ✅ Store the retrieved result for future steps
self.generated_inputs[step_output] = plain_result
print(f"\n✅ RAG Step Executed: {step_name}\n🔎 Query: {query}\n📚 Retrieved Context: {plain_result}\n")
f"_RAG_STEP_{step_name}": rag_log
current_step = RunnableLambda(lambda _: {step_output: plain_result}, name=f"RAG_{step_output}")
raise ValueError(f"Unknown step type: {step['type']}")
chain_steps.append(current_step)
return RunnableSequence(*chain_steps)
def parse_retrieved_data(self,retrieved_texts,mode):
retrieved_docs = {} # ✅ Store results by category
plain_texts = [doc['content'] for doc in retrieved_texts]
metadata = [doc['metadata'] for doc in retrieved_texts]
retrieved_docs[mode] = plain_texts # ✅ Organized by mode
retrieved_metadata[mode] = metadata
return retrieved_docs, retrieved_metadata
def export_workflow_to_json(self, chain):
"""Exports the workflow structure to a JSON file for debugging."""
for i, step in enumerate(chain.steps):
"raw_type": str(type(step)), # Logs exact type
"name": getattr(step, "name", f"Step_{i + 1}") # Get step name or auto-name it
# ✅ Check if it's an LLM Step (even inside RunnableLambda)
if isinstance(step, RunnableLambda) and step_data["name"].startswith("LLM_"):
step_data["type"] = "LLM Call"
# ✅ Check if it's a RAG Retrieval Step
elif isinstance(step, RunnableLambda) and step_data["name"].startswith("RAG_"):
step_data["type"] = "RAG Retrieval"
# ✅ Check if it's a Processing Step
elif isinstance(step, RunnableLambda):
step_data["type"] = "Processing Step"
# ✅ Otherwise, mark it as unknown
step_data["type"] = "Unknown Step"
steps_info.append(step_data)
# ✅ Save workflow structure to a JSON file
with open("workflow_structure.json", "w") as f:
json.dump(steps_info, f, indent=4)
print("✅ Workflow structure exported to `workflow_structure.json`")
# ============= GENERATE CHAINS =================================
# ===============================================================
self.persona_workflow = self.create_persona_chain()
self.full_workflow = self.create_chain()
self.user_inputs = {} # Stores user-modified prompts
#self.execution_history = [] # Stores all results
self.current_persona = "" # Stores the latest persona
self.persona_history = []
# ==========================================================================
# ====================== GRADIO FUNCTIONALITY ==============================
# ==========================================================================
# ---------------- SAVE CHAT HISTORY ----------------------------------------
def save_history(self, history):
file_path = f"chat_history_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
# Convert history to structured JSON format
"persona_history": self.persona_history, # 🔹 Now includes both persona and the refinement prompt used
"chat_history": [{"--HUMAN--": msg[0], "--TREE--": msg[1]} for msg in history],
"rag_logs": self.rag_logs
# Save the JSON data to a file
with open(file_path, "w", encoding="utf-8") as file:
json.dump(history_json, file, indent=4)
return file_path # Returns the file path for download
# ---------------- HANDLE CHAT LOGIC ----------------------------------------
def respond(self, user_input, history):
updated_history = history + [(user_input, f"Response to: {user_input}")] # Append response
# ---------------- INITIALIZE PERSONA ---------------------------------------
def initialize_persona(self):
# 🔹 Run Only First Step (Prepares Data, No Output)
persona_output = self.persona_workflow.steps[0].invoke({})
current_persona = persona_output["persona"] # Store it globally
self.persona_history.append({
"parameters": self.parameters,
"base_persona": current_persona
print(f"🔍 Debug: Generated Persona → {current_persona}") # Debugging Output
self.step_index = 1 # Move to Third Chain Step (Refinement)
# 🔹 Get Third Step (Persona Refinement)
refinement_step = self.persona_steps[self.step_index]
refinement_template = self.prompts[refinement_step["prompt"]]["template"]
refinement_display = refinement_template
return current_persona, gr.update(visible=True), gr.update(value=refinement_display, visible=True) # ✅ Updates Persona Box
# ✅ Function to Refine Persona
def refine_persona(self, user_input):
refined_prompt = user_input if user_input else self.prompts[self.step_index]["prompt"]
#refined_persona = full_workflow.steps[2].invoke(workflow_steps[step_index]["output"])
refined_persona = self.persona_workflow.steps[self.step_index].invoke({}, _prompt=refined_prompt)
current_persona = refined_persona["refined_persona"] # Update Persona Globally
# 🔹 Store the persona alongside the refinement prompt
self.persona_history.append({
"refinement_prompt": refined_prompt, # Store the exact prompt used for refinement
"refined_persona": current_persona # Store the newly generated persona
# restart step index for workflow chain
step_index = 0 # Move to Next Step (Skip RAG if Needed)
adjusted_step_index = self.define_step_index(step_index)
print(adjusted_step_index)
self.step_index = adjusted_step_index #self.define_step_index(step_index)
print(f"🔍 Debug: Updated Persona → {current_persona}") # Debugging Output
# 🔹 Fetch Next Step (First Actual Prompt)
first_actual_prompt_step = self.workflow_steps[self.step_index]
print(first_actual_prompt_step)
first_prompt_template = self.prompts[first_actual_prompt_step["prompt"]]["template"]
display_prompt = first_prompt_template
return current_persona, gr.update(visible=True), gr.update(value=display_prompt, visible=False), gr.update(visible=False)
def define_step_index(self, current_index):
step_type = self.workflow_steps[current_index]["type"]
print(f"testing current_index: {index}")
print(f"------FOUND RAG at {index-1} ----- updated index: {index}")
return self.define_step_index(index)
# ✅ Function to Execute Steps (PAUSES Until User Clicks Next)
def process_step(self, user_input, history):
# 🔹 If All Steps Are Completed
if self.step_index >= len(self.workflow_steps):
return history + [("✅ Generation Process Complete!", "")], None
step_name = self.workflow_steps[self.step_index]["name"]
step_index = self.define_step_index(self.step_index)
self.prev_step_index = step_index
# 🔹 Handle LLM Steps (User Can Modify Prompt)
config_template = self.prompts[self.workflow_steps[step_index]["prompt"]]["template"]
prompt_template = user_input if user_input else config_template
self.latest_prompt = prompt_template
inputs = {key: self.generated_inputs.get(key, self.parameters.get(key, "")) for key in self.workflow_steps[step_index]["inputs"]}
filled_prompt = prompt_template.format(**inputs)
# 🔹 If User Modified the Input, Use That
#final_prompt = user_input if user_input else filled_prompt
self.user_inputs[step_name] = prompt_template #final_prompt # Store user-modified prompt
response = self.full_workflow.steps[step_index].invoke({},_prompt=filled_prompt)
response = response[f"{self.workflow_steps[step_index]['output']}"]
# 🔹 Prepare Next Step Prompt
if step_index < len(self.workflow_steps):
self.step_index = self.define_step_index(step_index)
self.next_step_index = self.step_index
next_step = self.workflow_steps[self.step_index]
if next_step["type"] == "LLM":
next_prompt_template = self.prompts[next_step["prompt"]]["template"]
next_filled_prompt = next_prompt_template
self.next_prompt = next_filled_prompt
next_filled_prompt = None
next_filled_prompt = None # No more steps
# print(f"--CHATBOX SHOULD PRINT: {history + [(prompt_template, response)]}")
return history + [(prompt_template, response)], next_filled_prompt
def get_prev_step_prompt(self):
self.step_index = self.prev_step_index
return self.latest_prompt
def get_next_step_prompt(self):
self.step_index = self.next_step_index
def update_chain_with_yaml(self, file):
self.update_chain(file.name) # Call the update function in ChainManager
return "✅ Chain updated successfully!"
return "⚠️ No file provided!"