From fe840b622c5d5e11a59871a88db4f394ae00d183 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Murta?= Date: Fri, 29 Jan 2021 22:48:55 +0000 Subject: [PATCH] Adds graph subcommand Moves all graph and matplotlib dependencies to graph.py. Moves Transactions and Categories methods outside the class for better visibility. Variable expenses renamed to required. transactions.py now has by_year, by_month, by_categories in global scope and slightly improved perfomance. --- main.py | 132 ++++++++------------------------------- pfbudget/categories.py | 59 +++++++++-------- pfbudget/graph.py | 72 +++++++++++++++++++++ pfbudget/tools.py | 4 +- pfbudget/transactions.py | 85 +++++++++++++++---------- 5 files changed, 178 insertions(+), 174 deletions(-) create mode 100644 pfbudget/graph.py diff --git a/main.py b/main.py index 110222e..99cef34 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,9 @@ from pathlib import Path import argparse import datetime as dt -import matplotlib.pyplot as plt -import pickle -import sys -from pfbudget.categories import Categories + +from pfbudget.graph import monthly from pfbudget.transactions import load_transactions, save_transactions -from pfbudget.parsers import Parser import pfbudget.tools as tools @@ -21,11 +18,15 @@ class PfBudgetNotInitialized(Exception): pass +class DataFileMissing(Exception): + pass + + def init(state, args): """init function - Creates .pfbudget.pickle which stores the internal state of the program for later use. Parses all raw directory - into data directory. + Creates state file which stores the internal state of the program for later use. + Calls parse, that parses all raw directory into data directory. args.raw -- raw dir args.data -- data dir @@ -50,8 +51,8 @@ def init(state, args): def restart(state, args): """restart function - Deletes .pfbudget.pickle and creates new one. Parses all raw directory into data directory. New dirs can be passed - as arguments, otherwise uses previous values + Deletes state and creates new one. Parses all raw directory into data directory. + New dirs can be passed as arguments, otherwise uses previous values. args.raw -- raw dir args.data -- data dir @@ -61,8 +62,7 @@ def restart(state, args): try: (Path(state.data_dir) / fn).unlink() except FileNotFoundError: - print("missing {}".format(Path(state.data_dir) / fn)) - sys.exit(-1) + raise DataFileMissing("missing {}".format(Path(state.data_dir) / fn)) if args.raw: state.raw_dir = args.raw @@ -91,8 +91,8 @@ def backup(state, args): def parse(state, args): """parse function - Extracts from .pfbudget.pickle the already read files and parses the remaining. args will be None if called from - command line and gathered from the pickle. + Extracts from .pfbudget.pickle the already read files and parses the remaining. + args will be None if called from command line and gathered from the pickle. args.raw -- raw dir args.data -- data dir @@ -141,7 +141,10 @@ def vacation(state, args): def status(state, args): print(state) - sys.exit(0) + + +def graph(state, args): + monthly(state, start=dt.date(2020, 1, 1), end=dt.date(2020, 12, 31)) if __name__ == "__main__": @@ -197,105 +200,20 @@ if __name__ == "__main__": p_backup.set_defaults(func=backup) p_parse.set_defaults(func=parse) p_vacation.set_defaults(func=vacation) - p_report.set_defaults(func=categorize) p_status.set_defaults(func=status) + p_graph.set_defaults(func=graph) state = tools.pfstate(p) state.filename = p args = parser.parse_args() args.func(state, args) - transactions = load_transactions(state.data_dir) + # income = [sum(t.value for cat, transactions in months.items() for t in transactions + # if cat in get_income_categories()) for months in monthly_transactions_by_cat] - # reprocess = [Education().name] - # for i, transaction in enumerate(transactions): - # for category in Categories.get_categories(): - # if transaction.category in reprocess: - # transaction.category = '' + # expenses = [] + # for category in expense_categories: + # expense_value = [-sum(t.value for t in month[category]) for month in monthly_transactions_by_cat] + # expenses.extend(expense_value) - monthly_transactions = transactions.get_transactions_by_month( - start=dt.date(2020, 1, 1), end=dt.date(2020, 12, 31) - ) - monthly_transactions_by_cat = [] - for month_transactions in monthly_transactions.values(): - cat = month_transactions.get_transactions_by_category() - monthly_transactions_by_cat.append(cat) - - for month, month_transactions in zip( - monthly_transactions.keys(), monthly_transactions_by_cat - ): - nulls = sum(t.value for t in month_transactions["Null"]) - if nulls != 0: - print(f"{month} {nulls}") - - expense_categories = [ - *Categories.get_fixed_expenses(), - *Categories.get_variable_expenses(), - *Categories.get_discretionary_expenses(), - ] - - if True: - t = list(monthly_transactions.keys()) - income = [ - float( - sum( - t.value - for cat, transactions in months.items() - for t in transactions - if cat in Categories.get_income_categories() - ) - ) - for months in monthly_transactions_by_cat - ] - # income = [] - # for months in monthly_transactions_by_cat: - # for cat, transactions in months.items(): - # if cat in Categories.get_income_categories(): - # income.append(sum(transactions)) - - expenses = [] - for category in expense_categories: - expense_value = [ - -float(sum(t.value for t in month[category])) - for month in monthly_transactions_by_cat - ] - expenses.append(expense_value) - # expenses = [transactions for months in monthly_transactions_by_cat for cat, transactions in months.items() - # if cat not in Categories.get_income_categories() and transactions] - for expense in expenses: - for i, month in reversed(list(enumerate(t))): - if expense[i] < 0: - if i - 1 < 0: - break - else: - expense[i - 1] += expense[i] - expense[i] = 0 - - plt.plot(t, income, label="Income") - plt.stackplot(t, expenses, labels=expense_categories) - plt.legend(bbox_to_anchor=(1, 1), loc="upper left") - plt.show() - - income = [ - sum( - t.value - for cat, transactions in months.items() - for t in transactions - if cat in Categories.get_income_categories() - ) - for months in monthly_transactions_by_cat - ] - - expenses = [] - for category in expense_categories: - expense_value = [ - -sum(t.value for t in month[category]) - for month in monthly_transactions_by_cat - ] - expenses.extend(expense_value) - - print( - "Income: {}, Expenses: {}, Net = {}".format( - sum(income), sum(expenses), sum(income) - sum(expenses) - ) - ) + # print("Income: {}, Expenses: {}, Net = {}"".format(sum(income), sum(expenses), sum(income) - sum(expenses))) diff --git a/pfbudget/categories.py b/pfbudget/categories.py index 8232c23..05acb18 100644 --- a/pfbudget/categories.py +++ b/pfbudget/categories.py @@ -38,39 +38,36 @@ class Categories: def get_categories(cls): return cls.__subclasses__() - @classmethod - def get_categories_names(cls): - return [cat.name for cat in cls.get_categories()] - @classmethod - def get_income_categories(cls): - return [cat.name for cat in cls.get_categories() if "Income" in cat.name] +def get_categories(): + return [cat.name for cat in Categories.get_categories()] - @classmethod - def get_fixed_expenses(cls): - return [ - Utilities.name, - Commute.name, - ] - - @classmethod - def get_variable_expenses(cls): - return [Groceries.name] - - @classmethod - def get_discretionary_expenses(cls): - return [ - cat.name - for cat in cls.get_categories() - if cat.name - not in [ - *cls.get_income_categories(), - *cls.get_fixed_expenses(), - *cls.get_variable_expenses(), - Investment.name, - Null.name, - ] + +def get_income_categories(): + return [cat for cat in get_categories() if "Income" in cat] + + +def get_fixed_expenses(): + return [Utilities.name, Commute.name] + + +def get_required_expenses(): + return [Groceries.name] + + +def get_discretionary_expenses(): + return [ + cat + for cat in get_categories() + if cat + not in [ + *get_income_categories(), + *get_fixed_expenses(), + *get_required_expenses(), + Investment.name, + Null.name, ] + ] class Income1(Categories): @@ -193,7 +190,7 @@ class Travel(Categories): name = "Travel" regex = [c("ryanair"), c("easyjet"), c("airbnb")] not_in_travel = [ - *Categories.get_income_categories(), + *get_income_categories(), Utilities.name, ] diff --git a/pfbudget/graph.py b/pfbudget/graph.py new file mode 100644 index 0000000..311ae40 --- /dev/null +++ b/pfbudget/graph.py @@ -0,0 +1,72 @@ +from dateutil.rrule import MONTHLY, YEARLY +import matplotlib.pyplot as plt + +from .categories import ( + get_income_categories, + get_fixed_expenses, + get_required_expenses, + get_discretionary_expenses, +) +from .transactions import load_transactions, daterange, by_category, by_month + + +def monthly(state, start, end): + transactions = load_transactions(state.data_dir) + + monthly_transactions = by_month(transactions, start, end) + monthly_transactions_by_categories = {} + income, fixed, required, discretionary = [], [], [], [] + + for month, transactions in monthly_transactions.items(): + monthly_transactions_by_categories[month] = by_category(transactions) + income.append( + sum( + float(t.value) + for category, transactions in monthly_transactions_by_categories[ + month + ].items() + if transactions and category in get_income_categories() + for t in transactions + ) + ) + fixed.append( + sum( + -float(t.value) + for category, transactions in monthly_transactions_by_categories[ + month + ].items() + if transactions and category in get_fixed_expenses() + for t in transactions + ) + ) + required.append( + sum( + -float(t.value) + for category, transactions in monthly_transactions_by_categories[ + month + ].items() + if transactions and category in get_required_expenses() + for t in transactions + ) + ) + discretionary.append( + sum( + -float(t.value) + for category, transactions in monthly_transactions_by_categories[ + month + ].items() + if transactions and category in get_discretionary_expenses() + for t in transactions + ) + ) + + plt.plot(daterange(start, end, "month"), income, label="Income") + plt.stackplot( + daterange(start, end, "month"), + fixed, + required, + discretionary, + labels=["Fixed", "Required", "Discretionary"], + ) + plt.legend(loc="upper left") + plt.show() diff --git a/pfbudget/tools.py b/pfbudget/tools.py index 3058cc4..bdedfeb 100644 --- a/pfbudget/tools.py +++ b/pfbudget/tools.py @@ -4,7 +4,7 @@ import datetime as dt import pickle import shutil -from .categories import Categories, Null, Travel +from .categories import Categories, Null, Travel, get_categories from .transactions import ( Transaction, load_transactions, @@ -255,7 +255,7 @@ def manual_categorization(state: PFState, transactions: list): category = input(f"{transaction.desc()} category: ") if category == "quit": return - if category not in Categories.get_categories_names(): + if category not in get_categories(): print("category doesn't exist") continue else: diff --git a/pfbudget/transactions.py b/pfbudget/transactions.py index 35c8aeb..7e28a4b 100644 --- a/pfbudget/transactions.py +++ b/pfbudget/transactions.py @@ -1,9 +1,10 @@ from csv import reader, writer from datetime import date +from dateutil.rrule import rrule, MONTHLY, YEARLY from decimal import Decimal, InvalidOperation from pathlib import Path -from .categories import Categories +from .categories import get_categories COMMENT_TOKEN = "#" @@ -124,42 +125,58 @@ class Transactions(list): return years - def get_transactions_by_month(self, start=None, end=None): - if not start: - start = self[0].date - if not end: - end = self[-1].date - months = dict() - for year, year_transactions in self.get_transactions_by_year( - start, end - ).items(): - for month in range(1, 13): - key = "_".join([str(year), str(month)]) - months[key] = Transactions( - t for t in year_transactions if t.date.month == month - ) +def daterange(start, end, period): + if period == "year": + r = [d.strftime("%Y") for d in rrule(YEARLY, dtstart=start, until=end)] + elif period == "month": + r = [d.strftime("%b %Y") for d in rrule(MONTHLY, dtstart=start, until=end)] + else: + raise TransactionError("wrong time period") + return r - # trims last unused months - trim = 1 - for transactions in reversed(months.values()): - if transactions: - break - else: - trim += 1 - while trim := trim - 1: - months.popitem() - return months +def by_year(transactions, start=None, end=None) -> dict: + start = start if start else transactions[0].date + end = end if end else transactions[-1].date - def get_transactions_by_category(self): - categories = {cat: [] for cat in Categories.get_categories_names()} - for transaction in self: - try: - categories[transaction.category].append(transaction) - except AttributeError: - categories[transaction.category] = [transaction] - return categories + yearly_transactions = dict.fromkeys(daterange(start, end, "year"), None) + for t in [t for t in transactions if t.date >= start and t.date <= end]: + try: + yearly_transactions[t.date.strftime("%Y")].append(t) + except AttributeError: + yearly_transactions[t.date.strftime("%Y")] = [t] + except KeyError: + raise TransactionError("date invalid") + + return yearly_transactions + + +def by_month(transactions, start=None, end=None) -> dict: + start = start if start else transactions[0].date + end = end if end else transactions[-1].date + + monthly_transactions = dict.fromkeys(daterange(start, end, "month"), None) + for t in [t for t in transactions if t.date >= start and t.date <= end]: + try: + monthly_transactions[t.date.strftime("%b %Y")].append(t) + except AttributeError: + monthly_transactions[t.date.strftime("%b %Y")] = [t] + except KeyError: + raise TransactionError("date invalid") + + return monthly_transactions + + +def by_category(transactions) -> dict: + transactions_by_category = dict.fromkeys(get_categories(), None) + for transaction in transactions: + try: + transactions_by_category[transaction.category].append(transaction) + except AttributeError: + transactions_by_category[transaction.category] = [transaction] + + return transactions_by_category def load_transactions(data_dir) -> Transactions: @@ -184,7 +201,7 @@ def save_transactions(data_dir, transactions): write_transactions(f, trs) -def read_transactions(filename, encoding="utf-8"): +def read_transactions(filename, encoding="utf-8") -> list: try: with open(filename, newline="", encoding=encoding) as f: r = reader(f, delimiter="\t")