diff --git a/pfbudget/database.py b/pfbudget/database.py index f5d18ca..844d358 100644 --- a/pfbudget/database.py +++ b/pfbudget/database.py @@ -63,13 +63,14 @@ HAVING COUNT(*) > 1 SORTED_TRANSACTIONS = """ SELECT * FROM transactions -ORDER BY (?) +ORDER BY date ASC """ SELECT_TRANSACTIONS_BETWEEN_DATES = """ SELECT * FROM transactions WHERE date BETWEEN (?) AND (?) +ORDER BY date ASC """ SELECT_TRANSACTIONS_BY_CATEGORY = """ @@ -191,9 +192,9 @@ class DBManager: return [Transaction(t) for t in transactions] return None - def get_sorted_transactions(self, key: str) -> list[Transaction] | None: - logger.info(f"Get transactions sorted by {key}") - transactions = self.__execute(SORTED_TRANSACTIONS, key) + def get_sorted_transactions(self) -> list[Transaction] | None: + logger.info("Get transactions sorted by date") + transactions = self.__execute(SORTED_TRANSACTIONS) if transactions: return [Transaction(t) for t in transactions] return None diff --git a/pfbudget/graph.py b/pfbudget/graph.py index caab394..1ac2df9 100644 --- a/pfbudget/graph.py +++ b/pfbudget/graph.py @@ -1,242 +1,98 @@ +from __future__ import annotations +from calendar import monthrange +from dateutil.rrule import rrule, MONTHLY +from typing import TYPE_CHECKING +import datetime as dt import matplotlib.pyplot as plt -from .categories import ( - get_income_categories, - get_fixed_expenses, - get_required_expenses, - get_health_expenses, - get_discretionary_expenses, -) -from .transactions import load_transactions, daterange, by_month_and_category +import pfbudget.categories as categories + +if TYPE_CHECKING: + from pfbudget.database import DBManager -def monthly(state, start, end): - transactions = load_transactions(state.data_dir) - if not start: - start = transactions[0].date - if not end: - end = transactions[-1].date - - income, fixed, required, health, discretionary = [], [], [], [], [] - monthly_transactions_by_categories = by_month_and_category(transactions, start, end) - - for _, transactions_by_category in monthly_transactions_by_categories.items(): - income.append( - sum( - float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_income_categories() - for t in transactions - ) +def monthly(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date.max): + transactions = db.get_daterange(start, end) + start, end = transactions[0].date, transactions[-1].date + monthly_transactions = tuple( + ( + month, + { + group: sum( + transaction.value + for transaction in transactions + if transaction.category in categories + and month + <= transaction.date + <= month + + dt.timedelta(days=monthrange(month.year, month.month)[1] - 1) + ) + for group, categories in categories.groups.items() + }, ) - fixed.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_fixed_expenses() - for t in transactions + for month in [ + month.date() + for month in rrule( + MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1) ) - ) - required.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_required_expenses() - for t in transactions - ) - ) - health.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_health_expenses() - for t in transactions - ) - ) - discretionary.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_discretionary_expenses() - for t in transactions - ) - ) - - plt.figure(figsize=(30, 10)) - plt.plot(daterange(start, end, "month"), income, label="Income") - plt.stackplot( - daterange(start, end, "month"), - fixed, - required, - health, - discretionary, - labels=["Fixed", "Required", "Health", "Discretionary"], + ] ) - plt.legend(loc="upper left") - plt.tight_layout() - plt.savefig("graph.png") - - -def discrete(state, start, end): - transactions = load_transactions(state.data_dir) - if not start: - start = transactions[0].date - if not end: - end = transactions[-1].date - - income, fixed, required, health, discretionary = [], [], [], [], [] - monthly_transactions_by_categories = by_month_and_category(transactions, start, end) - - for _, transactions_by_category in monthly_transactions_by_categories.items(): - income.append( - sum( - float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_income_categories() - for t in transactions - ) - ) - fixed.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_fixed_expenses() - for t in transactions - ) - ) - required.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_required_expenses() - for t in transactions - ) - ) - health.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_health_expenses() - for t in transactions - ) - ) - d = [] - for category, transactions in transactions_by_category.items(): - if category in get_discretionary_expenses(): - try: - d.append(sum(-float(t.value) for t in transactions)) - except TypeError: - d.append(0) - - discretionary.append(d) - - # transposing discretionary - discretionary = list(map(list, zip(*discretionary))) plt.figure(figsize=(30, 10)) - plt.plot(daterange(start, end, "month"), income, label="Income") - plt.stackplot( - daterange(start, end, "month"), - fixed, - required, - health, - *discretionary, - labels=["Fixed", "Required", "Health", *get_discretionary_expenses()], + plt.plot( + list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))), + [groups["income"] for _, groups in monthly_transactions], ) - plt.legend(loc="upper left") - plt.grid() - plt.tight_layout() - plt.savefig("graph.png") - - -def average(state, start, end): - transactions = load_transactions(state.data_dir) - - income, fixed, required, health, discretionary = [], [], [], [], [] - monthly_transactions_by_categories = by_month_and_category(transactions, start, end) - - for _, transactions_by_category in monthly_transactions_by_categories.items(): - income.append( - sum( - float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_income_categories() - for t in transactions - ) - ) - fixed.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_fixed_expenses() - for t in transactions - ) - ) - required.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_required_expenses() - for t in transactions - ) - ) - health.append( - sum( - -float(t.value) - for category, transactions in transactions_by_category.items() - if transactions and category in get_health_expenses() - for t in transactions - ) - ) - d = [] - for category, transactions in transactions_by_category.items(): - if category in get_discretionary_expenses(): - try: - d.append(sum(-float(t.value) for t in transactions)) - except TypeError: - d.append(0) - - discretionary.append(d) - - # transposing discretionary - discretionary = list(map(list, zip(*discretionary))) - - print(discretionary) - - n = len(daterange(start, end, "month")) - - avg_income = sum(income) / n - - l_avg_income = [avg_income] * n - - avg_fixed = [sum(fixed) / n] * n - avg_required = [sum(required) / n] * n - avg_health = [sum(health) / n] * n - avg_discretionary = [[sum(d) / n] * n for d in discretionary] - - print(avg_discretionary) - - plt.figure(figsize=(30, 10)) - plt.plot(daterange(start, end, "month"), l_avg_income, label=f"Income {avg_income}") plt.stackplot( - daterange(start, end, "month"), - avg_fixed, - avg_required, - avg_health, - *avg_discretionary, - labels=[ - f"Fixed {avg_fixed[0]/avg_income * 100}%", - f"Required {avg_required[0]/avg_income * 100}%", - f"Health {avg_health[0]/avg_income * 100}%", - *[ - f"{e} {avg_discretionary[i][0]/avg_income * 100}%" - for i, e in enumerate(get_discretionary_expenses()) - ], + list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))), + [ + [-groups[group] for _, groups in monthly_transactions] + for group in categories.groups.keys() + if group != "income" ], + labels=[group for group in categories.groups.keys() if group != "income"], ) - plt.legend(bbox_to_anchor=(1, 1), loc="upper left") + plt.legend(loc="upper left") plt.tight_layout() - plt.savefig( - "graph.png", - dpi=600, + plt.savefig("graph.png") + + +def discrete(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date.max): + transactions = db.get_daterange(start, end) + start, end = transactions[0].date, transactions[-1].date + monthly_transactions = tuple( + ( + month, + { + category: sum( + transaction.value + for transaction in transactions + if transaction.category == category + and month + <= transaction.date + <= month + + dt.timedelta(days=monthrange(month.year, month.month)[1] - 1) + ) + for category in categories.categories.keys() + }, + ) + for month in [ + month.date() + for month in rrule( + MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1) + ) + ] ) + + plt.figure(figsize=(30, 10)) + plt.stackplot( + list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))), + [ + [-categories[category] for _, categories in monthly_transactions] + for category in categories.categories.keys() + ], + labels=[category for category in categories.categories.keys()], + ) + plt.legend(loc="upper left") + plt.tight_layout() + plt.savefig("graph.png") diff --git a/pfbudget/runnable.py b/pfbudget/runnable.py index 3abb975..9ce7256 100644 --- a/pfbudget/runnable.py +++ b/pfbudget/runnable.py @@ -67,10 +67,10 @@ def argparser() -> argparse.ArgumentParser: func=lambda args: categorize_data(DBManager(args.database)) ) - p_graph = subparsers.add_parser("graph", help="graph help") - p_report = subparsers.add_parser("report", help="report help") - p_status = subparsers.add_parser("status", help="status help") - + """ + Graph + """ + p_graph = subparsers.add_parser("graph", parents=[help]) p_graph.add_argument( "option", type=str, @@ -86,9 +86,12 @@ def argparser() -> argparse.ArgumentParser: p_graph_interval.add_argument("--start", type=str, nargs=1, help="graph start date") p_graph_interval.add_argument("--end", type=str, nargs=1, help="graph end date") p_graph_interval.add_argument("--year", type=str, nargs=1, help="graph year") + p_graph.set_defaults(func=graph) + + p_report = subparsers.add_parser("report", help="report help") + p_status = subparsers.add_parser("status", help="status help") p_status.set_defaults(func=status) - p_graph.set_defaults(func=graph) p_report.set_defaults(func=f_report) return parser @@ -123,16 +126,14 @@ def status(state, args): print(state) -def graph(state, args): - """Graph - - Plots the transactions over a period of time. +def graph(args): + """Plots the transactions over a period of time. Args: state (PFState): Internal state of the program args (dict): argparse variables """ - start, end = None, None + start, end = dt.date.min, dt.date.max if args.start or args.interval: start = dt.datetime.strptime(args.start[0], "%Y/%m/%d").date() @@ -150,9 +151,9 @@ def graph(state, args): ).date() - dt.timedelta(days=1) if args.option == "monthly": - monthly(state, start, end) + monthly(DBManager(args.database), start, end) elif args.option == "discrete": - discrete(state, start, end) + discrete(DBManager(args.database), start, end) def f_report(state, args):