Architectural patterns, security considerations, and best practices for enterprise integrations with Rememberizer
This guide provides comprehensive information for organizations looking to integrate Rememberizer's knowledge management and semantic search capabilities into enterprise environments. It covers architectural patterns, security considerations, scalability, and best practices.
Enterprise Integration Overview
Rememberizer offers robust enterprise integration capabilities that extend beyond basic API usage, allowing organizations to build sophisticated knowledge management systems that:
Scale to meet organizational needs across departments and teams
Maintain security and compliance with enterprise requirements
Integrate with existing systems and workflow tools
Enable team-based access control and knowledge sharing
Support high-volume batch operations for document processing
Architectural Patterns for Enterprise Integration
1. Multi-Tenant Knowledge Management
Organizations can implement a multi-tenant architecture to organize knowledge by teams, departments, or functions:
import requests
import time
from concurrent.futures import ThreadPoolExecutor
def batch_upload_documents(files, api_key, batch_size=5):
"""
Upload documents in batches to avoid rate limits
Args:
files: List of file paths to upload
api_key: Rememberizer API key
batch_size: Number of concurrent uploads
"""
headers = {
'X-API-Key': api_key
}
results = []
# Process files in batches
with ThreadPoolExecutor(max_workers=batch_size) as executor:
for i in range(0, len(files), batch_size):
batch = files[i:i+batch_size]
futures = []
# Submit batch of uploads
for file_path in batch:
with open(file_path, 'rb') as f:
files = {'file': f}
future = executor.submit(
requests.post,
'https://api.rememberizer.ai/api/v1/documents/upload/',
headers=headers,
files=files
)
futures.append(future)
# Collect results
for future in futures:
response = future.result()
results.append(response.json())
# Rate limiting - pause between batches
if i + batch_size < len(files):
time.sleep(1)
return results
async function batchSearchWithRateLimit(queries, apiKey, options = {}) {
const {
batchSize = 5,
delayBetweenBatches = 1000,
maxRetries = 3,
retryDelay = 2000
} = options;
const results = [];
// Process queries in batches
for (let i = 0; i < queries.length; i += batchSize) {
const batch = queries.slice(i, i + batchSize);
const batchPromises = batch.map(query => searchWithRetry(query, apiKey, maxRetries, retryDelay));
// Execute batch
const batchResults = await Promise.all(batchPromises);
results.push(...batchResults);
// Apply rate limiting between batches
if (i + batchSize < queries.length) {
await new Promise(resolve => setTimeout(resolve, delayBetweenBatches));
}
}
return results;
}
async function searchWithRetry(query, apiKey, maxRetries, retryDelay) {
let retries = 0;
while (retries < maxRetries) {
try {
const response = await fetch('https://api.rememberizer.ai/api/v1/search/', {
method: 'POST',
headers: {
'X-API-Key': apiKey,
'Content-Type': 'application/json'
},
body: JSON.stringify({ query })
});
if (response.ok) {
return response.json();
}
// Handle rate limiting specifically
if (response.status === 429) {
const retryAfter = response.headers.get('Retry-After') || retryDelay / 1000;
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
retries++;
continue;
}
// Other errors
throw new Error(`Search failed with status: ${response.status}`);
} catch (error) {
retries++;
if (retries >= maxRetries) {
throw error;
}
await new Promise(resolve => setTimeout(resolve, retryDelay));
}
}
}
import requests
def create_team_knowledge_base(team_id, name, description, api_key):
"""
Create a knowledge base for a specific team
"""
headers = {
'X-API-Key': api_key,
'Content-Type': 'application/json'
}
payload = {
'team_id': team_id,
'name': name,
'description': description
}
response = requests.post(
'https://api.rememberizer.ai/api/v1/teams/knowledge/',
headers=headers,
json=payload
)
return response.json()
def grant_team_access(knowledge_id, team_id, permission_level, api_key):
"""
Grant a team access to a knowledge base
Args:
knowledge_id: ID of the knowledge base
team_id: ID of the team to grant access
permission_level: 'read', 'write', or 'admin'
api_key: Rememberizer API key
"""
headers = {
'X-API-Key': api_key,
'Content-Type': 'application/json'
}
payload = {
'team_id': team_id,
'knowledge_id': knowledge_id,
'permission': permission_level
}
response = requests.post(
'https://api.rememberizer.ai/api/v1/knowledge/permissions/',
headers=headers,
json=payload
)
return response.json()
async function robustApiCall(endpoint, method, payload, apiKey) {
try {
const response = await fetch(`https://api.rememberizer.ai/api/v1/${endpoint}`, {
method,
headers: {
'X-API-Key': apiKey,
'Content-Type': 'application/json'
},
body: method !== 'GET' ? JSON.stringify(payload) : undefined
});
// Handle different response types
if (response.status === 204) {
return { success: true };
}
if (!response.ok) {
const error = await response.json();
throw new Error(error.message || `API call failed with status: ${response.status}`);
}
return await response.json();
} catch (error) {
// Log error details for troubleshooting
console.error(`API call to ${endpoint} failed:`, error);
// Provide meaningful error to calling code
throw new Error(`Failed to ${method} ${endpoint}: ${error.message}`);
}
}
import requests
import time
from functools import lru_cache
# Cache frequently accessed documents for 10 minutes
@lru_cache(maxsize=100)
def get_document_with_cache(document_id, api_key, timestamp=None):
"""
Get a document with caching
Args:
document_id: ID of the document to retrieve
api_key: Rememberizer API key
timestamp: Cache invalidation timestamp (default: 10 min chunks)
"""
# Generate a timestamp that changes every 10 minutes for cache invalidation
if timestamp is None:
timestamp = int(time.time() / 600)
headers = {
'X-API-Key': api_key
}
response = requests.get(
f'https://api.rememberizer.ai/api/v1/documents/{document_id}/',
headers=headers
)
return response.json()