Skip to content

How to Set Up a Private GPT: Step by Step

    By:  Husam Yaghi

    A local GPT model refers to having an AI model (Large Language Model) like GPT-3 installed and running directly on your own personal computer (Mac or Windows) or a local server.  As stated in a previous post, installing a local GPT offers numerous benefits, including enhanced privacy, independence from the internet, cost efficiency, and customization options.

    Here we provide you with all the steps we took to have a functioning GPT installed locally to operate on local files in a secure private environment.

    Install Python:

    Download and install the latest version of Python from the official website

    https://www.python.org/downloads/windows/

    Make sure to select the option to add Python to your system’s PATH during the installation.

    Install Git:

    Download and install Git for Windows https://git-scm.com/download/win

    Install Anaconda:

    Because we will be running a lot of Python scripts, Anaconda is a great replacement to the default Windows Command Prompt.   https://docs.anaconda.com/anaconda/install/windows/

    Prepare for the model:

    Do a quick internet search to help you decide on the GPT model you want to use based on the available hardware resources you have.  Also, you may want to visit https://huggingface.co/models to check out the long list of available models. Here is a sample:

    • GPT-2 (this is a small and pretrained model by OpenAI)
    • GPT-Neo (this is a larger pretrained model than GPT-2 by Anthropic)
    • Llama (this is a much larger pretrained model by Meta AI)

    For this post, we will use GPT-Neo as it is not too small nor too large; and it could run comfortably with just one GPU (there are models which could run on PC’s without a GPU though keep in mind that the more compute resources you have, the better the performance).

    Installation:

    Let’s start the installation process of our chosen model (GPT-Neo):

    • Click on the Windows icon and run Anaconda as an Administrator
    • A command prompt screen will be opened
    • Create the directory where the local GPT will be installed:

                          mkdir c:\yaghiGPT

                cd c:\yaghiGPT

    Install the Prerequisites:

            c:\yaghiGPT > pip install transformers torch sentencepiece

              c:\yaghiGPT > pip install fastapi

              c:\yaghiGPT > pip install uvicorn

              c:\yaghiGPT > pip install torch

              c:\yaghiGPT > pip install tensorflow

              c:\yaghiGPT > pip install tensorboard

              c:\yaghiGPT > pip uninstall huggingface_hub

              c:\yaghiGPT > pip install transformers datasets

    Clone the GPT-Neo repository:

              c:\yaghiGPT > git clone https://github.com/EleutherAI/gpt-neo.git

    Create a virtual environment and activate it:

              c:\yaghiGPT > python  -m  venv  gpt-neo-env

                c:\yaghiGPT > Scripts\activate  gpt-neo-env/bin/activate

    Create Dataset:

    Gather all files which you plan to train the model on and use later.  Put all those files in a folder; let’s call it: gptDataSet

              c:\yaghiGPT >  mkdir  gptDataSet

    The dataset consists of PDF, Word, and PowerPoint files.  

    Process the dataset:

    The model will extract the text content from those files before using them for fine-tuning.  Let’s create the following script in a file called auto_process_files.py. This script automatically will process any file which gets added to the dataset folder:

    # auto_process_files.py

    import os

    import time

    import PyPDF2

    import docx

    from pptx import Presentation

    from watchdog.observers import Observer

    from watchdog.events import FileSystemEventHandler

    # Directory containing the files

    directory = "d:/yaghiDataSet"

    # Function to process PDF files

    def process_pdf(file_path):

        with open(file_path, 'rb') as file:

            reader = PyPDF2.PdfReader(file)

            num_pages = len(reader.pages)

            text = []

            for page_num in range(num_pages):

                page = reader.pages[page_num]

                text.append(page.extract_text())

            return "\n".join(text)

    # Function to process Word files

    def process_word(file_path):

        doc = docx.Document(file_path)

        text = []

        for paragraph in doc.paragraphs:

            text.append(paragraph.text)

        return "\n".join(text)

    # Function to process PowerPoint files

    def process_pptx(file_path):

        prs = Presentation(file_path)

        text = []

        for slide in prs.slides:

            for shape in slide.shapes:

                if hasattr(shape, "text"):

                    text.append(shape.text)

        return "\n".join(text)

    # Function to process a single file

    def process_file(file_path):

        filename = os.path.basename(file_path)

        if filename.endswith(".pdf"):

            print(f"Processing PDF: {filename}")

            pdf_text = process_pdf(file_path)

            print(f"Extracted text length: {len(pdf_text)} characters")

        elif filename.endswith(".docx"):

            print(f"Processing Word Document: {filename}")

            word_text = process_word(file_path)

            print(f"Extracted text length: {len(word_text)} characters")

        elif filename.endswith(".pptx"):

            print(f"Processing PowerPoint Presentation: {filename}")

            pptx_text = process_pptx(file_path)

            print(f"Extracted text length: {len(pptx_text)} characters")

        else:

            print(f"Unsupported file type: {filename}")

    # Custom event handler

    class FileHandler(FileSystemEventHandler):

        def on_created(self, event):

            if event.is_directory:

                return

            print(f"New file detected: {event.src_path}")

            process_file(event.src_path)

    # Process existing files

    def process_existing_files():

        print(f"Checking existing files in {directory}")

        for filename in os.listdir(directory):

            file_path = os.path.join(directory, filename)

            if os.path.isfile(file_path):

                process_file(file_path)

    # Set up the observer

    event_handler = FileHandler()

    observer = Observer()

    observer.schedule(event_handler, directory, recursive=False)

    print("Starting file processing script...")

    print(f"Monitoring directory: {directory}")

    try:

        observer.start()

        process_existing_files()

        print("Waiting for new files...")

        while True:

            time.sleep(1)

    except KeyboardInterrupt:

        print("Script interrupted by user.")

        observer.stop()

    finally:

        observer.join()

    run the script; it might take hours or minutes depending on the number of files your dataset contains:

              c:\yaghiGPT >  python  auto_process_files.py

    Train the model:

    Now we need to train the model on the local dataset, finetune it to improve its quality, then continuously monitor the performance.  Create train_finetune_monitor.py to contain the following script:

    # train_finetune_monitor.py

    import os

    import numpy as np

    import pickle

    from PyPDF2 import PdfReader

    from sentence_transformers import SentenceTransformer

    from transformers import pipeline, GPT2LMHeadModel, GPT2Tokenizer

    import tkinter as tk

    from tkinter import ttk, filedialog, scrolledtext

    import threading

    import logging

    logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

    class QASystem:

        def __init__(self, directory, index_file='index.pkl', embeddings_file='embeddings.pkl'):

            # ... (existing QASystem code) ...

        def finetune_model(self, train_dataset, eval_dataset, output_dir, num_epochs=3, batch_size=4, learning_rate=5e-5):

            model = GPT2LMHeadModel.from_pretrained("gpt-neo-2.7B")

            tokenizer = GPT2Tokenizer.from_pretrained("gpt-neo-2.7B")

            model.train()

            optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

            for epoch in range(num_epochs):

                train_loss = 0

                for batch in train_dataset:

                    optimizer.zero_grad()

                    input_ids = batch['input_ids'].to(device)

                    attention_mask = batch['attention_mask'].to(device)

                    labels = batch['input_ids'].to(device)

                    outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

                    loss = outputs.loss

                    loss.backward()

                    optimizer.step()

                    train_loss += loss.item()

                eval_loss = 0

                model.eval()

                for batch in eval_dataset:

                    input_ids = batch['input_ids'].to(device)

                    attention_mask = batch['attention_mask'].to(device)

                    labels = batch['input_ids'].to(device)

                    with torch.no_grad():

                        outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

                        eval_loss += outputs.loss.item()

                eval_loss /= len(eval_dataset)

                logging.info(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss/len(train_dataset)}, Eval Loss: {eval_loss}")

            os.makedirs(output_dir, exist_ok=True)

            model.save_pretrained(output_dir)

            tokenizer.save_pretrained(output_dir)

    class QAApp:

        def __init__(self, root, qa_system):

            # ... (existing QAApp code) ...

        def monitor_performance(self):

            # Add code to monitor the performance of the QA system

            # This could include metrics like accuracy, F1 score, etc.

            # and display them in the GUI or log them to a file

            pass

    if __:

        root = tk.Tk()

        qa_system = QASystem("data")

        app = QAApp(root, qa_system)

        # Finetune the model

        train_dataset, eval_dataset = load_datasets()

        qa_system.finetune_model(train_dataset, eval_dataset, "output")

        # Monitor the performance

        app.monitor_performance()

        root.mainloop()

    execute the script:

              c:\yaghiGPT >  python  train_finetune_monitor.py

    Load & Run:

    Now we’re ready to start interacting with the model.   Paste the following script into a file called load_gpt-neo.py

    # Add a follow_up method in the QAApp class to handle follow-up questions.

    # Store the context of the previous answer so that it can be used to generate more detailed responses for follow-up questions.

    # Add a "Follow-Up" button in the GUI and connect it to the follow_up method.

    import os

    import numpy as np

    import torch

    import re

    import pickle

    from PyPDF2 import PdfReader

    from docx import Document

    from pptx import Presentation

    from sentence_transformers import SentenceTransformer

    from transformers import AutoTokenizer, pipeline

    from faiss import IndexFlatL2

    import tkinter as tk

    from tkinter import ttk, filedialog, scrolledtext

    import speech_recognition as sr

    import pyttsx3

    class QASystem:

        def __init__(self, directory, index_file='index.pkl', embeddings_file='embeddings.pkl'):

            self.directory = directory

            self.index_file = index_file

            self.embeddings_file = embeddings_file

            if os.path.exists(self.index_file) and os.path.exists(self.embeddings_file):

                self.index, self.embedding_model, self.document_embeddings = self.load_index_and_embeddings()

            else:

                self.documents = self.load_documents()

                self.index, self.embedding_model, self.document_embeddings = self.create_index()

                self.save_index_and_embeddings()

            self.qa_pipeline = pipeline("question-answering", model="EleutherAI/gpt-neo-2.7B", tokenizer="EleutherAI/gpt-neo-2.7B")

            self.summarizer_pipeline = pipeline("summarization", model="EleutherAI/gpt-neo-2.7B", tokenizer="EleutherAI/gpt-neo-2.7B")

        def save_index_and_embeddings(self):

            with open(self.index_file, 'wb') as f:

                pickle.dump((self.index, self.embedding_model, self.document_embeddings), f)

            with open(self.embeddings_file, 'wb') as f:

                pickle.dump(self.document_embeddings, f)

        def load_index_and_embeddings(self):

            try:

                with open(self.index_file, 'rb') as f:

                    index, embedding_model, document_embeddings = pickle.load(f)

                with open(self.embeddings_file, 'rb') as f:

                    document_embeddings = pickle.load(f)

                return index, embedding_model, document_embeddings

            except AttributeError as e:

                print(f"Error loading index and embeddings: {e}")

                print("Recreating index and embeddings...")

                self.documents = self.load_documents()

                self.index, self.embedding_model, self.document_embeddings = self.create_index()

                self.save_index_and_embeddings()

                return self.index, self.embedding_model, self.document_embeddings

        def load_documents(self):

            documents = []

            files = os.listdir(self.directory)

            for filename in files:

                if filename.endswith('.pdf'):

                    filepath = os.path.join(self.directory, filename)

                    try:

                        with open(filepath, 'rb') as file:

                            pdf = PdfReader(file)

                            text = ''.join(page.extract_text() for page in pdf.pages)

                            documents.append((filename, text))

                    except Exception as e:

                        print(f"Error reading {filename}: {str(e)}")

                elif filename.endswith('.docx'):

                    filepath = os.path.join(self.directory, filename)

                    try:

                        doc = Document(filepath)

                        text = ' '.join(para.text for para in doc.paragraphs)

                        documents.append((filename, text))

                    except Exception as e:

                        print(f"Error reading {filename}: {str(e)}")

                elif filename.endswith('.pptx'):

                    filepath = os.path.join(self.directory, filename)

                    try:

                        ppt = Presentation(filepath)

                        text = ' '.join(shape.text for slide in ppt.slides for shape in slide.shapes if hasattr(shape, 'text'))

                        documents.append((filename, text))

                    except Exception as e:

                        print(f"Error reading {filename}: {str(e)}")

            return documents

        def create_index(self):

            embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

            document_embeddings = []

            for doc_id, text in self.documents:

                chunks = self.chunk_text(text)

                for chunk in chunks:

                    embedding = embedding_model.encode(chunk)

                    document_embeddings.append((embedding, doc_id, chunk))

            embeddings = np.array(for embed in document_embeddings])

            index = IndexFlatL2(embeddings.shape[1])

            index.add(embeddings)

            return index, embedding_model, document_embeddings

        def chunk_text(self, text, max_length=512):

            words = text.split()

            return [" ".join(words[i:i + max_length]) for i in range(0, len(words), max_length)]

        def extract_relevant_text(self, context, query, max_length=1000):

            words = context.split()

            if (len(words) <= max_length):

                return context

            query_words = set(query.lower().split())

            scores = []

            for i in range(len(words) - max_length + 1):

                chunk = ' '.join(words[i:i + max_length])

                score = sum(1 for word in query_words if word in chunk.lower())

                scores.append((score, i))

            best_start = max(scores, key=lambda x: x[0])[1]

            return ' '.join(words[best_start:best_start + max_length])

        def answer_question(self, question, contexts, max_length=2048, max_new_tokens=50):

            combined_context = " ".join(contexts)

            # Truncate the combined context to the maximum allowed length

            combined_context = combined_context[:max_length]

            result = self.qa_pipeline(question=question, context=combined_context, max_new_tokens=max_new_tokens)

            answer = result['answer']

            return answer

        def summarize_text(self, text, max_length=2048, max_new_tokens=50):

            text = text[:max_length]  # Ensure the text does not exceed the maximum length

            summary = self.summarizer_pipeline(text, max_new_tokens=max_new_tokens)

            return summary[0]['summary_text']

        def post_process_answer(self, answer):

            answer = answer.strip('.,;: \t\n').capitalize()

            if not answer.endswith(('.', '!', '?')):

                answer += '.'

            return answer

        def clean_answer(self, answer):

            # Add your cleaning logic here

            # For now, let's just return the answer as is

            return answer

        def get_answer(self, question, num_sources=3):

            question_embedding = self.embedding_model.encode(question)

            distances, indices = self.index.search(np.array([question_embedding]), num_sources)

            best_chunks = [self.document_embeddings[idx][2] for idx in indices[0]]

            relevant_docs = [self.document_embeddings[idx][1] for idx in indices[0]]

            relevant_texts = [self.extract_relevant_text(chunk, question) for chunk in best_chunks]

            answer = self.answer_question(question, relevant_texts)

            answer = self.clean_answer(answer)

            answer = self.post_process_answer(answer)

            if len(answer.split()) < 20:

                combined_text = " ".join(relevant_texts)

                summary = self.summarize_text(combined_text)

                if len(summary.split()) > len(answer.split()):

                    answer = summary

            return answer, relevant_docs, relevant_texts  # Return relevant_texts for follow-up

    class QAApp:

        def __init__(self, root, qa_system):

            self.qa_system = qa_system

            self.history = []  # Initialize history

            self.previous_context = []  # Store the context of the previous answer

            self.root = root

            self.root.title("Yaghi's QA System")

            self.root.geometry("800x600")

            self.style = ttk.Style()

            self.style.theme_use('clam')

            self.frame = ttk.Frame(root, padding="10 10 10 10")

            self.frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))

            self.input_label = ttk.Label(self.frame, text="Welcome to Yaghi's GPT. Please enter your question:", background="", foreground="#00695c", font=("Helvetica", 12, "bold"))

            self.input_label.grid(row=0, column=0, sticky=tk.W, pady=5)

            self.input_text = ttk.Entry(self.frame, width=70, font=("Helvetica", 10))

            self.input_text.grid(row=1, column=0, pady=5, padx=5)

            self.submit_button = ttk.Button(self.frame, text="Submit", command=self.get_response, style="TButton")

            self.submit_button.grid(row=1, column=1, padx=5)

            self.output_label = ttk.Label(self.frame, text="Response:", background="", foreground="#00695c", font=("Helvetica", 12, "bold"))

            self.output_label.grid(row=2, column=0, sticky=tk.W, pady=5)

            self.output_text = scrolledtext.ScrolledText(self.frame, wrap=tk.WORD, width=70, height=20, font=("Helvetica", 10), background="#ffffff")

            self.output_text.grid(row=3, column=0, pady=5, columnspan=2)

            # Follow-Up button

            self.follow_up_button = ttk.Button(self.frame, text="Follow-Up", command=self.follow_up, style="TButton")

            self.follow_up_button.grid(row=2, column=1, pady=5)

            for child in self.frame.winfo_children():

                child.grid_configure(padx=5, pady=5)

            self.root.columnconfigure(0, weight=1)

            self.root.rowconfigure(0, weight=1)

            # Add styles

            self.style.configure("TButton", foreground="#ffffff", background="#00796b", font=("Helvetica", 10, "bold"))

            self.style.map("TButton", background=[("active", "#004d40")])

            # Export button

            self.export_button = ttk.Button(self.frame, text="Export History", command=self.export_history, style="TButton")

            self.export_button.grid(row=4, column=0, pady=5)

            # Voice input button

            self.voice_button = ttk.Button(self.frame, text="Voice Input", command=self.voice_input, style="TButton")

            self.voice_button.grid(row=4, column=1, pady=5)

        def get_response(self):

            question = self.input_text.get()

            answer, sources, contexts = self.qa_system.get_answer(question)  # Get contexts for follow-up

    # Store the contexts for follow-up questions

            self.previous_context = contexts

            # Clear the current text

            self.output_text.delete(1.0, tk.END)

            # Insert the question in a different color

            self.output_text.insert(tk.END, "Question: ", "question")

            self.output_text.insert(tk.END, question + "\n", "question_text")

           # Insert the answer in a different color

            self.output_text.insert(tk.END, "Answer: ", "answer")

            self.output_text.insert(tk.END, answer + "\n", "answer_text")

           # Insert the sources

            self.output_text.insert(tk.END, "Sources: " + ", ".join(sources) + "\n\n")

            # Define tags for custom colors

            self.output_text.tag_config("question", foreground="#1e88e5", font=("Helvetica", 10, "bold"))

            self.output_text.tag_config("question_text", foreground="#0d47a1")

            self.output_text.tag_config("answer", foreground="#43a047", font=("Helvetica", 10, "bold"))

            self.output_text.tag_config("answer_text", foreground="#1b5e20")

            # Store the interaction in history

            self.history.append({

                "question": question,

                "answer": answer,

                "sources": sources

            })

        def follow_up(self):

            follow_up_question = self.input_text.get()

            if not self.previous_context:

                self.output_text.insert(tk.END, "No previous context available for follow-up.\n", "error")

                return

            # Debug: Log follow-up question and previous context

            print(f"Follow-up question: {follow_up_question}")

            print(f"Previous context: {self.previous_context}")

            # Use the previous context to generate more relevant context for the follow-up question

            combined_context = " ".join(self.previous_context)

            relevant_text = self.qa_system.extract_relevant_text(combined_context, follow_up_question)

            print(f"Relevant text for follow-up: {relevant_text}")

            # Get the answer using the relevant context

            answer = self.qa_system.answer_question(follow_up_question, [relevant_text])

            print(f"Answer for follow-up: {answer}")

            # Clear the current text

            self.output_text.delete(1.0, tk.END)

            # Insert the follow-up question in a different color

            self.output_text.insert(tk.END, "Follow-Up Question: ", "question")

            self.output_text.insert(tk.END, follow_up_question + "\n", "question_text")

           # Insert the answer in a different color

            self.output_text.insert(tk.END, "Answer: ", "answer")

            self.output_text.insert(tk.END, answer + "\n", "answer_text")

           # Insert the sources

            self.output_text.insert(tk.END, "Sources: " + ", ".join(self.qa_system.get_answer(follow_up_question)[1]) + "\n\n")

            # Define tags for custom colors

            self.output_text.tag_config("question", foreground="#1e88e5", font=("Helvetica", 10, "bold"))

            self.output_text.tag_config("question_text", foreground="#0d47a1")

            self.output_text.tag_config("answer", foreground="#43a047", font=("Helvetica", 10, "bold"))

            self.output_text.tag_config("answer_text", foreground="#1b5e20")

            # Store the interaction in history

            self.history.append({

                "question": follow_up_question,

                "answer": answer,

                "sources": self.qa_system.get_answer(follow_up_question)[1]

            })

        def export_history(self):

            file_path = filedialog.asksaveasfilename(defaultextension=".txt", filetypes=[("Text files", "*.txt")])

            if file_path:

                with open(file_path, 'w') as file:

                    for entry in self.history:

                        file.write(f"Question: {entry['question']}\n")

                        file.write(f"Answer: {entry['answer']}\n")

                        file.write(f"Sources: {', '.join(entry['sources'])}\n\n")

        def voice_input(self):

            recognizer = sr.Recognizer()

            with sr.Microphone() as source:

                self.output_text.insert(tk.END, "Listening...\n")

                audio = recognizer.listen(source)

                try:

                    question = recognizer.recognize_google(audio)

                    self.input_text.delete(0, tk.END)

                    self.input_text.insert(0, question)

                    self.get_response()

                except sr.UnknownValueError:

                    self.output_text.insert(tk.END, "Could not understand audio\n")

                except sr.RequestError as e:

                    self.output_text.insert(tk.END, f"Could not request results; {e}\n")

    if __name__ == "__main__":

        root = tk.Tk()

        qa_system = QASystem(directory="d:/gptDataSet")

        app = QAApp(root, qa_system)

        root.mainloop()

    Execute the code:

                c:\yaghiGPT >  python  load_gpt-neo.py

     

    Maintenance:

    Hopefully now the model runs without any missing libraries.  It should create a window for interactivity with the model.  Ask the model questions for answers from the dataset files.  Keep retouching the parameters to improve the training and retrieval processes.

    Also, I made good use of my poe.com subscription. It gave me access to Claude-3.5-Sonnet and GPT-4o (of course in addition to many other models). I used these two amazing models to get help with error messages and to double check my scripts. I can’t say which one was better, because their responses or performance was inconsistent, so I flipped between them. Further, to avoid getting undesired replies, when writing a prompt, I would start with a background phrase: “I am installing a local private GPT on my Windows PC. Please …….”. These models, could rewrite/fix a script, could explain what a script is doing, and could guide you as well.

     

    Disclaimer: “This blog post was researched and written with the assistance of artificial intelligence tools.”