Python SDK

v0.11.54 · Python 3.10+

PyPI · GitHub · Changelog

pip install dakera              # core (sync client)
pip install dakera[async]       # include async client
pip show dakera                 # verify installed version

Client setup

Point the client at your Dakera server. Use env vars to keep the URL and key out of source code:

import os
from dakera import DakeraClient

# Recommended: configure via env vars
# export DAKERA_URL=http://<YOUR_SERVER_IP>:3300
# export DAKERA_API_KEY=your-api-key

client = DakeraClient(
    base_url=os.getenv("DAKERA_URL", "http://localhost:3300"),
    api_key=os.getenv("DAKERA_API_KEY")
)

Memory operations

client.memories.store(agent_id="my-agent", content="User is learning Rust", importance=0.8, tags=["learning"])
memories = client.memories.recall(agent_id="my-agent", query="programming languages", top_k=10)
memories = client.memories.batch_recall(agent_id="my-agent", tags=["learning"], min_importance=0.6)
client.memories.forget(memory_id="mem-abc123")
client.memories.set_importance(memory_id="mem-abc123", importance=0.9)
memory = client.memories.get(memory_id="mem-abc123")

Sessions

session = client.sessions.start(agent_id="my-agent", metadata={"task": "review"})
client.memories.store(agent_id="my-agent", content="Found 2 issues", session_id=session.id)
client.sessions.end(session_id=session.id, summary="Review complete")
sessions = client.sessions.list(agent_id="my-agent")

Vector operations

# Upsert a text document (auto-embedded server-side)
client.vectors.upsert_text(namespace="docs", documents=[
    {"id": "doc-001", "text": "Quarterly revenue report Q1 2026", "metadata": {"source": "finance"}}
])

# Query by natural language
results = client.vectors.query_text(namespace="docs", text="revenue trends", top_k=5)

# Hybrid search (vector + BM25)
results = client.vectors.hybrid_search(namespace="docs", text="revenue", top_k=10, vector_weight=0.7)

# Full-text search (BM25 only)
results = client.vectors.fulltext_search(namespace="docs", query="quarterly report", top_k=5)

Knowledge graph

# Build knowledge graph from a seed memory
graph = client.knowledge.graph(agent_id="my-agent", memory_id="mem-abc", depth=2)

# Find path between two memories
path = client.knowledge.path(agent_id="my-agent", from_id="mem-a", to_id="mem-b")

# Cross-agent knowledge network
network = client.knowledge.cross_agent_network(agent_ids=["agent-a", "agent-b"])

# Deduplicate memories (preview first)
dupes = client.knowledge.deduplicate(agent_id="my-agent", threshold=0.93, dry_run=True)

Namespace & API key management

# Create a namespace
client.namespaces.create(name="production", dimension=384, distance="cosine")

# Create a scoped API key
key = client.namespaces.create_key(namespace="production", name="reader", scope="read", expires_in_days=90)

# Set memory lifecycle policy
client.namespaces.set_memory_policy(namespace="production", episodic_ttl_seconds=604800)

Entity extraction

# Extract entities from text
entities = client.extract_entities(text="Alice met Bob at Google HQ on Tuesday")

# Configure namespace entity extraction
client.namespaces.configure_ner(namespace="chat", extract_entities=True,
    entity_types=["person", "organization", "location"])

Import & export

# Export all memories
data = client.export_memories(agent_id="my-agent", format="jsonl")

# Import from Mem0 format
client.import_memories(agent_id="my-agent", data=mem0_json, format="mem0")

Feedback loop

# Upvote a useful memory
client.memories.feedback(memory_id="mem-abc", signal="upvote")

# Get feedback summary for an agent
summary = client.agents.feedback_summary(agent_id="my-agent")

Async client

from dakera import AsyncDakeraClient

async with AsyncDakeraClient(base_url="http://localhost:3300", api_key="key") as client:
    await client.memories.store(agent_id="my-agent", content="async memory")
    memories = await client.memories.recall(agent_id="my-agent", query="async")

Error handling

from dakera.exceptions import DakeraError, NotFoundError, RateLimitError, AuthorizationError

try:
    memory = client.memories.get(memory_id="mem-abc")
except NotFoundError:
    print("Memory not found")
except RateLimitError as e:
    print(f"Rate limited — retry after {e.retry_after}s")
except DakeraError as e:
    print(f"Error {e.error_code}: {e.message}")

Error types: DakeraError (base), ConnectionError, NotFoundError, ValidationError, RateLimitError, ServerError, AuthenticationError, AuthorizationError, TimeoutError.

Connection configuration

from dakera import DakeraClient, RetryConfig

client = DakeraClient(
    base_url="http://localhost:3300",
    api_key="your-key",
    timeout=30,                          # seconds
    retry_config=RetryConfig(
        max_retries=3,
        base_delay=0.5,
        max_delay=10.0,
        jitter=True
    )
)