feat: Enhance DataManager and GraphManager with performance optimizations and caching
This commit is contained in:
+161
-55
@@ -9,7 +9,7 @@ from pathology_manager import PathologyManager
|
||||
|
||||
|
||||
class DataManager:
|
||||
"""Handle all data operations for the application."""
|
||||
"""Handle all data operations for the application with performance optimizations."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -22,10 +22,21 @@ class DataManager:
|
||||
self.logger: logging.Logger = logger
|
||||
self.medicine_manager = medicine_manager
|
||||
self.pathology_manager = pathology_manager
|
||||
|
||||
# Cache for loaded data to avoid repeated file I/O
|
||||
self._data_cache: pd.DataFrame | None = None
|
||||
self._cache_timestamp: float = 0
|
||||
self._headers_cache: tuple[str, ...] | None = None
|
||||
self._dtype_cache: dict[str, type] | None = None
|
||||
|
||||
self._initialize_csv_file()
|
||||
|
||||
def _get_csv_headers(self) -> list[str]:
|
||||
"""Get CSV headers based on current pathology and medicine configuration."""
|
||||
def _get_csv_headers(self) -> tuple[str, ...]:
|
||||
"""Get CSV headers based on current pathology and medicine configuration.
|
||||
Cached to avoid repeated computation."""
|
||||
if self._headers_cache is not None:
|
||||
return self._headers_cache
|
||||
|
||||
# Start with date
|
||||
headers = ["date"]
|
||||
|
||||
@@ -37,7 +48,9 @@ class DataManager:
|
||||
for medicine_key in self.medicine_manager.get_medicine_keys():
|
||||
headers.extend([medicine_key, f"{medicine_key}_doses"])
|
||||
|
||||
return headers + ["note"]
|
||||
result = tuple(headers + ["note"])
|
||||
self._headers_cache = result
|
||||
return result
|
||||
|
||||
def _initialize_csv_file(self) -> None:
|
||||
"""Create CSV file with headers if it doesn't exist or is empty."""
|
||||
@@ -46,27 +59,74 @@ class DataManager:
|
||||
writer = csv.writer(file)
|
||||
writer.writerow(self._get_csv_headers())
|
||||
|
||||
def _invalidate_cache(self) -> None:
|
||||
"""Invalidate the data cache when data changes."""
|
||||
self._data_cache = None
|
||||
self._cache_timestamp = 0
|
||||
|
||||
def _should_reload_data(self) -> bool:
|
||||
"""Check if data should be reloaded based on file modification time."""
|
||||
if self._data_cache is None:
|
||||
return True
|
||||
|
||||
try:
|
||||
file_mtime = os.path.getmtime(self.filename)
|
||||
return file_mtime > self._cache_timestamp
|
||||
except OSError:
|
||||
return True
|
||||
|
||||
def _get_dtype_dict(self) -> dict[str, type]:
|
||||
"""Get pandas dtype dictionary for efficient reading.
|
||||
Cached to avoid recreation."""
|
||||
if self._dtype_cache is not None:
|
||||
return self._dtype_cache
|
||||
|
||||
dtype_dict = {"date": str, "note": str}
|
||||
|
||||
# Add pathology types
|
||||
for pathology_key in self.pathology_manager.get_pathology_keys():
|
||||
dtype_dict[pathology_key] = int
|
||||
|
||||
# Add medicine types
|
||||
for medicine_key in self.medicine_manager.get_medicine_keys():
|
||||
dtype_dict[medicine_key] = int
|
||||
dtype_dict[f"{medicine_key}_doses"] = str
|
||||
|
||||
self._dtype_cache = dtype_dict
|
||||
return dtype_dict
|
||||
|
||||
def load_data(self) -> pd.DataFrame:
|
||||
"""Load data from CSV file."""
|
||||
"""Load data from CSV file with caching for better performance."""
|
||||
if not os.path.exists(self.filename) or os.path.getsize(self.filename) == 0:
|
||||
self.logger.warning("CSV file is empty or doesn't exist. No data to load.")
|
||||
return pd.DataFrame()
|
||||
|
||||
# Use cached data if available and file hasn't changed
|
||||
if not self._should_reload_data():
|
||||
return self._data_cache.copy()
|
||||
|
||||
try:
|
||||
# Build dtype dictionary dynamically
|
||||
dtype_dict = {"date": str, "note": str}
|
||||
# Use pre-built dtype dictionary for faster parsing
|
||||
dtype_dict = self._get_dtype_dict()
|
||||
|
||||
# Add pathology types
|
||||
for pathology_key in self.pathology_manager.get_pathology_keys():
|
||||
dtype_dict[pathology_key] = int
|
||||
# Read with optimized settings
|
||||
df: pd.DataFrame = pd.read_csv(
|
||||
self.filename,
|
||||
dtype=dtype_dict,
|
||||
na_filter=False, # Don't convert to NaN, keep as empty strings
|
||||
engine="c", # Use faster C engine
|
||||
)
|
||||
|
||||
# Add medicine types
|
||||
for medicine_key in self.medicine_manager.get_medicine_keys():
|
||||
dtype_dict[medicine_key] = int
|
||||
dtype_dict[f"{medicine_key}_doses"] = str
|
||||
# Sort only if needed (check if already sorted)
|
||||
if len(df) > 1 and not df["date"].is_monotonic_increasing:
|
||||
df = df.sort_values(by="date").reset_index(drop=True)
|
||||
|
||||
# Cache the data and timestamp
|
||||
self._data_cache = df.copy()
|
||||
self._cache_timestamp = os.path.getmtime(self.filename)
|
||||
|
||||
return df.copy()
|
||||
|
||||
df: pd.DataFrame = pd.read_csv(self.filename, dtype=dtype_dict).fillna("")
|
||||
return df.sort_values(by="date").reset_index(drop=True)
|
||||
except pd.errors.EmptyDataError:
|
||||
self.logger.warning("CSV file is empty. No data to load.")
|
||||
return pd.DataFrame()
|
||||
@@ -75,69 +135,104 @@ class DataManager:
|
||||
return pd.DataFrame()
|
||||
|
||||
def add_entry(self, entry_data: list[str | int]) -> bool:
|
||||
"""Add a new entry to the CSV file."""
|
||||
"""Add a new entry to the CSV file with optimized duplicate checking."""
|
||||
try:
|
||||
# Check if date already exists
|
||||
df: pd.DataFrame = self.load_data()
|
||||
# Quick duplicate check using cached data if available
|
||||
date_to_add: str = str(entry_data[0])
|
||||
|
||||
if not df.empty and date_to_add in df["date"].values:
|
||||
self.logger.warning(f"Entry with date {date_to_add} already exists.")
|
||||
return False
|
||||
if self._data_cache is not None:
|
||||
# Use cached data for duplicate check
|
||||
if date_to_add in self._data_cache["date"].values:
|
||||
self.logger.warning(
|
||||
f"Entry with date {date_to_add} already exists."
|
||||
)
|
||||
return False
|
||||
else:
|
||||
# Fallback to loading data if no cache
|
||||
df: pd.DataFrame = self.load_data()
|
||||
if not df.empty and date_to_add in df["date"].values:
|
||||
self.logger.warning(
|
||||
f"Entry with date {date_to_add} already exists."
|
||||
)
|
||||
return False
|
||||
|
||||
# Write to file
|
||||
with open(self.filename, mode="a", newline="") as file:
|
||||
writer = csv.writer(file)
|
||||
writer.writerow(entry_data)
|
||||
|
||||
# Invalidate cache since data changed
|
||||
self._invalidate_cache()
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error adding entry: {str(e)}")
|
||||
return False
|
||||
|
||||
def update_entry(self, original_date: str, values: list[str | int]) -> bool:
|
||||
"""Update an existing entry identified by original_date."""
|
||||
"""Update an existing entry identified by original_date
|
||||
with optimized processing."""
|
||||
try:
|
||||
df: pd.DataFrame = self.load_data()
|
||||
new_date: str = str(values[0])
|
||||
|
||||
# If the date is being changed, check if the new date already exists
|
||||
if original_date != new_date and new_date in df["date"].values:
|
||||
# Optimized duplicate check
|
||||
if original_date != new_date:
|
||||
date_exists = (df["date"] == new_date).any()
|
||||
if date_exists:
|
||||
self.logger.warning(
|
||||
f"Cannot update: entry with date {new_date} already exists."
|
||||
)
|
||||
return False
|
||||
|
||||
# Get current CSV headers to match with values
|
||||
headers = list(self._get_csv_headers())
|
||||
|
||||
# Ensure we have the right number of values with optimized padding
|
||||
if len(values) < len(headers):
|
||||
# Pad with defaults efficiently
|
||||
padding_needed = len(headers) - len(values)
|
||||
for i in range(padding_needed):
|
||||
header_idx = len(values) + i
|
||||
if header_idx < len(headers):
|
||||
header = headers[header_idx]
|
||||
if header == "note" or header.endswith("_doses"):
|
||||
values.append("")
|
||||
else:
|
||||
values.append(0)
|
||||
|
||||
# Use vectorized update for better performance
|
||||
mask = df["date"] == original_date
|
||||
if mask.any():
|
||||
df.loc[mask, headers] = values
|
||||
# Write back to CSV with optimized method
|
||||
df.to_csv(self.filename, index=False, mode="w")
|
||||
self._invalidate_cache()
|
||||
return True
|
||||
else:
|
||||
self.logger.warning(
|
||||
f"Cannot update: entry with date {new_date} already exists."
|
||||
f"Entry with date {original_date} not found for update."
|
||||
)
|
||||
return False
|
||||
|
||||
# Get current CSV headers to match with values
|
||||
headers = self._get_csv_headers()
|
||||
|
||||
# Ensure we have the right number of values
|
||||
if len(values) != len(headers):
|
||||
self.logger.warning(
|
||||
f"Value count mismatch: expected {len(headers)}, got {len(values)}"
|
||||
)
|
||||
# Pad with defaults if too few values
|
||||
while len(values) < len(headers):
|
||||
header = headers[len(values)]
|
||||
if header == "note" or header.endswith("_doses"):
|
||||
values.append("")
|
||||
else:
|
||||
values.append(0)
|
||||
|
||||
# Update the row using column names
|
||||
df.loc[df["date"] == original_date, headers] = values
|
||||
df.to_csv(self.filename, index=False)
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error updating entry: {str(e)}")
|
||||
return False
|
||||
|
||||
def delete_entry(self, date: str) -> bool:
|
||||
"""Delete an entry identified by date."""
|
||||
"""Delete an entry identified by date with optimized processing."""
|
||||
try:
|
||||
df: pd.DataFrame = self.load_data()
|
||||
# Remove the row with the matching date
|
||||
original_len = len(df)
|
||||
|
||||
# Use vectorized filtering for better performance
|
||||
df = df[df["date"] != date]
|
||||
# Write the updated dataframe back to the CSV
|
||||
df.to_csv(self.filename, index=False)
|
||||
|
||||
# Only write if something was actually deleted
|
||||
if len(df) < original_len:
|
||||
df.to_csv(self.filename, index=False, mode="w")
|
||||
self._invalidate_cache()
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error deleting entry: {str(e)}")
|
||||
@@ -146,23 +241,34 @@ class DataManager:
|
||||
def get_today_medicine_doses(
|
||||
self, date: str, medicine_name: str
|
||||
) -> list[tuple[str, str]]:
|
||||
"""Get list of (timestamp, dose) tuples for a medicine on a given date."""
|
||||
"""Get list of (timestamp, dose) tuples for a medicine on a given date
|
||||
with caching."""
|
||||
try:
|
||||
df: pd.DataFrame = self.load_data()
|
||||
if df.empty or date not in df["date"].values:
|
||||
if df.empty:
|
||||
return []
|
||||
|
||||
# Use vectorized filtering for better performance
|
||||
date_mask = df["date"] == date
|
||||
if not date_mask.any():
|
||||
return []
|
||||
|
||||
dose_column = f"{medicine_name}_doses"
|
||||
doses_str = df.loc[df["date"] == date, dose_column].iloc[0]
|
||||
if dose_column not in df.columns:
|
||||
return []
|
||||
|
||||
doses_str = df.loc[date_mask, dose_column].iloc[0]
|
||||
|
||||
if not doses_str:
|
||||
return []
|
||||
|
||||
# Optimized dose parsing
|
||||
doses = []
|
||||
for dose_entry in doses_str.split("|"):
|
||||
if ":" in dose_entry:
|
||||
timestamp, dose = dose_entry.split(":", 1)
|
||||
doses.append((timestamp, dose))
|
||||
parts = dose_entry.split(":", 1)
|
||||
if len(parts) == 2:
|
||||
doses.append((parts[0], parts[1]))
|
||||
|
||||
return doses
|
||||
except Exception as e:
|
||||
|
||||
Reference in New Issue
Block a user