题意:怎样在本地连接到 Milvus 数据库。连接 Milvus 数据库被拒绝的错误
问题背景:
I am trying to run a RAG pipeline using haystack & Milvus.
我正在尝试使用 haystack 和 Milvus 运行一个 RAG(检索增强型生成)管道。
Its my first time using Milvus, and I seem to have an issue with it.
这是我第一次使用 Milvus,但我似乎遇到了一个问题。
I'm following this tutorial, with some basic changes: Retrieval-Augmented Generation (RAG) with Milvus and Haystack | Milvus Documentation
Here is my code:
import os
import urllib.requestfrom haystack import Pipeline
from haystack.components.converters import MarkdownToDocument
from haystack_integrations.components.embedders.ollama import OllamaDocumentEmbedder, OllamaTextEmbedderfrom haystack.components.preprocessors import DocumentSplitter
from haystack.components.writers import DocumentWriterfrom milvus_haystack import MilvusDocumentStore
from milvus_haystack.milvus_embedding_retriever import MilvusEmbeddingRetrieverurl = "https://www.gutenberg.org/cache/epub/7785/pg7785.txt"
file_path = "./davinci.txt"if not os.path.exists(file_path):urllib.request.urlretrieve(url, file_path) document_store = MilvusDocumentStore(connection_args={"uri": "./milvus.db"},drop_old=True,
)indexing_pipeline = Pipeline()
indexing_pipeline.add_component("converter", MarkdownToDocument())
indexing_pipeline.add_component("splitter", DocumentSplitter(split_by="sentence", split_length=2)
)
indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder())
indexing_pipeline.add_component("writer", DocumentWriter(document_store))
indexing_pipeline.connect("converter", "splitter")
indexing_pipeline.connect("splitter", "embedder")
indexing_pipeline.connect("embedder", "writer")indexing_pipeline.draw('./pipeline_diagram.png')indexing_pipeline.run({"converter": {"sources": [file_path]}})
It all works well until the last line, where I get a ConnectionRefusedError. First the conversion (from markdown to document) runs well, but then the code fails.
直到最后一行,一切都运行正常,但随后我遇到了一个 ConnectionRefusedError
。首先,转换(从 Markdown 到文档)运行得很好,但随后代码失败了。
I am not sure why it happens, as I see the milvus.db
and milvus.db.lock
files created as expected.
我不确定为什么会发生这种情况,因为我看到 milvus.db
和 milvus.db.lock
文件已经按预期被创建。
The full error is:
---------------------------------------------------------------------------
ConnectionRefusedError Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:203, in HTTPConnection._new_conn(self)202 try:
--> 203 sock = connection.create_connection(204 (self._dns_host, self.port),205 self.timeout,206 source_address=self.source_address,207 socket_options=self.socket_options,208 )209 except socket.gaierror as e:File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/connection.py:85, in create_connection(address, timeout, source_address, socket_options)84 try:
---> 85 raise err86 finally:87 # Break explicitly a reference cycleFile /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/connection.py:73, in create_connection(address, timeout, source_address, socket_options)72 sock.bind(source_address)
---> 73 sock.connect(sa)74 # Break explicitly a reference cycleConnectionRefusedError: [Errno 61] Connection refusedThe above exception was the direct cause of the following exception:NewConnectionError Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:791, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)790 # Make the request on the HTTPConnection object
--> 791 response = self._make_request(792 conn,793 method,794 url,795 timeout=timeout_obj,796 body=body,797 headers=headers,798 chunked=chunked,799 retries=retries,800 response_conn=response_conn,801 preload_content=preload_content,802 decode_content=decode_content,803 **response_kw,804 )806 # Everything went great!File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:497, in HTTPConnectionPool._make_request(self, conn, method, url, body, headers, retries, timeout, chunked, response_conn, preload_content, decode_content, enforce_content_length)496 try:
--> 497 conn.request(498 method,499 url,500 body=body,501 headers=headers,502 chunked=chunked,503 preload_content=preload_content,504 decode_content=decode_content,505 enforce_content_length=enforce_content_length,506 )508 # We are swallowing BrokenPipeError (errno.EPIPE) since the server is509 # legitimately able to close the connection after sending a valid response.510 # With this behaviour, the received response is still readable.File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:395, in HTTPConnection.request(self, method, url, body, headers, chunked, preload_content, decode_content, enforce_content_length)394 self.putheader(header, value)
--> 395 self.endheaders()397 # If we're given a body we start sending that in chunks.File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:1289, in HTTPConnection.endheaders(self, message_body, encode_chunked)1288 raise CannotSendHeader()
-> 1289 self._send_output(message_body, encode_chunked=encode_chunked)File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:1048, in HTTPConnection._send_output(self, message_body, encode_chunked)1047 del self._buffer[:]
-> 1048 self.send(msg)1050 if message_body is not None:1051 1052 # create a consistent interface to message_bodyFile /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/http/client.py:986, in HTTPConnection.send(self, data)985 if self.auto_open:
--> 986 self.connect()987 else:File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:243, in HTTPConnection.connect(self)242 def connect(self) -> None:
--> 243 self.sock = self._new_conn()244 if self._tunnel_host:245 # If we're tunneling it means we're connected to our proxy.File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connection.py:218, in HTTPConnection._new_conn(self)217 except OSError as e:
--> 218 raise NewConnectionError(219 self, f"Failed to establish a new connection: {e}"220 ) from e222 # Audit hooks are only available in Python 3.8+NewConnectionError: <urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refusedThe above exception was the direct cause of the following exception:MaxRetryError Traceback (most recent call last)
File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/adapters.py:486, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)485 try:
--> 486 resp = conn.urlopen(487 method=request.method,488 url=url,489 body=request.body,490 headers=request.headers,491 redirect=False,492 assert_same_host=False,493 preload_content=False,494 decode_content=False,495 retries=self.max_retries,496 timeout=timeout,497 chunked=chunked,498 )500 except (ProtocolError, OSError) as err:File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/connectionpool.py:845, in HTTPConnectionPool.urlopen(self, method, url, body, headers, retries, redirect, assert_same_host, timeout, pool_timeout, release_conn, chunked, body_pos, preload_content, decode_content, **response_kw)843 new_e = ProtocolError("Connection aborted.", new_e)
--> 845 retries = retries.increment(846 method, url, error=new_e, _pool=self, _stacktrace=sys.exc_info()[2]847 )848 retries.sleep()File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/urllib3/util/retry.py:515, in Retry.increment(self, method, url, response, error, _pool, _stacktrace)514 reason = error or ResponseError(cause)
--> 515 raise MaxRetryError(_pool, url, reason) from reason # type: ignore[arg-type]517 log.debug("Incremented Retry for (url='%s'): %r", url, new_retry)MaxRetryError: HTTPConnectionPool(host='localhost', port=11434): Max retries exceeded with url: /api/embeddings (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refused'))During handling of the above exception, another exception occurred:ConnectionError Traceback (most recent call last)
Cell In[15], line 1
----> 1 indexing_pipeline.run({"converter": {"sources": [file_path]}})3 print("Number of documents:", document_store.count_documents())File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack/core/pipeline/pipeline.py:197, in Pipeline.run(self, data, debug, include_outputs_from)195 span.set_content_tag("haystack.component.input", last_inputs[name])196 logger.info("Running component {component_name}", component_name=name)
--> 197 res = comp.run(**last_inputs[name])198 self.graph.nodes[name]["visits"] += 1200 if not isinstance(res, Mapping):File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack_integrations/components/embedders/ollama/document_embedder.py:139, in OllamaDocumentEmbedder.run(self, documents, generation_kwargs)136 raise TypeError(msg)138 texts_to_embed = self._prepare_texts_to_embed(documents=documents)
--> 139 embeddings, meta = self._embed_batch(140 texts_to_embed=texts_to_embed, batch_size=self.batch_size, generation_kwargs=generation_kwargs141 )143 for doc, emb in zip(documents, embeddings):144 doc.embedding = embFile /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/haystack_integrations/components/embedders/ollama/document_embedder.py:107, in OllamaDocumentEmbedder._embed_batch(self, texts_to_embed, batch_size, generation_kwargs)105 batch = texts_to_embed[i] # Single batch only106 payload = self._create_json_payload(batch, generation_kwargs)
--> 107 response = requests.post(url=self.url, json=payload, timeout=self.timeout)108 response.raise_for_status()109 result = response.json()File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/api.py:115, in post(url, data, json, **kwargs)103 def post(url, data=None, json=None, **kwargs):104 r"""Sends a POST request.105 106 :param url: URL for the new :class:`Request` object.(...)112 :rtype: requests.Response113 """
--> 115 return request("post", url, data=data, json=json, **kwargs)File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/api.py:59, in request(method, url, **kwargs)55 # By using the 'with' statement we are sure the session is closed, thus we56 # avoid leaving sockets open which can trigger a ResourceWarning in some57 # cases, and look like a memory leak in others.58 with sessions.Session() as session:
---> 59 return session.request(method=method, url=url, **kwargs)File /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/sessions.py:589, in Session.request(self, method, url, params, data, headers, cookies, files, auth, timeout, allow_redirects, proxies, hooks, stream, verify, cert, json)584 send_kwargs = {585 "timeout": timeout,586 "allow_redirects": allow_redirects,587 }588 send_kwargs.update(settings)
--> 589 resp = self.send(prep, **send_kwargs)591 return respFile /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/sessions.py:703, in Session.send(self, request, **kwargs)700 start = preferred_clock()702 # Send the request
--> 703 r = adapter.send(request, **kwargs)705 # Total elapsed time of the request (approximately)706 elapsed = preferred_clock() - startFile /opt/anaconda3/envs/haystack_milvus_playground/lib/python3.11/site-packages/requests/adapters.py:519, in HTTPAdapter.send(self, request, stream, timeout, verify, cert, proxies)515 if isinstance(e.reason, _SSLError):516 # This branch is for urllib3 v1.22 and later.517 raise SSLError(e, request=request)
--> 519 raise ConnectionError(e, request=request)521 except ClosedPoolError as e:522 raise ConnectionError(e, request=request)ConnectionError: HTTPConnectionPool(host='localhost', port=11434): Max retries exceeded with url: /api/embeddings (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x30ca49690>: Failed to establish a new connection: [Errno 61] Connection refused'))
any help resolving this would be appreciated. My assumption is that it is something very simple in creating the milvus database local connection, but I dont know where it is.
任何帮助解决这个问题都将不胜感激。我的猜测是,在创建 Milvus 数据库本地连接时可能有一些非常简单的设置问题,但我不知道问题出在哪里。
问题解决:
As suggested in earlier replies, Ollama was not running locally.
正如之前的回复所建议的,Ollama并没有在本地运行。
To resolve this I needed to: 解决这个问题需要做如下步骤:
1 - download and install Ollama 下载并安装 Ollama
2 - pull (or run) in the terminal ollama pull nomic-embed-text
for the embedders. This is not clear enough in the haystack documentation for the ollama integration but once this is done it should run.
在终端中执行(或运行)ollama pull nomic-embed-text
命令来获取嵌入器(embedders)。在 haystack 的文档中,关于 ollama 集成的这一部分说明得不够清楚,但一旦完成这一步,它应该能够运行。
Also I would suggest to change from: 我建议替换如下代码:indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder())
to:
indexing_pipeline.add_component("embedder", OllamaDocumentEmbedder(model="nomic-embed-text", url="http://localhost:11434/api/embeddings"))