7bb06fabdd
- Added DataFilter class for managing filtering and searching of medical data. - Introduced SearchFilterWidget for UI controls related to search and filters. - Integrated search and filter features into MedTrackerApp, allowing users to filter data by date range, medicine status, and pathology scores. - Implemented quick filters for common use cases (last week, last month, high symptoms). - Enhanced data loading and display logic to accommodate filtered data. - Added error handling for data loading issues. - Updated UIManager to reflect filter status in the application. - Improved entry validation in add_new_entry method to ensure data integrity.
419 lines
14 KiB
Python
419 lines
14 KiB
Python
"""Search and filter functionality for TheChart application."""
|
|
|
|
import re
|
|
from typing import Any
|
|
|
|
import pandas as pd
|
|
|
|
|
|
class DataFilter:
|
|
"""Handles filtering and searching of medical data."""
|
|
|
|
def __init__(self, logger=None):
|
|
"""
|
|
Initialize data filter.
|
|
|
|
Args:
|
|
logger: Logger instance for debugging
|
|
"""
|
|
self.logger = logger
|
|
self.active_filters = {}
|
|
self.search_term = ""
|
|
|
|
def set_date_range_filter(
|
|
self, start_date: str | None = None, end_date: str | None = None
|
|
) -> None:
|
|
"""
|
|
Set date range filter.
|
|
|
|
Args:
|
|
start_date: Start date string (inclusive)
|
|
end_date: End date string (inclusive)
|
|
"""
|
|
if start_date or end_date:
|
|
self.active_filters["date_range"] = {"start": start_date, "end": end_date}
|
|
elif "date_range" in self.active_filters:
|
|
del self.active_filters["date_range"]
|
|
|
|
def set_medicine_filter(self, medicine_key: str, taken: bool) -> None:
|
|
"""
|
|
Filter by medicine taken status.
|
|
|
|
Args:
|
|
medicine_key: Medicine identifier
|
|
taken: Whether medicine was taken (True) or not taken (False)
|
|
"""
|
|
if "medicines" not in self.active_filters:
|
|
self.active_filters["medicines"] = {}
|
|
|
|
self.active_filters["medicines"][medicine_key] = taken
|
|
|
|
def set_pathology_range_filter(
|
|
self,
|
|
pathology_key: str,
|
|
min_score: int | None = None,
|
|
max_score: int | None = None,
|
|
) -> None:
|
|
"""
|
|
Filter by pathology score range.
|
|
|
|
Args:
|
|
pathology_key: Pathology identifier
|
|
min_score: Minimum score (inclusive)
|
|
max_score: Maximum score (inclusive)
|
|
"""
|
|
if min_score is not None or max_score is not None:
|
|
if "pathologies" not in self.active_filters:
|
|
self.active_filters["pathologies"] = {}
|
|
|
|
self.active_filters["pathologies"][pathology_key] = {
|
|
"min": min_score,
|
|
"max": max_score,
|
|
}
|
|
|
|
def set_search_term(self, search_term: str) -> None:
|
|
"""
|
|
Set text search term for notes and other text fields.
|
|
|
|
Args:
|
|
search_term: Text to search for
|
|
"""
|
|
self.search_term = search_term.strip()
|
|
|
|
def clear_all_filters(self) -> None:
|
|
"""Clear all active filters and search terms."""
|
|
self.active_filters.clear()
|
|
self.search_term = ""
|
|
|
|
def clear_filter(self, filter_type: str, filter_key: str | None = None) -> None:
|
|
"""
|
|
Clear specific filter.
|
|
|
|
Args:
|
|
filter_type: Type of filter ("date_range", "medicines", "pathologies")
|
|
filter_key: Specific key within filter type (optional)
|
|
"""
|
|
if filter_type in self.active_filters:
|
|
if filter_key and isinstance(self.active_filters[filter_type], dict):
|
|
if filter_key in self.active_filters[filter_type]:
|
|
del self.active_filters[filter_type][filter_key]
|
|
# Remove parent filter if empty
|
|
if not self.active_filters[filter_type]:
|
|
del self.active_filters[filter_type]
|
|
else:
|
|
del self.active_filters[filter_type]
|
|
|
|
def apply_filters(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""
|
|
Apply all active filters to the dataframe.
|
|
|
|
Args:
|
|
df: Input dataframe
|
|
|
|
Returns:
|
|
Filtered dataframe
|
|
"""
|
|
if df.empty:
|
|
return df
|
|
|
|
filtered_df = df.copy()
|
|
|
|
try:
|
|
# Apply date range filter
|
|
filtered_df = self._apply_date_filter(filtered_df)
|
|
|
|
# Apply medicine filters
|
|
filtered_df = self._apply_medicine_filters(filtered_df)
|
|
|
|
# Apply pathology filters
|
|
filtered_df = self._apply_pathology_filters(filtered_df)
|
|
|
|
# Apply text search
|
|
filtered_df = self._apply_text_search(filtered_df)
|
|
|
|
if self.logger:
|
|
original_count = len(df)
|
|
filtered_count = len(filtered_df)
|
|
self.logger.debug(
|
|
f"Applied filters: {original_count} -> {filtered_count} entries"
|
|
)
|
|
|
|
return filtered_df
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.error(f"Error applying filters: {e}")
|
|
return df # Return original data if filtering fails
|
|
|
|
def _apply_date_filter(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Apply date range filter."""
|
|
if "date_range" not in self.active_filters:
|
|
return df
|
|
|
|
date_filter = self.active_filters["date_range"]
|
|
start_date = date_filter.get("start")
|
|
end_date = date_filter.get("end")
|
|
|
|
if not start_date and not end_date:
|
|
return df
|
|
|
|
try:
|
|
# Convert date column to datetime for comparison
|
|
df_dates = pd.to_datetime(df["date"], format="%m/%d/%Y", errors="coerce")
|
|
|
|
mask = pd.Series(True, index=df.index)
|
|
|
|
if start_date:
|
|
start_dt = pd.to_datetime(start_date, format="%m/%d/%Y")
|
|
mask &= df_dates >= start_dt
|
|
|
|
if end_date:
|
|
end_dt = pd.to_datetime(end_date, format="%m/%d/%Y")
|
|
mask &= df_dates <= end_dt
|
|
|
|
return df[mask]
|
|
|
|
except Exception as e:
|
|
if self.logger:
|
|
self.logger.warning(f"Date filter failed: {e}")
|
|
return df
|
|
|
|
def _apply_medicine_filters(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Apply medicine filters."""
|
|
if "medicines" not in self.active_filters:
|
|
return df
|
|
|
|
medicine_filters = self.active_filters["medicines"]
|
|
mask = pd.Series(True, index=df.index)
|
|
|
|
for medicine_key, should_be_taken in medicine_filters.items():
|
|
if medicine_key in df.columns:
|
|
if should_be_taken:
|
|
# Filter for entries where medicine was taken (value > 0)
|
|
mask &= df[medicine_key] > 0
|
|
else:
|
|
# Filter for entries where medicine was not taken (value == 0)
|
|
mask &= df[medicine_key] == 0
|
|
|
|
return df[mask]
|
|
|
|
def _apply_pathology_filters(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Apply pathology score range filters."""
|
|
if "pathologies" not in self.active_filters:
|
|
return df
|
|
|
|
pathology_filters = self.active_filters["pathologies"]
|
|
mask = pd.Series(True, index=df.index)
|
|
|
|
for pathology_key, score_range in pathology_filters.items():
|
|
if pathology_key in df.columns:
|
|
min_score = score_range.get("min")
|
|
max_score = score_range.get("max")
|
|
|
|
if min_score is not None:
|
|
mask &= df[pathology_key] >= min_score
|
|
|
|
if max_score is not None:
|
|
mask &= df[pathology_key] <= max_score
|
|
|
|
return df[mask]
|
|
|
|
def _apply_text_search(self, df: pd.DataFrame) -> pd.DataFrame:
|
|
"""Apply text search to notes and other text fields."""
|
|
if not self.search_term:
|
|
return df
|
|
|
|
# Create regex pattern for case-insensitive search
|
|
try:
|
|
pattern = re.compile(re.escape(self.search_term), re.IGNORECASE)
|
|
except re.error:
|
|
# If regex fails, fall back to simple string search
|
|
pattern = self.search_term.lower()
|
|
|
|
mask = pd.Series(False, index=df.index)
|
|
|
|
# Search in notes column
|
|
if "note" in df.columns:
|
|
if isinstance(pattern, re.Pattern):
|
|
mask |= df["note"].astype(str).str.contains(pattern, na=False)
|
|
else:
|
|
mask |= (
|
|
df["note"].astype(str).str.lower().str.contains(pattern, na=False)
|
|
)
|
|
|
|
# Search in date column
|
|
if "date" in df.columns:
|
|
if isinstance(pattern, re.Pattern):
|
|
mask |= df["date"].astype(str).str.contains(pattern, na=False)
|
|
else:
|
|
mask |= (
|
|
df["date"].astype(str).str.lower().str.contains(pattern, na=False)
|
|
)
|
|
|
|
return df[mask]
|
|
|
|
def get_filter_summary(self) -> dict[str, Any]:
|
|
"""
|
|
Get summary of active filters.
|
|
|
|
Returns:
|
|
Dictionary describing active filters
|
|
"""
|
|
summary = {
|
|
"has_filters": bool(self.active_filters or self.search_term),
|
|
"filter_count": len(self.active_filters),
|
|
"search_term": self.search_term,
|
|
"filters": {},
|
|
}
|
|
|
|
# Date range summary
|
|
if "date_range" in self.active_filters:
|
|
date_range = self.active_filters["date_range"]
|
|
summary["filters"]["date_range"] = {
|
|
"start": date_range.get("start", "Any"),
|
|
"end": date_range.get("end", "Any"),
|
|
}
|
|
|
|
# Medicine filters summary
|
|
if "medicines" in self.active_filters:
|
|
medicine_filters = self.active_filters["medicines"]
|
|
summary["filters"]["medicines"] = {
|
|
"taken": [k for k, v in medicine_filters.items() if v],
|
|
"not_taken": [k for k, v in medicine_filters.items() if not v],
|
|
}
|
|
|
|
# Pathology filters summary
|
|
if "pathologies" in self.active_filters:
|
|
pathology_filters = self.active_filters["pathologies"]
|
|
summary["filters"]["pathologies"] = {}
|
|
for key, range_filter in pathology_filters.items():
|
|
min_val = range_filter.get("min", "Any")
|
|
max_val = range_filter.get("max", "Any")
|
|
summary["filters"]["pathologies"][key] = f"{min_val} - {max_val}"
|
|
|
|
return summary
|
|
|
|
|
|
class QuickFilters:
|
|
"""Predefined quick filters for common use cases."""
|
|
|
|
@staticmethod
|
|
def last_week(data_filter: DataFilter) -> None:
|
|
"""Filter for entries from the last 7 days."""
|
|
from datetime import datetime, timedelta
|
|
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=7)
|
|
|
|
data_filter.set_date_range_filter(
|
|
start_date.strftime("%m/%d/%Y"), end_date.strftime("%m/%d/%Y")
|
|
)
|
|
|
|
@staticmethod
|
|
def last_month(data_filter: DataFilter) -> None:
|
|
"""Filter for entries from the last 30 days."""
|
|
from datetime import datetime, timedelta
|
|
|
|
end_date = datetime.now()
|
|
start_date = end_date - timedelta(days=30)
|
|
|
|
data_filter.set_date_range_filter(
|
|
start_date.strftime("%m/%d/%Y"), end_date.strftime("%m/%d/%Y")
|
|
)
|
|
|
|
@staticmethod
|
|
def this_month(data_filter: DataFilter) -> None:
|
|
"""Filter for entries from the current month."""
|
|
from datetime import datetime
|
|
|
|
now = datetime.now()
|
|
start_date = now.replace(day=1)
|
|
|
|
data_filter.set_date_range_filter(
|
|
start_date.strftime("%m/%d/%Y"), now.strftime("%m/%d/%Y")
|
|
)
|
|
|
|
@staticmethod
|
|
def high_symptoms(data_filter: DataFilter, pathology_keys: list[str]) -> None:
|
|
"""Filter for entries with high symptom scores (7+)."""
|
|
for pathology_key in pathology_keys:
|
|
data_filter.set_pathology_range_filter(pathology_key, min_score=7)
|
|
|
|
@staticmethod
|
|
def low_symptoms(data_filter: DataFilter, pathology_keys: list[str]) -> None:
|
|
"""Filter for entries with low symptom scores (0-3)."""
|
|
for pathology_key in pathology_keys:
|
|
data_filter.set_pathology_range_filter(pathology_key, max_score=3)
|
|
|
|
@staticmethod
|
|
def no_medication(data_filter: DataFilter, medicine_keys: list[str]) -> None:
|
|
"""Filter for entries where no medications were taken."""
|
|
for medicine_key in medicine_keys:
|
|
data_filter.set_medicine_filter(medicine_key, taken=False)
|
|
|
|
|
|
class SearchHistory:
|
|
"""Manages search history for quick access to previous searches."""
|
|
|
|
def __init__(self, max_history: int = 20):
|
|
"""
|
|
Initialize search history.
|
|
|
|
Args:
|
|
max_history: Maximum number of search terms to remember
|
|
"""
|
|
self.max_history = max_history
|
|
self.history: list[str] = []
|
|
|
|
def add_search(self, search_term: str) -> None:
|
|
"""
|
|
Add a search term to history.
|
|
|
|
Args:
|
|
search_term: Search term to add
|
|
"""
|
|
search_term = search_term.strip()
|
|
if not search_term:
|
|
return
|
|
|
|
# Remove if already exists
|
|
if search_term in self.history:
|
|
self.history.remove(search_term)
|
|
|
|
# Add to beginning
|
|
self.history.insert(0, search_term)
|
|
|
|
# Trim to max size
|
|
if len(self.history) > self.max_history:
|
|
self.history = self.history[: self.max_history]
|
|
|
|
def get_history(self) -> list[str]:
|
|
"""Get search history."""
|
|
return self.history.copy()
|
|
|
|
def clear_history(self) -> None:
|
|
"""Clear all search history."""
|
|
self.history.clear()
|
|
|
|
def get_suggestions(self, partial_term: str) -> list[str]:
|
|
"""
|
|
Get search suggestions based on partial input.
|
|
|
|
Args:
|
|
partial_term: Partial search term
|
|
|
|
Returns:
|
|
List of matching suggestions from history
|
|
"""
|
|
if not partial_term:
|
|
return self.history[:5] # Return recent searches
|
|
|
|
partial_lower = partial_term.lower()
|
|
suggestions = []
|
|
|
|
for term in self.history:
|
|
if term.lower().startswith(partial_lower):
|
|
suggestions.append(term)
|
|
|
|
return suggestions[:5] # Return top 5 matches
|