# Add a follow_up method in the QAApp class to handle follow-up questions.
# Store the context of the previous answer so that it can be used to generate more detailed responses for follow-up questions.
# Add a "Follow-Up" button in the GUI and connect it to the follow_up method.
import os
import numpy as np
import torch
import re
import pickle
from PyPDF2 import PdfReader
from docx import Document
from pptx import Presentation
from sentence_transformers import SentenceTransformer
from transformers import AutoTokenizer, pipeline
from faiss import IndexFlatL2
import tkinter as tk
from tkinter import ttk, filedialog, scrolledtext
import speech_recognition as sr
import pyttsx3
class QASystem:
def __init__(self, directory, index_file='index.pkl', embeddings_file='embeddings.pkl'):
self.directory = directory
self.index_file = index_file
self.embeddings_file = embeddings_file
if os.path.exists(self.index_file) and os.path.exists(self.embeddings_file):
self.index, self.embedding_model, self.document_embeddings = self.load_index_and_embeddings()
else:
self.documents = self.load_documents()
self.index, self.embedding_model, self.document_embeddings = self.create_index()
self.save_index_and_embeddings()
self.qa_pipeline = pipeline("question-answering", model="EleutherAI/gpt-neo-2.7B", tokenizer="EleutherAI/gpt-neo-2.7B")
self.summarizer_pipeline = pipeline("summarization", model="EleutherAI/gpt-neo-2.7B", tokenizer="EleutherAI/gpt-neo-2.7B")
def save_index_and_embeddings(self):
with open(self.index_file, 'wb') as f:
pickle.dump((self.index, self.embedding_model, self.document_embeddings), f)
with open(self.embeddings_file, 'wb') as f:
pickle.dump(self.document_embeddings, f)
def load_index_and_embeddings(self):
try:
with open(self.index_file, 'rb') as f:
index, embedding_model, document_embeddings = pickle.load(f)
with open(self.embeddings_file, 'rb') as f:
document_embeddings = pickle.load(f)
return index, embedding_model, document_embeddings
except AttributeError as e:
print(f"Error loading index and embeddings: {e}")
print("Recreating index and embeddings...")
self.documents = self.load_documents()
self.index, self.embedding_model, self.document_embeddings = self.create_index()
self.save_index_and_embeddings()
return self.index, self.embedding_model, self.document_embeddings
def load_documents(self):
documents = []
files = os.listdir(self.directory)
for filename in files:
if filename.endswith('.pdf'):
filepath = os.path.join(self.directory, filename)
try:
with open(filepath, 'rb') as file:
pdf = PdfReader(file)
text = ''.join(page.extract_text() for page in pdf.pages)
documents.append((filename, text))
except Exception as e:
print(f"Error reading {filename}: {str(e)}")
elif filename.endswith('.docx'):
filepath = os.path.join(self.directory, filename)
try:
doc = Document(filepath)
text = ' '.join(para.text for para in doc.paragraphs)
documents.append((filename, text))
except Exception as e:
print(f"Error reading {filename}: {str(e)}")
elif filename.endswith('.pptx'):
filepath = os.path.join(self.directory, filename)
try:
ppt = Presentation(filepath)
text = ' '.join(shape.text for slide in ppt.slides for shape in slide.shapes if hasattr(shape, 'text'))
documents.append((filename, text))
except Exception as e:
print(f"Error reading {filename}: {str(e)}")
return documents
def create_index(self):
embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
document_embeddings = []
for doc_id, text in self.documents:
chunks = self.chunk_text(text)
for chunk in chunks:
embedding = embedding_model.encode(chunk)
document_embeddings.append((embedding, doc_id, chunk))
embeddings = np.array( for embed in document_embeddings])
index = IndexFlatL2(embeddings.shape[1])
index.add(embeddings)
return index, embedding_model, document_embeddings
def chunk_text(self, text, max_length=512):
words = text.split()
return [" ".join(words[i:i + max_length]) for i in range(0, len(words), max_length)]
def extract_relevant_text(self, context, query, max_length=1000):
words = context.split()
if (len(words) <= max_length):
return context
query_words = set(query.lower().split())
scores = []
for i in range(len(words) - max_length + 1):
chunk = ' '.join(words[i:i + max_length])
score = sum(1 for word in query_words if word in chunk.lower())
scores.append((score, i))
best_start = max(scores, key=lambda x: x[0])[1]
return ' '.join(words[best_start:best_start + max_length])
def answer_question(self, question, contexts, max_length=2048, max_new_tokens=50):
combined_context = " ".join(contexts)
# Truncate the combined context to the maximum allowed length
combined_context = combined_context[:max_length]
result = self.qa_pipeline(question=question, context=combined_context, max_new_tokens=max_new_tokens)
answer = result['answer']
return answer
def summarize_text(self, text, max_length=2048, max_new_tokens=50):
text = text[:max_length] # Ensure the text does not exceed the maximum length
summary = self.summarizer_pipeline(text, max_new_tokens=max_new_tokens)
return summary[0]['summary_text']
def post_process_answer(self, answer):
answer = answer.strip('.,;: \t\n').capitalize()
if not answer.endswith(('.', '!', '?')):
answer += '.'
return answer
def clean_answer(self, answer):
# Add your cleaning logic here
# For now, let's just return the answer as is
return answer
def get_answer(self, question, num_sources=3):
question_embedding = self.embedding_model.encode(question)
distances, indices = self.index.search(np.array([question_embedding]), num_sources)
best_chunks = [self.document_embeddings[idx][2] for idx in indices[0]]
relevant_docs = [self.document_embeddings[idx][1] for idx in indices[0]]
relevant_texts = [self.extract_relevant_text(chunk, question) for chunk in best_chunks]
answer = self.answer_question(question, relevant_texts)
answer = self.clean_answer(answer)
answer = self.post_process_answer(answer)
if len(answer.split()) < 20:
combined_text = " ".join(relevant_texts)
summary = self.summarize_text(combined_text)
if len(summary.split()) > len(answer.split()):
answer = summary
return answer, relevant_docs, relevant_texts # Return relevant_texts for follow-up
class QAApp:
def __init__(self, root, qa_system):
self.qa_system = qa_system
self.history = [] # Initialize history
self.previous_context = [] # Store the context of the previous answer
self.root = root
self.root.title("Yaghi's QA System")
self.root.geometry("800x600")
self.style = ttk.Style()
self.style.theme_use('clam')
self.frame = ttk.Frame(root, padding="10 10 10 10")
self.frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))
self.input_label = ttk.Label(self.frame, text="Welcome to Yaghi's GPT. Please enter your question:", background="", foreground="#00695c", font=("Helvetica", 12, "bold"))
self.input_label.grid(row=0, column=0, sticky=tk.W, pady=5)
self.input_text = ttk.Entry(self.frame, width=70, font=("Helvetica", 10))
self.input_text.grid(row=1, column=0, pady=5, padx=5)
self.submit_button = ttk.Button(self.frame, text="Submit", command=self.get_response, style="TButton")
self.submit_button.grid(row=1, column=1, padx=5)
self.output_label = ttk.Label(self.frame, text="Response:", background="", foreground="#00695c", font=("Helvetica", 12, "bold"))
self.output_label.grid(row=2, column=0, sticky=tk.W, pady=5)
self.output_text = scrolledtext.ScrolledText(self.frame, wrap=tk.WORD, width=70, height=20, font=("Helvetica", 10), background="#ffffff")
self.output_text.grid(row=3, column=0, pady=5, columnspan=2)
# Follow-Up button
self.follow_up_button = ttk.Button(self.frame, text="Follow-Up", command=self.follow_up, style="TButton")
self.follow_up_button.grid(row=2, column=1, pady=5)
for child in self.frame.winfo_children():
child.grid_configure(padx=5, pady=5)
self.root.columnconfigure(0, weight=1)
self.root.rowconfigure(0, weight=1)
# Add styles
self.style.configure("TButton", foreground="#ffffff", background="#00796b", font=("Helvetica", 10, "bold"))
self.style.map("TButton", background=[("active", "#004d40")])
# Export button
self.export_button = ttk.Button(self.frame, text="Export History", command=self.export_history, style="TButton")
self.export_button.grid(row=4, column=0, pady=5)
# Voice input button
self.voice_button = ttk.Button(self.frame, text="Voice Input", command=self.voice_input, style="TButton")
self.voice_button.grid(row=4, column=1, pady=5)
def get_response(self):
question = self.input_text.get()
answer, sources, contexts = self.qa_system.get_answer(question) # Get contexts for follow-up
# Store the contexts for follow-up questions
self.previous_context = contexts
# Clear the current text
self.output_text.delete(1.0, tk.END)
# Insert the question in a different color
self.output_text.insert(tk.END, "Question: ", "question")
self.output_text.insert(tk.END, question + "\n", "question_text")
# Insert the answer in a different color
self.output_text.insert(tk.END, "Answer: ", "answer")
self.output_text.insert(tk.END, answer + "\n", "answer_text")
# Insert the sources
self.output_text.insert(tk.END, "Sources: " + ", ".join(sources) + "\n\n")
# Define tags for custom colors
self.output_text.tag_config("question", foreground="#1e88e5", font=("Helvetica", 10, "bold"))
self.output_text.tag_config("question_text", foreground="#0d47a1")
self.output_text.tag_config("answer", foreground="#43a047", font=("Helvetica", 10, "bold"))
self.output_text.tag_config("answer_text", foreground="#1b5e20")
# Store the interaction in history
self.history.append({
"question": question,
"answer": answer,
"sources": sources
})
def follow_up(self):
follow_up_question = self.input_text.get()
if not self.previous_context:
self.output_text.insert(tk.END, "No previous context available for follow-up.\n", "error")
return
# Debug: Log follow-up question and previous context
print(f"Follow-up question: {follow_up_question}")
print(f"Previous context: {self.previous_context}")
# Use the previous context to generate more relevant context for the follow-up question
combined_context = " ".join(self.previous_context)
relevant_text = self.qa_system.extract_relevant_text(combined_context, follow_up_question)
print(f"Relevant text for follow-up: {relevant_text}")
# Get the answer using the relevant context
answer = self.qa_system.answer_question(follow_up_question, [relevant_text])
print(f"Answer for follow-up: {answer}")
# Clear the current text
self.output_text.delete(1.0, tk.END)
# Insert the follow-up question in a different color
self.output_text.insert(tk.END, "Follow-Up Question: ", "question")
self.output_text.insert(tk.END, follow_up_question + "\n", "question_text")
# Insert the answer in a different color
self.output_text.insert(tk.END, "Answer: ", "answer")
self.output_text.insert(tk.END, answer + "\n", "answer_text")
# Insert the sources
self.output_text.insert(tk.END, "Sources: " + ", ".join(self.qa_system.get_answer(follow_up_question)[1]) + "\n\n")
# Define tags for custom colors
self.output_text.tag_config("question", foreground="#1e88e5", font=("Helvetica", 10, "bold"))
self.output_text.tag_config("question_text", foreground="#0d47a1")
self.output_text.tag_config("answer", foreground="#43a047", font=("Helvetica", 10, "bold"))
self.output_text.tag_config("answer_text", foreground="#1b5e20")
# Store the interaction in history
self.history.append({
"question": follow_up_question,
"answer": answer,
"sources": self.qa_system.get_answer(follow_up_question)[1]
})
def export_history(self):
file_path = filedialog.asksaveasfilename(defaultextension=".txt", filetypes=[("Text files", "*.txt")])
if file_path:
with open(file_path, 'w') as file:
for entry in self.history:
file.write(f"Question: {entry['question']}\n")
file.write(f"Answer: {entry['answer']}\n")
file.write(f"Sources: {', '.join(entry['sources'])}\n\n")
def voice_input(self):
recognizer = sr.Recognizer()
with sr.Microphone() as source:
self.output_text.insert(tk.END, "Listening...\n")
audio = recognizer.listen(source)
try:
question = recognizer.recognize_google(audio)
self.input_text.delete(0, tk.END)
self.input_text.insert(0, question)
self.get_response()
except sr.UnknownValueError:
self.output_text.insert(tk.END, "Could not understand audio\n")
except sr.RequestError as e:
self.output_text.insert(tk.END, f"Could not request results; {e}\n")
if __name__ == "__main__":
root = tk.Tk()
qa_system = QASystem(directory="d:/gptDataSet")
app = QAApp(root, qa_system)
root.mainloop()
|