Use FileIngestor for local files: it auto-detects format from the file extension and handles archives:
from semantica.ingest import FileIngestoringestor = FileIngestor()# Single file -> FileObjectfile_obj = ingestor.ingest_file("data/report.pdf")print(file_obj.name) # "report.pdf"print(file_obj.file_type) # "pdf"print(file_obj.text) # decoded text content (property on FileObject)print(file_obj.size) # bytes# Directory scan -> List[FileObject]files = ingestor.ingest_directory("data/", recursive=True)for f in files: print(f.name, f.file_type, f.size)
FileIngestor is always the fastest path for local files. It auto-detects format from extension, handles ZIP/TAR archives automatically, and reads content into .content bytes or the .text property. Use read_content=False when you only need file metadata.
For web, database, or stream sources, each ingestor exposes its own typed method:
from semantica.ingest import FileIngestoringestor = FileIngestor()# Single file: type auto-detected from extensionfile_obj = ingestor.ingest_file("data/report.pdf")# Recursive directory scanfiles = ingestor.ingest_directory("data/", recursive=True)# ingest() also works: routes to file or directory automaticallyfrom semantica.ingest import ingestresult = ingest("data/report.pdf") # {"files": [FileObject]}
2
Connect to a database
from semantica.ingest import DBIngestoringestor = DBIngestor()# Ingest all tablesresult = ingestor.ingest_database( "postgresql://user:pass@localhost/db", include_tables=["documents"],)# result["tables"]["documents"]["rows"] contains the row dicts# Run a custom queryrows = ingestor.execute_query( "postgresql://user:pass@localhost/db", "SELECT id, content, created_at FROM documents WHERE status = :s", s="active",)
Glob patterns (e.g. "data/**/*.docx") are not supported. ingest() accepts a file path or a directory path only. To filter by extension inside a directory, use ingest_directory() with the pattern= filter option.
PyArrow-based ingestion for Apache Parquet files, including Hive-style partitioned datasets:
from semantica.ingest import ParquetIngestoringestor = ParquetIngestor()# Single Parquet file -> ParquetDatadata = ingestor.ingest_file("data/events.parquet")# Partitioned directory (year=2024/month=01/...)data = ingestor.ingest_directory("data/partitioned/")# Load only specific columns: pass as kwargfrom semantica.ingest import ingest_parquetdata = ingest_parquet("data/events.parquet", columns=["id", "text", "timestamp"])# Extract schema without loading dataschema = ingest_parquet("data/events.parquet", method="schema")
Requires pyarrow: pip install pyarrow.
Use ParquetIngestor instead of FileIngestor for structured analytical data. Parquet ingestion preserves column types (int, float, datetime) that CSV reading loses. Use columns=["id", "text"] to avoid loading unused columns: critical for wide tables with hundreds of columns.
XMLIngestor uses lxml with resolve_entities=False to prevent XML External Entity (XXE) injection attacks.
XMLIngestor is XXE-safe by default. Do not use standard xml.etree.ElementTree to pre-parse XML before passing to Semantica: it does not block XXE attacks. XMLIngestor uses lxml with resolve_entities=False to safely parse untrusted XML.
Rate-limit web crawling.WebIngestor(delay=1.0, respect_robots=True) is the responsible default. Without rate limiting you risk getting blocked by the target server or violating its terms of service.
Use this for public REST-style APIs that do not require keys or tokens:
from semantica.ingest import PublicAPIIngestor, PublicAPIExamples, ingest_public_apiingestor = PublicAPIIngestor(rate_limit_delay=1.0)# Ingest any public endpointdata = ingestor.ingest_public_api("https://jsonplaceholder.typicode.com/posts")# Use a pre-configured example by namedata = ingestor.ingest_example("rest_countries_all")# Check if endpoint is accessible without authdetection = ingestor.detect_public_api("https://jsonplaceholder.typicode.com/posts")# List available pre-configured examplesexamples = PublicAPIExamples.list_examples()# Convenience functiondata = ingest_public_api("https://jsonplaceholder.typicode.com/posts")
Public API ingestion rejects common auth headers and query parameters by
default. Use RESTIngestor for authenticated APIs.
DBIngestor takes no required constructor args. Pass the connection string to each method:
from semantica.ingest import DBIngestoringestor = DBIngestor()# Ingest entire database (all tables, or filtered)result = ingestor.ingest_database( "postgresql://user:pass@localhost/db", include_tables=["documents"],)# result["schema"], result["tables"], result["total_tables"]# Run a custom query -> List[Dict]rows = ingestor.execute_query( "postgresql://user:pass@localhost/db", "SELECT id, content FROM documents WHERE status = :s", s="active",)# Export a single table -> TableDatatable = ingestor.export_table( "postgresql://user:pass@localhost/db", table_name="documents", limit=1000,)
Requires sqlalchemy: pip install sqlalchemy plus your database driver.
DBIngestor() takes no connection string in its constructor. Pass the connection string to ingest_database(), execute_query(), or export_table() as the first positional argument: not to DBIngestor() itself.
StreamIngestor methods require the target broker’s client library to be installed.ingest_kafka needs kafka-python, ingest_rabbitmq needs pika, ingest_kinesis needs boto3, and ingest_pulsar needs pulsar-client. Missing dependencies raise ImportError at call time, not at import time.
ingest() auto-detects source type from the path or URL and routes to the appropriate ingestor. It returns a Dict[str, Any] where the key depends on source type:
Ingest existing OWL or RDF ontology files as structured knowledge sources:
from semantica.ingest import OntologyIngestoringestor = OntologyIngestor()data = ingestor.ingest_ontology("domain_ontology.owl", format="turtle")# Or using the convenience functionfrom semantica.ingest import ingest_ontologydata = ingest_ontology("domain_ontology.ttl")