Python SDK

The Python SDK provides the AgentMemory class — the primary interface for reading, writing, and managing agent memory.

Table of Contents

  1. Installation
  2. Creating an Instance
    1. With Custom Configuration
    2. With a Pre-configured Adapter
    3. With an Importance Evaluator
  3. Core Operations
    1. Write
    2. Read
    3. List
    4. Search
    5. Stats
  4. Outcomes
    1. Recording Outcomes
    2. Outcome Types
  5. Memory Types
  6. History (Temporal Queries)
  7. Explainability
  8. Decision Traces
    1. Getting the trace from an outcome
    2. Browsing past traces
    3. Filtering traces
  9. Tool Context
    1. Record in the causal chain (lightweight)
    2. Persist for other agents (durable)
  10. Watch
  11. Briefing (Memory Cortex)
    1. Basic Usage
    2. Parameters
    3. Digest Types
    4. Workflow Integration
  12. Snapshots
  13. Knowledge Graph
  14. Semantic Search
  15. Context Manager
  16. Connecting to AMFS SaaS
    1. Environment Variables
    2. Explicit HttpAdapter
  17. Conflict Handling

Installation

pip install amfs

Creating an Instance

from amfs import AgentMemory

mem = AgentMemory(agent_id="my-agent")

With Custom Configuration

from pathlib import Path

mem = AgentMemory(
    agent_id="my-agent",
    config_path=Path("./custom-amfs.yaml"),
    ttl_sweep_interval=60.0,
    decay_half_life_days=30.0,
)

With a Pre-configured Adapter

from amfs_filesystem import FilesystemAdapter

adapter = FilesystemAdapter(root=Path(".amfs"), namespace="staging")
mem = AgentMemory(agent_id="my-agent", adapter=adapter)

With an Importance Evaluator

from amfs_core.importance import ImportanceEvaluator

class MyEvaluator(ImportanceEvaluator):
    def evaluate(self, entity_path, key, value):
        score = 0.8 if "critical" in str(value).lower() else 0.3
        return score, {"criticality": score}

mem = AgentMemory(agent_id="my-agent", importance_evaluator=MyEvaluator())

Every write() call will automatically score the entry and set importance_score and importance_dimensions. If the evaluator raises, the write proceeds without scoring.


Core Operations

Write

entry = mem.write(
    "checkout-service",          # entity_path
    "retry-pattern",             # key
    {"max_retries": 3},          # value (any JSON-serializable data)
    confidence=0.85,             # optional, default 1.0
    pattern_refs=["retry"],      # optional cross-references
    memory_type=MemoryType.FACT, # optional: fact (default), belief, or experience
)

Write with a TTL (time-to-live):

from datetime import datetime, timedelta, timezone

mem.write(
    "svc", "temp-flag", {"active": True},
    ttl_at=datetime.now(timezone.utc) + timedelta(hours=24),
)

Read

entry = mem.read("checkout-service", "retry-pattern")

if entry:
    print(entry.value)
    print(entry.version)
    print(entry.confidence)

With minimum confidence filter:

entry = mem.read("svc", "pattern", min_confidence=0.5)

List

# All entries
entries = mem.list()

# Entries for a specific entity
entries = mem.list("checkout-service")

# Include superseded versions
entries = mem.list("checkout-service", include_superseded=True)
results = mem.search(entity_path="checkout-service", min_confidence=0.5)

Progressive retrieval with depth — search only high-priority tiers for fast, high-signal results:

hot_only = mem.search(query="retry strategy", depth=1)   # Hot tier
hot_warm = mem.search(query="retry strategy", depth=2)   # Hot + Warm
all_tiers = mem.search(query="retry strategy")            # All (default)

Composite recall scoring (blends semantic similarity, recency, and confidence):

from amfs_core.models import RecallConfig

scored = mem.search(
    query="how do we handle retries?",
    recall_config=RecallConfig(semantic_weight=0.5, recency_weight=0.3, confidence_weight=0.2),
)
for item in scored:
    print(f"{item.entry.key} — score={item.score:.3f}")

Semantic scoring requires an embedder. Without one, the semantic component is 0.0.

Stats

stats = mem.stats()
print(f"Total entries: {stats.total_entries}")
print(f"Total outcomes: {stats.total_outcomes}")

Outcomes

Recording Outcomes

from amfs import OutcomeType

# With explicit causal keys
updated = mem.commit_outcome(
    outcome_ref="INC-1042",
    outcome_type=OutcomeType.CRITICAL_FAILURE,
    causal_entry_keys=["checkout-service/retry-pattern"],
)

# With auto-causal linking (uses everything read in this session)
updated = mem.commit_outcome(
    outcome_ref="DEP-300",
    outcome_type=OutcomeType.SUCCESS,
)

Outcome Types

OutcomeType.CRITICAL_FAILURE  # × 1.15
OutcomeType.FAILURE           # × 1.10
OutcomeType.MINOR_FAILURE     # × 1.08
OutcomeType.SUCCESS           # × 0.97

Memory Types

Classify entries to control decay behavior:

from amfs import MemoryType

# Facts (default) — objective knowledge, standard decay
mem.write("svc", "config", {"pool_size": 10}, memory_type=MemoryType.FACT)

# Beliefs — subjective inferences, decay 2× faster
mem.write("svc", "hypothesis", "Likely an N+1 query issue", memory_type=MemoryType.BELIEF)

# Experiences — action logs, decay 1.5× slower
mem.write("svc", "action-log", "Added index on user_id", memory_type=MemoryType.EXPERIENCE)

History (Temporal Queries)

Retrieve the full version history of an entry with optional time filtering:

from datetime import datetime, timedelta, timezone

# All versions
versions = mem.history("checkout-service", "retry-pattern")
for v in versions:
    print(f"v{v.version} — confidence: {v.confidence}{v.provenance.written_at}")

# Versions from the last 7 days
since = datetime.now(timezone.utc) - timedelta(days=7)
recent = mem.history("checkout-service", "retry-pattern", since=since)

Explainability

Inspect the causal chain — which entries were read during the current session and how they connect to outcomes:

chain = mem.explain()
print(chain["session_id"])
print(chain["causal_keys"])   # list of entity_path/key pairs that were read
print(chain["entries"])       # full entry details for each causal key

Filter by outcome reference:

chain = mem.explain(outcome_ref="INC-1042")

Decision Traces

When you call commit_outcome(), AMFS snapshots the full decision trace — every entry that was read, every context that was recorded, and every query that was made. The resulting trace is persisted and can be retrieved later.

Getting the trace from an outcome

mem.record_context("ci-check", "All tests green", source="GitHub Actions")
entry = mem.read("checkout-service", "retry-pattern")

updated = mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)

# The trace is attached to the outcome
trace = mem._last_trace
print(f"Trace ID: {trace.id}")
print(f"Causal entries: {len(trace.causal_entries)}")
print(f"External contexts: {len(trace.external_contexts)}")

Browsing past traces

# List recent traces
traces = mem._adapter.list_traces(limit=10)
for t in traces:
    print(f"{t['id']}{t['agent_id']}{t['outcome_ref']} ({t['outcome_type']})")

# Get a specific trace
trace = mem._adapter.get_trace("ddbcefff-901a-4fa6-...")
print(trace.decision_summary)
print(f"Session duration: {trace.session_duration_ms}ms")
for entry in trace.causal_entries:
    print(f"  Read: {entry.entity_path}/{entry.key} (v{entry.version})")

Filtering traces

traces = mem._adapter.list_traces(
    entity_path="checkout-service",
    agent_id="deploy-agent",
    outcome_type="success",
    limit=5,
)

Tool Context

When agents call external tools or APIs, there are two ways to capture that context in AMFS depending on your needs.

Record in the causal chain (lightweight)

Use record_context() to add external inputs to the causal chain without writing to storage. This makes explain() return a complete decision trace:

entry = mem.read("checkout-service", "retry-pattern")

mem.record_context(
    "pagerduty-incidents",
    "3 SEV-1 incidents in the last 24h for checkout-service",
    source="PagerDuty API",
)
mem.record_context(
    "git-log",
    "15 commits since last deploy, 3 touching retry logic",
    source="git",
)

mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)

chain = mem.explain()
print(chain["causal_entries"])     # AMFS entries that were read
print(chain["external_contexts"])  # tool/API inputs that informed the decision

Persist for other agents (durable)

Use MemoryType.EXPERIENCE with a TTL to store tool results so downstream agents can retrieve them:

from datetime import datetime, timedelta, timezone

mem.write(
    "checkout-service",
    "tool-result-pagerduty",
    {"incidents": 3, "sev1": True, "last_24h": True},
    memory_type=MemoryType.EXPERIENCE,
    ttl_at=datetime.now(timezone.utc) + timedelta(hours=1),
)

The next agent reads it with mem.read("checkout-service", "tool-result-pagerduty") instead of re-calling the API.


Watch

Get real-time notifications when entries change:

def on_change(entry):
    print(f"{entry.key} updated to v{entry.version}")

handle = mem.watch("checkout-service", on_change)

# Stop watching
handle.cancel()

Briefing (Memory Cortex)

When the Memory Cortex is running, agents can retrieve pre-compiled knowledge digests ranked by relevance. This is how agents consume the “brain brief” — compiled summaries of entities, other agents, and external sources — without having to search through raw memory entries.

Basic Usage

mem = AgentMemory(agent_id="deploy-agent", adapter=adapter)

# Get a ranked briefing of compiled knowledge
briefs = mem.briefing(entity_path="myapp/checkout-service", limit=5)

for digest in briefs:
    print(digest.digest_type)   # "entity", "agent_brief", "source", or "connection_map"
    print(digest.scope)         # the entity path, agent ID, or source ID
    print(digest.summary)       # structured summary (varies by digest type)
    print(digest.entry_count)   # number of source entries compiled
    print(digest.compiled_at)   # when the digest was last compiled

Parameters

Parameter Type Description
entity_path str \| None Focus on digests relevant to this entity
agent_id str \| None Focus on digests relevant to this agent
limit int Maximum number of digests to return (default: 10)

Digest Types

Type Scope What It Contains
entity Entity path (e.g. myapp/checkout-service) Summary of all knowledge about an entity — key count, average confidence, top keys, narrative
agent_brief Agent ID (e.g. deploy-agent) Summary of an agent’s knowledge and activity — entries written, entities touched, outcomes
source Source ID (e.g. github) Summary of external data from a connector — events ingested, entities touched
connection_map Cross-entity scope Cross-entity relationships (Pro)

Workflow Integration

Call briefing() at the start of a task to get context before making decisions:

with AgentMemory(agent_id="deploy-agent", adapter=adapter) as mem:
    # Step 1: Get a briefing on what you need to know
    briefs = mem.briefing(entity_path="myapp/checkout-service", limit=5)
    for digest in briefs:
        print(f"[{digest.digest_type}] {digest.scope}: {digest.summary.get('narrative', '')}")

    # Step 2: Read specific entries based on the briefing
    entry = mem.read("myapp/checkout-service", "retry-pattern")

    # Step 3: Do your work, record context
    mem.record_context("ci-pipeline", "All tests passing, deploy ready", source="GitHub Actions")

    # Step 4: Commit the outcome
    mem.commit_outcome("DEP-500", OutcomeType.SUCCESS)

If the Cortex is not running, briefing() returns an empty list — your agent code can safely call it without checking.


Snapshots

Export and import the full state of your memory:

from amfs_core.snapshot import SnapshotExporter, SnapshotImporter

# Export
exporter = SnapshotExporter(mem.adapter)
exporter.export("backup.json")

# Import into a different adapter
from amfs_filesystem import FilesystemAdapter
target = FilesystemAdapter(root=Path("/new/.amfs"), namespace="restored")
importer = SnapshotImporter(target)
importer.restore("backup.json")

Knowledge Graph

The knowledge graph builds automatically as agents write, commit outcomes, and learn from each other. You can also traverse it directly:

edges = mem.graph_neighbors(
    "checkout-service/retry-pattern",
    direction="both",
    depth=2,
    min_confidence=0.5,
)
for edge in edges:
    print(f"{edge.source_entity} --{edge.relation}--> {edge.target_entity}")
Parameter Description
entity Starting entity to explore
relation Filter by relation type (e.g. "references", "informed")
direction "outgoing", "incoming", or "both"
depth Traversal depth (1 = direct neighbors, >1 for multi-hop)

Multi-hop traversal (depth > 1) requires the Postgres adapter. The Filesystem and S3 adapters return an empty list for graph methods.


If you configure an embedder, you can search by meaning:

results = mem.semantic_search("how do we handle retries?", top_k=5)
for entry, score in results:
    print(f"{entry.key} (similarity: {score:.3f})")

Context Manager

Use AgentMemory as a context manager for automatic cleanup:

with AgentMemory(agent_id="my-agent") as mem:
    mem.write("svc", "key", "value")
    entry = mem.read("svc", "key")
# Watchers, TTL sweepers, and background threads are cleaned up

Connecting to AMFS SaaS

When using AMFS as a hosted service (SaaS), connect through the HTTP API with your API key instead of a direct database connection.

Environment Variables

export AMFS_HTTP_URL="https://amfs-login.sense-lab.ai"
export AMFS_API_KEY="amfs_sk_your_key_here"

With these set, the SDK auto-detects the HTTP adapter — no code changes needed:

from amfs import AgentMemory

mem = AgentMemory(agent_id="my-agent")
mem.write("checkout-service", "retry-pattern", {"max_retries": 3})

Explicit HttpAdapter

You can also configure the adapter directly:

from amfs import AgentMemory
from amfs_adapter_http import HttpAdapter

adapter = HttpAdapter(
    base_url="https://amfs-login.sense-lab.ai",
    api_key="amfs_sk_your_key_here",
)
mem = AgentMemory(agent_id="my-agent", adapter=adapter)

Install the HTTP adapter with pip install amfs-adapter-http.

Never use AMFS_POSTGRES_DSN for external agents in multi-tenant mode. Always use AMFS_HTTP_URL + AMFS_API_KEY to ensure tenant isolation, scope enforcement, and audit logging.

See the SaaS Connection Guide and Environment Variables for details.


Conflict Handling

Handle concurrent writes to the same key:

from amfs_core.models import ConflictPolicy

# Raise an error on conflict
mem = AgentMemory(
    agent_id="my-agent",
    conflict_policy=ConflictPolicy.RAISE,
)

# Custom conflict resolution
def merge(existing, incoming, value):
    return {**existing.value, **value}

mem = AgentMemory(
    agent_id="my-agent",
    on_conflict=merge,
)