diff --git a/src/data_manager.py b/src/data_manager.py index 8425362..788acc2 100644 --- a/src/data_manager.py +++ b/src/data_manager.py @@ -2,6 +2,8 @@ import csv import logging import os import tempfile +from datetime import datetime +from typing import Any import pandas as pd @@ -312,6 +314,127 @@ class DataManager: except Exception: pass + # ------------------------------------------------------------------ + # Archiving / Rotation + # ------------------------------------------------------------------ + def _get_archive_dir(self) -> str: + """Return path to the archives directory next to the main CSV.""" + base_dir = os.path.dirname(os.path.abspath(self.filename)) or "." + archive_dir = os.path.join(base_dir, "archives") + os.makedirs(archive_dir, exist_ok=True) + return archive_dir + + def _ensure_headers(self, df: pd.DataFrame) -> pd.DataFrame: + """Ensure dataframe has all expected headers in correct order. + + Missing numeric fields default to 0; dose/note string fields to ''. + Columns are ordered per _get_csv_headers(). + """ + headers = list(self._get_csv_headers()) + out = df.copy() + for col in headers: + if col not in out.columns: + if col == "note" or col.endswith("_doses"): + out[col] = "" + else: + out[col] = 0 + # Drop unknown columns to keep files tidy + out = out[headers] + return out + + def _write_archive_file(self, year: int, df: pd.DataFrame) -> str: + """Append archived rows to a per-year CSV with full headers. + + Returns the archive file path. + """ + archive_dir = self._get_archive_dir() + base = os.path.splitext(os.path.basename(self.filename))[0] + archive_path = os.path.join(archive_dir, f"{base}_{year}.csv") + df_to_write = self._ensure_headers(df) + # If file doesn't exist, write with header; else append without header + write_header = ( + not os.path.exists(archive_path) or os.path.getsize(archive_path) == 0 + ) + try: + df_to_write.to_csv(archive_path, mode="a", index=False, header=write_header) + except Exception as e: + self.logger.error(f"Failed to write archive file {archive_path}: {e}") + raise + return archive_path + + def archive_old_data(self, keep_years: int = 1) -> dict[str, Any]: + """Archive rows older than the most recent N years into per-year files. + + Args: + keep_years: Number of most recent full calendar years to keep in the + main CSV (minimum 1). Rows with a date older than the earliest + kept year are moved to archives/BASE_YYYY.csv. + + Returns: + Summary dict: { 'archived_rows': int, 'archive_files': set[str], + 'kept_rows': int } + """ + try: + keep_years = max(1, int(keep_years)) + except Exception: + keep_years = 1 + + df = self.load_data() + if df.empty or "date" not in df.columns: + return {"archived_rows": 0, "archive_files": set(), "kept_rows": 0} + + # Parse dates (stored as mm/dd/YYYY normally) + dates = pd.to_datetime(df["date"], format="%m/%d/%Y", errors="coerce") + df = df.copy() + df["__dt"] = dates + # If we couldn't parse dates, nothing to archive safely + if df["__dt"].isna().all(): + df.drop(columns=["__dt"], inplace=True) + return { + "archived_rows": 0, + "archive_files": set(), + "kept_rows": int(len(df)), + } + + current_year = datetime.now().year + earliest_kept_year = current_year - keep_years + 1 + + to_archive = df[df["__dt"].dt.year < earliest_kept_year] + to_keep = df[df["__dt"].dt.year >= earliest_kept_year] + + if to_archive.empty: + df.drop(columns=["__dt"], inplace=True) + return { + "archived_rows": 0, + "archive_files": set(), + "kept_rows": int(len(df)), + } + + archive_files: set[str] = set() + try: + # Group by year and append to each year's archive file + for year, group in to_archive.groupby(to_archive["__dt"].dt.year): + group = group.drop(columns=["__dt"]) # remove helper + path = self._write_archive_file(int(year), group) + archive_files.add(path) + + # Write the kept rows back to main CSV atomically + kept_df = to_keep.drop(columns=["__dt"]).copy() + # Ensure columns and order + kept_df = self._ensure_headers(kept_df) + self._atomic_write_csv(kept_df) + self._invalidate_cache() + except Exception as e: + # If archiving failed mid-way, log and propagate minimal info + self.logger.error(f"Archiving failed: {e}") + raise + + return { + "archived_rows": int(len(to_archive)), + "archive_files": archive_files, + "kept_rows": int(len(to_keep)), + } + def get_today_medicine_doses( self, date: str, medicine_name: str ) -> list[tuple[str, str]]: diff --git a/src/input_validator.py b/src/input_validator.py index c19b052..ca6d891 100644 --- a/src/input_validator.py +++ b/src/input_validator.py @@ -233,34 +233,59 @@ class InputValidator: entry_data: dict[str, Any], ) -> tuple[bool, list[str]]: """ - Validate that an entry has the minimum required data. + Backward-compat entry completeness check. + + Delegates to validate_entry_completeness_with_keys when possible. + """ + # Heuristic split: treat keys ending with _doses and note/date as + # non-core and assume the rest are a mix of pathologies and medicines; + # callers should prefer the explicit API below. + keys = [ + k + for k in entry_data + if k not in {"date", "note"} and not str(k).endswith("_doses") + ] + # Even split guess is unreliable; use value patterns instead: + path_keys = [k for k in keys if isinstance(entry_data.get(k), int | float)] + med_keys = [k for k in keys if k not in path_keys] + return InputValidator.validate_entry_completeness_with_keys( + entry_data, path_keys, med_keys + ) + + @staticmethod + def validate_entry_completeness_with_keys( + entry_data: dict[str, Any], + pathology_keys: list[str], + medicine_keys: list[str], + ) -> tuple[bool, list[str]]: + """ + Validate that an entry has the minimum required data using explicit keys. Args: entry_data: Dictionary containing entry data + pathology_keys: Keys representing pathology scores (numeric, >0 meaningful) + medicine_keys: Keys representing medicine taken flags (0/1 boolean) Returns: Tuple of (is_complete, list_of_missing_fields) """ - missing_fields = [] - - # Check required fields + missing_fields: list[str] = [] if not entry_data.get("date"): missing_fields.append("Date") - # Check that at least one pathology or medicine is recorded - has_pathology_data = any( - entry_data.get(key, 0) > 0 - for key in entry_data - if not key.endswith("_doses") and key not in ["date", "note"] - ) + def _as_int(v: Any) -> int: + try: + return int(v) + except Exception: + try: + return int(float(v)) + except Exception: + return 0 - has_medicine_data = any( - entry_data.get(key, 0) > 0 - for key in entry_data - if not key.endswith("_doses") and key not in ["date", "note"] - ) + has_pathology = any(_as_int(entry_data.get(k, 0)) > 0 for k in pathology_keys) + has_medicine = any(_as_int(entry_data.get(k, 0)) == 1 for k in medicine_keys) - if not (has_pathology_data or has_medicine_data): + if not (has_pathology or has_medicine): missing_fields.append("At least one pathology score or medicine entry") return len(missing_fields) == 0, missing_fields diff --git a/src/main.py b/src/main.py index 0ac6e29..3f97b39 100644 --- a/src/main.py +++ b/src/main.py @@ -509,6 +509,10 @@ class MedTrackerApp: command=self._restore_from_backup, accelerator="Ctrl+Shift+R", ) + tools_menu.add_command( + label="Archive Old Data...", + command=self._archive_old_data, + ) tools_menu.add_separator() tools_menu.add_command( label="Open Config Folder (Ctrl+Shift+C)", @@ -843,6 +847,47 @@ Use Ctrl+S to save entries and Ctrl+Q to quit.""" logger.error(f"Failed to open backups folder: {e}") self.ui_manager.update_status("Failed to open backups folder", "error") + def _archive_old_data(self) -> None: + """Archive rows older than configured years and shrink main CSV.""" + try: + keep_years = int(get_pref("archive_keep_years", 1) or 1) + except Exception: + keep_years = 1 + # Confirm with user + if not messagebox.askyesno( + "Archive Old Data", + ( + "This will move entries older than the last " + f"{keep_years} year(s) to per-year archive files and shrink the " + "main CSV.\n\nProceed?" + ), + parent=self.root, + ): + return + + try: + self.ui_manager.update_status("Archiving old data...", "info") + summary = self.data_manager.archive_old_data(keep_years=keep_years) + archived = int(summary.get("archived_rows", 0)) + kept = int(summary.get("kept_rows", 0)) + files = summary.get("archive_files", set()) or set() + file_list = "\n".join( + [f"\u2022 {os.path.basename(str(p))}" for p in sorted(files)] + ) + msg = f"Archived {archived} row(s). Kept {kept}." + if file_list: + msg += f"\n\n{file_list}" + self.ui_manager.update_status("Archiving complete", "success") + if hasattr(self.ui_manager, "show_toast"): + self.ui_manager.show_toast("Archiving complete", 1500) + messagebox.showinfo("Archive Complete", msg, parent=self.root) + # Refresh view since data file changed + self.refresh_data_display() + except Exception as e: + logger.error(f"Archiving failed: {e}") + self.ui_manager.update_status("Archiving failed", "error") + messagebox.showerror("Archive Failed", str(e), parent=self.root) + def _refresh_ui_after_config_change(self) -> None: """Refresh UI components after pathology or medicine configuration changes.""" self.ui_manager.update_status( @@ -1291,21 +1336,14 @@ Use Ctrl+S to save entries and Ctrl+Q to quit.""" return entry_data["note"] = validated_note - # Check entry completeness: require date and at least one of - # (any pathology score > 0) or (any medicine taken == 1) - missing_fields: list[str] = [] - if not entry_data.get("date"): - missing_fields.append("Date") - - has_pathology = any( - entry_data.get(k, 0) > 0 - for k in self.pathology_manager.get_pathology_keys() + # Check entry completeness using explicit keys + is_complete, missing_fields = ( + InputValidator.validate_entry_completeness_with_keys( + entry_data, + self.pathology_manager.get_pathology_keys(), + self.medicine_manager.get_medicine_keys(), + ) ) - has_medicine = any( - entry_data.get(k, 0) == 1 for k in self.medicine_manager.get_medicine_keys() - ) - if not (has_pathology or has_medicine): - missing_fields.append("At least one pathology score or medicine entry") if missing_fields: missing_msg = "Missing required data:\n" + "\n".join( diff --git a/src/preferences.py b/src/preferences.py index 60c5e47..d196c20 100644 --- a/src/preferences.py +++ b/src/preferences.py @@ -25,6 +25,8 @@ _DEFAULTS: dict[str, Any] = { # Table column UX "column_widths": {}, "last_sort": {"column": None, "ascending": True}, + # Data: archiving/rotation + "archive_keep_years": 1, } _PREFERENCES: dict[str, Any] = dict(_DEFAULTS)