I was using the upsert function to insert around 7k rows with unique primary key values multiple times to see how the upsert works. After insertion I tried to find the number of rows in the collection using the following methods :
collection.num_entities
num_entities = client.get_collection_stats(collection_name)["row_count"]
print(f"Number of entities in the collection: {num_entities}")
collection.query(output_fields=["count(*)"])
Each of them where returning the count as 46k which included the rows that should have been deleted. While this returned exactly the 7k unique ids :
query_result = collection.query(
expr="id != ''",
output_fields=["id"]
)
I also couldn't see the duplicate rows when doing searching or querying. What is the cause for this behavior and if i want to get actual number of rows in the collection excluding the deleted ones how to get that ?
collection.num_entities
internally calls client.get_collection_stats()
As we know, milvus manages data by segments, each sealed segment's metadata(including the initial row count, and s3 path) is recorded in Etcd. Delete operation doesn't directly affect sealed segments, all the deleted ids are stored in a "delta_log" in S3, while the initial row count is not changed in the Etcd.
client.get_collection_stats()
quickly iterates segments in etcd, adding up the initial row count of each segment, and returning the sum value. So, it doesn't count the deleted items. This interface sometimes is useful when users doesn't want to load the collection and get a raw number quickly.collection.query(output_fields=["count(*)"])
This is a real query operation that requires the collection to be loaded. It scans all the segments, including sealed segments and growing segments, deleted items are counted.
Usually, if you need a precise number, you need to set the consistency_level to be "Strong" to make sure all the data in Pulsar is consumed by query node so that the data is queryable.results = collection.query(expr="", output_fields=["count(*)"], consistency_level="Strong")
query_result = collection.query(expr="id != ''", output_fields=["id"])
is also a real query operation that returns all the unique ids in the collection.
To understand it clear, read this example:
import random
import time
from pymilvus import (
connections, utility, Collection, FieldSchema, CollectionSchema, DataType)
connections.connect(host="localhost", port=19530)
dim = 128
metric_type = "COSINE"
collection_name = "test"
schema = CollectionSchema(
fields=[
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
FieldSchema(name="vector", dtype = DataType.FLOAT_VECTOR, dim=dim),
])
utility.drop_collection(collection_name)
collection = Collection(collection_name, schema)
print(collection_name, "created")
index_params = {
'metric_type': metric_type,
'index_type': "FLAT",
'params': {},
}
collection.create_index(field_name="vector", index_params=index_params)
collection.load()
# insert 7k items with unique ids
batch = 7000
data = [
[k for k in range(batch)],
[[random.random() for _ in range(dim)] for _ in range(batch)],
]
collection.insert(data=data)
print(collection_name, "data inserted")
# call upsert to repeat update the 7k items
for i in range(6):
collection.upsert(data=data)
print(collection_name, "data upserted")
collection.flush() # call flush() here to force the buffer to be flushed to sealed segments. In practice, we don't recommend manually call this method
print("num_entities =", collection.num_entities) # expect 49k
# consistency_level = Stong to ensure all the data is consumed by querynode and queryable
# see https://milvus.io/docs/consistency.md#Consistency-levels
results = collection.query(expr="", output_fields=["count(*)"], consistency_level="Strong")
print("query((count(*)) =", results) # expect 7k
query_result = collection.query(expr="id >= 0", output_fields=["id"], consistency_level="Strong")
print("query(id >= 0) =", len(query_result)) # expect 7k
The result of this script:
test created
test data inserted
test data upserted
num_entities = 49000
query((count(*)) = data: ["{'count(*)': 7000}"]
query(id >= 0) = 7000