How to Set Up a Private GPT: Step by Step

By:  Husam Yaghi

A local GPT model refers to having an AI model (Large Language Model) like GPT-3 installed and running directly on your own personal computer (Mac or Windows) or a local server.  As stated in a previous post, installing a local GPT offers numerous benefits, including enhanced privacy, independence from the internet, cost efficiency, and customization options.

Here we provide you with all the steps we took to have a functioning GPT installed locally to operate on local files in a secure private environment.

Install Python:

Download and install the latest version of Python from the official website

https://www.python.org/downloads/windows/

Make sure to select the option to add Python to your system’s PATH during the installation.

Install Git:

Download and install Git for Windows https://git-scm.com/download/win

Install Anaconda:

Because we will be running a lot of Python scripts, Anaconda is a great replacement to the default Windows Command Prompt.   https://docs.anaconda.com/anaconda/install/windows/

Prepare for the model:

Do a quick internet search to help you decide on the GPT model you want to use based on the available hardware resources you have.  Also, you may want to visit https://huggingface.co/models to check out the long list of available models. Here is a sample:

  • GPT-2 (this is a small and pretrained model by OpenAI)
  • GPT-Neo (this is a larger pretrained model than GPT-2 by Anthropic)
  • Llama (this is a much larger pretrained model by Meta AI)

For this post, we will use GPT-Neo as it is not too small nor too large; and it could run comfortably with just one GPU (there are models which could run on PC’s without a GPU though keep in mind that the more compute resources you have, the better the performance).

Installation:

Let’s start the installation process of our chosen model (GPT-Neo):

  • Click on the Windows icon and run Anaconda as an Administrator
  • A command prompt screen will be opened
  • Create the directory where the local GPT will be installed:

                      mkdir c:\yaghiGPT

            cd c:\yaghiGPT

Install the Prerequisites:

        c:\yaghiGPT > pip install transformers torch sentencepiece

          c:\yaghiGPT > pip install fastapi

          c:\yaghiGPT > pip install uvicorn

          c:\yaghiGPT > pip install torch

          c:\yaghiGPT > pip install tensorflow

          c:\yaghiGPT > pip install tensorboard

          c:\yaghiGPT > pip uninstall huggingface_hub

          c:\yaghiGPT > pip install transformers datasets

Clone the GPT-Neo repository:

          c:\yaghiGPT > git clone https://github.com/EleutherAI/gpt-neo.git

Create a virtual environment and activate it:

          c:\yaghiGPT > python  -m  venv  gpt-neo-env

            c:\yaghiGPT > Scripts\activate  gpt-neo-env/bin/activate

Create Dataset:

Gather all files which you plan to train the model on and use later.  Put all those files in a folder; let’s call it: gptDataSet

          c:\yaghiGPT >  mkdir  gptDataSet

The dataset consists of PDF, Word, and PowerPoint files.  

Process the dataset:

The model will extract the text content from those files before using them for fine-tuning.  Let’s create the following script in a file called auto_process_files.py. This script automatically will process any file which gets added to the dataset folder:

# auto_process_files.py

import os

import time

import PyPDF2

import docx

from pptx import Presentation

from watchdog.observers import Observer

from watchdog.events import FileSystemEventHandler

# Directory containing the files

directory = "d:/yaghiDataSet"

# Function to process PDF files

def process_pdf(file_path):

    with open(file_path, 'rb') as file:

        reader = PyPDF2.PdfReader(file)

        num_pages = len(reader.pages)

        text = []

        for page_num in range(num_pages):

            page = reader.pages[page_num]

            text.append(page.extract_text())

        return "\n".join(text)

# Function to process Word files

def process_word(file_path):

    doc = docx.Document(file_path)

    text = []

    for paragraph in doc.paragraphs:

        text.append(paragraph.text)

    return "\n".join(text)

# Function to process PowerPoint files

def process_pptx(file_path):

    prs = Presentation(file_path)

    text = []

    for slide in prs.slides:

        for shape in slide.shapes:

            if hasattr(shape, "text"):

                text.append(shape.text)

    return "\n".join(text)

# Function to process a single file

def process_file(file_path):

    filename = os.path.basename(file_path)

    if filename.endswith(".pdf"):

        print(f"Processing PDF: {filename}")

        pdf_text = process_pdf(file_path)

        print(f"Extracted text length: {len(pdf_text)} characters")

    elif filename.endswith(".docx"):

        print(f"Processing Word Document: {filename}")

        word_text = process_word(file_path)

        print(f"Extracted text length: {len(word_text)} characters")

    elif filename.endswith(".pptx"):

        print(f"Processing PowerPoint Presentation: {filename}")

        pptx_text = process_pptx(file_path)

        print(f"Extracted text length: {len(pptx_text)} characters")

    else:

        print(f"Unsupported file type: {filename}")

# Custom event handler

class FileHandler(FileSystemEventHandler):

    def on_created(self, event):

        if event.is_directory:

            return

        print(f"New file detected: {event.src_path}")

        process_file(event.src_path)

# Process existing files

def process_existing_files():

    print(f"Checking existing files in {directory}")

    for filename in os.listdir(directory):

        file_path = os.path.join(directory, filename)

        if os.path.isfile(file_path):

            process_file(file_path)

# Set up the observer

event_handler = FileHandler()

observer = Observer()

observer.schedule(event_handler, directory, recursive=False)

print("Starting file processing script...")

print(f"Monitoring directory: {directory}")

try:

    observer.start()

    process_existing_files()

    print("Waiting for new files...")

    while True:

        time.sleep(1)

except KeyboardInterrupt:

    print("Script interrupted by user.")

    observer.stop()

finally:

    observer.join()

run the script; it might take hours or minutes depending on the number of files your dataset contains:

          c:\yaghiGPT >  python  auto_process_files.py

Train the model:

Now we need to train the model on the local dataset, finetune it to improve its quality, then continuously monitor the performance.  Create train_finetune_monitor.py to contain the following script:

# train_finetune_monitor.py

import os

import numpy as np

import pickle

from PyPDF2 import PdfReader

from sentence_transformers import SentenceTransformer

from transformers import pipeline, GPT2LMHeadModel, GPT2Tokenizer

import tkinter as tk

from tkinter import ttk, filedialog, scrolledtext

import threading

import logging

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class QASystem:

    def __init__(self, directory, index_file='index.pkl', embeddings_file='embeddings.pkl'):

        # ... (existing QASystem code) ...

    def finetune_model(self, train_dataset, eval_dataset, output_dir, num_epochs=3, batch_size=4, learning_rate=5e-5):

        model = GPT2LMHeadModel.from_pretrained("gpt-neo-2.7B")

        tokenizer = GPT2Tokenizer.from_pretrained("gpt-neo-2.7B")

        model.train()

        optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)

        for epoch in range(num_epochs):

            train_loss = 0

            for batch in train_dataset:

                optimizer.zero_grad()

                input_ids = batch['input_ids'].to(device)

                attention_mask = batch['attention_mask'].to(device)

                labels = batch['input_ids'].to(device)

                outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

                loss = outputs.loss

                loss.backward()

                optimizer.step()

                train_loss += loss.item()

            eval_loss = 0

            model.eval()

            for batch in eval_dataset:

                input_ids = batch['input_ids'].to(device)

                attention_mask = batch['attention_mask'].to(device)

                labels = batch['input_ids'].to(device)

                with torch.no_grad():

                    outputs = model(input_ids, attention_mask=attention_mask, labels=labels)

                    eval_loss += outputs.loss.item()

            eval_loss /= len(eval_dataset)

            logging.info(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss/len(train_dataset)}, Eval Loss: {eval_loss}")

        os.makedirs(output_dir, exist_ok=True)

        model.save_pretrained(output_dir)

        tokenizer.save_pretrained(output_dir)

class QAApp:

    def __init__(self, root, qa_system):

        # ... (existing QAApp code) ...

    def monitor_performance(self):

        # Add code to monitor the performance of the QA system

        # This could include metrics like accuracy, F1 score, etc.

        # and display them in the GUI or log them to a file

        pass

if __:

    root = tk.Tk()

    qa_system = QASystem("data")

    app = QAApp(root, qa_system)

    # Finetune the model

    train_dataset, eval_dataset = load_datasets()

    qa_system.finetune_model(train_dataset, eval_dataset, "output")

    # Monitor the performance

    app.monitor_performance()

    root.mainloop()

execute the script:

          c:\yaghiGPT >  python  train_finetune_monitor.py

Load & Run:

Now we’re ready to start interacting with the model.   Paste the following script into a file called load_gpt-neo.py

# Add a follow_up method in the QAApp class to handle follow-up questions.

# Store the context of the previous answer so that it can be used to generate more detailed responses for follow-up questions.

# Add a "Follow-Up" button in the GUI and connect it to the follow_up method.

import os

import numpy as np

import torch

import re

import pickle

from PyPDF2 import PdfReader

from docx import Document

from pptx import Presentation

from sentence_transformers import SentenceTransformer

from transformers import AutoTokenizer, pipeline

from faiss import IndexFlatL2

import tkinter as tk

from tkinter import ttk, filedialog, scrolledtext

import speech_recognition as sr

import pyttsx3

class QASystem:

    def __init__(self, directory, index_file='index.pkl', embeddings_file='embeddings.pkl'):

        self.directory = directory

        self.index_file = index_file

        self.embeddings_file = embeddings_file

        if os.path.exists(self.index_file) and os.path.exists(self.embeddings_file):

            self.index, self.embedding_model, self.document_embeddings = self.load_index_and_embeddings()

        else:

            self.documents = self.load_documents()

            self.index, self.embedding_model, self.document_embeddings = self.create_index()

            self.save_index_and_embeddings()

        self.qa_pipeline = pipeline("question-answering", model="EleutherAI/gpt-neo-2.7B", tokenizer="EleutherAI/gpt-neo-2.7B")

        self.summarizer_pipeline = pipeline("summarization", model="EleutherAI/gpt-neo-2.7B", tokenizer="EleutherAI/gpt-neo-2.7B")

    def save_index_and_embeddings(self):

        with open(self.index_file, 'wb') as f:

            pickle.dump((self.index, self.embedding_model, self.document_embeddings), f)

        with open(self.embeddings_file, 'wb') as f:

            pickle.dump(self.document_embeddings, f)

    def load_index_and_embeddings(self):

        try:

            with open(self.index_file, 'rb') as f:

                index, embedding_model, document_embeddings = pickle.load(f)

            with open(self.embeddings_file, 'rb') as f:

                document_embeddings = pickle.load(f)

            return index, embedding_model, document_embeddings

        except AttributeError as e:

            print(f"Error loading index and embeddings: {e}")

            print("Recreating index and embeddings...")

            self.documents = self.load_documents()

            self.index, self.embedding_model, self.document_embeddings = self.create_index()

            self.save_index_and_embeddings()

            return self.index, self.embedding_model, self.document_embeddings

    def load_documents(self):

        documents = []

        files = os.listdir(self.directory)

        for filename in files:

            if filename.endswith('.pdf'):

                filepath = os.path.join(self.directory, filename)

                try:

                    with open(filepath, 'rb') as file:

                        pdf = PdfReader(file)

                        text = ''.join(page.extract_text() for page in pdf.pages)

                        documents.append((filename, text))

                except Exception as e:

                    print(f"Error reading {filename}: {str(e)}")

            elif filename.endswith('.docx'):

                filepath = os.path.join(self.directory, filename)

                try:

                    doc = Document(filepath)

                    text = ' '.join(para.text for para in doc.paragraphs)

                    documents.append((filename, text))

                except Exception as e:

                    print(f"Error reading {filename}: {str(e)}")

            elif filename.endswith('.pptx'):

                filepath = os.path.join(self.directory, filename)

                try:

                    ppt = Presentation(filepath)

                    text = ' '.join(shape.text for slide in ppt.slides for shape in slide.shapes if hasattr(shape, 'text'))

                    documents.append((filename, text))

                except Exception as e:

                    print(f"Error reading {filename}: {str(e)}")

        return documents

    def create_index(self):

        embedding_model = SentenceTransformer('all-MiniLM-L6-v2')

        document_embeddings = []

        for doc_id, text in self.documents:

            chunks = self.chunk_text(text)

            for chunk in chunks:

                embedding = embedding_model.encode(chunk)

                document_embeddings.append((embedding, doc_id, chunk))

        embeddings = np.array(for embed in document_embeddings])

        index = IndexFlatL2(embeddings.shape[1])

        index.add(embeddings)

        return index, embedding_model, document_embeddings

    def chunk_text(self, text, max_length=512):

        words = text.split()

        return [" ".join(words[i:i + max_length]) for i in range(0, len(words), max_length)]

    def extract_relevant_text(self, context, query, max_length=1000):

        words = context.split()

        if (len(words) <= max_length):

            return context

        query_words = set(query.lower().split())

        scores = []

        for i in range(len(words) - max_length + 1):

            chunk = ' '.join(words[i:i + max_length])

            score = sum(1 for word in query_words if word in chunk.lower())

            scores.append((score, i))

        best_start = max(scores, key=lambda x: x[0])[1]

        return ' '.join(words[best_start:best_start + max_length])

    def answer_question(self, question, contexts, max_length=2048, max_new_tokens=50):

        combined_context = " ".join(contexts)

        # Truncate the combined context to the maximum allowed length

        combined_context = combined_context[:max_length]

        result = self.qa_pipeline(question=question, context=combined_context, max_new_tokens=max_new_tokens)

        answer = result['answer']

        return answer

    def summarize_text(self, text, max_length=2048, max_new_tokens=50):

        text = text[:max_length]  # Ensure the text does not exceed the maximum length

        summary = self.summarizer_pipeline(text, max_new_tokens=max_new_tokens)

        return summary[0]['summary_text']

    def post_process_answer(self, answer):

        answer = answer.strip('.,;: \t\n').capitalize()

        if not answer.endswith(('.', '!', '?')):

            answer += '.'

        return answer

    def clean_answer(self, answer):

        # Add your cleaning logic here

        # For now, let's just return the answer as is

        return answer

    def get_answer(self, question, num_sources=3):

        question_embedding = self.embedding_model.encode(question)

        distances, indices = self.index.search(np.array([question_embedding]), num_sources)

        best_chunks = [self.document_embeddings[idx][2] for idx in indices[0]]

        relevant_docs = [self.document_embeddings[idx][1] for idx in indices[0]]

        relevant_texts = [self.extract_relevant_text(chunk, question) for chunk in best_chunks]

        answer = self.answer_question(question, relevant_texts)

        answer = self.clean_answer(answer)

        answer = self.post_process_answer(answer)

        if len(answer.split()) < 20:

            combined_text = " ".join(relevant_texts)

            summary = self.summarize_text(combined_text)

            if len(summary.split()) > len(answer.split()):

                answer = summary

        return answer, relevant_docs, relevant_texts  # Return relevant_texts for follow-up

class QAApp:

    def __init__(self, root, qa_system):

        self.qa_system = qa_system

        self.history = []  # Initialize history

        self.previous_context = []  # Store the context of the previous answer

        self.root = root

        self.root.title("Yaghi's QA System")

        self.root.geometry("800x600")

        self.style = ttk.Style()

        self.style.theme_use('clam')

        self.frame = ttk.Frame(root, padding="10 10 10 10")

        self.frame.grid(row=0, column=0, sticky=(tk.W, tk.E, tk.N, tk.S))

        self.input_label = ttk.Label(self.frame, text="Welcome to Yaghi's GPT. Please enter your question:", background="", foreground="#00695c", font=("Helvetica", 12, "bold"))

        self.input_label.grid(row=0, column=0, sticky=tk.W, pady=5)

        self.input_text = ttk.Entry(self.frame, width=70, font=("Helvetica", 10))

        self.input_text.grid(row=1, column=0, pady=5, padx=5)

        self.submit_button = ttk.Button(self.frame, text="Submit", command=self.get_response, style="TButton")

        self.submit_button.grid(row=1, column=1, padx=5)

        self.output_label = ttk.Label(self.frame, text="Response:", background="", foreground="#00695c", font=("Helvetica", 12, "bold"))

        self.output_label.grid(row=2, column=0, sticky=tk.W, pady=5)

        self.output_text = scrolledtext.ScrolledText(self.frame, wrap=tk.WORD, width=70, height=20, font=("Helvetica", 10), background="#ffffff")

        self.output_text.grid(row=3, column=0, pady=5, columnspan=2)

        # Follow-Up button

        self.follow_up_button = ttk.Button(self.frame, text="Follow-Up", command=self.follow_up, style="TButton")

        self.follow_up_button.grid(row=2, column=1, pady=5)

        for child in self.frame.winfo_children():

            child.grid_configure(padx=5, pady=5)

        self.root.columnconfigure(0, weight=1)

        self.root.rowconfigure(0, weight=1)

        # Add styles

        self.style.configure("TButton", foreground="#ffffff", background="#00796b", font=("Helvetica", 10, "bold"))

        self.style.map("TButton", background=[("active", "#004d40")])

        # Export button

        self.export_button = ttk.Button(self.frame, text="Export History", command=self.export_history, style="TButton")

        self.export_button.grid(row=4, column=0, pady=5)

        # Voice input button

        self.voice_button = ttk.Button(self.frame, text="Voice Input", command=self.voice_input, style="TButton")

        self.voice_button.grid(row=4, column=1, pady=5)

    def get_response(self):

        question = self.input_text.get()

        answer, sources, contexts = self.qa_system.get_answer(question)  # Get contexts for follow-up

# Store the contexts for follow-up questions

        self.previous_context = contexts

        # Clear the current text

        self.output_text.delete(1.0, tk.END)

        # Insert the question in a different color

        self.output_text.insert(tk.END, "Question: ", "question")

        self.output_text.insert(tk.END, question + "\n", "question_text")

       # Insert the answer in a different color

        self.output_text.insert(tk.END, "Answer: ", "answer")

        self.output_text.insert(tk.END, answer + "\n", "answer_text")

       # Insert the sources

        self.output_text.insert(tk.END, "Sources: " + ", ".join(sources) + "\n\n")

        # Define tags for custom colors

        self.output_text.tag_config("question", foreground="#1e88e5", font=("Helvetica", 10, "bold"))

        self.output_text.tag_config("question_text", foreground="#0d47a1")

        self.output_text.tag_config("answer", foreground="#43a047", font=("Helvetica", 10, "bold"))

        self.output_text.tag_config("answer_text", foreground="#1b5e20")

        # Store the interaction in history

        self.history.append({

            "question": question,

            "answer": answer,

            "sources": sources

        })

    def follow_up(self):

        follow_up_question = self.input_text.get()

        if not self.previous_context:

            self.output_text.insert(tk.END, "No previous context available for follow-up.\n", "error")

            return

        # Debug: Log follow-up question and previous context

        print(f"Follow-up question: {follow_up_question}")

        print(f"Previous context: {self.previous_context}")

        # Use the previous context to generate more relevant context for the follow-up question

        combined_context = " ".join(self.previous_context)

        relevant_text = self.qa_system.extract_relevant_text(combined_context, follow_up_question)

        print(f"Relevant text for follow-up: {relevant_text}")

        # Get the answer using the relevant context

        answer = self.qa_system.answer_question(follow_up_question, [relevant_text])

        print(f"Answer for follow-up: {answer}")

        # Clear the current text

        self.output_text.delete(1.0, tk.END)

        # Insert the follow-up question in a different color

        self.output_text.insert(tk.END, "Follow-Up Question: ", "question")

        self.output_text.insert(tk.END, follow_up_question + "\n", "question_text")

       # Insert the answer in a different color

        self.output_text.insert(tk.END, "Answer: ", "answer")

        self.output_text.insert(tk.END, answer + "\n", "answer_text")

       # Insert the sources

        self.output_text.insert(tk.END, "Sources: " + ", ".join(self.qa_system.get_answer(follow_up_question)[1]) + "\n\n")

        # Define tags for custom colors

        self.output_text.tag_config("question", foreground="#1e88e5", font=("Helvetica", 10, "bold"))

        self.output_text.tag_config("question_text", foreground="#0d47a1")

        self.output_text.tag_config("answer", foreground="#43a047", font=("Helvetica", 10, "bold"))

        self.output_text.tag_config("answer_text", foreground="#1b5e20")

        # Store the interaction in history

        self.history.append({

            "question": follow_up_question,

            "answer": answer,

            "sources": self.qa_system.get_answer(follow_up_question)[1]

        })

    def export_history(self):

        file_path = filedialog.asksaveasfilename(defaultextension=".txt", filetypes=[("Text files", "*.txt")])

        if file_path:

            with open(file_path, 'w') as file:

                for entry in self.history:

                    file.write(f"Question: {entry['question']}\n")

                    file.write(f"Answer: {entry['answer']}\n")

                    file.write(f"Sources: {', '.join(entry['sources'])}\n\n")

    def voice_input(self):

        recognizer = sr.Recognizer()

        with sr.Microphone() as source:

            self.output_text.insert(tk.END, "Listening...\n")

            audio = recognizer.listen(source)

            try:

                question = recognizer.recognize_google(audio)

                self.input_text.delete(0, tk.END)

                self.input_text.insert(0, question)

                self.get_response()

            except sr.UnknownValueError:

                self.output_text.insert(tk.END, "Could not understand audio\n")

            except sr.RequestError as e:

                self.output_text.insert(tk.END, f"Could not request results; {e}\n")

if __name__ == "__main__":

    root = tk.Tk()

    qa_system = QASystem(directory="d:/gptDataSet")

    app = QAApp(root, qa_system)

    root.mainloop()

Execute the code:

            c:\yaghiGPT >  python  load_gpt-neo.py

 

Maintenance:

Hopefully now the model runs without any missing libraries.  It should create a window for interactivity with the model.  Ask the model questions for answers from the dataset files.  Keep retouching the parameters to improve the training and retrieval processes.

Also, I made good use of my poe.com subscription. It gave me access to Claude-3.5-Sonnet and GPT-4o (of course in addition to many other models). I used these two amazing models to get help with error messages and to double check my scripts. I can’t say which one was better, because their responses or performance was inconsistent, so I flipped between them. Further, to avoid getting undesired replies, when writing a prompt, I would start with a background phrase: “I am installing a local private GPT on my Windows PC. Please …….”. These models, could rewrite/fix a script, could explain what a script is doing, and could guide you as well.

 

Disclaimer: “This blog post was researched and written with the assistance of artificial intelligence tools.”