pythonpostgresqlllama-indexpgvector

How to create custom columns when creating embeddings using llamaindex in postgres (with pgvector extension)


I'm building an agent for semantic query using llama index and Postgres(using pgvector extension) as vector store. I'm using llama index to create vector embeddings and vector search indexes with VectorStoreIndex.from_documents(). I've defined PGVectorStore as follows

self.vector_store = PGVectorStore.from_params(
        database=request.tenant,
        host=host,
        password=password,
        port=port,
        user=user,
        table_name="vectorstore",
        embed_dim=1536,
        hnsw_kwargs={
            "hnsw_m": 16,
            "hnsw_ef_construction": 64,
            "hnsw_ef_search": 40,
            "hnsw_dist_method": "vector_cosine_ops",
            },
        )

Using this llama index creates vector embeddings and indexes in data_vectorstore table. Currently I'm storing some of the columns that I want this table to have as metadata (in metadata column) like external_id. I want to create custom columns when creating embeddings. Is this possible using llama index?

Is there a way to do it? and How?

I tried adding those ids to the metadata but the I strictly require the schema to have those columns in the table and not in metadata json.


Solution

  • I found the solution bu writing custom PGVectorStore, overriding the add method and creating those custom columns in it.

    from llama_index.vector_stores.postgres import PGVectorStore
    import psycopg2
    import json
    
    class CustomPGVectorStore(PGVectorStore):
        def __init__(self, connection_string=None, table_name="vectorstore", **kwargs):
            super().__init__(connection_string=connection_string, table_name=table_name, **kwargs)
            # Store connection parameters for later use
            self.connection_string = connection_string
            self.table_name = f"data_{table_name}"
        
        def add(self, nodes, **add_kwargs):
            """Override to add custom columns during insert."""
    
            conn = psycopg2.connect(self.connection_string)
            cursor = conn.cursor()
    
            node_ids = []  # Collect node IDs to return
    
            try:
                for idx, node in enumerate(nodes):
                    # Store the node ID for return value
                    node_ids.append(node.id_)
    
    
                    # Extract values for custom columns from node
                    external_id = node.metadata.get("external_id", None)
                    embeddingqueue_id = node.metadata.get("embeddingqueue_id", None)
    
                    # Remove from metadata to avoid duplication
                    if "external_id" in node.metadata:
                        del node.metadata["external_id"]
                    
                    if "embeddingqueue_id" in node.metadata:
                        del node.metadata["embeddingqueue_id"]
    
                    # Generate a numeric ID based on current timestamp and index
                    # This ensures uniqueness while being compatible with bigint type
                    import time
                    numeric_id = int(time.time() * 1000) + idx
    
                    # Create custom insert query using %s placeholders instead of $n
                    query = f"""
                    INSERT INTO {self.table_name} 
                    (id, embedding, text, metadata_, external_id, embeddingqueue_id) 
                    VALUES (%s, %s, %s, %s, %s, %s)
                    """
                    
                    # Execute with custom parameters
                    cursor.execute(
                        query,
                        (
                            numeric_id,
                            node.embedding,
                            node.text,
                            json.dumps(node.metadata),
                            external_id,
                            embeddingqueue_id
                        )
                    )
                conn.commit()
                return node_ids
            except Exception as e:
                conn.rollback()
                raise e
            finally:
                cursor.close()
                conn.close()