"""Processing module for embedding analysis.
This module contains the Processor class that handles the core data processing
logic for embedding generation, similarity calculation, and metric evaluation.
It coordinates specialized services to execute individual test runs.
Example:
Process a test run::
from src.core.processing import Processor
processor = Processor(db, config)
result = processor.run_test(rows, model_config, ...)
"""
import hashlib
import logging
from typing import Any
import numpy as np
from langdetect import detect
from langdetect.lang_detect_exception import LangDetectException
from tqdm import tqdm
from ..metrics.evaluation_metrics import calculate_all_metrics
from ..services.embedding_service import EmbeddingService
from ..services.similarity_service import SimilarityService
from ..services.visualization_service import VisualizationService
from ..utils.database import EmbeddingDatabase
from ..utils.utils import chunk_text
from .config import AppConfig, ModelConfig
[docs]
class Processor:
"""Handle core data processing logic for embedding analysis.
This class orchestrates the processing pipeline for a single test run,
delegating specific tasks to specialized services for embedding generation,
similarity calculation, and visualization.
Attributes:
db: The embedding database instance.
config: The application configuration.
embedding_service: Service for embedding generation and caching.
similarity_service: Service for similarity calculations.
visualization_service: Service for t-SNE visualization.
"""
[docs]
def __init__(self, db: EmbeddingDatabase, config: AppConfig) -> None:
"""Initialize the Processor.
Args:
db: The embedding database instance for storing results.
config: The application configuration.
"""
self.db = db
self.config = config
self.embedding_service = EmbeddingService(db, config)
self.similarity_service = SimilarityService()
self.visualization_service = VisualizationService(db)
def _safe_convert_to_python_types(self, data: Any) -> Any:
"""Recursively convert NumPy types to native Python types.
Ensures data can be serialized to JSON by converting numpy arrays,
floats, and integers to their Python equivalents.
Args:
data: The data to convert, which may contain NumPy types.
Returns:
The data with all NumPy types converted to Python native types.
"""
if isinstance(data, np.ndarray):
return data.astype(float).tolist()
elif isinstance(data, (np.floating, float)):
return float(data)
elif isinstance(data, (np.integer, int)):
return int(data)
elif isinstance(data, dict):
return {
key: self._safe_convert_to_python_types(value)
for key, value in data.items()
}
elif isinstance(data, (list, tuple)):
return [self._safe_convert_to_python_types(item) for item in data]
else:
return data
[docs]
def run_test(
self,
rows: list[tuple[str, str]],
model_config: ModelConfig,
chunk_size: int,
chunk_overlap: int,
themes: list[str],
theme_name: str,
chunking_strategy: str,
similarity_metric: str,
processed_files: list[str],
pbar: tqdm,
) -> dict[str, Any]:
"""Process a test run for a specific parameter combination.
Handles the complete workflow for processing files including embedding
generation, similarity calculation, and metric evaluation.
Args:
rows: List of (name, content) tuples for files to process.
model_config: The model configuration to use.
chunk_size: Size of text chunks in characters.
chunk_overlap: Overlap between chunks in characters.
themes: List of theme keywords to compare against.
theme_name: Name identifier for the theme set.
chunking_strategy: The chunking strategy to use.
similarity_metric: The similarity metric to use.
processed_files: List of file names already processed.
pbar: Progress bar object for status updates.
Returns:
Dictionary containing processing results with file data and metrics.
"""
model_name = model_config.name
results = {"files": {}}
embedding_function = self.embedding_service.get_embedding_function(model_config)
# Use the base model name for caching, as embeddings are model-specific
base_model_name = model_config.name
themes_embeddings_map, themes_time = self.embedding_service.get_or_create_embeddings(
embedding_function, base_model_name, themes
)
unprocessed_rows = [row for row in rows if row[0] not in processed_files]
if not themes_embeddings_map:
logging.error(
f"Failed to embed themes for model '{model_name}'. "
f"Skipping {len(unprocessed_rows)} files for this combination."
)
pbar.update(len(unprocessed_rows))
return {"results": {}}
theme_hashes = [self.embedding_service.get_text_hash(theme) for theme in themes]
embed_themes_list = [
themes_embeddings_map[h] for h in theme_hashes if h in themes_embeddings_map
]
embed_themes = np.array(embed_themes_list)
for item in unprocessed_rows:
name, texte = item
grid_params = (
f"cs{chunk_size} co{chunk_overlap} {similarity_metric[:3]} "
f"{chunking_strategy} {theme_name[-13:]}"
)
description = f"{name} | {model_name[-20:]} | {grid_params}"
pbar.set_description(description[:150])
if not texte or not texte.strip():
pbar.update(1)
continue
try:
language = detect(texte)
except LangDetectException:
language = "en" # Default to English if detection fails
item_phrases = chunk_text(
texte, chunk_size, chunk_overlap, chunking_strategy, language=language
)
if not item_phrases:
pbar.update(1)
continue
all_embeddings_map, chunks_time = self.embedding_service.get_or_create_embeddings(
embedding_function, base_model_name, item_phrases
)
phrase_hashes = {p: self.embedding_service.get_text_hash(p) for p in item_phrases}
item_embed_phrases = np.array(
[
all_embeddings_map[h]
for p in item_phrases
if (h := phrase_hashes[p]) in all_embeddings_map
]
)
if item_embed_phrases.size == 0:
pbar.update(1)
continue
if embed_themes.shape[1] != item_embed_phrases.shape[1]:
logging.error(f"Dimension mismatch for file '{name}'.")
pbar.update(1)
continue
# Calculate similarities and labels with validation
similarites = self.similarity_service.calculate_similarity(
embed_themes, item_embed_phrases, similarity_metric
)
similarites = self.similarity_service.validate_similarities(similarites, similarity_metric)
labels = np.argmax(similarites, axis=0)
# Calculate metrics
all_metrics = calculate_all_metrics(
embed_themes,
item_embed_phrases,
labels,
)
# Add timing metrics
total_embedding_time = themes_time + chunks_time
all_metrics["embedding_computation_time"] = float(total_embedding_time)
# Convert metrics to safe Python types
all_metrics = self._safe_convert_to_python_types(all_metrics)
# Generate t-SNE coordinates
tsne_key = f"{model_config.name}_cs{chunk_size}_co{chunk_overlap}_{chunking_strategy}"
scatter_plot_data = self.visualization_service.get_or_create_tsne_data(
item_embed_phrases,
tsne_key,
name,
similarites,
self.config.similarity_threshold,
)
# Ensure all data is properly converted
max_similarities = similarites.max(axis=0)
safe_similarities = self._safe_convert_to_python_types(max_similarities)
results["files"][name] = {
"file_name": name,
"phrases": item_phrases,
"similarities": safe_similarities,
"metrics": all_metrics,
"scatter_plot_data": scatter_plot_data,
}
pbar.update(1)
return {"results": results}