pythonmultithreadingtkinterasynchronousthread-safety

How do I return the response when working with multiple threads in Python?


How do I get a response when I call

response = gemini_pdf(message) I want response to contain the final output. I’m stuck here.

Here is my code:

def read_pdf(file_path):
    global file_content
    global file_name
 
    
    # Read the contents of the PDF file
    pdfreader = PdfReader(file_path)
    from typing_extensions import Concatenate
    # read text from pdf
    raw_text = ''
    try:
        for i, page in enumerate(pdfreader.pages):
            content = page.extract_text()
            if content:
                raw_text += content

        print(raw_text)
        text_splitter = CharacterTextSplitter(
            separator = "\n",
            chunk_size = 800,
            chunk_overlap  = 200,
            length_function = len,
    )
        new_content = text_splitter.split_text(raw_text)
        len(new_content)
        embeddings = GoogleGenerativeAIEmbeddings(model = "models/embedding-001")
        file_content = FAISS.from_texts(new_content, embeddings)
        

    except TypeError as e:
        
        print("Sorry, an error occurred while processing your request.", f"{e}")
        pass
    except FileNotFoundError as e:
        
        print("Sorry, an error occurred while processing your request.", f"{e}")
        pass
    except Exception as e:
        print("Sorry, an error occurred while processing your request.", f"{e}")
        tk.messagebox.showwarning("An error occurred", f"{e}")
        
        return
    
def answer_ftom_pdf(query):
    global file_content
    
    model = ChatGoogleGenerativeAI(model="gemini-1.5-flash-latest",
                            temperature=0.3)

    #prompt = PromptTemplate(template = prompt_template, input_variables = ["context", "question"])
    docs = file_content.similarity_search(query)
    chain = load_qa_chain(model, chain_type="stuff")
    if file_content != None:
        response = chain.run(input_documents=docs, question=query)
        print(response)
        response = response.strip()
        print("Here")
        return response
    if file_content  == None:
        print("Content is none")


def gemini_pdf(query=None):
    global file_content
    global file_name


    if file_content is None:
        # Open a file dialog to select the PDF file
        file_path = filedialog.askopenfilename(filetypes=[("PDF files", "*.pdf")])
        file_name = os.path.basename(file_path)
        if file_name:
            threading.Thread(target=read_pdf, args=(file_path,)).start()

                
    elif file_content:
      
        print("Same PDF")
        threading.Thread(target=answer_ftom_pdf, args=(query,)).start()

How can I ensure response contains the final output when calling gemini_pdf(message)


Solution

  • You could use a global variable that will be used to store the result from the thread and you periodically check this variable. But the way I would get back a result from a worker function running in a different thread would be to pass to the function a queue instance (in your case you could just use a queue.SimpleQueue instance) to which it will put the result. The main (i.e. calling) thread simply blocks on a get call made on the queue to await the result as follows:

    A Blocking Approach

    from threading import Thread
    from queue import SimpleQueue
    
    def worker(x, response_queue):
        result = x * x
        response_queue.put(result)
    
    def main():
        response_queue = SimpleQueue()
        Thread(target=worker, args=(7, response_queue)).start()
        result = response_queue.get()  # Wait for the result
        print(result)
    
    if __name__ == '__main__':
        main()
    

    Prints:

    49
    

    So your gemini_pdf function should create a queue.SimpleQueue instance, which it will pass to the thread worker functions read_pdf and answer_ftom_pdf (sic) modified to expect this additional argument. The main thread will then call get on the queue instance to retrieve the result.

    Non-blocking Approaches

    If you do not want to indefinitely block waiting for the result and would rather periodically test for completion, one way would be to specify a timeout value on the call to get, e.g. response_queue.get(timeout=.1). Then you will be blocking, but only for up to .1 seconds, waiting for the result to be returned. If nothing has been placed on the queue by time the timeout value has expired, a queue.Empty exception will be raised, which you need to catch. The above code rewritten to not indefinitely block would be:

    from threading import Thread
    from queue import SimpleQueue, Empty
    
    def worker(x, response_queue):
        result = x * x
        response_queue.put(result)
    
    def main():
        response_queue = SimpleQueue()
        Thread(target=worker, args=(7, response_queue)).start()
        while True:
            try:
                result = response_queue.get(timeout=.1)
            except Empty:
                pass
            else:
                break
        print(result)
    
    if __name__ == '__main__':
        main()
    

    If you do not which to block at all, you can periodically check to see if a result is on the queue by checking its size:

    from threading import Thread
    from queue import SimpleQueue
    
    def worker(x, response_queue):
        result = x * x
        response_queue.put(result)
    
    def main():
        response_queue = SimpleQueue()
        Thread(target=worker, args=(7, response_queue)).start()
        # You would not in actuality be calling qsize in a loop like the
        # following, but rather based on your requirement periodically checking the
        # queue's size:
        while response_queue.qsize() == 0:
            ...
        result = response_queue.get()
        print(result)
    
    if __name__ == '__main__':
        main()