pythonlangchainlarge-language-modelpython-embeddingrag

How to develop a Generalized RAG Pipeline for Text, Images, and Structured Data


I'm trying to find a general solution for RAG to solve problems involving both text, images, chart, tables,.., they are in many different formats such as .docx, .xlsx, .pdf.

The requirement for the answer:

The features of the documents:

Below are some documents I have read but feel I don't fully understand, I'm not sure how it can help me.

I want to be able to outline a pipeline to answer questions according to the requirements of my system. Any help would be greatly appreciated!

System:


Solution

  • Here is a sample of the code you will need to implement a RAG-FUSION. You would have to structure your requirements with this code, this serves as a guide for json files, you can implement others such as pdf, images following the same procedure.

    def determine_extension(file):
        if file.endswith(".jpg", ".png"):
           send_image_to_rag_classifier(file)
        elif ...
        else ...
    
    
    """ Implement the RAG fusion using the langchain library"""
    
    import asyncio
    import json
    import logging
    import os
    import pathlib as path
    from operator import itemgetter
    from typing import Any
    
    from dotenv import find_dotenv, load_dotenv
    from langchain.prompts import ChatPromptTemplate
    from langchain_community.vectorstores import chroma
    from langchain_core.documents import Document
    from langchain_core.output_parsers import StrOutputParser
    from langchain_core.vectorstores import VectorStoreRetriever
    from langchain_openai import ChatOpenAI, OpenAIEmbeddings
    from langchain_text_splitters import RecursiveJsonSplitter
    
    logger = logging.getLogger(__name__)
    
    
    # read OPENAI_API_KEY from the environment
    load_dotenv(find_dotenv())
    
    # Define a prompt for the RAG model
    SYSTEM_PROMPT = """ 
                      Your prompt
                    """
    
    
    # recursive pass data in the retriever_data
    def collect_data_files(filepath: path) -> list:
        """Walk through the file path and collect the files
    
        Args: filepath: The file path to be walked through
    
        Returns:
        list: List of files
        """
        
        return store_file
    
    
    # Create a recursive json splitter to split the data into chunks
    def retrieve_data(data) -> list[chroma.Document]:
        """
        Retrieve the data from the file
    
        Args: data: The data to be retrieved
    
        Returns: list: List of documents
        """
    
        docs = collect_data_files(data)
    
        for file in docs:
            with open(file, "r") as f:
                data = json.loads(f.read())
                # Split the data into chunks
                splitter = RecursiveJsonSplitter(max_chunk_size=300)
    
                # create documents from the vector database
                documents = splitter.create_documents(texts=data, convert_lists=True)
    
        return documents
    
    
    # vectorstore database from chroma
    def vectorstore_db(data) -> VectorStoreRetriever:
        """
        Create a vectorstore database from the data
        Args: data: The data to be indexed
    
        Returns: VectorStoreRetriever: The vectorstore retriever
        """
    
    
        return vector_retriever
    
    
    # create a function to generate queries from the RAG model
    def get_unique_union_of_documents(docs: list[list]) -> list[Any]:
        """
        Get the unique union of the documents
        Args:
        docs: The documents to be processed
    
        Returns:
        list: The unique union of the documents"""
    
        return [json.loads(doc) for doc in unique_union]
    
    
    # RAG FUSION
    class RAGFusion:
        """
        Implement the RAG fusion
        Args:
        data: The data to be used for the RAG fusion
        """
    
        def __init__(self, data) -> None:
    
            self.data = data
    
        def __call__(self, question: str) -> str:
            """
            Implement the RAG fusion
            Args:
            question: The question to be answered
    
            Returns:
            str: The answer to the question
            """
    
            try:
                # create a retrieval chain
                prompt_for_rag_fusion = ChatPromptTemplate.from_template(SYSTEM_PROMPT)
    
                generate_query = (
                    prompt_for_rag_fusion
                    | ChatOpenAI(temperature=0.5, max_tokens=4096)
                    | StrOutputParser()
                    | (lambda x: x.split("\n"))
                )
    
                vb = vectorstore_db(self.data)
    
                # create a retrieval chain
                retrieval_chain = generate_query | vb.map() | get_unique_union_of_documents
    
                chat_template = """
                            Answer the following questions{question} \n
                            Based on the data and context provided {context} \n
                            Question: {question} \n
                        """
    
                # get the chat prompt template
                prompt = ChatPromptTemplate.from_template(chat_template)
    
                # use this llm
                llm = ChatOpenAI(temperature=0.5, max_tokens=4096)
    
                # implement the final rag fusion
                final_rag_fusion = (
                    {"context": retrieval_chain, "question": itemgetter("question")}
                    | prompt
                    | llm
                    | StrOutputParser()
                )
    
                return final_rag_fusion.invoke({"question": question})
            except Exception as e:
                logger.error(f"An error occurred: {e}")