PipelineBuilder solves the glue problem between processing steps. Declare your steps, register handler functions, wire the connections, and hand control to ExecutionEngine — it handles topological ordering, passes output between steps, retries on failure with configurable backoff, and returns a structured ExecutionResult you can log or alert on.
PipelineBuilder and ExecutionEngine are in semantica.pipeline. Failure handling, retry policies, and parallelism management are separate classes you can import individually for fine-grained control. Custom step handlers are plain Python functions — no subclassing required.Your First Pipeline
The minimum viable pipeline has three steps: ingest, extract, store. Define them, connect them, build, execute:ExecutionEngine performs a topological sort of the step graph before executing, so even if you declare steps in the wrong order the execution sequence is always correct. Each step receives the previous step’s return value as its data argument.
Reading the ExecutionResult
Everyengine.execute_pipeline() call returns an ExecutionResult dataclass. Check it before assuming success:
result.errors is a List[str] — one entry per failed step, each containing the exception message. A pipeline with retry_on_failure=True attempts each failed step up to max_retries times (default: 3) before recording it as a failure and moving on.
Handling Failures and Configuring Retry Policy
By default,ExecutionEngine(retry_on_failure=True) uses an exponential backoff policy: three retries, starting at 1 second, doubling each time, capped at 60 seconds. For steps that call external APIs or databases — where transient failures are expected — you can set per-step-type policies via FailureHandler:
handler.classify_error() distinguishes ValidationError (low severity, usually don’t retry), ProcessingError (high severity), and timeout/connection errors (medium severity, always retry). You can inspect the classification:
Running Steps in Parallel
When two steps don’t depend on each other — for example, NER extraction and triplet extraction both reading from the same ingest output — declare them as parallel branches by connecting both to the same upstream step:set_parallelism(n) tells the engine how many steps it may run simultaneously. The topological sort guarantees that only steps whose dependencies are all completed are eligible for concurrent execution — you cannot accidentally run a step before its inputs are ready.
Delta / Incremental Processing
Your STIX bundle directory grows by 20–30 new files each night. Re-processing all 4,000 historical files every morning wastes time and compute.delta_mode=True on the ingest step tells the pipeline to process only files that have changed since the last version snapshot:
base_version_id and target_version_id are stored on the PipelineStep dataclass and passed through to your handler via config — your handler is responsible for using them to filter its input. A typical pattern is to check file modification timestamps against the base version date.
Building from a Config Dict
For pipelines defined in config files — useful when different environments (dev, staging, prod) run the same pipeline with different paths and thresholds — pass a dict tobuild_pipeline() instead of calling add_step() manually:
build_pipeline() reads step connections from the "dependencies" key inside each step’s config dict (not from connect_steps() calls). Add dependencies explicitly if you use this path:
Monitoring Progress
ExecutionEngine integrates with Semantica’s progress tracker automatically — every step start, update, and completion is recorded. To observe progress during a long-running pipeline, inspect step status on the Pipeline object after execution:
result.metrics gives the aggregate view:
Domain Examples
- Defense — CTI/Threat
- Security — SOC/Incident
- Life Science — Clinical/Pharma
- Banking — Risk/Compliance
A SOC threat intelligence team needs an end-to-end pipeline that ingests STIX bundles from a classified directory, runs entity extraction with custom threat-actor labels, and builds a
ContextGraph ready for analyst queries. The pipeline runs every six hours; failed steps retry automatically so a transient filesystem error doesn’t drop an ingestion cycle.Related Guides
- Ingest — all source types for the ingest step: PDFs, APIs, databases, RSS feeds, STIX directories, and streams
- Semantic Extraction — NER, relation extraction, triplet extraction, and event detection for the extract step
- Context Graphs — building and querying the
ContextGraphthat the store step populates - Provenance — tracking the origin document, confidence score, and pipeline run ID for every extracted entity
