I'm trying to find a general solution for RAG to solve problems involving both text, images, chart, tables,.., they are in many different formats such as .docx, .xlsx, .pdf.
The requirement for the answer:
The features of the documents:
Below are some documents I have read but feel I don't fully understand, I'm not sure how it can help me.
I want to be able to outline a pipeline to answer questions according to the requirements of my system. Any help would be greatly appreciated!
System:
Here is a sample of the code you will need to implement a RAG-FUSION. You would have to structure your requirements with this code, this serves as a guide for json files, you can implement others such as pdf, images following the same procedure.
def determine_extension(file):
if file.endswith(".jpg", ".png"):
send_image_to_rag_classifier(file)
elif ...
else ...
""" Implement the RAG fusion using the langchain library"""
import asyncio
import json
import logging
import os
import pathlib as path
from operator import itemgetter
from typing import Any
from dotenv import find_dotenv, load_dotenv
from langchain.prompts import ChatPromptTemplate
from langchain_community.vectorstores import chroma
from langchain_core.documents import Document
from langchain_core.output_parsers import StrOutputParser
from langchain_core.vectorstores import VectorStoreRetriever
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_text_splitters import RecursiveJsonSplitter
logger = logging.getLogger(__name__)
# read OPENAI_API_KEY from the environment
load_dotenv(find_dotenv())
# Define a prompt for the RAG model
SYSTEM_PROMPT = """
Your prompt
"""
# recursive pass data in the retriever_data
def collect_data_files(filepath: path) -> list:
"""Walk through the file path and collect the files
Args: filepath: The file path to be walked through
Returns:
list: List of files
"""
return store_file
# Create a recursive json splitter to split the data into chunks
def retrieve_data(data) -> list[chroma.Document]:
"""
Retrieve the data from the file
Args: data: The data to be retrieved
Returns: list: List of documents
"""
docs = collect_data_files(data)
for file in docs:
with open(file, "r") as f:
data = json.loads(f.read())
# Split the data into chunks
splitter = RecursiveJsonSplitter(max_chunk_size=300)
# create documents from the vector database
documents = splitter.create_documents(texts=data, convert_lists=True)
return documents
# vectorstore database from chroma
def vectorstore_db(data) -> VectorStoreRetriever:
"""
Create a vectorstore database from the data
Args: data: The data to be indexed
Returns: VectorStoreRetriever: The vectorstore retriever
"""
return vector_retriever
# create a function to generate queries from the RAG model
def get_unique_union_of_documents(docs: list[list]) -> list[Any]:
"""
Get the unique union of the documents
Args:
docs: The documents to be processed
Returns:
list: The unique union of the documents"""
return [json.loads(doc) for doc in unique_union]
# RAG FUSION
class RAGFusion:
"""
Implement the RAG fusion
Args:
data: The data to be used for the RAG fusion
"""
def __init__(self, data) -> None:
self.data = data
def __call__(self, question: str) -> str:
"""
Implement the RAG fusion
Args:
question: The question to be answered
Returns:
str: The answer to the question
"""
try:
# create a retrieval chain
prompt_for_rag_fusion = ChatPromptTemplate.from_template(SYSTEM_PROMPT)
generate_query = (
prompt_for_rag_fusion
| ChatOpenAI(temperature=0.5, max_tokens=4096)
| StrOutputParser()
| (lambda x: x.split("\n"))
)
vb = vectorstore_db(self.data)
# create a retrieval chain
retrieval_chain = generate_query | vb.map() | get_unique_union_of_documents
chat_template = """
Answer the following questions{question} \n
Based on the data and context provided {context} \n
Question: {question} \n
"""
# get the chat prompt template
prompt = ChatPromptTemplate.from_template(chat_template)
# use this llm
llm = ChatOpenAI(temperature=0.5, max_tokens=4096)
# implement the final rag fusion
final_rag_fusion = (
{"context": retrieval_chain, "question": itemgetter("question")}
| prompt
| llm
| StrOutputParser()
)
return final_rag_fusion.invoke({"question": question})
except Exception as e:
logger.error(f"An error occurred: {e}")