The DataLoaderSource
class provides batch data loading capabilities for processing data from files and external sources. It supports multiple data formats and is designed for ETL pipelines and large-scale data ingestion scenarios.
Enumeration of supported data formats for batch loading.
DataFormat(*args, **kwds)
Comma-separated values format for tabular data.
JavaScript Object Notation format for structured data.
Apache Parquet columnar storage format for efficient data processing.
Optimized Row Columnar format for big data processing.
eXtensible Markup Language format for structured documents.
Fixed-width format for structured text data with fixed column positions.
DataLoaderConfig
Configuration object for specifying data loading parameters.
DataLoaderConfig(path, format, name=None, pandas_read_kwargs=None)
Parameters
The file path or URL to the data source. Can be local file paths, URLs, or cloud storage paths.
The format of the data source, specified using the DataFormat enum.
Optional name for the data loader configuration. Useful for identifying multiple data sources.
pandas_read_kwargs
dict[str, Any] | None
default:"None"
Additional keyword arguments passed to the pandas read function (e.g., read_csv
, read_json
). Allows fine-tuning of data loading behavior.
DataLoaderSource
Main class for batch data loading from configured sources.
DataLoaderSource(schema, data_loader_config, parser=None)
Parameters
The schema object that defines the structure of data this source will handle. All loaded data must conform to this schema.
Configuration object specifying the data source, format, and loading parameters.
parser
DataParser | None
default:"None"
Optional data parser for processing loaded data. If None, an appropriate parser is selected based on the data format.
Properties
The configuration object used for this data loader.
The name identifier for this data loader source.
Inheritance
The DataLoaderSource
extends several classes for online data processing:
Inheritance Chain:
DataLoaderSource
- →
OnlineSource
- →
TransformerPublisher
- →
Source
- →
Generic
Data Loading Examples
CSV File Loading
from superlinked import DataLoaderSource, DataLoaderConfig, DataFormat
# Configure CSV loading
csv_config = DataLoaderConfig(
path="data/products.csv",
format=DataFormat.CSV,
name="product_data",
pandas_read_kwargs={
"delimiter": ",",
"header": 0,
"encoding": "utf-8"
}
)
# Create data loader source
csv_source = DataLoaderSource(
schema=product_schema,
data_loader_config=csv_config
)
JSON File Loading
# Configure JSON loading
json_config = DataLoaderConfig(
path="data/articles.json",
format=DataFormat.JSON,
name="article_data",
pandas_read_kwargs={
"orient": "records",
"lines": True # For JSONL format
}
)
json_source = DataLoaderSource(
schema=article_schema,
data_loader_config=json_config
)
Parquet File Loading
# Configure Parquet loading
parquet_config = DataLoaderConfig(
path="data/large_dataset.parquet",
format=DataFormat.PARQUET,
name="large_data",
pandas_read_kwargs={
"columns": ["id", "title", "content", "category"],
"filters": [("category", "in", ["tech", "science"])]
}
)
parquet_source = DataLoaderSource(
schema=document_schema,
data_loader_config=parquet_config
)
Cloud Storage Loading
# Load from cloud storage (S3, GCS, etc.)
cloud_config = DataLoaderConfig(
path="s3://my-bucket/data/products.csv",
format=DataFormat.CSV,
name="cloud_products",
pandas_read_kwargs={
"storage_options": {
"aws_access_key_id": "your_key",
"aws_secret_access_key": "your_secret"
}
}
)
cloud_source = DataLoaderSource(
schema=product_schema,
data_loader_config=cloud_config
)
Use Cases
ETL Pipelines
Process large datasets from data warehouses:
# Load from data warehouse export
warehouse_config = DataLoaderConfig(
path="exports/daily_transactions.parquet",
format=DataFormat.PARQUET,
name="daily_transactions"
)
transaction_source = DataLoaderSource(
schema=transaction_schema,
data_loader_config=warehouse_config
)
Batch Processing
Regular batch updates from external systems:
# Daily batch processing
for date in date_range:
daily_config = DataLoaderConfig(
path=f"data/daily_feeds/{date}.json",
format=DataFormat.JSON,
name=f"feed_{date}"
)
daily_source = DataLoaderSource(
schema=feed_schema,
data_loader_config=daily_config
)
Data Migration
Migrate data from existing systems:
# Migrate from legacy CSV exports
migration_configs = [
DataLoaderConfig(f"legacy_data/users_{i}.csv", DataFormat.CSV)
for i in range(1, 11) # 10 CSV files
]
migration_sources = [
DataLoaderSource(user_schema, config)
for config in migration_configs
]
Research and Analytics
Load research datasets for analysis:
# Research dataset loading
research_config = DataLoaderConfig(
path="research_data/scientific_papers.json",
format=DataFormat.JSON,
name="research_papers",
pandas_read_kwargs={
"chunksize": 1000 # Process in chunks for large files
}
)
research_source = DataLoaderSource(
schema=paper_schema,
data_loader_config=research_config
)
Chunked Processing
For large files, use pandas chunking:
large_file_config = DataLoaderConfig(
path="very_large_dataset.csv",
format=DataFormat.CSV,
pandas_read_kwargs={
"chunksize": 10000, # Process 10k rows at a time
"low_memory": False
}
)
Column Selection
Load only needed columns to reduce memory usage:
optimized_config = DataLoaderConfig(
path="wide_dataset.parquet",
format=DataFormat.PARQUET,
pandas_read_kwargs={
"columns": ["id", "title", "content"] # Only load needed columns
}
)
Data Filtering
Apply filters during loading to reduce data volume:
filtered_config = DataLoaderConfig(
path="large_dataset.parquet",
format=DataFormat.PARQUET,
pandas_read_kwargs={
"filters": [
("date", ">=", "2024-01-01"),
("category", "in", ["important", "urgent"])
]
}
)
Best Practices
Parquet for Analytics: Use Parquet format for large analytical datasets due to efficient compression and columnar storage.
JSON for APIs: Use JSON format when loading data from API responses or when maintaining data structure is important.
Memory Management
Large Files: For files larger than available RAM, use chunked processing or streaming approaches to prevent memory issues.
Error Handling
Data Validation: Always validate loaded data against your schema before processing to catch format mismatches early.
Column Selection: Only load columns you need to reduce memory usage and improve loading performance.
Integration with Applications
DataLoaderSource integrates with Superlinked applications for batch processing:
from superlinked import RestApp, Index, TextSimilaritySpace
# Create data loader for batch ingestion
batch_config = DataLoaderConfig(
path="data/product_catalog.csv",
format=DataFormat.CSV
)
batch_source = DataLoaderSource(
schema=product_schema,
data_loader_config=batch_config
)
# Create application with both batch and real-time sources
app = RestApp(
sources=[batch_source, real_time_source], # Batch + real-time
indices=[product_index],
queries=[search_query],
vector_database=vector_db
)
The DataLoaderSource provides efficient batch data loading capabilities essential for production data processing pipelines and large-scale vector search applications.