feat: Implement data archiving functionality in DataManager, enhance input validation, and add UI option for archiving old data

This commit is contained in:
William Valentin
2025-08-08 17:33:02 -07:00
parent 117e489072
commit 9e107f6125
4 changed files with 218 additions and 30 deletions
+123
View File
@@ -2,6 +2,8 @@ import csv
import logging
import os
import tempfile
from datetime import datetime
from typing import Any
import pandas as pd
@@ -312,6 +314,127 @@ class DataManager:
except Exception:
pass
# ------------------------------------------------------------------
# Archiving / Rotation
# ------------------------------------------------------------------
def _get_archive_dir(self) -> str:
"""Return path to the archives directory next to the main CSV."""
base_dir = os.path.dirname(os.path.abspath(self.filename)) or "."
archive_dir = os.path.join(base_dir, "archives")
os.makedirs(archive_dir, exist_ok=True)
return archive_dir
def _ensure_headers(self, df: pd.DataFrame) -> pd.DataFrame:
"""Ensure dataframe has all expected headers in correct order.
Missing numeric fields default to 0; dose/note string fields to ''.
Columns are ordered per _get_csv_headers().
"""
headers = list(self._get_csv_headers())
out = df.copy()
for col in headers:
if col not in out.columns:
if col == "note" or col.endswith("_doses"):
out[col] = ""
else:
out[col] = 0
# Drop unknown columns to keep files tidy
out = out[headers]
return out
def _write_archive_file(self, year: int, df: pd.DataFrame) -> str:
"""Append archived rows to a per-year CSV with full headers.
Returns the archive file path.
"""
archive_dir = self._get_archive_dir()
base = os.path.splitext(os.path.basename(self.filename))[0]
archive_path = os.path.join(archive_dir, f"{base}_{year}.csv")
df_to_write = self._ensure_headers(df)
# If file doesn't exist, write with header; else append without header
write_header = (
not os.path.exists(archive_path) or os.path.getsize(archive_path) == 0
)
try:
df_to_write.to_csv(archive_path, mode="a", index=False, header=write_header)
except Exception as e:
self.logger.error(f"Failed to write archive file {archive_path}: {e}")
raise
return archive_path
def archive_old_data(self, keep_years: int = 1) -> dict[str, Any]:
"""Archive rows older than the most recent N years into per-year files.
Args:
keep_years: Number of most recent full calendar years to keep in the
main CSV (minimum 1). Rows with a date older than the earliest
kept year are moved to archives/BASE_YYYY.csv.
Returns:
Summary dict: { 'archived_rows': int, 'archive_files': set[str],
'kept_rows': int }
"""
try:
keep_years = max(1, int(keep_years))
except Exception:
keep_years = 1
df = self.load_data()
if df.empty or "date" not in df.columns:
return {"archived_rows": 0, "archive_files": set(), "kept_rows": 0}
# Parse dates (stored as mm/dd/YYYY normally)
dates = pd.to_datetime(df["date"], format="%m/%d/%Y", errors="coerce")
df = df.copy()
df["__dt"] = dates
# If we couldn't parse dates, nothing to archive safely
if df["__dt"].isna().all():
df.drop(columns=["__dt"], inplace=True)
return {
"archived_rows": 0,
"archive_files": set(),
"kept_rows": int(len(df)),
}
current_year = datetime.now().year
earliest_kept_year = current_year - keep_years + 1
to_archive = df[df["__dt"].dt.year < earliest_kept_year]
to_keep = df[df["__dt"].dt.year >= earliest_kept_year]
if to_archive.empty:
df.drop(columns=["__dt"], inplace=True)
return {
"archived_rows": 0,
"archive_files": set(),
"kept_rows": int(len(df)),
}
archive_files: set[str] = set()
try:
# Group by year and append to each year's archive file
for year, group in to_archive.groupby(to_archive["__dt"].dt.year):
group = group.drop(columns=["__dt"]) # remove helper
path = self._write_archive_file(int(year), group)
archive_files.add(path)
# Write the kept rows back to main CSV atomically
kept_df = to_keep.drop(columns=["__dt"]).copy()
# Ensure columns and order
kept_df = self._ensure_headers(kept_df)
self._atomic_write_csv(kept_df)
self._invalidate_cache()
except Exception as e:
# If archiving failed mid-way, log and propagate minimal info
self.logger.error(f"Archiving failed: {e}")
raise
return {
"archived_rows": int(len(to_archive)),
"archive_files": archive_files,
"kept_rows": int(len(to_keep)),
}
def get_today_medicine_doses(
self, date: str, medicine_name: str
) -> list[tuple[str, str]]:
+41 -16
View File
@@ -233,34 +233,59 @@ class InputValidator:
entry_data: dict[str, Any],
) -> tuple[bool, list[str]]:
"""
Validate that an entry has the minimum required data.
Backward-compat entry completeness check.
Delegates to validate_entry_completeness_with_keys when possible.
"""
# Heuristic split: treat keys ending with _doses and note/date as
# non-core and assume the rest are a mix of pathologies and medicines;
# callers should prefer the explicit API below.
keys = [
k
for k in entry_data
if k not in {"date", "note"} and not str(k).endswith("_doses")
]
# Even split guess is unreliable; use value patterns instead:
path_keys = [k for k in keys if isinstance(entry_data.get(k), int | float)]
med_keys = [k for k in keys if k not in path_keys]
return InputValidator.validate_entry_completeness_with_keys(
entry_data, path_keys, med_keys
)
@staticmethod
def validate_entry_completeness_with_keys(
entry_data: dict[str, Any],
pathology_keys: list[str],
medicine_keys: list[str],
) -> tuple[bool, list[str]]:
"""
Validate that an entry has the minimum required data using explicit keys.
Args:
entry_data: Dictionary containing entry data
pathology_keys: Keys representing pathology scores (numeric, >0 meaningful)
medicine_keys: Keys representing medicine taken flags (0/1 boolean)
Returns:
Tuple of (is_complete, list_of_missing_fields)
"""
missing_fields = []
# Check required fields
missing_fields: list[str] = []
if not entry_data.get("date"):
missing_fields.append("Date")
# Check that at least one pathology or medicine is recorded
has_pathology_data = any(
entry_data.get(key, 0) > 0
for key in entry_data
if not key.endswith("_doses") and key not in ["date", "note"]
)
def _as_int(v: Any) -> int:
try:
return int(v)
except Exception:
try:
return int(float(v))
except Exception:
return 0
has_medicine_data = any(
entry_data.get(key, 0) > 0
for key in entry_data
if not key.endswith("_doses") and key not in ["date", "note"]
)
has_pathology = any(_as_int(entry_data.get(k, 0)) > 0 for k in pathology_keys)
has_medicine = any(_as_int(entry_data.get(k, 0)) == 1 for k in medicine_keys)
if not (has_pathology_data or has_medicine_data):
if not (has_pathology or has_medicine):
missing_fields.append("At least one pathology score or medicine entry")
return len(missing_fields) == 0, missing_fields
+52 -14
View File
@@ -509,6 +509,10 @@ class MedTrackerApp:
command=self._restore_from_backup,
accelerator="Ctrl+Shift+R",
)
tools_menu.add_command(
label="Archive Old Data...",
command=self._archive_old_data,
)
tools_menu.add_separator()
tools_menu.add_command(
label="Open Config Folder (Ctrl+Shift+C)",
@@ -843,6 +847,47 @@ Use Ctrl+S to save entries and Ctrl+Q to quit."""
logger.error(f"Failed to open backups folder: {e}")
self.ui_manager.update_status("Failed to open backups folder", "error")
def _archive_old_data(self) -> None:
"""Archive rows older than configured years and shrink main CSV."""
try:
keep_years = int(get_pref("archive_keep_years", 1) or 1)
except Exception:
keep_years = 1
# Confirm with user
if not messagebox.askyesno(
"Archive Old Data",
(
"This will move entries older than the last "
f"{keep_years} year(s) to per-year archive files and shrink the "
"main CSV.\n\nProceed?"
),
parent=self.root,
):
return
try:
self.ui_manager.update_status("Archiving old data...", "info")
summary = self.data_manager.archive_old_data(keep_years=keep_years)
archived = int(summary.get("archived_rows", 0))
kept = int(summary.get("kept_rows", 0))
files = summary.get("archive_files", set()) or set()
file_list = "\n".join(
[f"\u2022 {os.path.basename(str(p))}" for p in sorted(files)]
)
msg = f"Archived {archived} row(s). Kept {kept}."
if file_list:
msg += f"\n\n{file_list}"
self.ui_manager.update_status("Archiving complete", "success")
if hasattr(self.ui_manager, "show_toast"):
self.ui_manager.show_toast("Archiving complete", 1500)
messagebox.showinfo("Archive Complete", msg, parent=self.root)
# Refresh view since data file changed
self.refresh_data_display()
except Exception as e:
logger.error(f"Archiving failed: {e}")
self.ui_manager.update_status("Archiving failed", "error")
messagebox.showerror("Archive Failed", str(e), parent=self.root)
def _refresh_ui_after_config_change(self) -> None:
"""Refresh UI components after pathology or medicine configuration changes."""
self.ui_manager.update_status(
@@ -1291,21 +1336,14 @@ Use Ctrl+S to save entries and Ctrl+Q to quit."""
return
entry_data["note"] = validated_note
# Check entry completeness: require date and at least one of
# (any pathology score > 0) or (any medicine taken == 1)
missing_fields: list[str] = []
if not entry_data.get("date"):
missing_fields.append("Date")
has_pathology = any(
entry_data.get(k, 0) > 0
for k in self.pathology_manager.get_pathology_keys()
# Check entry completeness using explicit keys
is_complete, missing_fields = (
InputValidator.validate_entry_completeness_with_keys(
entry_data,
self.pathology_manager.get_pathology_keys(),
self.medicine_manager.get_medicine_keys(),
)
)
has_medicine = any(
entry_data.get(k, 0) == 1 for k in self.medicine_manager.get_medicine_keys()
)
if not (has_pathology or has_medicine):
missing_fields.append("At least one pathology score or medicine entry")
if missing_fields:
missing_msg = "Missing required data:\n" + "\n".join(
+2
View File
@@ -25,6 +25,8 @@ _DEFAULTS: dict[str, Any] = {
# Table column UX
"column_widths": {},
"last_sort": {"column": None, "ascending": True},
# Data: archiving/rotation
"archive_keep_years": 1,
}
_PREFERENCES: dict[str, Any] = dict(_DEFAULTS)