Skip to main content

Milvus

Milvus is a high-performance, highly scalable vector database. It is one of the most popular vector database as of this writing. Milvus is a great choice to manage large collections of documents for applications ranging from knowledgebase-oriented information retrieval to image similarity search.

The Milvus deployment on Shakudo also comes with Attu enabled for a web-based management interface.

Connections

A milvus connection must be established before further operations can be performed. The alias names the connection for future reference. Functions that use the connection will typically have a using parameter with a default value of 'default', so opening a connection with alias='default' allows us to operate other pymilvus facilities while omitting the connection name.

Creating a connection

Connections are created with the pymilvus connections module.

from pymilvus import connections
connections.connect(
alias="default",
host=os.environ['MILVUS_HOST'],
port=os.environ['MILVUS_PORT']
)

For more details on the connection parameters, see the official pymilvus documentation

Closing a connection

Since there is no management object for Milvus connections, they must be released explicitly through the connections manager:

connections.disconnect("default")

Databases

It is optionally possible to create Databases, which allows setting user permissions ranging over a set of collections. Details on database management are available in the milvus documentation

Collections

Milvus data is stored in collections, which have to be loaded in memory before they can be searched against. Loading is not necessary when filling the collection, however.

Creating a Collection

Milvus can operate in schema or schemaless mode depending on how the collection is created. Simply set allow_dynamic_fields=True to enable schemaless.

from pymilvus import Collection, FieldSchema, CollectionSchema, DataType

MAX_TITLE = 512
MAX_TEXT = 1024
MAX_VEC = 384

NAME = "WikiHow"

whschema = CollectionSchema(
fields=[
FieldSchema(name="pk", dtype=DataType.INT64, is_primary=True, auto_id=True),
FieldSchema(name="title", dtype=DataType.VARCHAR, max_length=65535, default_value=""),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535, default_value=""),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=384, description="embedding vector")
],
enable_dynamic_fields=False,
description="WikiHow collection"
)
whcollection = Collection(
name=NAME,
schema=whschema,
consistency_level="Session"
)

As for connections, the official pymilvus documentation provides more extensive details. Note that the field size limits are in bytes and depend on the encoding used in milvus, it is not based on character count for VARCHAR. The list of available datatypes is available here.

Importantly, the primary key may be either INT64 or VARCHAR and vectors can be either FLOAT_VECTOR or BINARY_VECTOR.

The consistency level of the collection is discussed further in the Consistency article at milvus.io. Briefly, consistency_level="Session" is a good default which means that queries will always happen after reads in our current session, even though they could happen before writes from other sessions are actualized. By comparison, Strong consistency ensures queries will always happen after all writes are completed. Eventually is the weakest consistency level and will process reads immediately, against whatever values are available in the replica at the time.

Inserting data in a collection

Given a collection, its insert function can be used to insert a batch of data. If using an auto_id key, the key field should be elided from the input. The argument to insert is a list of lists of field values, positionally ordered as in the schema, such as the following example:

def insert_data(data):
vecs = embed_documents([d['title'] for d in data])

entries = [[], [], []]

for i in range(len(data)):
entries[0].append(data[i]['title'])
entries[1].append(data[i]['text'])
entries[2].append(vecs[i])

whcollection.insert(entries)

Note that if a Milvus worker crashes (e.g. OOM) during operations, although Milvus features redundancy and a second node will come online to keep smooth operations, the default timeout value (in the insert function) may be too low and may cause failure. Increasing it to a much larger value will allow the process to keep running across a worker crash.

Milvus will not finalize an insertion (i.e. "seal a segment") unless enough data has been inserted since the last sealed segment. To force Milvus to seal a segment, it is important to flush the collection:

whcollection.flush()

Creating an index

Bruteforce searches against vectros in the database can be very slow. Setting up an index can drastically speed up the search.

whcollection.create_index(field_name="vector", index_params={"metric_type": "L2", "index_type": "IVF_FLAT", "nlist": "1024"})

In the above example, we have created an index on the field named vector with a flat index using an inverted file, a maximum of 1024 clusters, and an L2 metric. More details about how to parameterize index creation can be found at this link

Milvus also supports creating indexes on (and searching against) scalar data (possibly in combination with the vector search).

Referring to an existing collection

An existing collection on the 'default' connection can be loaded with a simple

NAME = "WikiHow"
whcollection = Collection(NAME)

Loading a collection

Collections cannot be queried against unless they are loaded first. This is simply achieved as follows:

whcollection.load()

Releasing a collection

The collection will stay loaded until it is released, either programmatically or through Attu.

whcollection.release()

Search is conducted on a loaded collection as follows:

hits = whcollection.search(
[embed_query(what)], # Vector for the query
anns_field="vector", # Name of the field to search against
param={'metric_type': 'L2', # Search params...
'offset': 0,
'params': {'nprobe': 1}
},
limit=1,
output_fields=['text', 'title']) # Fields to retrieve

The full documentation for the search parameters can be found here. In the above example, we obtain the top search result provided the embeddings for the data to search for. Milvus also supports filter expressions (discribed in the documentation). The param field relates to the index(es) defined on the collection. A consistency_level can also be specified for the query.

The hits returned by a Milvus search contains a list of hits as specified by the search parameters for each input vector. Since we provided a single input vector in this case, we can obtain more details about the hits corresponding to this vector as follows:

query_hits = hits[0]
top_query_hit = query_hits[0]
print(f"Title: {top_query_hit.entity.get('title')}")
print(f"Text: {top_query_hit.entity.get('text')}")
print(f"Distance between query embedding and document embedding: {top_query_hit.distance}")

Since we specified that we only wanted the top hit, we only need to care about the first (i.e. only) hit returned for the first (once again, only) input vector in our search. In the above, we print out the fields retrieved from the search as specified in output_fields in our call, and the distance between the embedding we used to search the database and the document's embedding.

Query

Milvus can also do scalar searches, termed "query". For details, see the Milvus documentation on Query