Extract data from Snowflake into Semantica with password, key-pair, OAuth, and SSO authentication.
Installation
# Install with Snowflake support
pip install "semantica[db-snowflake]"
# Or install the connector separately
pip install snowflake-connector-python
Basic Usage
from semantica.ingest import SnowflakeIngestor
import os
ingestor = SnowflakeIngestor(
account = os.getenv( "SNOWFLAKE_ACCOUNT" ),
user = os.getenv( "SNOWFLAKE_USER" ),
password = os.getenv( "SNOWFLAKE_PASSWORD" ),
warehouse = os.getenv( "SNOWFLAKE_WAREHOUSE" ),
database = os.getenv( "SNOWFLAKE_DATABASE" ),
schema = os.getenv( "SNOWFLAKE_SCHEMA" ),
)
data = ingestor.ingest_table( "CUSTOMERS" )
print ( f "Retrieved { data.row_count } rows: columns: { data.columns } " )
Use environment variables (or a .env file with python-dotenv) to keep credentials out of source code. SnowflakeIngestor() with no arguments reads from SNOWFLAKE_* environment variables automatically.
Authentication Methods
Password
Key-Pair (Recommended)
OAuth
SSO
ingestor = SnowflakeIngestor(
account = "myaccount" ,
user = "myuser" ,
password = "mypassword" ,
warehouse = "COMPUTE_WH" ,
)
ingestor = SnowflakeIngestor(
account = "myaccount" ,
user = "myuser" ,
private_key_path = "/path/to/rsa_key.p8" ,
warehouse = "COMPUTE_WH" ,
)
Preferred for production: no password stored in config. ingestor = SnowflakeIngestor(
account = "myaccount" ,
user = "myuser" ,
authenticator = "oauth" ,
token = "your_oauth_token" ,
warehouse = "COMPUTE_WH" ,
)
ingestor = SnowflakeIngestor(
account = "myaccount" ,
user = "myuser" ,
authenticator = "externalbrowser" ,
warehouse = "COMPUTE_WH" ,
)
Querying
Ingest a table with filters
data = ingestor.ingest_table(
"CUSTOMERS" ,
where = "COUNTRY = 'USA' AND CREATED_DATE > '2024-01-01'" ,
order_by = "CREATED_DATE DESC" ,
limit = 10000 ,
)
Custom SQL
data = ingestor.ingest_query( """
SELECT CUSTOMER_ID, SUM(AMOUNT) AS TOTAL_AMOUNT
FROM SALES
WHERE DATE >= '2024-01-01'
GROUP BY CUSTOMER_ID
""" )
Schema introspection
schema = ingestor.get_table_schema( "CUSTOMERS" )
for column in schema[ "columns" ]:
print ( f " { column[ 'name' ] } : { column[ 'type' ] } " )
Export as Semantica Documents
documents = ingestor.export_as_documents(
data,
id_field = "CUSTOMER_ID" ,
text_fields = [ "NAME" , "EMAIL" , "NOTES" ],
)
print ( f "Created { len (documents) } documents for processing" )
Batch Processing Large Tables
PAGE_SIZE = 5000
for page in range (total_pages):
data = ingestor.ingest_table(
"LARGE_TABLE" ,
limit = PAGE_SIZE ,
offset = page * PAGE_SIZE ,
)
process_batch(data)
Or use the built-in batch_size parameter:
data = ingestor.ingest_query(
"SELECT * FROM LARGE_TABLE" ,
batch_size = 5000 ,
)
Troubleshooting
from semantica.ingest import SnowflakeConnector
connector = SnowflakeConnector( account = "myaccount" , user = "myuser" , password = "mypassword" )
if not connector.test_connection():
print ( "Connection failed: check credentials and account identifier" )
See Also
Ingest Module Full SnowflakeIngestor and all other ingestors.
Pipeline Use Snowflake ingestion as a pipeline step.
Installation All optional dependency extras.
Knowledge Graph Build a KG from ingested Snowflake data.