import csv import logging import os import tempfile import pandas as pd from medicine_manager import MedicineManager from pathology_manager import PathologyManager class DataManager: """Handle all data operations for the application with performance optimizations.""" def __init__( self, filename: str, logger: logging.Logger, medicine_manager: MedicineManager, pathology_manager: PathologyManager, ) -> None: self._init_internal( filename, logger, medicine_manager, pathology_manager, ) def _init_internal( self, filename: str, logger: logging.Logger, medicine_manager: MedicineManager, pathology_manager: PathologyManager, ) -> None: self.filename = filename self.logger = logger self.medicine_manager = medicine_manager self.pathology_manager = pathology_manager self._data_cache = None self._cache_timestamp = 0 self._headers_cache = None self._dtype_cache = None self._graph_cache = None self._config_version = 0 self._initialize_csv_file() def _get_csv_headers(self) -> tuple[str, ...]: """Get CSV headers based on current pathology and medicine configuration. Cached to avoid repeated computation.""" if self._headers_cache is not None: return self._headers_cache # Start with date headers = ["date"] # Add pathology headers for pathology_key in self.pathology_manager.get_pathology_keys(): headers.append(pathology_key) # Add medicine headers for medicine_key in self.medicine_manager.get_medicine_keys(): headers.extend([medicine_key, f"{medicine_key}_doses"]) result = tuple(headers + ["note"]) self._headers_cache = result return result def _initialize_csv_file(self) -> None: """Create CSV file with headers if it doesn't exist or is empty.""" try: creating = not os.path.exists(self.filename) if creating or os.path.getsize(self.filename) == 0: with open(self.filename, mode="w", newline="") as file: writer = csv.writer(file) writer.writerow(self._get_csv_headers()) if creating: # Emit warning so tests detect creation of missing file self.logger.warning( "CSV file did not exist and was created with headers." ) except Exception as e: self.logger.error(f"Failed to initialize CSV file: {e}") def _invalidate_cache(self) -> None: """Invalidate the data cache when data changes.""" self._data_cache = None self._cache_timestamp = 0 self._graph_cache = None def invalidate_structure(self) -> None: """Invalidate caches due to structural changes (e.g., medicines/pathologies). Public method for other managers / UI to call instead of reaching into private attributes. This bumps a config version ensuring future loads rebuild dependent caches. """ self._headers_cache = None self._dtype_cache = None self._graph_cache = None self._config_version += 1 # Data remains valid but columns may differ; safest is full invalidation self._invalidate_cache() def _should_reload_data(self) -> bool: """Check if data should be reloaded based on file modification time.""" if self._data_cache is None: return True try: file_mtime = os.path.getmtime(self.filename) return file_mtime > self._cache_timestamp except OSError: return True def _get_dtype_dict(self) -> dict[str, type]: """Get pandas dtype dictionary for efficient reading. Cached to avoid recreation.""" if self._dtype_cache is not None: return self._dtype_cache dtype_dict = {"date": str, "note": str} # Add pathology types for pathology_key in self.pathology_manager.get_pathology_keys(): dtype_dict[pathology_key] = int # Add medicine types for medicine_key in self.medicine_manager.get_medicine_keys(): dtype_dict[medicine_key] = int dtype_dict[f"{medicine_key}_doses"] = str self._dtype_cache = dtype_dict return dtype_dict def load_data(self) -> pd.DataFrame: """Load data from CSV file with caching for better performance.""" if not os.path.exists(self.filename): self.logger.warning("CSV file does not exist. No data to load.") return pd.DataFrame() if os.path.getsize(self.filename) == 0: self.logger.warning("CSV file is empty. No data to load.") return pd.DataFrame() # Use cached data if available and file hasn't changed if not self._should_reload_data(): return self._data_cache.copy() try: # Use pre-built dtype dictionary for faster parsing dtype_dict = self._get_dtype_dict() # Read with optimized settings df: pd.DataFrame = pd.read_csv( self.filename, dtype=dtype_dict, na_filter=False, # Don't convert to NaN, keep as empty strings engine="c", # Use faster C engine ) # If file has only headers (no rows), treat as empty with warning if df.empty: self.logger.warning("CSV file contains only headers. No data to load.") return pd.DataFrame() # Sort only if needed (check if already sorted) if len(df) > 1 and not df["date"].is_monotonic_increasing: df = df.sort_values(by="date").reset_index(drop=True) # Cache the data and timestamp self._data_cache = df.copy() self._cache_timestamp = os.path.getmtime(self.filename) # Invalidate graph cache because underlying data changed self._graph_cache = None return df.copy() except pd.errors.EmptyDataError: self.logger.warning("CSV file is empty. No data to load.") return pd.DataFrame() except Exception as e: self.logger.error(f"Error loading data: {str(e)}") return pd.DataFrame() def add_entry(self, entry_data: list[str | int]) -> bool: """Add a new entry to the CSV file with optimized duplicate checking.""" try: # Quick duplicate check using cached data if available date_to_add: str = str(entry_data[0]) if self._data_cache is not None: # Use cached data for duplicate check if date_to_add in self._data_cache["date"].values: self.logger.warning( f"Entry with date {date_to_add} already exists." ) return False else: # Fallback to loading data if no cache df: pd.DataFrame = self.load_data() if not df.empty and date_to_add in df["date"].values: self.logger.warning( f"Entry with date {date_to_add} already exists." ) return False # Write to file with open(self.filename, mode="a", newline="") as file: writer = csv.writer(file) writer.writerow(entry_data) # Invalidate cache since data changed self._invalidate_cache() return True except Exception as e: self.logger.error(f"Error adding entry: {str(e)}") return False def update_entry(self, original_date: str, values: list[str | int]) -> bool: """Update an existing entry identified by original_date with optimized processing.""" try: df: pd.DataFrame = self.load_data() new_date: str = str(values[0]) # Optimized duplicate check if original_date != new_date: date_exists = (df["date"] == new_date).any() if date_exists: self.logger.warning( f"Cannot update: entry with date {new_date} already exists." ) return False # Get current CSV headers to match with values headers = list(self._get_csv_headers()) # Ensure we have the right number of values with optimized padding if len(values) < len(headers): # Pad with defaults efficiently padding_needed = len(headers) - len(values) for i in range(padding_needed): header_idx = len(values) + i if header_idx < len(headers): header = headers[header_idx] if header == "note" or header.endswith("_doses"): values.append("") else: values.append(0) # Use vectorized update for better performance mask = df["date"] == original_date if mask.any(): df.loc[mask, headers] = values # Atomic write back to CSV to avoid partial writes self._atomic_write_csv(df) self._invalidate_cache() return True else: self.logger.warning( f"Entry with date {original_date} not found for update." ) return False except Exception as e: self.logger.error(f"Error updating entry: {str(e)}") return False def delete_entry(self, date: str) -> bool: """Delete an entry identified by date with optimized processing.""" try: df: pd.DataFrame = self.load_data() original_len = len(df) # Use vectorized filtering for better performance df = df[df["date"] != date] # Only write if something was actually deleted if len(df) < original_len: self._atomic_write_csv(df) self._invalidate_cache() return True except Exception as e: self.logger.error(f"Error deleting entry: {str(e)}") return False # ------------------------------------------------------------------ # File write helpers # ------------------------------------------------------------------ def _atomic_write_csv(self, df: pd.DataFrame) -> None: """Write a DataFrame to CSV atomically by writing to a temp file then replacing. This prevents corrupted files if the app crashes mid-write. """ directory = os.path.dirname(os.path.abspath(self.filename)) or "." os.makedirs(directory, exist_ok=True) fd, tmp_path = tempfile.mkstemp( prefix="thechart_", suffix=".csv", dir=directory ) try: with os.fdopen(fd, "w") as tmp_file: df.to_csv(tmp_file, index=False) os.replace(tmp_path, self.filename) finally: # If replace succeeded tmp_path no longer exists; suppress errors try: if os.path.exists(tmp_path): os.remove(tmp_path) except Exception: pass def get_today_medicine_doses( self, date: str, medicine_name: str ) -> list[tuple[str, str]]: """Get list of (timestamp, dose) tuples for a medicine on a given date with caching.""" try: df: pd.DataFrame = self.load_data() if df.empty: return [] # Use vectorized filtering for better performance date_mask = df["date"] == date if not date_mask.any(): return [] dose_column = f"{medicine_name}_doses" if dose_column not in df.columns: return [] doses_str = df.loc[date_mask, dose_column].iloc[0] if not doses_str: return [] # Optimized dose parsing doses = [] for dose_entry in doses_str.split("|"): if ":" in dose_entry: parts = dose_entry.split(":", 1) if len(parts) == 2: doses.append((parts[0], parts[1])) return doses except Exception as e: self.logger.error(f"Error getting medicine doses: {str(e)}") return [] # ------------------------------------------------------------------ # Retrieval helpers # ------------------------------------------------------------------ def get_row(self, date: str) -> list[str | int] | None: """Return a row (as list aligned with current headers) for a date. Args: date: Date string identifying the row Returns: List of values aligned with current CSV headers or None if not found. """ try: df = self.load_data() if df.empty or "date" not in df.columns: return None mask = df["date"] == date if not mask.any(): return None headers = list(self._get_csv_headers()) row_series = df.loc[mask, headers].iloc[0] return [row_series[h] for h in headers] except Exception: return None # ------------------------------------------------------------------ # Graph Data Handling # ------------------------------------------------------------------ def get_graph_ready_data(self) -> pd.DataFrame: """Return a dataframe ready for graphing (datetime index cached). This avoids repeatedly parsing dates & re-sorting in the graph layer. """ base_df = self.load_data() if base_df.empty: return base_df if self._graph_cache is not None: return self._graph_cache.copy() try: graph_df = base_df.copy() # Expect date stored in mm/dd/YYYY format graph_df["date"] = pd.to_datetime( graph_df["date"], format="%m/%d/%Y", errors="coerce" ) graph_df = graph_df.dropna(subset=["date"]).sort_values("date") graph_df.set_index("date", inplace=True) self._graph_cache = graph_df.copy() return graph_df except Exception: # Fallback: return original (unindexed) data return base_df