The DataLoaderSource class provides batch data loading capabilities for processing data from files and external sources. It supports multiple data formats and is designed for ETL pipelines and large-scale data ingestion scenarios.

DataFormat Enum

Enumeration of supported data formats for batch loading.
DataFormat(*args, **kwds)

Available Formats

CSV
DataFormat
Comma-separated values format for tabular data.
JSON
DataFormat
JavaScript Object Notation format for structured data.
PARQUET
DataFormat
Apache Parquet columnar storage format for efficient data processing.
ORC
DataFormat
Optimized Row Columnar format for big data processing.
XML
DataFormat
eXtensible Markup Language format for structured documents.
FWF
DataFormat
Fixed-width format for structured text data with fixed column positions.

DataLoaderConfig

Configuration object for specifying data loading parameters.
DataLoaderConfig(path, format, name=None, pandas_read_kwargs=None)

Parameters

path
str
required
The file path or URL to the data source. Can be local file paths, URLs, or cloud storage paths.
format
DataFormat
required
The format of the data source, specified using the DataFormat enum.
name
str | None
default:"None"
Optional name for the data loader configuration. Useful for identifying multiple data sources.
pandas_read_kwargs
dict[str, Any] | None
default:"None"
Additional keyword arguments passed to the pandas read function (e.g., read_csv, read_json). Allows fine-tuning of data loading behavior.

DataLoaderSource

Main class for batch data loading from configured sources.
DataLoaderSource(schema, data_loader_config, parser=None)

Parameters

schema
IdSchemaObjectT
required
The schema object that defines the structure of data this source will handle. All loaded data must conform to this schema.
data_loader_config
DataLoaderConfig
required
Configuration object specifying the data source, format, and loading parameters.
parser
DataParser | None
default:"None"
Optional data parser for processing loaded data. If None, an appropriate parser is selected based on the data format.

Properties

config
DataLoaderConfig
required
The configuration object used for this data loader.
name
str
required
The name identifier for this data loader source.

Inheritance

The DataLoaderSource extends several classes for online data processing: Inheritance Chain:
  • DataLoaderSource
  • OnlineSource
  • TransformerPublisher
  • Source
  • Generic

Data Loading Examples

CSV File Loading

from superlinked import DataLoaderSource, DataLoaderConfig, DataFormat

# Configure CSV loading
csv_config = DataLoaderConfig(
    path="data/products.csv",
    format=DataFormat.CSV,
    name="product_data",
    pandas_read_kwargs={
        "delimiter": ",",
        "header": 0,
        "encoding": "utf-8"
    }
)

# Create data loader source
csv_source = DataLoaderSource(
    schema=product_schema,
    data_loader_config=csv_config
)

JSON File Loading

# Configure JSON loading
json_config = DataLoaderConfig(
    path="data/articles.json",
    format=DataFormat.JSON,
    name="article_data",
    pandas_read_kwargs={
        "orient": "records",
        "lines": True  # For JSONL format
    }
)

json_source = DataLoaderSource(
    schema=article_schema,
    data_loader_config=json_config
)

Parquet File Loading

# Configure Parquet loading
parquet_config = DataLoaderConfig(
    path="data/large_dataset.parquet",
    format=DataFormat.PARQUET,
    name="large_data",
    pandas_read_kwargs={
        "columns": ["id", "title", "content", "category"],
        "filters": [("category", "in", ["tech", "science"])]
    }
)

parquet_source = DataLoaderSource(
    schema=document_schema,
    data_loader_config=parquet_config
)

Cloud Storage Loading

# Load from cloud storage (S3, GCS, etc.)
cloud_config = DataLoaderConfig(
    path="s3://my-bucket/data/products.csv",
    format=DataFormat.CSV,
    name="cloud_products",
    pandas_read_kwargs={
        "storage_options": {
            "aws_access_key_id": "your_key",
            "aws_secret_access_key": "your_secret"
        }
    }
)

cloud_source = DataLoaderSource(
    schema=product_schema,
    data_loader_config=cloud_config
)

Use Cases

ETL Pipelines

Process large datasets from data warehouses:
# Load from data warehouse export
warehouse_config = DataLoaderConfig(
    path="exports/daily_transactions.parquet",
    format=DataFormat.PARQUET,
    name="daily_transactions"
)

transaction_source = DataLoaderSource(
    schema=transaction_schema,
    data_loader_config=warehouse_config
)

Batch Processing

Regular batch updates from external systems:
# Daily batch processing
for date in date_range:
    daily_config = DataLoaderConfig(
        path=f"data/daily_feeds/{date}.json",
        format=DataFormat.JSON,
        name=f"feed_{date}"
    )
    
    daily_source = DataLoaderSource(
        schema=feed_schema,
        data_loader_config=daily_config
    )

Data Migration

Migrate data from existing systems:
# Migrate from legacy CSV exports
migration_configs = [
    DataLoaderConfig(f"legacy_data/users_{i}.csv", DataFormat.CSV)
    for i in range(1, 11)  # 10 CSV files
]

migration_sources = [
    DataLoaderSource(user_schema, config)
    for config in migration_configs
]

Research and Analytics

Load research datasets for analysis:
# Research dataset loading
research_config = DataLoaderConfig(
    path="research_data/scientific_papers.json",
    format=DataFormat.JSON,
    name="research_papers",
    pandas_read_kwargs={
        "chunksize": 1000  # Process in chunks for large files
    }
)

research_source = DataLoaderSource(
    schema=paper_schema,
    data_loader_config=research_config
)

Performance Optimization

Chunked Processing

For large files, use pandas chunking:
large_file_config = DataLoaderConfig(
    path="very_large_dataset.csv",
    format=DataFormat.CSV,
    pandas_read_kwargs={
        "chunksize": 10000,  # Process 10k rows at a time
        "low_memory": False
    }
)

Column Selection

Load only needed columns to reduce memory usage:
optimized_config = DataLoaderConfig(
    path="wide_dataset.parquet",
    format=DataFormat.PARQUET,
    pandas_read_kwargs={
        "columns": ["id", "title", "content"]  # Only load needed columns
    }
)

Data Filtering

Apply filters during loading to reduce data volume:
filtered_config = DataLoaderConfig(
    path="large_dataset.parquet",
    format=DataFormat.PARQUET,
    pandas_read_kwargs={
        "filters": [
            ("date", ">=", "2024-01-01"),
            ("category", "in", ["important", "urgent"])
        ]
    }
)

Best Practices

File Format Selection

Parquet for Analytics: Use Parquet format for large analytical datasets due to efficient compression and columnar storage.
JSON for APIs: Use JSON format when loading data from API responses or when maintaining data structure is important.

Memory Management

Large Files: For files larger than available RAM, use chunked processing or streaming approaches to prevent memory issues.

Error Handling

Data Validation: Always validate loaded data against your schema before processing to catch format mismatches early.

Performance Optimization

Column Selection: Only load columns you need to reduce memory usage and improve loading performance.

Integration with Applications

DataLoaderSource integrates with Superlinked applications for batch processing:
from superlinked import RestApp, Index, TextSimilaritySpace

# Create data loader for batch ingestion
batch_config = DataLoaderConfig(
    path="data/product_catalog.csv",
    format=DataFormat.CSV
)

batch_source = DataLoaderSource(
    schema=product_schema,
    data_loader_config=batch_config
)

# Create application with both batch and real-time sources
app = RestApp(
    sources=[batch_source, real_time_source],  # Batch + real-time
    indices=[product_index],
    queries=[search_query],
    vector_database=vector_db
)
The DataLoaderSource provides efficient batch data loading capabilities essential for production data processing pipelines and large-scale vector search applications.