Semantica coordinates multiple agents through a shared ContextGraph — agents read and write to the same graph, or hand off serialized state via save() and load(), with no message broker required. Use this pattern when splitting work across ingestion, enrichment, reasoning, and reporting roles that must share a single evidence base.
This guide covers multi-agent coordination. For the memory layer each agent uses internally, see Agent Memory. For graph traversal and entity linking, see Context Graphs. For decision recording and precedent matching, see Decision Intelligence.

The Three Coordination Patterns

Before writing any code, choose the right coordination pattern for your pipeline. Shared graph works when all agents run in the same process. They hold references to the same ContextGraph object — thread-safe by default — so every store() from one agent is immediately visible to every retrieve() from another. This is the lowest-latency option and the right default for in-process pipelines. Save / load handoff works when agents run in different processes, on different machines, or at different times. Agent A finishes its work, calls context.save(path), and Agent B calls context.load(path) to pick up exactly where A left off — full memory, full graph, full vector index. This is how you implement shift handoffs, async pipelines, and cross-service orchestration. Namespaced memories works when you have a single AgentContext instance serving multiple logical agents, each scoping its reads and writes with a conversation_id. Agents are isolated by tag, not by instance — useful for lightweight role separation without the overhead of multiple contexts. The pipeline in this guide uses all three.

Pattern 1 — Shared Graph for Concurrent Ingestion

The OSINT collector and the enrichment agent run concurrently. They share a single ContextGraph and a single VectorStore — the graph’s internal RLock makes concurrent writes safe.
import threading
from semantica.context import AgentContext, ContextGraph
from semantica.vector_store import VectorStore

# One graph, one vector store — both agents write to these
shared_graph = ContextGraph(advanced_analytics=True)
shared_vs    = VectorStore(backend="faiss", dimension=768)

def make_agent() -> AgentContext:
    """Factory: each agent gets its own AgentContext wrapping the shared backing stores."""
    return AgentContext(
        vector_store=shared_vs,            # same VectorStore instance
        knowledge_graph=shared_graph,      # same ContextGraph instance
        graph_expansion=True,
        max_expansion_hops=2,
        decision_tracking=True,
    )

osint_agent      = make_agent()
enrichment_agent = make_agent()
reasoning_agent  = make_agent()
The OSINT collector’s job is to ingest raw feeds and extract entities. It does not reason — it just ingests and lets the graph accumulate structure.
def osint_collection():
    """Agent 1: ingest raw threat feeds and CVE data."""
    osint_agent.store(
        [
            {
                "content": "APT29 exploits CVE-2024-3400 in PAN-OS GlobalProtect — unauthenticated RCE, CVSS 10.0",
                "metadata": {"source": "nvd_feed", "cve": "CVE-2024-3400", "actor": "APT29"},
            },
            {
                "content": "Volexity confirms active exploitation of CVE-2024-3400 against NATO member networks",
                "metadata": {"source": "volexity_blog", "actor": "APT29", "target": "NATO"},
            },
            {
                "content": "PAN-OS GlobalProtect affected versions: < 10.2.9-h1, < 11.0.4-h1, < 11.1.2-h3",
                "metadata": {"source": "paloalto_advisory", "cve": "CVE-2024-3400", "product": "GlobalProtect"},
            },
        ],
        extract_entities=True,
        extract_relationships=True,
        conversation_id="osint-pipeline",
    )
While the OSINT collector is running, the enrichment agent is independently pulling actor-profile data and linking it into the same graph.
def enrichment():
    """Agent 2: enrich the graph with actor profile and TTP context."""
    enrichment_agent.store(
        [
            {
                "content": "APT29 TTP profile: T1190 (Exploit Public-Facing Application), T1071.001 (Web Protocols C2), T1078 (Valid Accounts)",
                "metadata": {"source": "mitre_attck", "actor": "APT29", "type": "ttp_profile"},
            },
            {
                "content": "APT29 infrastructure fingerprint: use of Cloudflare Workers for C2 relay, certificate reuse across campaigns",
                "metadata": {"source": "recorded_future", "actor": "APT29", "type": "infrastructure"},
            },
        ],
        extract_entities=True,
        extract_relationships=True,
        conversation_id="enrichment-pipeline",
    )
Run both concurrently — the graph handles the locking.
t1 = threading.Thread(target=osint_collection)
t2 = threading.Thread(target=enrichment)
t1.start(); t2.start()
t1.join();  t2.join()

# The shared graph now contains entities and relationships from both agents.
# The reasoning agent can query across everything both agents stored.

Pattern 2 — Save / Load Handoff to a Reasoning Agent

The reasoning agent runs after ingestion completes. In a production pipeline this might be a separate process, a different container, or a scheduled job. The ingestion agents save their shared state; the reasoning agent loads it.
# After ingestion: save the combined graph and vector index
osint_agent.save("./pipeline/enriched_intel/")
# Writes:
#   pipeline/enriched_intel/agent_memory.json     — all MemoryItems
#   pipeline/enriched_intel/vector_store/         — FAISS index
#   pipeline/enriched_intel/knowledge_graph.json  — all nodes and edges

print("Ingestion complete. State saved for reasoning agent.")
The reasoning agent starts fresh, loads the state, and has full access to everything the ingestion agents built.
from semantica.context import AgentContext, ContextGraph
from semantica.vector_store import VectorStore
from semantica.llms import LiteLLM

# Create a fresh context before loading — load() merges into the existing context
reasoning_vs    = VectorStore(backend="faiss", dimension=768)
reasoning_graph = ContextGraph(advanced_analytics=True)
reasoning_agent = AgentContext(
    vector_store=reasoning_vs,
    knowledge_graph=reasoning_graph,
    graph_expansion=True,
    max_expansion_hops=3,
    decision_tracking=True,
)

reasoning_agent.load("./pipeline/enriched_intel/")
# All memories, graph nodes, and vector embeddings from both ingestion agents are now available.

# Use a high-capability model for the synthesis step
llm = LiteLLM(model="anthropic/claude-sonnet-4-20250514")

synthesis = reasoning_agent.query_with_reasoning(
    "Summarize the APT29 exploitation of CVE-2024-3400: affected products, "
    "observed TTPs, targeted sectors, and recommended mitigations.",
    llm_provider=llm,
    max_results=15,
    max_hops=3,
)

print(synthesis["response"])
print("Confidence: {:.0%}".format(synthesis["confidence"]))

# Store the synthesis back into the graph — the reporting agent will retrieve it
reasoning_agent.store(
    "SYNTHESIS: " + synthesis["response"],
    metadata={"type": "synthesis", "agent": "reasoning", "confidence": synthesis["confidence"]},
    conversation_id="synthesis-output",
)

# Record the analytical judgment as a traceable decision
reasoning_agent.record_decision(
    category="threat_assessment",
    scenario="APT29 active exploitation of CVE-2024-3400 in PAN-OS",
    reasoning=synthesis["reasoning_path"],
    outcome="high_priority_patch_advisory",
    confidence=synthesis["confidence"],
    entities=["APT29", "CVE-2024-3400", "GlobalProtect", "NATO"],
    decision_maker="reasoning_agent_v2",
)

# Hand off to the reporting agent
reasoning_agent.save("./pipeline/synthesis_output/")
load() merges into the existing context — it does not wipe it first. Always create a fresh AgentContext before calling load() if you want a clean restore from a handoff checkpoint.

Pattern 3 — Namespaced Memories for Role Separation

The reporting agent does not need its own graph instance. It shares the reasoning agent’s context but scopes its writes to its own namespace — the conversation_id acts as an agent identifier.
# The reporting agent loads the synthesis output
reporting_vs    = VectorStore(backend="faiss", dimension=768)
reporting_graph = ContextGraph()
reporting_agent = AgentContext(
    vector_store=reporting_vs,
    knowledge_graph=reporting_graph,
    graph_expansion=True,
)
reporting_agent.load("./pipeline/synthesis_output/")

# Retrieve everything the reasoning agent produced
synthesis_items = reporting_agent.retrieve(
    "APT29 CVE-2024-3400 threat assessment synthesis",
    max_results=10,
    conversation_id="synthesis-output",   # scoped to reasoning agent's output
)

# Build the finished brief
brief_sections = []
for item in synthesis_items:
    brief_sections.append(item["content"])

# Store the final report under the reporting agent's own namespace
reporting_agent.store(
    "\n\n".join(brief_sections),
    metadata={"type": "finished_report", "classification": "TLP:GREEN"},
    conversation_id="reporting-output",    # reporting agent's namespace
    user_id="reporting_agent",
)

# The full pipeline audit trail: retrieve across all namespaces
full_trail = reporting_agent.retrieve("APT29 CVE-2024-3400", max_results=25)
print("Pipeline produced {} traceable context items".format(len(full_trail)))
Each agent’s contributions are retrievable individually by filtering on conversation_id, or collectively by querying without a filter.

Domain Examples

A three-agent intelligence fusion cell: an OSINT collector ingests public feeds, a HUMINT analyst loads classified summaries, and a fusion officer synthesizes both streams into a Priority Intelligence Requirement answer. The OSINT and HUMINT agents run concurrently on a shared graph; the fusion officer loads the combined state in a separate process on an air-gapped network segment.
import threading
from semantica.context import AgentContext, ContextGraph
from semantica.vector_store import VectorStore
from semantica.llms import HuggingFaceLLM

# Shared graph for concurrent multi-INT collection
shared_graph = ContextGraph(advanced_analytics=True)
shared_vs    = VectorStore(backend="faiss", dimension=768)

osint_agent  = AgentContext(vector_store=shared_vs, knowledge_graph=shared_graph, graph_expansion=True)
humint_agent = AgentContext(vector_store=shared_vs, knowledge_graph=shared_graph, graph_expansion=True)

def osint_collection():
    osint_agent.store(
        [
            {"content": "CVE-2024-3400 confirmed exploited by APT29 against NATO member VPN gateways",
             "metadata": {"source": "NVD", "classification": "UNCLASSIFIED", "actor": "APT29"}},
            {"content": "Palo Alto PSIRT: GlobalProtect OS command injection via crafted SESSID cookie",
             "metadata": {"source": "PAN-SA-2024-0006", "classification": "UNCLASSIFIED"}},
        ],
        extract_entities=True,
        extract_relationships=True,
        conversation_id="osint-collector",
    )

def humint_analysis():
    # In a cleared environment, HUMINT docs come from a local classified store
    humint_agent.store(
        [
            {"content": "[S//NF] APT29 operator tradecraft: deploy WARPWIRE credential harvester post-exploitation of perimeter VPNs",
             "metadata": {"source": "HUMINT_Q4_2024", "classification": "SECRET//NOFORN", "actor": "APT29"}},
            {"content": "[S//NF] Target selection pattern: APT29 prioritizes Foreign Ministry and Defense Attache networks within NATO",
             "metadata": {"source": "HUMINT_Q4_2024", "classification": "SECRET//NOFORN", "actor": "APT29"}},
        ],
        extract_entities=True,
        extract_relationships=True,
        conversation_id="humint-analyst",
    )

# Concurrent multi-INT collection — graph handles thread safety
t1 = threading.Thread(target=osint_collection)
t2 = threading.Thread(target=humint_analysis)
t1.start(); t2.start()
t1.join();  t2.join()

# Save combined intelligence base for the air-gapped fusion officer
osint_agent.save("./fusion/combined_intel/")

# --- Fusion Officer (air-gapped segment, separate process) ---
fusion_vs    = VectorStore(backend="faiss", dimension=768)
fusion_graph = ContextGraph(advanced_analytics=True)
fusion_officer = AgentContext(
    vector_store=fusion_vs,
    knowledge_graph=fusion_graph,
    graph_expansion=True,
    decision_tracking=True,
)
fusion_officer.load("./fusion/combined_intel/")

# Air-gapped inference: local model on NFS share
llm = HuggingFaceLLM(model="/opt/models/llama-3.1-70b-instruct")

pir_answer = fusion_officer.query_with_reasoning(
    "PIR: What is APT29's current exploitation methodology against NATO perimeter VPNs "
    "and what post-exploitation capabilities have they deployed in Q4 2024?",
    llm_provider=llm,
    max_results=20,
    max_hops=3,
)
print(pir_answer["response"])
fusion_officer.save("./fusion/pir_report/")

Memory Isolation Reference

When multiple agents write to a shared context, use conversation_id to isolate their streams and retrieve them individually.
# Tag a memory to an agent's namespace
context.store("Finding: lateral movement confirmed", conversation_id="tier2-ir")

# Retrieve only that agent's memories
tier2_history = context.retrieve("lateral movement", conversation_id="tier2-ir")

# Get the full ordered history for a namespace
full_history = context.conversation("tier2-ir", max_items=100)

# Delete an agent's entire namespace
context.forget(conversation_id="tier2-ir")
The same pattern works for user-scoped isolation — replace conversation_id with user_id:
context.store("...", user_id="analyst-jsmith")
context.retrieve("...", user_id="analyst-jsmith")
  • Agent Memory — memory storage, retrieval, persistence, and the working memory window each agent uses internally
  • Context Graphs — build and traverse the shared ContextGraph directly; temporal interval reasoning; entity deduplication before node insertion
  • Decision Intelligence — record and trace decisions across agent handoffs with causal chain analysis
  • LLM Integrations — configure the LLM provider passed to query_with_reasoning() in each agent