Batch Ingestion

Category: Operations

Problem

When deploying a new agent, the memory store starts empty. The agent has no context about existing customers, past conversations, or organizational knowledge. Bootstrapping from existing data sources — CRM records, support tickets, documentation, chat logs — is essential for day-one usefulness. Doing this one record at a time is slow and error-prone.

Architecture

This pattern implements a batch import pipeline that reads from external data sources, applies chunking for long documents, tags each memory with source metadata, and stores with rate limiting to avoid overwhelming the Dakera instance. Progress tracking and error handling ensure reliability for large imports.

Flow

Implementation

from dakera import Dakera
import json
import time
import csv

client = Dakera(base_url="http://localhost:3300", api_key="dk-...")

def chunk_text(text: str, max_chars: int = 800) -> list:
    """Split long text into chunks at sentence boundaries."""
    sentences = text.replace("\n", " ").split(". ")
    chunks, current = [], ""
    for s in sentences:
        if len(current) + len(s) > max_chars and current:
            chunks.append(current.strip())
            current = s + ". "
        else:
            current += s + ". "
    if current.strip():
        chunks.append(current.strip())
    return chunks

def batch_import_json(filepath: str, namespace: str, source_tag: str):
    """Import records from a JSON file with rate limiting."""
    with open(filepath) as f:
        records = json.load(f)

    total = len(records)
    success, errors = 0, 0

    for i, record in enumerate(records):
        content = record.get("content", record.get("text", ""))
        if not content:
            continue

        chunks = chunk_text(content) if len(content) > 800 else [content]

        for j, chunk in enumerate(chunks):
            try:
                client.memory.store(
                    content=chunk,
                    namespace=namespace,
                    metadata={
                        "source": source_tag,
                        "record_id": record.get("id", str(i)),
                        "chunk_index": j,
                        "imported_at": time.time(),
                        "importance": record.get("importance", 0.6)
                    }
                )
                success += 1
            except Exception as e:
                errors += 1
                print(f"Error on record {i}, chunk {j}: {e}")

            # Rate limit: 20 requests per second
            time.sleep(0.05)

        if (i + 1) % 100 == 0:
            print(f"Progress: {i+1}/{total} records processed")

    print(f"Import complete: {success} stored, {errors} errors")

def batch_import_csv(filepath: str, namespace: str, content_col: str):
    """Import from CSV with column mapping."""
    with open(filepath) as f:
        reader = csv.DictReader(f)
        for i, row in enumerate(reader):
            content = row.get(content_col, "")
            if not content:
                continue
            client.memory.store(
                content=content,
                namespace=namespace,
                metadata={
                    "source": "csv_import",
                    "row": i,
                    "imported_at": time.time(),
                    **{k: v for k, v in row.items() if k != content_col}
                }
            )
            time.sleep(0.05)

# Usage: import CRM contacts
batch_import_json("crm_export.json", "customer-knowledge", "crm")

# Usage: import support ticket history
batch_import_csv("tickets.csv", "support-history", content_col="description")

When to Use This Pattern

Key Considerations