向量儲存

本指南將幫助您了解如何作為開發者使用 Rememberizer 向量儲存。

向量儲存

Rememberizer 向量儲存簡化了處理向量數據的過程,讓您能夠專注於文本輸入,並利用向量的力量進行各種應用,如搜索和數據分析。

介紹

Rememberizer 向量存儲提供了一個易於使用的介面,用於處理向量數據,同時抽象掉向量嵌入的複雜性。由 PostgreSQL 和 pgvector 擴展提供支持,Rememberizer 向量存儲允許您直接處理文本。該服務處理文本數據的分塊、向量化和存儲,使您能夠更專注於核心應用邏輯。

要深入了解向量嵌入和向量數據庫背後的理論概念,請參閱 什麼是向量嵌入和向量數據庫?

技術概述

向量儲存的運作方式

Rememberizer 向量儲存將文本轉換為高維向量表示(嵌入),以捕捉語義意義。這使得:

  1. 語義搜尋:根據意義而非僅僅是關鍵字來查找文件

  2. 相似性匹配:識別概念上相關的內容

  3. 高效檢索:快速從大型數據集中定位相關信息

主要組件

  • 文件處理:文本被拆分為最佳大小的片段,並具有重疊邊界以保留上下文

  • 向量化:片段使用最先進的模型轉換為嵌入

  • 索引:專門的算法組織向量以進行高效的相似性搜索

  • 查詢處理:搜索查詢被向量化並與存儲的嵌入進行比較

架構

Rememberizer 使用以下方式實現向量存儲:

  • PostgreSQL 與 pgvector 擴展:用於高效的向量存儲和搜索

  • 基於集合的組織:每個向量存儲都有其獨立的集合

  • API 驅動的訪問:所有操作的簡單 RESTful 端點

開始使用

創建向量存儲

  1. 在您的儀表板中導航到向量存儲部分

  2. 點擊「創建新的向量存儲」:

    • 將出現一個表單,提示您輸入詳細信息。

  3. 填寫詳細信息:

    • 名稱:為您的向量存儲提供一個唯一的名稱。

    • 描述:寫一個簡短的向量存儲描述。

    • 嵌入模型:選擇將文本轉換為向量的模型。

    • 索引算法:選擇如何組織向量以便搜索。

    • 搜索度量:定義如何計算向量之間的相似性。

    • 向量維度:向量嵌入的大小(通常為 768-1536)。

  4. 提交表單:

    • 點擊「創建」按鈕。您將收到成功通知,新的存儲將出現在您的向量存儲列表中。

創建新的向量存儲
創建新的向量存儲

配置選項

嵌入模型

模型
維度
描述
最適合

openai/text-embedding-3-large

1536

來自 OpenAI 的高精度嵌入模型

需要最大精度的生產應用

openai/text-embedding-3-small

1536

來自 OpenAI 的較小、更快的嵌入模型

具有更高吞吐量需求的應用

索引演算法

演算法
描述
取捨

IVFFLAT (預設)

反向文件與平坦壓縮

速度與準確性的良好平衡;適用於大多數數據集

HNSW

分層可導航小世界

對於大型數據集更好的準確性;更高的記憶體需求

搜尋指標

指標
描述
最適合

余弦相似度 (預設)

測量向量之間的角度

一般用途的相似性匹配

內積 (ip)

向量之間的點積

當向量的大小很重要時

L2 (歐幾里得)

向量之間的直線距離

當空間關係重要時

管理向量儲存

  1. 查看和編輯向量儲存:

    • 訪問管理儀表板以查看、編輯或刪除向量儲存。

  2. 查看文件:

    • 瀏覽特定向量儲存中的單個文件及其相關元數據。

  3. 統計數據:

    • 查看詳細統計數據,例如儲存的向量數量、查詢性能和操作指標。

查看向量儲存的詳細資訊
查看向量儲存的詳細資訊

API 金鑰管理

API 金鑰用於驗證和授權訪問 Rememberizer 向量儲存的 API 端點。妥善管理 API 金鑰對於維護您的向量儲存的安全性和完整性至關重要。

創建 API 金鑰

  1. 前往您的向量儲存詳細頁面

  2. 導航至 API 金鑰管理區域:

    • 它可以在「配置」標籤內找到

  3. 點擊 「新增 API 金鑰」

    • 將出現一個表單,提示您輸入詳細信息。

  4. 填寫詳細信息:

    • 名稱:提供一個名稱以幫助您識別其使用案例。

  5. 提交表單:

    • 點擊「創建」按鈕。新的 API 金鑰將被生成並顯示。請確保複製並安全存儲。此金鑰用於驗證對該特定向量儲存的請求。

創建新的 API 金鑰
創建新的 API 金鑰

撤銷 API 金鑰

如果不再需要 API 金鑰,您可以刪除它以防止任何潛在的濫用。

出於安全原因,您可能希望定期更換您的 API 金鑰。這涉及生成一個新的金鑰並撤銷舊的金鑰。

使用向量存儲 API

在創建向量存儲並生成 API 密鑰後,您可以使用 REST API 與其互動。

代碼範例

import requests
import json

API_KEY = "your_api_key_here"
VECTOR_STORE_ID = "vs_abc123"  # 替換為您的向量儲存 ID
BASE_URL = "https://api.rememberizer.ai/api/v1"

上傳文件到向量存儲

def upload_document(file_path, document_name=None): if document_name is None: document_name = file_path.split("/")[-1]

with open(file_path, "rb") as f:
    files = {"file": (document_name, f)}
    headers = {"x-api-key": API_KEY}
    
    response = requests.post(
        f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents",
        headers=headers,
        files=files
    )
    
    if response.status_code == 201:
        print(f"文件 '{document_name}' 上傳成功!")
        return response.json()
    else:
        print(f"上傳文件時出錯:{response.text}")
        return None

上傳文本內容到向量儲存

def upload_text(content, document_name): headers = { "x-api-key": API_KEY, "Content-Type": "application/json" }

data = {
    "name": document_name,
    "content": content
}

response = requests.post(
    f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/text",
    headers=headers,
    json=data
)

if response.status_code == 201:
    print(f"文本文件 '{document_name}' 上傳成功!")
    return response.json()
else:
    print(f"上傳文本時出錯: {response.text}")
    return None

搜尋向量儲存

def search_vector_store(query, num_results=5, prev_chunks=1, next_chunks=1): headers = {"x-api-key": API_KEY}

params = {
    "q": query,
    "n": num_results,
    "prev_chunks": prev_chunks,
    "next_chunks": next_chunks
}

response = requests.get(
    f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/search",
    headers=headers,
    params=params
)

if response.status_code == 200:
    results = response.json()
    print(f"找到 {len(results['matched_chunks'])} 個與 '{query}' 匹配的結果")
    
    # 列印最佳結果
    if results['matched_chunks']:
        top_match = results['matched_chunks'][0]
        print(f"最佳匹配 (距離: {top_match['distance']}):")
        print(f"文件: {top_match['document']['name']}")
        print(f"內容: {top_match['matched_content']}")
    
    return results
else:
    print(f"搜尋錯誤: {response.text}")
    return None

範例用法

上傳文件("path/to/document.pdf")

upload_text("這是一段範例文本,將被向量化", "sample-document.txt")

search_vector_store("向量相似度是如何工作的?")

{% endtab %}

{% tab title="JavaScript" %}
```javascript
// 向量存儲 API 客戶端
class VectorStoreClient {
  constructor(apiKey, vectorStoreId) {
    this.apiKey = apiKey;
    this.vectorStoreId = vectorStoreId;
    this.baseUrl = 'https://api.rememberizer.ai/api/v1';
  }

  // 獲取向量存儲信息
  async getVectorStoreInfo() {
    const response = await fetch(`${this.baseUrl}/vector-stores/${this.vectorStoreId}`, {
      method: 'GET',
      headers: {
        'x-api-key': this.apiKey
      }
    });
    
    if (!response.ok) {
      throw new Error(`獲取向量存儲信息失敗: ${response.statusText}`);
    }
    
    return response.json();
  }

  // 上傳文本文件
  async uploadTextDocument(name, content) {
    const response = await fetch(`${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents/text`, {
      method: 'POST',
      headers: {
        'x-api-key': this.apiKey,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        name,
        content
      })
    });
    
    if (!response.ok) {
      throw new Error(`上傳文本文件失敗: ${response.statusText}`);
    }
    
    return response.json();
  }

  // 上傳文件
  async uploadFile(file, onProgress) {
    const formData = new FormData();
    formData.append('file', file);
    
    const xhr = new XMLHttpRequest();
    
    return new Promise((resolve, reject) => {
      xhr.open('POST', `${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents`);
      xhr.setRequestHeader('x-api-key', this.apiKey);
      
      xhr.upload.onprogress = (event) => {
        if (event.lengthComputable && onProgress) {
          const percentComplete = (event.loaded / event.total) * 100;
          onProgress(percentComplete);
        }
      };
      
      xhr.onload = () => {
        if (xhr.status === 201) {
          resolve(JSON.parse(xhr.responseText));
        } else {
          reject(new Error(`上傳文件失敗: ${xhr.statusText}`));
        }
      };
      
      xhr.onerror = () => {
        reject(new Error('文件上傳期間的網絡錯誤'));
      };
      
      xhr.send(formData);
    });
  }

  // 在向量存儲中搜索文件
  async searchDocuments(query, options = {}) {
    const params = new URLSearchParams({
      q: query,
      n: options.numResults || 10,
      prev_chunks: options.prevChunks || 1,
      next_chunks: options.nextChunks || 1
    });
    
    if (options.threshold) {
      params.append('t', options.threshold);
    }
    
    const response = await fetch(
      `${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents/search?${params}`,
      {
        method: 'GET',
        headers: {
          'x-api-key': this.apiKey
        }
      }
    );
    
    if (!response.ok) {
      throw new Error(`搜索失敗: ${response.statusText}`);
    }
    
    return response.json();
  }

  // 列出向量存儲中的所有文件
  async listDocuments() {
    const response = await fetch(
      `${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents`,
      {
        method: 'GET',
        headers: {
          'x-api-key': this.apiKey
        }
      }
    );
    
    if (!response.ok) {
      throw new Error(`列出文件失敗: ${response.statusText}`);
    }
    
    return response.json();
  }

  // 刪除文件
  async deleteDocument(documentId) {
    const response = await fetch(
      `${this.baseUrl}/vector-stores/${this.vectorStoreId}/documents/${documentId}`,
      {
        method: 'DELETE',
        headers: {
          'x-api-key': this.apiKey
        }
      }
    );
    
    if (!response.ok) {
      throw new Error(`刪除文件失敗: ${response.statusText}`);
    }
    
    return true;
  }
}

// 示例用法
/*
const client = new VectorStoreClient('your_api_key', 'vs_abc123');

// 搜索文件
client.searchDocuments('語義搜索是如何工作的?')
  .then(results => {
    console.log(`找到 ${results.matched_chunks.length} 個匹配`);
    results.matched_chunks.forEach(match => {
      console.log(`文件: ${match.document.name}`);
      console.log(`分數: ${match.distance}`);
      console.log(`內容: ${match.matched_content}`);
      console.log('---');
    });
  })
  .catch(error => console.error(error));
*/

{% endtab %}

{% tab title="Ruby" %}


ruby
require 'net/http'
require 'uri'
require 'json'

class VectorStoreClient
  def initialize(api_key, vector_store_id)
    @api_key = api_key
    @vector_store_id = vector_store_id
    @base_url = 'https://api.rememberizer.ai/api/v1'
  end

  # 獲取向量存儲詳細信息
  def get_vector_store_info
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}")
    request = Net::HTTP::Get.new(uri)
    request['x-api-key'] = @api_key
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  # 上傳文本內容
  def upload_text(name, content)
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}/documents/text")
    request = Net::HTTP::Post.new(uri)
    request['Content-Type'] = 'application/json'
    request['x-api-key'] = @api_key
    
    request.body = {
      name: name,
      content: content
    }.to_json
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  # 搜索文件
  def search(query, num_results: 5, prev_chunks: 1, next_chunks: 1, threshold: nil)
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}/documents/search")
    params = {
      q: query,
      n: num_results,
      prev_chunks: prev_chunks,
      next_chunks: next_chunks
    }
    
    params[:t] = threshold if threshold
    
    uri.query = URI.encode_www_form(params)
    request = Net::HTTP::Get.new(uri)
    request['x-api-key'] = @api_key
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  # 列出文件
  def list_documents
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}/documents")
    request = Net::HTTP::Get.new(uri)
    request['x-api-key'] = @api_key
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  # 上傳文件(多部分表單)
  def upload_file(file_path)
    uri = URI("#{@base_url}/vector-stores/#{@vector_store_id}/documents")
    
    file_name = File.basename(file_path)
    file_content = File.binread(file_path)
    
    boundary = "RememberizerBoundary#{rand(1000000)}"
    
    request = Net::HTTP::Post.new(uri)
    request['Content-Type'] = "multipart/form-data; boundary=#{boundary}"
    request['x-api-key'] = @api_key
    
    post_body = []
    post_body << "--#{boundary}\r\n"
    post_body << "Content-Disposition: form-data; name=\"file\"; filename=\"#{file_name}\"\r\n"
    post_body << "Content-Type: application/octet-stream\r\n\r\n"
    post_body << file_content
    post_body << "\r\n--#{boundary}--\r\n"
    
    request.body = post_body.join
    
    response = send_request(uri, request)
    JSON.parse(response.body)
  end

  private

  def send_request(uri, request)
    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = (uri.scheme == 'https')
    
    response = http.request(request)
    
    unless response.is_a?(Net::HTTPSuccess)
      raise "API 請求失敗: #{response.code} #{response.message}\n#{response.body}"
    end
    
    response
  end
end

示例用法

=begin client = VectorStoreClient.new('your_api_key', 'vs_abc123')

搜尋文件

results = client.search('什麼是資料安全的最佳實踐?') puts "找到 #{results['matched_chunks'].length} 個結果"

顯示最佳結果

if results['matched_chunks'].any? top_match = results['matched_chunks'].first puts "最佳匹配 (距離: #{top_match['distance']}):" puts "文件: #{top_match['document']['name']}" puts "內容: #{top_match['matched_content']}" end =end


</div>

<div data-gb-custom-block data-tag="tab" data-title='cURL'>

```bash
# 設定您的 API 金鑰和向量儲存 ID
API_KEY="your_api_key_here"
VECTOR_STORE_ID="vs_abc123"
BASE_URL="https://api.rememberizer.ai/api/v1"

# 獲取向量存儲資訊
curl -X GET "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}" \
  -H "x-api-key: ${API_KEY}"

# 上傳文本文件
curl -X POST "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents/text" \
  -H "x-api-key: ${API_KEY}" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "example-document.txt",
    "content": "這是一個範例文件,將被向量化並存儲在向量數據庫中以進行語義搜索。"
  }'

# 上傳檔案
curl -X POST "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents" \
  -H "x-api-key: ${API_KEY}" \
  -F "file=@/path/to/your/document.pdf"

# 搜尋文件
curl -X GET "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents/search?q=semantic%20search&n=5&prev_chunks=1&next_chunks=1" \
  -H "x-api-key: ${API_KEY}"

# 列出所有文件
curl -X GET "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents" \
  -H "x-api-key: ${API_KEY}"

# 刪除文件
curl -X DELETE "${BASE_URL}/vector-stores/${VECTOR_STORE_ID}/documents/123" \
  -H "x-api-key: ${API_KEY}"

性能考量

即將推出:向量存儲架構圖

這個技術架構圖將說明:

  • PostgreSQL + pgvector 基礎架構

  • 索引算法結構 (IVFFLAT 與 HNSW)

  • 向量空間中的搜索指標如何運作 (視覺比較)

  • 文檔分塊過程及重疊視覺化

  • 在不同規模下的性能考量視覺化

為不同數據量進行優化

數據量
推薦配置
備註

小型 (<10k 文件)

IVFFLAT, 余弦相似度

簡單配置提供良好性能

中型 (10k-100k 文件)

IVFFLAT, 確保定期重新索引

在搜索速度和索引維護之間取得平衡

大型 (>100k 文件)

HNSW, 考慮增加向量維度

更高的內存使用,但在規模上保持性能

分塊策略

分塊過程對搜索質量有重大影響:

  • 分塊大小:Rememberizer 使用默認的分塊大小為 1024 字節,並有 200 字節的重疊

  • 較小的分塊(512-1024 字節):更精確的匹配,更適合特定問題

  • 較大的分塊(1500-2048 字節):每個匹配中有更多上下文,更適合廣泛主題

  • 重疊:確保在分塊邊界不會丟失上下文

查詢優化

  • 上下文窗口:使用 prev_chunksnext_chunks 來檢索周圍內容

  • 結果數量:從 3-5 個結果開始(n 參數),根據精確度需求進行調整

  • 閾值:調整 t 參數以根據相似度分數過濾結果

高級用法

重新索引

Rememberizer 在向量數量超過預定閾值時自動觸發重新索引,但在以下情況下考慮手動重新索引:

  • 上傳大量文檔

  • 更改嵌入模型

  • 修改索引算法

查詢增強

為了獲得更好的搜索結果:

  1. 具體 在搜索查詢中

  2. 包含上下文 當可能時

  3. 使用自然語言 而不是關鍵字

  4. 根據結果質量 調整參數

從其他向量資料庫遷移

如果您目前正在使用其他向量資料庫解決方案並希望遷移到 Rememberizer 向量儲存,以下指南將幫助您有效地轉移數據。

遷移概述

遷移向量數據涉及:

  1. 從您的源向量數據庫導出數據

  2. 將數據轉換為與 Rememberizer 兼容的格式

  3. 將數據導入您的 Rememberizer 向量存儲

  4. 驗證遷移是否成功

遷移到 Rememberizer 的好處

  • PostgreSQL 基礎:建立在成熟的資料庫技術上,具備內建的備份和恢復功能

  • 整合生態系統:與其他 Rememberizer 組件無縫連接

  • 簡化管理:統一介面進行向量操作

  • 先進安全性:行級安全性和細粒度訪問控制

  • 可擴展架構:隨著數據增長進行性能優化

從 Pinecone 遷移

import os
import pinecone
import requests
import json
import time

# 設定 Pinecone 客戶端
pinecone.init(api_key="PINECONE_API_KEY", environment="PINECONE_ENV")
source_index = pinecone.Index("your-pinecone-index")

# 設定 Rememberizer 向量儲存客戶端
REMEMBERIZER_API_KEY = "your_rememberizer_api_key"
VECTOR_STORE_ID = "vs_abc123"  # 您的 Rememberizer 向量儲存 ID
BASE_URL = "https://api.rememberizer.ai/api/v1"

# 1. 設定遷移的批次大小(根據您的數據大小進行調整)
BATCH_SIZE = 100

# 2. 從 Pinecone 獲取向量的函數
def fetch_vectors_from_pinecone(index_name, batch_size, cursor=None):
    # 如果您的 Pinecone 版本支持,請使用列表操作
    try:
        result = source_index.list(limit=batch_size, cursor=cursor)
        vectors = result.get("vectors", {})
        next_cursor = result.get("cursor")
        return vectors, next_cursor
    except AttributeError:
        # 對於不支持列表操作的舊版 Pinecone
        # 這是一種簡化的方法;實際實現取決於您的數據訪問模式
        query_response = source_index.query(
            vector=[0] * source_index.describe_index_stats()["dimension"],
            top_k=batch_size,
            include_metadata=True,
            include_values=True
        )
        return {item.id: {"id": item.id, "values": item.values, "metadata": item.metadata} 
                for item in query_response.matches}, None

# 3. 上傳向量到 Rememberizer 的函數
def upload_to_rememberizer(vectors):
    headers = {
        "x-api-key": REMEMBERIZER_API_KEY,
        "Content-Type": "application/json"
    }
    
    for vector_id, vector_data in vectors.items():
        # 將 Pinecone 向量數據轉換為 Rememberizer 格式
        document_name = vector_data.get("metadata", {}).get("filename", f"pinecone_doc_{vector_id}")
        content = vector_data.get("metadata", {}).get("text", "")
        
        if not content:
            print(f"跳過 {vector_id} - 在元數據中未找到文本內容")
            continue
            
        data = {
            "name": document_name,
            "content": content,
            # 可選:包括其他元數據
            "metadata": vector_data.get("metadata", {})
        }
        
        response = requests.post(
            f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/text",
            headers=headers,
            json=data
        )
        
        if response.status_code == 201:
            print(f"文檔 '{document_name}' 上傳成功!")
        else:
            print(f"上傳文檔 {document_name} 時出錯:{response.text}")
        
        # 添加小延遲以防止速率限制
        time.sleep(0.1)

# 4. 主要遷移功能
def migrate_pinecone_to_rememberizer():
    cursor = None
    total_migrated = 0
    
    print("開始從 Pinecone 遷移到 Rememberizer...")
    
    while True:
        vectors, cursor = fetch_vectors_from_pinecone("your-pinecone-index", BATCH_SIZE, cursor)
        
        if not vectors:
            break
            
        print(f"從 Pinecone 獲取了 {len(vectors)} 個向量")
        upload_to_rememberizer(vectors)
        
        total_migrated += len(vectors)
        print(f"進度: {total_migrated} 個向量已遷移")
        
        if not cursor:
            break
    
    print(f"遷移完成!總共遷移了 {total_migrated} 個向量到 Rememberizer")

# 執行遷移
# migrate_pinecone_to_rememberizer()

從 Qdrant 遷移

import requests
import json
import time
from qdrant_client import QdrantClient
from qdrant_client.http import model

# 設定 Qdrant 客戶端
QDRANT_URL = "http://localhost:6333"  # 或者你的 Qdrant 雲端 URL
QDRANT_API_KEY = "your_qdrant_api_key"  # 如果使用 Qdrant 雲端
QDRANT_COLLECTION_NAME = "your_collection"

qdrant_client = QdrantClient(
    url=QDRANT_URL,
    api_key=QDRANT_API_KEY  # 僅限於 Qdrant 雲端
)

# 設定 Rememberizer 向量儲存客戶端
REMEMBERIZER_API_KEY = "your_rememberizer_api_key"
VECTOR_STORE_ID = "vs_abc123"  # 您的 Rememberizer 向量儲存 ID
BASE_URL = "https://api.rememberizer.ai/api/v1"

# 批次大小處理
BATCH_SIZE = 100

# 從 Qdrant 獲取點的函數
def fetch_points_from_qdrant(collection_name, batch_size, offset=0):
    try:
        # 獲取集合信息以確定向量維度
        collection_info = qdrant_client.get_collection(collection_name=collection_name)
        
        # 滾動查詢點
        scroll_result = qdrant_client.scroll(
            collection_name=collection_name,
            limit=batch_size,
            offset=offset,
            with_payload=True,
            with_vectors=True
        )
        
        points = scroll_result[0]  # 元組 (points, next_offset)
        next_offset = scroll_result[1]
        
        return points, next_offset
    except Exception as e:
        print(f"從 Qdrant 獲取點時出錯: {e}")
        return [], None

# 將向 Rememberizer 上傳向量的函數
def upload_to_rememberizer(points):
    headers = {
        "x-api-key": REMEMBERIZER_API_KEY,
        "Content-Type": "application/json"
    }
    
    results = []
    
    for point in points:
        # 從 Qdrant 點提取數據
        point_id = point.id
        metadata = point.payload
        text_content = metadata.get("text", "")
        document_name = metadata.get("filename", f"qdrant_doc_{point_id}")
        
        if not text_content:
            print(f"跳過 {point_id} - 在有效負載中未找到文本內容")
            continue
            
        data = {
            "name": document_name,
            "content": text_content,
            # 可選:包括其他元數據
            "metadata": metadata
        }
        
        try:
            response = requests.post(
                f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/text",
                headers=headers,
                json=data
            )
            
            if response.status_code == 201:
                print(f"文檔 '{document_name}' 上傳成功!")
                results.append({"id": point_id, "success": True})
            else:
                print(f"上傳文檔 {document_name} 時出錯:{response.text}")
                results.append({"id": point_id, "success": False, "error": response.text})
        except Exception as e:
            print(f"上傳文檔 {document_name} 時發生異常:{str(e)}")
            results.append({"id": point_id, "success": False, "error": str(e)})
        
        # 添加小延遲以防止速率限制
        time.sleep(0.1)
    
    return results

# 主要遷移功能
def migrate_qdrant_to_rememberizer():
    offset = None
    total_migrated = 0
    
    print("開始從 Qdrant 遷移到 Rememberizer...")
    
    while True:
        points, next_offset = fetch_points_from_qdrant(
            QDRANT_COLLECTION_NAME, 
            BATCH_SIZE,
            offset
        )
        
        if not points:
            break
            
        print(f"從 Qdrant 獲取了 {len(points)} 個點")
        
        results = upload_to_rememberizer(points)
        success_count = sum(1 for r in results if r.get("success", False))
        
        total_migrated += success_count
        print(f"進度: {total_migrated} 個點成功遷移")
        
        if next_offset is None:
            break
            
        offset = next_offset
    
    print(f"遷移完成!總共遷移了 {total_migrated} 個點到 Rememberizer")

# 執行遷移
# migrate_qdrant_to_rememberizer()

從 Supabase pgvector 遷移

如果您已經在使用 Supabase 和 pgvector,遷移到 Rememberizer 特別簡單,因為兩者都使用帶有 pgvector 擴展的 PostgreSQL。

import psycopg2
import requests
import json
import time
import os
from dotenv import load_dotenv

# 載入環境變數
load_dotenv()

# Supabase PostgreSQL 配置
SUPABASE_DB_HOST = os.getenv("SUPABASE_DB_HOST")
SUPABASE_DB_PORT = os.getenv("SUPABASE_DB_PORT", "5432")
SUPABASE_DB_NAME = os.getenv("SUPABASE_DB_NAME")
SUPABASE_DB_USER = os.getenv("SUPABASE_DB_USER")
SUPABASE_DB_PASSWORD = os.getenv("SUPABASE_DB_PASSWORD")
SUPABASE_VECTOR_TABLE = os.getenv("SUPABASE_VECTOR_TABLE", "documents")

# Rememberizer 配置
REMEMBERIZER_API_KEY = os.getenv("REMEMBERIZER_API_KEY")
VECTOR_STORE_ID = os.getenv("VECTOR_STORE_ID")  # 例如,"vs_abc123"
BASE_URL = "https://api.rememberizer.ai/api/v1"

# 批次大小處理
BATCH_SIZE = 100

# 連接到 Supabase PostgreSQL
def connect_to_supabase():
    try:
        conn = psycopg2.connect(
            host=SUPABASE_DB_HOST,
            port=SUPABASE_DB_PORT,
            dbname=SUPABASE_DB_NAME,
            user=SUPABASE_DB_USER,
            password=SUPABASE_DB_PASSWORD
        )
        return conn
    except Exception as e:
        print(f"連接到 Supabase PostgreSQL 時出錯: {e}")
        return None

# 從 Supabase pgvector 獲取文件
def fetch_documents_from_supabase(conn, batch_size, offset=0):
    try:
        cursor = conn.cursor()
        
        # 根據您的表結構調整此查詢
        query = f"""
        SELECT id, content, metadata, embedding
        FROM {SUPABASE_VECTOR_TABLE}
        ORDER BY id
        LIMIT %s OFFSET %s
        """
        
        cursor.execute(query, (batch_size, offset))
        documents = cursor.fetchall()
        cursor.close()
        
        return documents
    except Exception as e:
        print(f"從 Supabase 獲取文件時出錯: {e}")
        return []

# 上傳文件到 Rememberizer
def upload_to_rememberizer(documents):
    headers = {
        "x-api-key": REMEMBERIZER_API_KEY,
        "Content-Type": "application/json"
    }
    
    results = []
    
    for doc in documents:
        doc_id, content, metadata, embedding = doc
        
        # 如果元數據以 JSON 字串形式存儲,則解析元數據
        if isinstance(metadata, str):
            try:
                metadata = json.loads(metadata)
            except:
                metadata = {}
        elif metadata is None:
            metadata = {}
        
        document_name = metadata.get("filename", f"supabase_doc_{doc_id}")
        
        if not content:
            print(f"跳過 {doc_id} - 找不到內容")
            continue
            
        data = {
            "name": document_name,
            "content": content,
            "metadata": metadata
        }
        
        try:
            response = requests.post(
                f"{BASE_URL}/vector-stores/{VECTOR_STORE_ID}/documents/text",
                headers=headers,
                json=data
            )
            
            if response.status_code == 201:
                print(f"文件 '{document_name}' 上傳成功!")
                results.append({"id": doc_id, "success": True})
            else:
                print(f"上傳文件 {document_name} 時出錯: {response.text}")
                results.append({"id": doc_id, "success": False, "error": response.text})
        except Exception as e:
            print(f"上傳文件 {document_name} 時發生異常: {str(e)}")
            results.append({"id": doc_id, "success": False, "error": str(e)})
        
        # 添加小延遲以防止速率限制
        time.sleep(0.1)
    
    return results

# 主要遷移功能
def migrate_supabase_to_rememberizer():
    conn = connect_to_supabase()
    if not conn:
        print("無法連接到 Supabase。中止遷移。")
        return
    
    offset = 0
    total_migrated = 0
    
    print("開始從 Supabase pgvector 遷移到 Rememberizer...")
    
    try:
        while True:
            documents = fetch_documents_from_supabase(conn, BATCH_SIZE, offset)
            
            if not documents:
                break
                
            print(f"從 Supabase 獲取了 {len(documents)} 份文件")
            
            results = upload_to_rememberizer(documents)
            success_count = sum(1 for r in results if r.get("success", False))
            
            total_migrated += success_count
            print(f"進度:成功遷移 {total_migrated} 份文件")
            
            offset += BATCH_SIZE
            
    finally:
        conn.close()
    
    print(f"遷移完成!共遷移 {total_migrated} 份文件到 Rememberizer")

# 執行遷移
# migrate_supabase_to_rememberizer()

遷移最佳實踐

遵循這些建議以確保成功的遷移:

  1. 提前規劃

    • 估算遷移所需的數據量和時間

    • 在低流量時段安排遷移

    • 在開始大型遷移之前增加磁碟空間

  2. 先測試

    • 在 Rememberizer 中創建測試向量存儲

    • 遷移一小部分數據(100-1000 個向量)

    • 使用關鍵查詢驗證搜索功能

  3. 數據驗證

    • 比較遷移前後的文檔數量

    • 執行基準查詢以確保結果相似

    • 驗證元數據是否正確保留

  4. 優化性能

    • 使用批量操作以提高效率

    • 考慮源數據庫和目標數據庫的地理位置共置

    • 監控 API 速率限制並相應調整批量大小

  5. 遷移後步驟

    • 驗證在 Rememberizer 中創建索引

    • 更新應用程序配置以指向新的向量存儲

    • 在遷移驗證之前保留源數據庫作為備份

有關詳細的 API 參考和端點文檔,請訪問 https://github.com/skydeckai/rememberizer-docs/blob/production/zh-hk/developer/api-docs/vector-store/README.md 頁面。


確保安全處理 API 密鑰並遵循 API 密鑰管理的最佳實踐。

Last updated