向量儲存

本指南將幫助您了解如何作為開發者使用 Rememberizer 向量儲存。

向量儲存

Rememberizer 向量儲存簡化了處理向量數據的過程,讓您能夠專注於文本輸入,並利用向量的力量進行各種應用,如搜索和數據分析。

介紹

Rememberizer 向量存儲提供了一個易於使用的介面,用於處理向量數據,同時抽象掉向量嵌入的複雜性。由 PostgreSQL 和 pgvector 擴展提供支持,Rememberizer 向量存儲允許您直接處理文本。該服務處理文本數據的分塊、向量化和存儲,使您能夠更專注於核心應用邏輯。

要深入了解向量嵌入和向量數據庫背後的理論概念,請參閱 什麼是向量嵌入和向量數據庫?

技術概述

向量儲存的運作方式

Rememberizer 向量儲存將文本轉換為高維向量表示(嵌入),以捕捉語義意義。這使得:

  1. 語義搜尋:根據意義而非僅僅是關鍵字來查找文件

  2. 相似性匹配:識別概念上相關的內容

  3. 高效檢索:快速從大型數據集中定位相關信息

主要組件

  • 文件處理:文本被拆分為最佳大小的片段,並具有重疊邊界以保留上下文

  • 向量化:片段使用最先進的模型轉換為嵌入

  • 索引:專門的算法組織向量以進行高效的相似性搜索

  • 查詢處理:搜索查詢被向量化並與存儲的嵌入進行比較

架構

Rememberizer 使用以下方式實現向量存儲:

  • PostgreSQL 與 pgvector 擴展:用於高效的向量存儲和搜索

  • 基於集合的組織:每個向量存儲都有其獨立的集合

  • API 驅動的訪問:所有操作的簡單 RESTful 端點

開始使用

創建向量存儲

  1. 在您的儀表板中導航到向量存儲部分

  2. 點擊「創建新的向量存儲」:

    • 將出現一個表單,提示您輸入詳細信息。

  3. 填寫詳細信息:

    • 名稱:為您的向量存儲提供一個唯一的名稱。

    • 描述:寫一個簡短的向量存儲描述。

    • 嵌入模型:選擇將文本轉換為向量的模型。

    • 索引算法:選擇如何組織向量以便搜索。

    • 搜索度量:定義如何計算向量之間的相似性。

    • 向量維度:向量嵌入的大小(通常為 768-1536)。

  4. 提交表單:

    • 點擊「創建」按鈕。您將收到成功通知,新的存儲將出現在您的向量存儲列表中。

配置選項

嵌入模型

模型
維度
描述
最適合

openai/text-embedding-3-large

1536

來自 OpenAI 的高精度嵌入模型

需要最大精度的生產應用

openai/text-embedding-3-small

1536

來自 OpenAI 的較小、更快的嵌入模型

具有更高吞吐量需求的應用

索引演算法

演算法
描述
取捨

IVFFLAT (預設)

反向文件與平坦壓縮

速度與準確性的良好平衡;適用於大多數數據集

HNSW

分層可導航小世界

對於大型數據集更好的準確性;更高的記憶體需求

搜尋指標

指標
描述
最適合

余弦相似度 (預設)

測量向量之間的角度

一般用途的相似性匹配

內積 (ip)

向量之間的點積

當向量的大小很重要時

L2 (歐幾里得)

向量之間的直線距離

當空間關係重要時

管理向量儲存

  1. 查看和編輯向量儲存:

    • 訪問管理儀表板以查看、編輯或刪除向量儲存。

  2. 查看文件:

    • 瀏覽特定向量儲存中的單個文件及其相關元數據。

  3. 統計數據:

    • 查看詳細統計數據,例如儲存的向量數量、查詢性能和操作指標。

API 金鑰管理

API 金鑰用於驗證和授權訪問 Rememberizer 向量儲存的 API 端點。妥善管理 API 金鑰對於維護您的向量儲存的安全性和完整性至關重要。

創建 API 金鑰

  1. 前往您的向量儲存詳細頁面

  2. 導航至 API 金鑰管理區域:

    • 它可以在「配置」標籤內找到

  3. 點擊 「新增 API 金鑰」

    • 將出現一個表單,提示您輸入詳細信息。

  4. 填寫詳細信息:

    • 名稱:提供一個名稱以幫助您識別其使用案例。

  5. 提交表單:

    • 點擊「創建」按鈕。新的 API 金鑰將被生成並顯示。請確保複製並安全存儲。此金鑰用於驗證對該特定向量儲存的請求。

撤銷 API 金鑰

如果不再需要 API 金鑰,您可以刪除它以防止任何潛在的濫用。

出於安全原因,您可能希望定期更換您的 API 金鑰。這涉及生成一個新的金鑰並撤銷舊的金鑰。

使用向量存儲 API

在創建向量存儲並生成 API 密鑰後,您可以使用 REST API 與其互動。

代碼範例

import requests
import json

API_KEY = "your_api_key_here"
VECTOR_STORE_ID = "vs_abc123"  # 替換為您的向量儲存 ID
BASE_URL = "https://api.rememberizer.ai/api/v1"

上傳文件到向量存儲

def upload_document(file_path, document_name=None): if document_name is None: document_name = file_path.split("/")[-1]

with open(file_path, "rb") as f:
    files = {"file": (document_name, f)}
    headers = {"x-api-key": API_KEY}
    
    response = requests.post(
        f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents",
        headers=headers,
        files=files
    )
    
    if response.status_code == 201:
        print(f"文件 '{document_name}' 上傳成功!")
        return response.json()
    else:
        print(f"上傳文件時出錯:{response.text}")
        return None

上傳文本內容到向量儲存

def upload_text(content, document_name): headers = { "x-api-key": API_KEY, "Content-Type": "application/json" }

data = {
    "name": document_name,
    "content": content
}

response = requests.post(
    f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/text",
    headers=headers,
    json=data
)

if response.status_code == 201:
    print(f"文本文件 '{document_name}' 上傳成功!")
    return response.json()
else:
    print(f"上傳文本時出錯: {response.text}")
    return None

搜尋向量儲存

def search_vector_store(query, num_results=5, prev_chunks=1, next_chunks=1): headers = {"x-api-key": API_KEY}

params = {
    "q": query,
    "n": num_results,
    "prev_chunks": prev_chunks,
    "next_chunks": next_chunks
}

response = requests.get(
    f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/search",
    headers=headers,
    params=params
)

if response.status_code == 200:
    results = response.json()
    print(f"找到 {len(results['matched_chunks'])} 個與 '{query}' 匹配的結果")
    
    # 列印最佳結果
    if results['matched_chunks']:
        top_match = results['matched_chunks'][0]
        print(f"最佳匹配 (距離: {top_match['distance']}):")
        print(f"文件: {top_match['document']['name']}")
        print(f"內容: {top_match['matched_content']}")
    
    return results
else:
    print(f"搜尋錯誤: {response.text}")
    return None

範例用法

上傳文件("path/to/document.pdf")

upload_text("這是一段範例文本,將被向量化", "sample-document.txt")

search_vector_store("向量相似度是如何工作的?")

{% endtab %}

{% tab title="JavaScript" %}
```javascript
// 向量存儲 API 客戶端
class VectorStoreClient {
  constructor(apiKey, vectorStoreId) {
    this.apiKey = apiKey;
    this.vectorStoreId = vectorStoreId;
    this.baseUrl = 'https://api.rememberizer.ai/api/v1';
  }

  // 獲取向量存儲信息
  async getVectorStoreInfo() {
    const response = await fetch(`${this.baseUrl}/vector-stores/${this.vectorStoreId}`, {
      method: 'GET',
      headers: {
        'x-api-key': this.apiKey
      }
    });
    
    if (!response.ok) {
      throw new Error(`獲取向量存儲信息失敗: ${response.statusText}`);
    }
    
    return response.json();
  }

  // 上傳文本文件
  async uploadTextDocument(name, content) {
    const response = await fetch(`${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents/text`, {
      method: 'POST',
      headers: {
        'x-api-key': this.apiKey,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        name,
        content
      })
    });
    
    if (!response.ok) {
      throw new Error(`上傳文本文件失敗: ${response.statusText}`);
    }
    
    return response.json();
  }

  // 上傳文件
  async uploadFile(file, onProgress) {
    const formData = new FormData();
    formData.append('file', file);
    
    const xhr = new XMLHttpRequest();
    
    return new Promise((resolve, reject) => {
      xhr.open('POST', `${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents`);
      xhr.setRequestHeader('x-api-key', this.apiKey);
      
      xhr.upload.onprogress = (event) => {
        if (event.lengthComputable && onProgress) {
          const percentComplete = (event.loaded / event.total) * 100;
          onProgress(percentComplete);
        }
      };
      
      xhr.onload = () => {
        if (xhr.status === 201) {
          resolve(JSON.parse(xhr.responseText));
        } else {
          reject(new Error(`上傳文件失敗: ${xhr.statusText}`));
        }
      };
      
      xhr.onerror = () => {
        reject(new Error('文件上傳期間的網絡錯誤'));
      };
      
      xhr.send(formData);
    });
  }

  // 在向量存儲中搜索文件
  async searchDocuments(query, options = {}) {
    const params = new URLSearchParams({
      q: query,
      n: options.numResults || 10,
      prev_chunks: options.prevChunks || 1,
      next_chunks: options.nextChunks || 1
    });
    
    if (options.threshold) {
      params.append('t', options.threshold);
    }
    
    const response = await fetch(
      `${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents/search?${params}`,
      {
        method: 'GET',
        headers: {
          'x-api-key': this.apiKey
        }
      }
    );
    
    if (!response.ok) {
      throw new Error(`搜索失敗: ${response.statusText}`);
    }
    
    return response.json();
  }

  // 列出向量存儲中的所有文件
  async listDocuments() {
    const response = await fetch(
      `${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents`,
      {
        method: 'GET',
        headers: {
          'x-api-key': this.apiKey
        }
      }
    );
    
    if (!response.ok) {
      throw new Error(`列出文件失敗: ${response.statusText}`);
    }
    
    return response.json();
  }

  // 刪除文件
  async deleteDocument(documentId) {
    const response = await fetch(
      `${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents/${documentId}`,
      {
        method: 'DELETE',
        headers: {
          'x-api-key': this.apiKey
        }
      }
    );
    
    if (!response.ok) {
      throw new Error(`刪除文件失敗: ${response.statusText}`);
    }
    
    return true;
  }
}

// 示例用法
/*
const client = new VectorStoreClient('your_api_key', 'vs_abc123');

// 搜索文件
client.searchDocuments('語義搜索是如何工作的?')
  .then(results => {
    console.log(`找到 ${results.matched_chunks.length} 個匹配`);
    results.matched_chunks.forEach(match => {
      console.log(`文件: ${match.document.name}`);
      console.log(`分數: ${match.distance}`);
      console.log(`內容: ${match.matched_content}`);
      console.log('---');
    });
  })
  .catch(error => console.error(error));
*/

{% endtab %}

{% tab title="Ruby" %}


ruby
require 'net/http'
require 'uri'
require 'json'

class VectorStoreClient
  def initialize(api_key, vector_store_id)
    @api_key = api_key
    @vector_store_id = vector_store_id
    @base_url = 'https://api.rememberizer.ai/api/v1'
  end

  # 獲取向量存儲詳細信息
  def get_vector_store_info
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}")
    request = Net::HTTP::Get.new(uri)
    request['x-api-key'] = @api_key
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  # 上傳文本內容
  def upload_text(name, content)
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}/documents/text")
    request = Net::HTTP::Post.new(uri)
    request['Content-Type'] = 'application/json'
    request['x-api-key'] = @api_key
    
    request.body = {
      name: name,
      content: content
    }.to_json
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  # 搜索文件
  def search(query, num_results: 5, prev_chunks: 1, next_chunks: 1, threshold: nil)
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}/documents/search")
    params = {
      q: query,
      n: num_results,
      prev_chunks: prev_chunks,
      next_chunks: next_chunks
    }
    
    params[:t] = threshold if threshold
    
    uri.query = URI.encode_www_form(params)
    request = Net::HTTP::Get.new(uri)
    request['x-api-key'] = @api_key
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  # 列出文件
  def list_documents
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}/documents")
    request = Net::HTTP::Get.new(uri)
    request['x-api-key'] = @api_key
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  # 上傳文件(多部分表單)
  def upload_file(file_path)
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}/documents")
    
    file_name = File.basename(file_path)
    file_content = File.binread(file_path)
    
    boundary = "RememberizerBoundary#{rand(1000000)}"
    
    request = Net::HTTP::Post.new(uri)
    request['Content-Type'] = "multipart/form-data; boundary=#{boundary}"
    request['x-api-key'] = @api_key
    
    post_body = []
    post_body << "--#{boundary}\r\n"
    post_body << "Content-Disposition: form-data; name=\"file\"; filename=\"#{file_name}\"\r\n"
    post_body << "Content-Type: application/octet-stream\r\n\r\n"
    post_body << file_content
    post_body << "\r\n--#{boundary}--\r\n"
    
    request.body = post_body.join
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  private

  def send_request(uri, request)
    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = (uri.scheme == 'https')
    
    response = http.request(request)
    
    unless response.is_a?(Net::HTTPSuccess)
      raise "API 請求失敗: #{response.code} #{response.message}\n#{response.body}"
    end
    
    response
  end
end

示例用法

=begin client = VectorStoreClient.new('your_api_key', 'vs_abc123')

搜尋文件

results = client.search('什麼是資料安全的最佳實踐?') puts "找到 #{results['matched_chunks'].length} 個結果"

顯示最佳結果

if results['matched_chunks'].any? top_match = results['matched_chunks'].first puts "最佳匹配 (距離: #{top_match['distance']}):" puts "文件: #{top_match['document']['name']}" puts "內容: #{top_match['matched_content']}" end =end


</div>

<div data-gb-custom-block data-tag="tab" data-title='cURL'>

```bash
# 設定您的 API 金鑰和向量儲存 ID
API_KEY="your_api_key_here"
VECTOR_STORE_ID="vs_abc123"
BASE_URL="https://api.rememberizer.ai/api/v1"

# 獲取向量存儲資訊
curl -X GET "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}" \
  -H "x-api-key: ${API_KEY}"

# 上傳文本文件
curl -X POST "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents/text" \
  -H "x-api-key: ${API_KEY}" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "example-document.txt",
    "content": "這是一個範例文件,將被向量化並存儲在向量數據庫中以進行語義搜索。"
  }'

# 上傳檔案
curl -X POST "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents" \
  -H "x-api-key: ${API_KEY}" \
  -F "file=@/path/to/your/document.pdf"

# 搜尋文件
curl -X GET "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents/search?q=semantic%20search&n=5&prev_chunks=1&next_chunks=1" \
  -H "x-api-key: ${API_KEY}"

# 列出所有文件
curl -X GET "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents" \
  -H "x-api-key: ${API_KEY}"

# 刪除文件
curl -X DELETE "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents/123" \
  -H "x-api-key: ${API_KEY}"

性能考量

即將推出:向量存儲架構圖

這個技術架構圖將說明:

  • PostgreSQL + pgvector 基礎架構

  • 索引算法結構 (IVFFLAT 與 HNSW)

  • 向量空間中的搜索指標如何運作 (視覺比較)

  • 文檔分塊過程及重疊視覺化

  • 在不同規模下的性能考量視覺化

為不同數據量進行優化

數據量
推薦配置
備註

小型 (<10k 文件)

IVFFLAT, 余弦相似度

簡單配置提供良好性能

中型 (10k-100k 文件)

IVFFLAT, 確保定期重新索引

在搜索速度和索引維護之間取得平衡

大型 (>100k 文件)

HNSW, 考慮增加向量維度

更高的內存使用,但在規模上保持性能

分塊策略

分塊過程對搜索質量有重大影響:

  • 分塊大小:Rememberizer 使用默認的分塊大小為 1024 字節,並有 200 字節的重疊

  • 較小的分塊(512-1024 字節):更精確的匹配,更適合特定問題

  • 較大的分塊(1500-2048 字節):每個匹配中有更多上下文,更適合廣泛主題

  • 重疊:確保在分塊邊界不會丟失上下文

查詢優化

  • 上下文窗口:使用 prev_chunksnext_chunks 來檢索周圍內容

  • 結果數量:從 3-5 個結果開始(n 參數),根據精確度需求進行調整

  • 閾值:調整 t 參數以根據相似度分數過濾結果

高級用法

重新索引

Rememberizer 在向量數量超過預定閾值時自動觸發重新索引,但在以下情況下考慮手動重新索引:

  • 上傳大量文檔

  • 更改嵌入模型

  • 修改索引算法

查詢增強

為了獲得更好的搜索結果:

  1. 具體 在搜索查詢中

  2. 包含上下文 當可能時

  3. 使用自然語言 而不是關鍵字

  4. 根據結果質量 調整參數

從其他向量資料庫遷移

如果您目前正在使用其他向量資料庫解決方案並希望遷移到 Rememberizer 向量儲存,以下指南將幫助您有效地轉移數據。

遷移概述

遷移向量數據涉及:

  1. 從您的源向量數據庫導出數據

  2. 將數據轉換為與 Rememberizer 兼容的格式

  3. 將數據導入您的 Rememberizer 向量存儲

  4. 驗證遷移是否成功

遷移到 Rememberizer 的好處

  • PostgreSQL 基礎:建立在成熟的資料庫技術上,具備內建的備份和恢復功能

  • 整合生態系統:與其他 Rememberizer 組件無縫連接

  • 簡化管理:統一介面進行向量操作

  • 先進安全性:行級安全性和細粒度訪問控制

  • 可擴展架構:隨著數據增長進行性能優化

從 Pinecone 遷移

import os
import pinecone
import requests
import json
import time

# 設定 Pinecone 客戶端
pinecone.init(api_key="PINECONE_API_KEY", environment="PINECONE_ENV")
source_index = pinecone.Index("your-pinecone-index")

# 設定 Rememberizer 向量儲存客戶端
REMEMBERIZER_API_KEY = "your_rememberizer_api_key"
VECTOR_STORE_ID = "vs_abc123"  # 您的 Rememberizer 向量儲存 ID
BASE_URL = "https://api.rememberizer.ai/api/v1"

# 1. 設定遷移的批次大小(根據您的數據大小進行調整)
BATCH_SIZE = 100

# 2. 從 Pinecone 獲取向量的函數
def fetch_vectors_from_pinecone(index_name, batch_size, cursor=None):
    # 如果您的 Pinecone 版本支持,請使用列表操作
    try:
        result = source_index.list(limit=batch_size, cursor=cursor)
        vectors = result.get("vectors", {})
        next_cursor = result.get("cursor")
        return vectors, next_cursor
    except AttributeError:
        # 對於不支持列表操作的舊版 Pinecone
        # 這是一種簡化的方法;實際實現取決於您的數據訪問模式
        query_response = source_index.query(
            vector=[0] * source_index.describe_index_stats()["dimension"],
            top_k=batch_size,
            include_metadata=True,
            include_values=True
        )
        return {item.id: {"id": item.id, "values": item.values, "metadata": item.metadata} 
                for item in query_response.matches}, None

# 3. 上傳向量到 Rememberizer 的函數
def upload_to_rememberizer(vectors):
    headers = {
        "x-api-key": REMEMBERIZER_API_KEY,
        "Content-Type": "application/json"
    }
    
    for vector_id, vector_data in vectors.items():
        # 將 Pinecone 向量數據轉換為 Rememberizer 格式
        document_name = vector_data.get("metadata", {}).get("filename", f"pinecone_doc_{vector_id}")
        content = vector_data.get("metadata", {}).get("text", "")
        
        if not content:
            print(f"跳過 {vector_id} - 在元數據中未找到文本內容")
            continue
            
        data = {
            "name": document_name,
            "content": content,
            # 可選:包括其他元數據
            "metadata": vector_data.get("metadata", {})
        }
        
        response = requests.post(
            f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/text",
            headers=headers,
            json=data
        )
        
        if response.status_code == 201:
            print(f"文檔 '{document_name}' 上傳成功!")
        else:
            print(f"上傳文檔 {document_name} 時出錯:{response.text}")
        
        # 添加小延遲以防止速率限制
        time.sleep(0.1)

# 4. 主要遷移功能
def migrate_pinecone_to_rememberizer():
    cursor = None
    total_migrated = 0
    
    print("開始從 Pinecone 遷移到 Rememberizer...")
    
    while True:
        vectors, cursor = fetch_vectors_from_pinecone("your-pinecone-index", BATCH_SIZE, cursor)
        
        if not vectors:
            break
            
        print(f"從 Pinecone 獲取了 {len(vectors)} 個向量")
        upload_to_rememberizer(vectors)
        
        total_migrated += len(vectors)
        print(f"進度: {total_migrated} 個向量已遷移")
        
        if not cursor:
            break
    
    print(f"遷移完成!總共遷移了 {total_migrated} 個向量到 Rememberizer")

# 執行遷移
# migrate_pinecone_to_rememberizer()

從 Qdrant 遷移

import requests
import json
import time
from qdrant_client import QdrantClient
from qdrant_client.http import model

# 設定 Qdrant 客戶端
QDRANT_URL = "http://localhost:6333"  # 或者你的 Qdrant 雲端 URL
QDRANT_API_KEY = "your_qdrant_api_key"  # 如果使用 Qdrant 雲端
QDRANT_COLLECTION_NAME = "your_collection"

qdrant_client = QdrantClient(
    url=QDRANT_URL,
    api_key=QDRANT_API_KEY  # 僅限於 Qdrant 雲端
)

# 設定 Rememberizer 向量儲存客戶端
REMEMBERIZER_API_KEY = "your_rememberizer_api_key"
VECTOR_STORE_ID = "vs_abc123"  # 您的 Rememberizer 向量儲存 ID
BASE_URL = "https://api.rememberizer.ai/api/v1"

# 批次大小處理
BATCH_SIZE = 100

# 從 Qdrant 獲取點的函數
def fetch_points_from_qdrant(collection_name, batch_size, offset=0):
    try:
        # 獲取集合信息以確定向量維度
        collection_info = qdrant_client.get_collection(collection_name=collection_name)
        
        # 滾動查詢點
        scroll_result = qdrant_client.scroll(
            collection_name=collection_name,
            limit=batch_size,
            offset=offset,
            with_payload=True,
            with_vectors=True
        )
        
        points = scroll_result[0]  # 元組 (points, next_offset)
        next_offset = scroll_result[1]
        
        return points, next_offset
    except Exception as e:
        print(f"從 Qdrant 獲取點時出錯: {e}")
        return [], None

# 將向 Rememberizer 上傳向量的函數
def upload_to_rememberizer(points):
    headers = {
        "x-api-key": REMEMBERIZER_API_KEY,
        "Content-Type": "application/json"
    }
    
    results = []
    
    for point in points:
        # 從 Qdrant 點提取數據
        point_id = point.id
        metadata = point.payload
        text_content = metadata.get("text", "")
        document_name = metadata.get("filename", f"qdrant_doc_{point_id}")
        
        if not text_content:
            print(f"跳過 {point_id} - 在有效負載中未找到文本內容")
            continue
            
        data = {
            "name": document_name,
            "content": text_content,
            # 可選:包括其他元數據
            "metadata": metadata
        }
        
        try:
            response = requests.post(
                f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/text",
                headers=headers,
                json=data
            )
            
            if response.status_code == 201:
                print(f"文檔 '{document_name}' 上傳成功!")
                results.append({"id": point_id, "success": True})
            else:
                print(f"上傳文檔 {document_name} 時出錯:{response.text}")
                results.append({"id": point_id, "success": False, "error": response.text})
        except Exception as e:
            print(f"上傳文檔 {document_name} 時發生異常:{str(e)}")
            results.append({"id": point_id, "success": False, "error": str(e)})
        
        # 添加小延遲以防止速率限制
        time.sleep(0.1)
    
    return results

# 主要遷移功能
def migrate_qdrant_to_rememberizer():
    offset = None
    total_migrated = 0
    
    print("開始從 Qdrant 遷移到 Rememberizer...")
    
    while True:
        points, next_offset = fetch_points_from_qdrant(
            QDRANT_COLLECTION_NAME, 
            BATCH_SIZE,
            offset
        )
        
        if not points:
            break
            
        print(f"從 Qdrant 獲取了 {len(points)} 個點")
        
        results = upload_to_rememberizer(points)
        success_count = sum(1 for r in results if r.get("success", False))
        
        total_migrated += success_count
        print(f"進度: {total_migrated} 個點成功遷移")
        
        if next_offset is None:
            break
            
        offset = next_offset
    
    print(f"遷移完成!總共遷移了 {total_migrated} 個點到 Rememberizer")

# 執行遷移
# migrate_qdrant_to_rememberizer()

從 Supabase pgvector 遷移

如果您已經在使用 Supabase 和 pgvector,遷移到 Rememberizer 特別簡單,因為兩者都使用帶有 pgvector 擴展的 PostgreSQL。

import psycopg2
import requests
import json
import time
import os
from dotenv import load_dotenv

# 載入環境變數
load_dotenv()

# Supabase PostgreSQL 配置
SUPABASE_DB_HOST = os.getenv("SUPABASE_DB_HOST")
SUPABASE_DB_PORT = os.getenv("SUPABASE_DB_PORT", "5432")
SUPABASE_DB_NAME = os.getenv("SUPABASE_DB_NAME")
SUPABASE_DB_USER = os.getenv("SUPABASE_DB_USER")
SUPABASE_DB_PASSWORD = os.getenv("SUPABASE_DB_PASSWORD")
SUPABASE_VECTOR_TABLE = os.getenv("SUPABASE_VECTOR_TABLE", "documents")

# Rememberizer 配置
REMEMBERIZER_API_KEY = os.getenv("REMEMBERIZER_API_KEY")
VECTOR_STORE_ID = os.getenv("VECTOR_STORE_ID")  # 例如,"vs_abc123"
BASE_URL = "https://api.rememberizer.ai/api/v1"

# 批次大小處理
BATCH_SIZE = 100

# 連接到 Supabase PostgreSQL
def connect_to_supabase():
    try:
        conn = psycopg2.connect(
            host=SUPABASE_DB_HOST,
            port=SUPABASE_DB_PORT,
            dbname=SUPABASE_DB_NAME,
            user=SUPABASE_DB_USER,
            password=SUPABASE_DB_PASSWORD
        )
        return conn
    except Exception as e:
        print(f"連接到 Supabase PostgreSQL 時出錯: {e}")
        return None

# 從 Supabase pgvector 獲取文件
def fetch_documents_from_supabase(conn, batch_size, offset=0):
    try:
        cursor = conn.cursor()
        
        # 根據您的表結構調整此查詢
        query = f"""
        SELECT id, content, metadata, embedding
        FROM {SUPABASE_VECTOR_TABLE}
        ORDER BY id
        LIMIT %s OFFSET %s
        """
        
        cursor.execute(query, (batch_size, offset))
        documents = cursor.fetchall()
        cursor.close()
        
        return documents
    except Exception as e:
        print(f"從 Supabase 獲取文件時出錯: {e}")
        return []

# 上傳文件到 Rememberizer
def upload_to_rememberizer(documents):
    headers = {
        "x-api-key": REMEMBERIZER_API_KEY,
        "Content-Type": "application/json"
    }
    
    results = []
    
    for doc in documents:
        doc_id, content, metadata, embedding = doc
        
        # 如果元數據以 JSON 字串形式存儲,則解析元數據
        if isinstance(metadata, str):
            try:
                metadata = json.loads(metadata)
            except:
                metadata = {}
        elif metadata is None:
            metadata = {}
        
        document_name = metadata.get("filename", f"supabase_doc_{doc_id}")
        
        if not content:
            print(f"跳過 {doc_id} - 找不到內容")
            continue
            
        data = {
            "name": document_name,
            "content": content,
            "metadata": metadata
        }
        
        try:
            response = requests.post(
                f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/text",
                headers=headers,
                json=data
            )
            
            if response.status_code == 201:
                print(f"文件 '{document_name}' 上傳成功!")
                results.append({"id": doc_id, "success": True})
            else:
                print(f"上傳文件 {document_name} 時出錯: {response.text}")
                results.append({"id": doc_id, "success": False, "error": response.text})
        except Exception as e:
            print(f"上傳文件 {document_name} 時發生異常: {str(e)}")
            results.append({"id": doc_id, "success": False, "error": str(e)})
        
        # 添加小延遲以防止速率限制
        time.sleep(0.1)
    
    return results

# 主要遷移功能
def migrate_supabase_to_rememberizer():
    conn = connect_to_supabase()
    if not conn:
        print("無法連接到 Supabase。中止遷移。")
        return
    
    offset = 0
    total_migrated = 0
    
    print("開始從 Supabase pgvector 遷移到 Rememberizer...")
    
    try:
        while True:
            documents = fetch_documents_from_supabase(conn, BATCH_SIZE, offset)
            
            if not documents:
                break
                
            print(f"從 Supabase 獲取了 {len(documents)} 份文件")
            
            results = upload_to_rememberizer(documents)
            success_count = sum(1 for r in results if r.get("success", False))
            
            total_migrated += success_count
            print(f"進度:成功遷移 {total_migrated} 份文件")
            
            offset += BATCH_SIZE
            
    finally:
        conn.close()
    
    print(f"遷移完成!共遷移 {total_migrated} 份文件到 Rememberizer")

# 執行遷移
# migrate_supabase_to_rememberizer()

遷移最佳實踐

遵循這些建議以確保成功的遷移:

  1. 提前規劃

    • 估算遷移所需的數據量和時間

    • 在低流量時段安排遷移

    • 在開始大型遷移之前增加磁碟空間

  2. 先測試

    • 在 Rememberizer 中創建測試向量存儲

    • 遷移一小部分數據(100-1000 個向量)

    • 使用關鍵查詢驗證搜索功能

  3. 數據驗證

    • 比較遷移前後的文檔數量

    • 執行基準查詢以確保結果相似

    • 驗證元數據是否正確保留

  4. 優化性能

    • 使用批量操作以提高效率

    • 考慮源數據庫和目標數據庫的地理位置共置

    • 監控 API 速率限制並相應調整批量大小

  5. 遷移後步驟

    • 驗證在 Rememberizer 中創建索引

    • 更新應用程序配置以指向新的向量存儲

    • 在遷移驗證之前保留源數據庫作為備份

有關詳細的 API 參考和端點文檔,請訪問 向量儲存 API 頁面。


確保安全處理 API 密鑰並遵循 API 密鑰管理的最佳實踐。

Last updated