from semantica.pipeline import PipelineValidatorvalidator = PipelineValidator()result = validator.validate_pipeline(pipeline)if not result.valid: for error in result.errors: # errors is List[str] print(f"Error: {error}") for warning in result.warnings: print(f"Warning: {warning}")
Use PipelineValidator before running in production. It catches dependency cycles, missing step names, and misconfigured connections that would only surface as errors mid-run. Validation is instant; catching them after a 30-minute extraction job is not.
Inspect result.metrics to find bottlenecks.result.metrics['steps_executed'] and result.metrics['execution_time'] give a quick read on overall pipeline health. For per-step timing, check step.result on each PipelineStep after the run.
Set workers= based on workload type. Thread workers for I/O-bound steps (web fetching, DB queries), process workers for CPU-bound steps (embedding, OCR, large NER batches). Mixing pool types on the wrong step type wastes resources without speed gains.
In production, configure a RetryPolicy with limited retries so a single failing step does not stop the whole run. After execution, inspect result.errors to find and reprocess failed documents.
Configure retry policies to contain failures in production. Use handler.set_retry_policy("step_type", RetryPolicy(max_retries=3)) so transient errors are retried without stopping the pipeline. After the run, inspect result.errors to find and reprocess any documents that exhausted retries.
from semantica.pipeline import ExecutionEngineengine = ExecutionEngine()result = engine.execute_pipeline(pipeline, data="data/")# The progress tracker outputs tqdm bars to the console during execution
Displays a live progress bar in the terminal via Semantica’s built-in progress tracker. Best for scripts and CLI tools.
from semantica.pipeline import ExecutionEngineimport threading, timeengine = ExecutionEngine()# Run in a background thread, poll progress from main threaddef run(): engine.execute_pipeline(pipeline, data="data/")t = threading.Thread(target=run, daemon=True)t.start()while t.is_alive(): progress = engine.get_progress(pipeline.name) if progress: print(f" {progress['completed_steps']}/{progress['total_steps']} steps: {progress['status']}") time.sleep(2)
Poll get_progress() for live status during execution.
PipelineSerializer converts a pipeline to JSON or dict for storage and reloads it later:
from semantica.pipeline import PipelineSerializerserializer = PipelineSerializer()# Serialize to JSON stringjson_str = serializer.serialize_pipeline(pipeline, format="json")# Save to filewith open("pipeline_config.json", "w") as f: f.write(json_str)# Restore on any machine and executewith open("pipeline_config.json") as f: restored = serializer.deserialize_pipeline(f.read())result = ExecutionEngine().execute_pipeline(restored, data="data/")
Serialized pipelines capture step names, types, and config: but not handler functions (callables can’t be serialized). Re-register handlers on the restored steps before executing.
Use templates from PipelineTemplateManager for common patterns.create_pipeline_from_template("kg_construction") wires normalization, deduplication, conflict detection, and graph construction in the correct order: saving you from common mistakes like deduplicating before normalizing.
Fine-grained control over pipeline execution: pause, resume, cancel, and inspect live progress:
from semantica.pipeline import ExecutionEngineengine = ExecutionEngine(max_workers=4)# pipeline.name is the pipeline ID used for all control operationsresult = engine.execute_pipeline(pipeline, data="data/")pipeline_id = pipeline.name # e.g. "my_pipeline"# Pause after the current step finishesengine.pause_pipeline(pipeline_id)progress = engine.get_progress(pipeline_id)print(f"Completed: {progress['completed_steps']}/{progress['total_steps']}")print(f"Status: {progress['status']}")engine.resume_pipeline(pipeline_id)engine.stop_pipeline(pipeline_id)
Method
Returns
Description
execute_pipeline(pipeline, data)
ExecutionResult
Execute pipeline from start to finish
get_pipeline_status(pipeline_id)
PipelineStatus
Current state (RUNNING, PAUSED, STOPPED)
get_progress(pipeline_id)
Dict
completed_steps, total_steps, progress_percentage, status
Catches problems before they surface as mid-run failures:
from semantica.pipeline import PipelineValidatorvalidator = PipelineValidator()result = validator.validate_pipeline(pipeline)if result.valid: print("Pipeline is valid: safe to run")else: for error in result.errors: # errors is List[str] print(f"Error: {error}") for warning in result.warnings: # warnings is List[str] print(f"Warning: {warning}")
Checks performed:
Dependency cycle detection: A depends on B, B depends on A
Step type validation: each step type must be registered
Connection integrity: referenced step names must exist
Configuration completeness: required parameters must be present
from semantica.pipeline import ParallelismManager, Task# use_processes=False (default) → thread pool for I/O-bound tasksmanager = ParallelismManager(max_workers=8, use_processes=False)tasks = [Task(task_id=f"t{i}", handler=ner.extract, args=(text,)) for i, text in enumerate(texts)]results = manager.execute_parallel(tasks)# returns List[ParallelExecutionResult]successes = [r for r in results if r.success]failures = [r for r in results if not r.success]
Use thread pools for I/O-bound steps: web fetching, database queries, API calls.
from semantica.pipeline import ParallelismManager, Task# use_processes=True → process pool, bypasses Python GILmanager = ParallelismManager(max_workers=4, use_processes=True)tasks = [Task(task_id=f"t{i}", handler=embedder.generate_embeddings, args=(chunk,)) for i, chunk in enumerate(chunks)]results = manager.execute_parallel(tasks)
Use process pools for CPU-bound steps: embedding, OCR, large NER batches.
Re-process only data that has changed since the last run:
from semantica.pipeline import PipelineBuilder, ExecutionEnginebuilder = PipelineBuilder()# delta_mode=True tells ExecutionEngine to compute the diff between two snapshots# and pass only changed triples to this step's handlerbuilder.add_step( "ingest", "file_ingest", handler=ingestor.ingest_file, delta_mode=True, base_version_id="v1", target_version_id="v2")builder.add_step( "extract", "ner_extract", handler=extractor.extract, delta_mode=True, base_version_id="v1", target_version_id="v2")builder.add_step( "build", "graph_build", handler=kg_builder.build, delta_mode=False # always rebuild the merged graph)builder.connect_steps("ingest", "extract")builder.connect_steps("extract", "build")pipeline = builder.build("delta_pipeline")engine = ExecutionEngine()result = engine.execute_pipeline( pipeline, data="data/", version_manager=version_manager, # required for delta mode triplet_store=triplet_store # required for delta mode)
Delta detection uses SHA-256 checksums on source content. Only sources whose checksum differs from base_version_id are passed to downstream steps. For pipelines that run hourly or daily against a growing corpus, delta mode eliminates redundant re-embedding and re-extraction.
@dataclassclass ExecutionResult: success: bool # True if all steps completed without failure output: Any # output from the final pipeline step metadata: Dict[str, Any] # {"pipeline_id": "...", "execution_time": 1.23} metrics: Dict[str, Any] # {"steps_executed": 4, "steps_failed": 0, "execution_time": 1.23} errors: List[str] # error messages from failed steps (empty on full success)# Access patternresult.success # boolresult.output # final step outputresult.metadata["pipeline_id"] # pipeline name used as IDresult.metadata["execution_time"] # total wall-clock secondsresult.metrics["steps_executed"] # count of successfully completed stepsresult.metrics["steps_failed"] # count of failed stepsresult.errors # List[str] of error messages
PipelineStep schema
@dataclassclass PipelineStep: name: str step_type: str config: Dict[str, Any] dependencies: List[str] # names of steps this step waits for handler: Optional[Callable] status: StepStatus result: Any error: Optional[Exception] delta_mode: bool # True = process only changed data base_version_id: Optional[str] # snapshot ID to diff against target_version_id: Optional[str] # snapshot ID being produced
StepStatus enum
from semantica.pipeline import StepStatusStepStatus.PENDING # Not yet startedStepStatus.RUNNING # Currently executingStepStatus.COMPLETED # Finished successfullyStepStatus.FAILED # Error occurred: check step.errorStepStatus.SKIPPED # Skipped due to FailureHandler "skip" strategy