Milvus
Milvus is a high-performance, highly scalable vector database. It is one of the most popular vector database as of this writing. Milvus is a great choice to manage large collections of documents for applications ranging from knowledgebase-oriented information retrieval to image similarity search.
The Milvus deployment on Shakudo also comes with Attu enabled for a web-based management interface.
Connections
A milvus connection must be established before further operations can be performed. The alias names the connection for future reference. Functions that use the connection will typically have a using
parameter with a default value of 'default'
, so opening a connection with alias='default'
allows us to operate other pymilvus facilities while omitting the connection name.
Creating a connection
Connections are created with the pymilvus connections
module.
from pymilvus import connections
connections.connect(
alias="default",
host=os.environ['MILVUS_HOST'],
port=os.environ['MILVUS_PORT']
)
For more details on the connection parameters, see the official pymilvus documentation
Closing a connection
Since there is no management object for Milvus connections, they must be released explicitly through the connections manager:
connections.disconnect("default")
Databases
It is optionally possible to create Databases, which allows setting user permissions ranging over a set of collections. Details on database management are available in the milvus documentation
Collections
Milvus data is stored in collections, which have to be loaded in memory before they can be searched against. Loading is not necessary when filling the collection, however.
Creating a Collection
Milvus can operate in schema or schemaless mode depending on how the collection is created. Simply set allow_dynamic_fields=True
to enable schemaless.
from pymilvus import Collection, FieldSchema, CollectionSchema, DataType
MAX_TITLE = 512
MAX_TEXT = 1024
MAX_VEC = 384
NAME = "WikiHow"
whschema = CollectionSchema(
fields=[
FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=65535, default_value=""),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535, default_value=""),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=384, description="embedding vector")
],
enable_dynamic_fields=False,
description="WikiHow collection"
)
whcollection = Collection(
name=NAME,
schema=whschema,
consistency_level="Session"
)
As for connections, the official pymilvus documentation provides more extensive details. Note that the field size limits are in bytes and depend on the encoding used in milvus, it is not based on character count for VARCHAR. The list of available datatypes is available here.
Importantly, the primary key may be either INT64
or VARCHAR
and vectors can be either FLOAT_VECTOR
or BINARY_VECTOR
.
The consistency level of the collection is discussed further in the Consistency article at milvus.io. Briefly, consistency_level="Session"
is a good default which means that queries will always happen after reads in our current session, even though they could happen before writes from other sessions are actualized. By comparison, Strong
consistency ensures queries will always happen after all writes are completed. Eventually
is the weakest consistency level and will process reads immediately, against whatever values are available in the replica at the time.
Inserting data in a collection
Given a collection, its insert
function can be used to insert a batch of data. If using an auto_id
key, the key
field should be elided from the input. The argument to insert
is a list of lists of field values, positionally ordered as in the schema, such as the following example:
def insert_data(data):
vecs = embed_documents([d['title'] for d in data])
entries = [[], [], []]
for i in range(len(data)):
entries[0].append(data[i]['title'])
entries[1].append(data[i]['text'])
entries[2].append(vecs[i])
whcollection.insert(entries)
Note that if a Milvus worker crashes (e.g. OOM) during operations, although Milvus features redundancy and a second node will come online to keep smooth operations, the default timeout
value (in the insert function) may be too low and may cause failure. Increasing it to a much larger value will allow the process to keep running across a worker crash.
Milvus will not finalize an insertion (i.e. "seal a segment") unless enough data has been inserted since the last sealed segment. To force Milvus to seal a segment, it is important to flush
the collection:
whcollection.flush()
Creating an index
Bruteforce searches against vectros in the database can be very slow. Setting up an index can drastically speed up the search.
whcollection.create_index(field_name="vector", index_params={"metric_type": "L2", "index_type": "IVF_FLAT", "nlist": "1024"})
In the above example, we have created an index on the field named vector
with a flat index using an inverted file, a maximum of 1024 clusters, and an L2 metric. More details about how to parameterize index creation can be found at this link
Milvus also supports creating indexes on (and searching against) scalar data (possibly in combination with the vector search).
Referring to an existing collection
An existing collection on the 'default'
connection can be loaded with a simple
NAME = "WikiHow"
whcollection = Collection(NAME)
Loading a collection
Collections cannot be queried against unless they are load
ed first. This is simply achieved as follows:
whcollection.load()
Releasing a collection
The collection will stay loaded until it is release
d, either programmatically or through Attu.
whcollection.release()
Search
Search is conducted on a loaded collection as follows:
hits = whcollection.search(
[embed_query(what)], # Vector for the query
anns_field="vector", # Name of the field to search against
param={'metric_type': 'L2', # Search params...
'offset': 0,
'params': {'nprobe': 1}
},
limit=1,
output_fields=['text', 'title']) # Fields to retrieve
The full documentation for the search parameters can be found here. In the above example, we obtain the top search result provided the embeddings for the data to search for. Milvus also supports filter expressions (discribed in the documentation). The param
field relates to the index(es) defined on the collection. A consistency_level
can also be specified for the query.
The hits
returned by a Milvus search contains a list of hits as specified by the search parameters for each input vector. Since we provided a single input vector in this case, we can obtain more details about the hits corresponding to this vector as follows:
query_hits = hits[0]
top_query_hit = query_hits[0]
print(f"Title: {top_query_hit.entity.get('title')}")
print(f"Text: {top_query_hit.entity.get('text')}")
print(f"Distance between query embedding and document embedding: {top_query_hit.distance}")
Since we specified that we only wanted the top hit, we only need to care about the first (i.e. only) hit returned for the first (once again, only) input vector in our search. In the above, we print out the fields retrieved from the search as specified in output_fields
in our call, and the distance between the embedding we used to search the database and the document's embedding.
Query
Milvus can also do scalar searches, termed "query". For details, see the Milvus documentation on Query