ContextGraph is a thread-safe, in-memory property graph with temporal validity windows on every node and edge, built-in BFS traversal, a FAISS vector index for semantic search, and proximity-blended retrieval through AgentContext. Use it when multiple agents or threads write to a shared knowledge base while analysts query it in real time.
For analytical operations on top of a populated graph — centrality rankings, community detection, node embeddings, link prediction — see the Graph Analytics guide. For recording and querying decisions stored as nodes, see the Decision Intelligence guide.

Constructing the Graph

The simplest possible graph needs no arguments:
from semantica.context import ContextGraph

graph = ContextGraph()
For a threat intelligence workload that will also run analytics, enable the sub-components at construction time — they initialize lazily but must be declared upfront:
graph = ContextGraph(
    advanced_analytics  = True,
    centrality_analysis = True,
    community_detection = True,
    node_embeddings     = True,
)
The graph is backed entirely by Python dicts and a re-entrant lock (threading.RLock). No external service, no database connection, no network call. You can stand up a fully functional intelligence graph in a unit test with a single import.

Adding Your First Entities

Every entity goes in as a node with a type, optional content string, and any number of metadata kwargs:
# add_node(node_id, node_type, content=None, **properties) -> None
# All extra kwargs land in ContextNode.metadata

graph.add_node(
    "APT29",
    "ThreatActor",
    "Russian state-sponsored group, also known as COZY BEAR",
    origin="Russia",
    motivation="espionage",
    first_seen="2008",
)

graph.add_node(
    "SUNBURST",
    "Malware",
    "Supply-chain backdoor embedded in SolarWinds Orion updates",
    family="backdoor",
    first_seen="2019-10",
    platforms=["Windows"],
)

graph.add_node(
    "CVE-2020-10148",
    "Vulnerability",
    "SolarWinds Orion API authentication bypass",
    cvss=10.0,
    affected_product="SolarWinds Orion",
)

graph.add_node(
    "45.142.212.100",
    "C2Domain",
    "Command-and-control server observed in SUNBURST campaign",
    asn="AS29550",
    country="Netherlands",
)

graph.add_node(
    "SolarWinds",
    "Victim",
    "SolarWinds Corporation — software supply chain victim",
    sector="Technology",
)
There is no properties={} parameter. Pass all metadata fields as direct keyword arguments. Calling add_node("x", "t", properties={"k": "v"}) would store the dict under a key literally named properties in metadata — not what you want.
Now connect them with typed, weighted edges:
# add_edge(source_id, target_id, edge_type="related_to", weight=1.0, **properties) -> None

graph.add_edge("APT29",          "SUNBURST",         "uses",       weight=1.0)
graph.add_edge("SUNBURST",       "CVE-2020-10148",   "exploits",   weight=0.95)
graph.add_edge("SUNBURST",       "SolarWinds",       "targets",    weight=1.0)
graph.add_edge("APT29",          "45.142.212.100",   "operates",   weight=0.9)
graph.add_edge("SUNBURST",       "45.142.212.100",   "beacons_to", weight=0.85)
graph.add_edge("CVE-2020-10148", "SolarWinds",       "affects",    weight=1.0)
Check what you have:
s = graph.stats()
print(f"Nodes: {s['node_count']}, Edges: {s['edge_count']}, Density: {s['density']:.4f}")
# Nodes: 5, Edges: 6, Density: 0.3000

print("Node types:", s["node_types"])   # {"ThreatActor": 1, "Malware": 1, ...}
print("Edge types:", s["edge_types"])   # {"uses": 1, "exploits": 1, ...}

Temporal Validity — Intel Has an Expiry Date

Use valid_from and valid_until to mark nodes and edges with activity windows so temporal queries exclude stale data:
# The C2 domain was only active during the campaign window
graph.add_node(
    "45.142.212.100",
    "C2Domain",
    "SUNBURST C2 — active during campaign",
    asn="AS29550",
    valid_from="2019-10-01T00:00:00",
    valid_until="2020-12-17T00:00:00",   # DarkHalo C2 shutdown date
)

# A detection rule with a limited effectiveness window
graph.add_node(
    "SIGMA-SUNBURST-001",
    "DetectionRule",
    "Sigma rule: SUNBURST beacon pattern",
    rule_type="sigma",
    valid_from="2020-12-13T00:00:00",
    valid_until="2021-06-30T23:59:59",   # deprecated after updated TTPs observed
)

# Temporal edges work the same way
graph.add_edge(
    "APT29", "45.142.212.100", "operates",
    weight=0.9,
    valid_from="2019-10-01T00:00:00",
    valid_until="2020-12-17T00:00:00",
)
Now ask: which nodes were active on December 1, 2020 (during the campaign)?
from datetime import datetime

# at_time must be a datetime object — not an ISO string
active = graph.find_active_nodes(
    node_type="C2Domain",
    at_time=datetime(2020, 12, 1, 0, 0, 0),
)
print(f"Active C2 domains on 2020-12-01: {len(active)}")
# Active C2 domains on 2020-12-01: 1  (45.142.212.100 is still in its window)

# Compare to today — the C2 is expired
active_now = graph.find_active_nodes(node_type="C2Domain")  # defaults to datetime.now()
print(f"Active C2 domains today: {len(active_now)}")
# Active C2 domains today: 0

# Full temporal snapshot — only nodes and edges valid at a given moment
snapshot = graph.state_at(datetime(2020, 12, 1, 0, 0, 0))
print(f"Active nodes: {len(snapshot['nodes'])}")
print(f"Active edges: {len(snapshot['edges'])}")
This is how you prevent a query today from returning “APT29 currently operates 45.142.212.100” — the edge is outside its validity window and won’t appear in temporal queries.

Finding Nodes

find_node() retrieves by ID, and find_nodes() filters by type or metadata:
# find_node(node_id) -> Optional[Dict]
# Returns keys: "id", "type", "content", "metadata"  — NOT "node_id" or "node_type"

actor = graph.find_node("APT29")
if actor:
    print(actor["id"])       # "APT29"
    print(actor["type"])     # "ThreatActor"
    print(actor["content"])  # "Russian state-sponsored group..."
    print(actor["metadata"]) # {"origin": "Russia", "motivation": "espionage", ...}

# find_nodes(node_type=None, skip=0, limit=None) -> List[Dict]
all_actors = graph.find_nodes(node_type="ThreatActor")
all_vulns  = graph.find_nodes(node_type="Vulnerability")

Traversing the Graph

BFS traversal answers reachability questions directly:
# get_neighbors(node_id, hops=1, relationship_types=None,
#               min_weight=0.0, include_distance_metadata=False) -> List[Dict]
# Each result: {"id", "type", "content", "relationship", "weight", "hop"}

neighbors = graph.get_neighbors("APT29", hops=2)
for n in neighbors:
    print(f"  hop={n['hop']}  [{n['relationship']}]  {n['id']}  ({n['type']})")

# hop=1  [uses]       SUNBURST         (Malware)
# hop=1  [operates]   45.142.212.100   (C2Domain)
# hop=2  [exploits]   CVE-2020-10148   (Vulnerability)
# hop=2  [targets]    SolarWinds       (Victim)
# hop=2  [beacons_to] 45.142.212.100   (C2Domain)  — also reachable via hop-1
Filter to only follow specific edge types — useful when you want to trace just the exploitation chain without noise from other relationship types:
exploit_chain = graph.get_neighbors(
    "APT29",
    hops=3,
    relationship_types=["uses", "exploits", "affects"],
)
When you need to understand how confident a connection is based on graph distance, enable distance metadata. Each result gains a confidence_decay multiplier — nodes further away are weighted down:
neighbors = graph.get_neighbors(
    "APT29",
    hops=3,
    include_distance_metadata=True,
)
for n in neighbors:
    print(f"  {n['id']:30s}  band={n['distance_band']:8s}  decay={n['confidence_decay']:.3f}")

# APT29's direct SUNBURST edge:   band=direct   decay=1.000
# CVE reached via SUNBURST:       band=near     decay=0.850
# SolarWinds reached via CVE:     band=mid      decay=0.700
For point-to-point routing, use the shortest path finder:
# shortest_path(source_id, target_id, edge_types=None) -> Optional[List[str]]
path = graph.shortest_path("APT29", "SolarWinds")
if path:
    print(" → ".join(path))
# APT29 → SUNBURST → SolarWinds
When you need to analyze a sub-cluster in isolation, extract_subgraph() gives you a new independent ContextGraph instance:
# extract_subgraph(node_ids, include_edges=True) -> ContextGraph
campaign_nodes = ["APT29", "SUNBURST", "CVE-2020-10148", "45.142.212.100", "SolarWinds"]
subgraph = graph.extract_subgraph(campaign_nodes)

s = subgraph.stats()
print(f"Subgraph: {s['node_count']} nodes, {s['edge_count']} edges")
# Subgraph: 5 nodes, 6 edges

Handling Concurrent Writes

ContextGraph handles concurrent writes with a re-entrant lock (threading.RLock) that wraps every mutation — you do not need to add your own synchronization:
import threading
from semantica.context import ContextGraph

graph = ContextGraph()

def misp_ingest_worker(events):
    for event in events:
        graph.add_node(event["id"], event["type"], event["value"])
        for attr in event.get("attributes", []):
            graph.add_edge(event["id"], attr["value"], "has_attribute")

def nvd_ingest_worker(cves):
    for cve in cves:
        graph.add_node(cve["id"], "Vulnerability", cve["description"], cvss=cve["cvss"])
        graph.add_edge(cve["id"], cve["product"], "affects")

# Both threads write safely to the same graph
t1 = threading.Thread(target=misp_ingest_worker, args=(misp_events,))
t2 = threading.Thread(target=nvd_ingest_worker, args=(nvd_batch,))
t1.start(); t2.start()
t1.join(); t2.join()

print(graph.stats())
The lock is re-entrant, so internal calls that themselves acquire the lock (for example, add_edge() calling find_node() internally) won’t deadlock.

Semantic Search via AgentContext

AgentContext wraps the graph with a FAISS vector index and lets you retrieve by semantic similarity, with optional blending of graph proximity:
from semantica.context import AgentContext, ContextGraph
from semantica.vector_store import VectorStore

graph = ContextGraph()
# ... (populated with CTI nodes as above)

context = AgentContext(
    vector_store    = VectorStore(backend="faiss", dimension=768),
    knowledge_graph = graph,
    hybrid_alpha    = 0.5,       # 50% semantic / 50% structural weighting
    decision_tracking = True,
)

# Store intel summaries — these become searchable
context.store("APT29 operated SUNBURST backdoor via SolarWinds supply chain compromise")
context.store("45.142.212.100 is a C2 server associated with the SUNBURST campaign")
context.store("CVE-2020-10148 allows unauthenticated API access in SolarWinds Orion")

# Retrieve with graph proximity blending
# anchor_node="APT29" means nodes close to APT29 in the graph score higher
results = context.retrieve(
    "APT29 infrastructure and C2 servers",
    max_results      = 10,
    anchor_node      = "APT29",
    max_hops         = 2,
    proximity_weight = 0.3,    # 30% graph proximity, 70% semantic score
    use_graph        = True,
)

for r in results:
    # "score"          — base semantic similarity (always present)
    # "combined_score" — blended score (present when proximity_weight > 0)
    # "distance_band"  — "direct" / "near" / "mid" / "far"
    score = r.get("combined_score", r.get("score", 0))
    print(f"[{score:.3f}]  {r.get('content', '')[:70]}")
proximity_weight is a per-call parameter on retrieve(), not a constructor setting. This means different queries can use different blending ratios on the same context object — a broad semantic search uses proximity_weight=0.0, while a neighborhood-focused traversal uses proximity_weight=0.5.

Cross-Graph Navigation

link_graph() connects two separate graphs, and cross_graph_path() finds paths that span the boundary:
from semantica.context import ContextGraph

actor_graph  = ContextGraph()
victim_graph = ContextGraph()

actor_graph.add_node("APT29",    "ThreatActor", "APT29")
actor_graph.add_node("SUNBURST", "Malware",     "SUNBURST backdoor")
actor_graph.add_edge("APT29", "SUNBURST", "uses")

victim_graph.add_node("SolarWinds", "Victim", "SolarWinds Corporation")
victim_graph.add_node("Treasury",   "Victim", "US Department of Treasury")
victim_graph.add_edge("SolarWinds", "Treasury", "supply_chain_compromised")

link_id = actor_graph.link_graph(
    victim_graph,
    "APT29",
    "SolarWinds",
    link_type="targets",
)

other_graph, target_node_id = actor_graph.navigate_to(link_id)

sw = other_graph.find_node(target_node_id)
if sw:
    print("Reached:", sw["id"])

result = actor_graph.cross_graph_path(
    "APT29",
    victim_graph,
    "Treasury",
)

if result.get("reachable"):
    print(f"Reached in {result['hop_count']} hops")
# APT29 → SUNBURST → SolarWinds → Treasury

Serialization and Persistence

After each ingest cycle, save the graph to disk. On restart, restore it — the entire node and edge set is preserved:
# Save
graph.save_to_file("cti_graph.json")

# Restore
restored = ContextGraph(advanced_analytics=True)
restored.load_from_file("cti_graph.json")

print(restored.stats())

# to_dict() gives you the raw serializable dict
d = graph.to_dict()
# d["nodes"]      → list of node dicts
# d["edges"]      → list of edge dicts
# d["statistics"] → {"node_count": int, "edge_count": int}
For full session persistence (graph + FAISS vector index + memory), use AgentContext.save() / AgentContext.load():
context.save("agent_state/")

# Later, on restart:
context2 = AgentContext(
    vector_store    = VectorStore(backend="faiss", dimension=768),
    knowledge_graph = ContextGraph(),
)
context2.load("agent_state/")

Domain Examples

Three separate ingest workers write to a shared ContextGraph simultaneously (MISP, NVD, classified STIX). Temporal validity prevents stale campaign data from appearing in current-threat queries.
from semantica.context import ContextGraph, AgentContext
from semantica.vector_store import VectorStore
from datetime import datetime

graph = ContextGraph(advanced_analytics=True, community_detection=True)

# Core CTI entities
graph.add_node("APT29", "ThreatActor", "Russian GRU unit, COZY BEAR",
               origin="Russia", motivation="espionage")
graph.add_node("SUNBURST", "Malware", "SolarWinds supply chain backdoor",
               family="backdoor", platforms=["Windows"])
graph.add_node("CVE-2020-10148", "Vulnerability",
               "SolarWinds Orion API auth bypass", cvss=10.0)

# Time-bound C2 infrastructure
graph.add_node("avsvmcloud.com", "C2Domain",
               "SUNBURST DNS C2 domain",
               valid_from="2019-10-01T00:00:00",
               valid_until="2020-12-18T00:00:00")

graph.add_edge("APT29",    "SUNBURST",        "deploys",    weight=1.0)
graph.add_edge("SUNBURST", "CVE-2020-10148",  "exploits",   weight=0.95)
graph.add_edge("SUNBURST", "avsvmcloud.com",  "beacons_to", weight=0.9,
               valid_from="2019-10-01T00:00:00",
               valid_until="2020-12-18T00:00:00")

# What C2 infrastructure is active right NOW?
active_c2 = graph.find_active_nodes(node_type="C2Domain")
print(f"Currently active C2 domains: {len(active_c2)}")
# Currently active C2 domains: 0  — avsvmcloud.com expired in 2020

# Historical query: what was active during the campaign?
campaign_c2 = graph.find_active_nodes(
    node_type="C2Domain",
    at_time=datetime(2020, 6, 1),
)
print(f"C2 domains active June 2020: {len(campaign_c2)}")
# C2 domains active June 2020: 1  — avsvmcloud.com was active

# Traversal: full blast radius from APT29
blast_radius = graph.get_neighbors("APT29", hops=3,
                                   include_distance_metadata=True)
for n in blast_radius:
    print(f"  hop={n['hop']}  decay={n['confidence_decay']:.2f}  {n['id']}")
  • Graph Analytics — centrality rankings, community detection, node embeddings, and link prediction on a populated ContextGraph
  • Decision Intelligence — recording decisions as typed nodes, causal chain analysis, precedent search, and policy enforcement
  • Ingest — loading data from PDFs, APIs, databases, STIX bundles, and RSS feeds into the graph
  • Deduplication — detecting and merging near-duplicate nodes before insertion to prevent graph fragmentation
  • Reasoning — temporal interval algebra (Allen relations), forward/backward chaining, and SPARQL over the knowledge graph
  • Ontology Management — deriving formal OWL ontologies from graph.to_dict() for downstream reasoning engines
  • Context Module Reference — full API for AgentContext, ContextGraph, ContextNode, ContextEdge