Batch Ingestion

Category: Operations

Problem

When deploying a new agent, the memory store starts empty. The agent has no context about existing customers, past conversations, or organizational knowledge. Bootstrapping from existing data sources — CRM records, support tickets, documentation, chat logs — is essential for day-one usefulness. Doing this one record at a time is slow and error-prone.

Architecture

This pattern implements a batch import pipeline that reads from external data sources, applies chunking for long documents, tags each memory with source metadata, and stores with rate limiting to avoid overwhelming the Dakera instance. Progress tracking and error handling ensure reliability for large imports.

Flow

  • Read records from source (JSON, CSV, database export)
  • Chunk long documents into memory-sized segments (500-1000 chars)
  • Tag each chunk with source, timestamp, and category metadata
  • Store in batches with rate limiting and retry logic

Implementation

from dakera import Dakera
import json
import time
import csv

client = Dakera(base_url="http://localhost:3300", api_key="dk-...")

def chunk_text(text: str, max_chars: int = 800) -> list:
    """Split long text into chunks at sentence boundaries."""
    sentences = text.replace("\n", " ").split(". ")
    chunks, current = [], ""
    for s in sentences:
        if len(current) + len(s) > max_chars and current:
            chunks.append(current.strip())
            current = s + ". "
        else:
            current += s + ". "
    if current.strip():
        chunks.append(current.strip())
    return chunks

def batch_import_json(filepath: str, namespace: str, source_tag: str):
    """Import records from a JSON file with rate limiting."""
    with open(filepath) as f:
        records = json.load(f)

    total = len(records)
    success, errors = 0, 0

    for i, record in enumerate(records):
        content = record.get("content", record.get("text", ""))
        if not content:
            continue

        chunks = chunk_text(content) if len(content) > 800 else [content]

        for j, chunk in enumerate(chunks):
            try:
                client.memory.store(
                    content=chunk,
                    namespace=namespace,
                    metadata={
                        "source": source_tag,
                        "record_id": record.get("id", str(i)),
                        "chunk_index": j,
                        "imported_at": time.time(),
                        "importance": record.get("importance", 0.6)
                    }
                )
                success += 1
            except Exception as e:
                errors += 1
                print(f"Error on record {i}, chunk {j}: {e}")

            # Rate limit: 20 requests per second
            time.sleep(0.05)

        if (i + 1) % 100 == 0:
            print(f"Progress: {i+1}/{total} records processed")

    print(f"Import complete: {success} stored, {errors} errors")

def batch_import_csv(filepath: str, namespace: str, content_col: str):
    """Import from CSV with column mapping."""
    with open(filepath) as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            content = row.get(content_col, "")
            if not content:
                continue
            client.memory.store(
                content=content,
                namespace=namespace,
                metadata={
                    "source": "csv_import",
                    "row": i,
                    "imported_at": time.time(),
                    **{k: v for k, v in row.items() if k != content_col}
                }
            )
            time.sleep(0.05)

# Usage: import CRM contacts
batch_import_json("crm_export.json", "customer-knowledge", "crm")

# Usage: import support ticket history
batch_import_csv("tickets.csv", "support-history", content_col="description")

When to Use This Pattern

  • Onboarding a new agent that needs existing organizational knowledge
  • Migrating from another memory system to Dakera
  • Periodic sync from CRM, ticketing, or documentation systems
  • Bootstrapping demo environments with realistic data

Key Considerations

  • Respect rate limits — 20 req/s is a safe default for self-hosted Dakera
  • Chunk long documents to keep each memory under 1000 characters for better recall
  • Always tag with source metadata so you can trace or delete imported data later
  • Run deduplication after large imports to avoid redundant memories