I am trying to run a RAG pipeline using haystack & Milvus.
Its my first time using Milvus, and I seem to have an issue with it.
I'm following this tutorial, with some basic changes: https://milvus.io/docs/integrate_with_haystack.md
Here is my code:
import os
import urllib.request
from haystack import Pipeline
from haystack.components.converters import MarkdownToDocument
from haystack_integrations.components.embedders.ollama import OllamaDocumentEmbedder, OllamaTextEmbedder
from haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriter
from milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetriever
url = "https://www.gutenberg.org/cache/epub/7785/pg7785.txt"
file_path = "./davinci.txt"
if not os.path.exists(file_path):
urllib.request.urlretrieve(url, file_path)
document_store = MilvusDocumentStore(
connection_args={"uri": "./milvus.db"},
drop_old=True,
)
indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", MarkdownToDocument())
indexing_pipeline.add_component(
"splitter", DocumentSplitter(split_by="sentence", split_length=2)
)
indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")
indexing_pipeline.draw('./pipeline_diagram.png')
indexing_pipeline.run({"converter": {"sources": [file_path]}})
It all works well until the last line, where I get a ConnectionRefusedError. First the conversion (from markdown to document) runs well, but then the code fails.
I am not sure why it happens, as I see the milvus.db
and milvus.db.lock
files created as expected.
The full error is:
---------------------------------------------------------------------------
ConnectionRefusedError Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:203, in HTTPConnection._new_conn(self)
202 try:
--> 203 sock = connection.create_connection(
204 (self._dns_host, self.port),
205 self.timeout,
206 source_address=self.source_address,
207 socket_options=self.socket_options,
208 )
209 except socket.gaierror as e:
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/connection.py:85, in create_connection(address, timeout, source_address, socket_options)
84 try:
---> 85 raise err
86 finally:
87 # Break explicitly a reference cycle
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/connection.py:73, in create_connection(address, timeout, source_address, socket_options)
72 sock.bind(source_address)
---> 73 sock.connect(sa)
74 # Break explicitly a reference cycle
ConnectionRefusedError: [Errno 61] Connection refused
The above exception was the direct cause of the following exception:
NewConnectionError Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:791, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
790 # Make the request on the HTTPConnection object
--> 791 response = self._make_request(
792 conn,
793 method,
794 url,
795 timeout=timeout_obj,
796 body=body,
797 headers=headers,
798 chunked=chunked,
799 retries=retries,
800 response_conn=response_conn,
801 preload_content=preload_content,
802 decode_content=decode_content,
803 **response_kw,
804 )
806 # Everything went great!
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:497, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length)
496 try:
--> 497 conn.request(
498 method,
499 url,
500 body=body,
501 headers=headers,
502 chunked=chunked,
503 preload_content=preload_content,
504 decode_content=decode_content,
505 enforce_content_length=enforce_content_length,
506 )
508 # We are swallowing BrokenPipeError (errno.EPIPE) since the server is
509 # legitimately able to close the connection after sending a valid response.
510 # With this behaviour, the received response is still readable.
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:395, in HTTPConnection.request(self, method, url, body, headers, chunked, preload_content, decode_content, enforce_content_length)
394 self.putheader(header, value)
--> 395 self.endheaders()
397 # If we're given a body we start sending that in chunks.
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:1289, in HTTPConnection.endheaders(self, message_body, encode_chunked)
1288 raise CannotSendHeader()
-> 1289 self._send_output(message_body, encode_chunked=encode_chunked)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:1048, in HTTPConnection._send_output(self, message_body, encode_chunked)
1047 del self._buffer[:]
-> 1048 self.send(msg)
1050 if message_body is not None:
1051
1052 # create a consistent interface to message_body
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:986, in HTTPConnection.send(self, data)
985 if self.auto_open:
--> 986 self.connect()
987 else:
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:243, in HTTPConnection.connect(self)
242 def connect(self) -> None:
--> 243 self.sock = self._new_conn()
244 if self._tunnel_host:
245 # If we're tunneling it means we're connected to our proxy.
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:218, in HTTPConnection._new_conn(self)
217 except OSError as e:
--> 218 raise NewConnectionError(
219 self, f"Failed to establish a new connection: {e}"
220 ) from e
222 # Audit hooks are only available in Python 3.8+
NewConnectionError: <urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refused
The above exception was the direct cause of the following exception:
MaxRetryError Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/adapters.py:486, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
485 try:
--> 486 resp = conn.urlopen(
487 method=request.method,
488 url=url,
489 body=request.body,
490 headers=request.headers,
491 redirect=False,
492 assert_same_host=False,
493 preload_content=False,
494 decode_content=False,
495 retries=self.max_retries,
496 timeout=timeout,
497 chunked=chunked,
498 )
500 except (ProtocolError, OSError) as err:
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:845, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)
843 new_e = ProtocolError("Connection aborted.", new_e)
--> 845 retries = retries.increment(
846 method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2]
847 )
848 retries.sleep()
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/retry.py:515, in Retry.increment(self, method, url, response, error, _pool, _stacktrace)
514 reason = error or ResponseError(cause)
--> 515 raise MaxRetryError(_pool, url, reason) from reason # type: ignore[arg-type]
517 log.debug("Incremented Retry for (url='%s'): %r", url, new_retry)
MaxRetryError: HTTPConnectionPool(host='localhost', port=11434): Max retries exceeded with url: /api/embeddings (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refused'))
During handling of the above exception, another exception occurred:
ConnectionError Traceback (most recent call last)
Cell In[15], line 1
----> 1 indexing_pipeline.run({"converter": {"sources": [file_path]}})
3 print("Number of documents:", document_store.count_documents())
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack/core/pipeline/pipeline.py:197, in Pipeline.run(self, data, debug, include_outputs_from)
195 span.set_content_tag("haystack.component.input", last_inputs[name])
196 logger.info("Running component {component_name}", component_name=name)
--> 197 res = comp.run(**last_inputs[name])
198 self.graph.nodes[name]["visits"] += 1
200 if not isinstance(res, Mapping):
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack_integrations/components/embedders/ollama/document_embedder.py:139, in OllamaDocumentEmbedder.run(self, documents, generation_kwargs)
136 raise TypeError(msg)
138 texts_to_embed = self._prepare_texts_to_embed(documents=documents)
--> 139 embeddings, meta = self._embed_batch(
140 texts_to_embed=texts_to_embed, batch_size=self.batch_size, generation_kwargs=generation_kwargs
141 )
143 for doc, emb in zip(documents, embeddings):
144 doc.embedding = emb
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack_integrations/components/embedders/ollama/document_embedder.py:107, in OllamaDocumentEmbedder._embed_batch(self, texts_to_embed, batch_size, generation_kwargs)
105 batch = texts_to_embed[i] # Single batch only
106 payload = self._create_json_payload(batch, generation_kwargs)
--> 107 response = requests.post(url=self.url, json=payload, timeout=self.timeout)
108 response.raise_for_status()
109 result = response.json()
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/api.py:115, in post(url, data, json, **kwargs)
103 def post(url, data=None, json=None, **kwargs):
104 r"""Sends a POST request.
105
106 :param url: URL for the new :class:`Request` object.
(...)
112 :rtype: requests.Response
113 """
--> 115 return request("post", url, data=data, json=json, **kwargs)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/api.py:59, in request(method, url, **kwargs)
55 # By using the 'with' statement we are sure the session is closed, thus we
56 # avoid leaving sockets open which can trigger a ResourceWarning in some
57 # cases, and look like a memory leak in others.
58 with sessions.Session() as session:
---> 59 return session.request(method=method, url=url, **kwargs)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)
584 send_kwargs = {
585 "timeout": timeout,
586 "allow_redirects": allow_redirects,
587 }
588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)
591 return resp
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/sessions.py:703, in Session.send(self, request, **kwargs)
700 start = preferred_clock()
702 # Send the request
--> 703 r = adapter.send(request, **kwargs)
705 # Total elapsed time of the request (approximately)
706 elapsed = preferred_clock() - start
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/adapters.py:519, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)
515 if isinstance(e.reason, _SSLError):
516 # This branch is for urllib3 v1.22 and later.
517 raise SSLError(e, request=request)
--> 519 raise ConnectionError(e, request=request)
521 except ClosedPoolError as e:
522 raise ConnectionError(e, request=request)
ConnectionError: HTTPConnectionPool(host='localhost', port=11434): Max retries exceeded with url: /api/embeddings (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refused'))
any help resolving this would be appreciated. My assumption is that it is something very simple in creating the milvus database local connection, but I dont know where it is.
As suggested in earlier replies, Ollama was not running locally.
To resolve this I needed to:
1 - download and install Ollama
2 - pull (or run) in the terminal ollama pull nomic-embed-text
for the embedders. This is not clear enough in the haystack documentation for the ollama integration but once this is done it should run.
Also I would suggest to change from:
indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder())
to:
indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder(model="nomic-embed-text", url="http://localhost:11434/api/embeddings"))