I'm building an agent for semantic query using llama index and Postgres(using pgvector extension) as vector store. I'm using llama index to create vector embeddings and vector search indexes with VectorStoreIndex.from_documents()
. I've defined PGVectorStore as follows
self.vector_store = PGVectorStore.from_params(
database=request.tenant,
host=host,
password=password,
port=port,
user=user,
table_name="vectorstore",
embed_dim=1536,
hnsw_kwargs={
"hnsw_m": 16,
"hnsw_ef_construction": 64,
"hnsw_ef_search": 40,
"hnsw_dist_method": "vector_cosine_ops",
},
)
Using this llama index creates vector embeddings and indexes in data_vectorstore
table. Currently I'm storing some of the columns that I want this table to have as metadata (in metadata column) like external_id
. I want to create custom columns when creating embeddings. Is this possible using llama index?
Is there a way to do it? and How?
I tried adding those ids to the metadata but the I strictly require the schema to have those columns in the table and not in metadata json.
I found the solution bu writing custom PGVectorStore
, overriding the add
method and creating those custom columns in it.
from llama_index.vector_stores.postgres import PGVectorStore
import psycopg2
import json
class CustomPGVectorStore(PGVectorStore):
def __init__(self, connection_string=None, table_name="vectorstore", **kwargs):
super().__init__(connection_string=connection_string, table_name=table_name, **kwargs)
# Store connection parameters for later use
self.connection_string = connection_string
self.table_name = f"data_{table_name}"
def add(self, nodes, **add_kwargs):
"""Override to add custom columns during insert."""
conn = psycopg2.connect(self.connection_string)
cursor = conn.cursor()
node_ids = [] # Collect node IDs to return
try:
for idx, node in enumerate(nodes):
# Store the node ID for return value
node_ids.append(node.id_)
# Extract values for custom columns from node
external_id = node.metadata.get("external_id", None)
embeddingqueue_id = node.metadata.get("embeddingqueue_id", None)
# Remove from metadata to avoid duplication
if "external_id" in node.metadata:
del node.metadata["external_id"]
if "embeddingqueue_id" in node.metadata:
del node.metadata["embeddingqueue_id"]
# Generate a numeric ID based on current timestamp and index
# This ensures uniqueness while being compatible with bigint type
import time
numeric_id = int(time.time() * 1000) + idx
# Create custom insert query using %s placeholders instead of $n
query = f"""
INSERT INTO {self.table_name}
(id, embedding, text, metadata_, external_id, embeddingqueue_id)
VALUES (%s, %s, %s, %s, %s, %s)
"""
# Execute with custom parameters
cursor.execute(
query,
(
numeric_id,
node.embedding,
node.text,
json.dumps(node.metadata),
external_id,
embeddingqueue_id
)
)
conn.commit()
return node_ids
except Exception as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()