Python SDK
v0.11.54 · Python 3.10+pip install dakera # core (sync client)
pip install dakera[async] # include async client
pip show dakera # verify installed version
Client setup
Point the client at your Dakera server. Use env vars to keep the URL and key out of source code:
import os
from dakera import DakeraClient
# Recommended: configure via env vars
# export DAKERA_URL=http://<YOUR_SERVER_IP>:3300
# export DAKERA_API_KEY=your-api-key
client = DakeraClient(
base_url=os.getenv("DAKERA_URL", "http://localhost:3300"),
api_key=os.getenv("DAKERA_API_KEY")
)
Memory operations
client.memories.store(agent_id="my-agent", content="User is learning Rust", importance=0.8, tags=["learning"])
memories = client.memories.recall(agent_id="my-agent", query="programming languages", top_k=10)
memories = client.memories.batch_recall(agent_id="my-agent", tags=["learning"], min_importance=0.6)
client.memories.forget(memory_id="mem-abc123")
client.memories.set_importance(memory_id="mem-abc123", importance=0.9)
memory = client.memories.get(memory_id="mem-abc123")
Sessions
session = client.sessions.start(agent_id="my-agent", metadata={"task": "review"})
client.memories.store(agent_id="my-agent", content="Found 2 issues", session_id=session.id)
client.sessions.end(session_id=session.id, summary="Review complete")
sessions = client.sessions.list(agent_id="my-agent")
Vector operations
# Upsert a text document (auto-embedded server-side)
client.vectors.upsert_text(namespace="docs", documents=[
{"id": "doc-001", "text": "Quarterly revenue report Q1 2026", "metadata": {"source": "finance"}}
])
# Query by natural language
results = client.vectors.query_text(namespace="docs", text="revenue trends", top_k=5)
# Hybrid search (vector + BM25)
results = client.vectors.hybrid_search(namespace="docs", text="revenue", top_k=10, vector_weight=0.7)
# Full-text search (BM25 only)
results = client.vectors.fulltext_search(namespace="docs", query="quarterly report", top_k=5)
Knowledge graph
# Build knowledge graph from a seed memory
graph = client.knowledge.graph(agent_id="my-agent", memory_id="mem-abc", depth=2)
# Find path between two memories
path = client.knowledge.path(agent_id="my-agent", from_id="mem-a", to_id="mem-b")
# Cross-agent knowledge network
network = client.knowledge.cross_agent_network(agent_ids=["agent-a", "agent-b"])
# Deduplicate memories (preview first)
dupes = client.knowledge.deduplicate(agent_id="my-agent", threshold=0.93, dry_run=True)
Namespace & API key management
# Create a namespace
client.namespaces.create(name="production", dimension=384, distance="cosine")
# Create a scoped API key
key = client.namespaces.create_key(namespace="production", name="reader", scope="read", expires_in_days=90)
# Set memory lifecycle policy
client.namespaces.set_memory_policy(namespace="production", episodic_ttl_seconds=604800)
Entity extraction
# Extract entities from text
entities = client.extract_entities(text="Alice met Bob at Google HQ on Tuesday")
# Configure namespace entity extraction
client.namespaces.configure_ner(namespace="chat", extract_entities=True,
entity_types=["person", "organization", "location"])
Import & export
# Export all memories
data = client.export_memories(agent_id="my-agent", format="jsonl")
# Import from Mem0 format
client.import_memories(agent_id="my-agent", data=mem0_json, format="mem0")
Feedback loop
# Upvote a useful memory
client.memories.feedback(memory_id="mem-abc", signal="upvote")
# Get feedback summary for an agent
summary = client.agents.feedback_summary(agent_id="my-agent")
Async client
from dakera import AsyncDakeraClient
async with AsyncDakeraClient(base_url="http://localhost:3300", api_key="key") as client:
await client.memories.store(agent_id="my-agent", content="async memory")
memories = await client.memories.recall(agent_id="my-agent", query="async")
Error handling
from dakera.exceptions import DakeraError, NotFoundError, RateLimitError, AuthorizationError
try:
memory = client.memories.get(memory_id="mem-abc")
except NotFoundError:
print("Memory not found")
except RateLimitError as e:
print(f"Rate limited — retry after {e.retry_after}s")
except DakeraError as e:
print(f"Error {e.error_code}: {e.message}")
Error types: DakeraError (base), ConnectionError, NotFoundError, ValidationError, RateLimitError, ServerError, AuthenticationError, AuthorizationError, TimeoutError.
Connection configuration
from dakera import DakeraClient, RetryConfig
client = DakeraClient(
base_url="http://localhost:3300",
api_key="your-key",
timeout=30, # seconds
retry_config=RetryConfig(
max_retries=3,
base_delay=0.5,
max_delay=10.0,
jitter=True
)
)