diff --git a/.gitignore b/.gitignore index 69cfeb3..2f891c4 100644 --- a/.gitignore +++ b/.gitignore @@ -146,5 +146,4 @@ dmypy.json ### Default user directories data/ raw/ -*.pickle -transactions.csv +.pfbudget diff --git a/initializer.py b/initializer.py deleted file mode 100644 index 2879c99..0000000 --- a/initializer.py +++ /dev/null @@ -1,100 +0,0 @@ -from datetime import date -from pathlib import Path -import logging -import matplotlib.pyplot as plt -import pickle -import sys - -from pfbudget.categories import Categories -from pfbudget.transactions import Transaction as Tr, TransactionError, Transactions -from pfbudget.parsers import Parser - - -def get_transactions(data_dir): - dfs = dict() - for df in Path(data_dir).iterdir(): - try: - trs = Tr.read_transactions(df) - except TransactionError as e: - print(f"{e} -> datafile {df}") - sys.exit(-2) - dfs[df.name] = trs - - return dfs - - -def initialize(raw_dir, data_dir, restart=False): - dfs = get_transactions(data_dir) - if restart: - rfs = dict() - logging.debug("rewriting both .raw and .transactions pickles") - else: - try: - rfs = pickle.load(open(".raw.pickle", "rb")) - assert ( - type(rfs) is dict - ), ".raw.pickle isn't a dictionary, so it could have been corrupted" - logging.debug(".raw.pickle opened") - except FileNotFoundError: - rfs = dict() - logging.debug("no .raw.pickle found") - - updated_trs, update = dict(), False - prompt = " has been modified since last update. Do you want to update the data files? (Yes/Update/No)" - for rf in Path(raw_dir).iterdir(): - if rf.name in rfs and rfs[rf.name][0] == rf.stat().st_mtime: - logging.debug(f"{rf.name} hasn't been modified since last access") - elif ( - rf.name not in rfs - or (answer := input(f"{rf.name}" + prompt).lower()) == "yes" - ): - trs = Parser.parse_csv(rf) - updated_trs[rf.name] = trs - try: - rfs[rf.name][0] = rf.stat().st_mtime - except KeyError: - rfs[rf.name] = [rf.stat().st_mtime, []] - update = True - logging.info(f"{rf.name} parsed") - elif answer == "update": - rfs[rf.name][0] = rf.stat().st_mtime - update = True - else: # prompt = no - update = True - - if update: - for rf_name, updated_trs in updated_trs.items(): - filename_set = set( - (t.date.year, f"{t.date.year}_{t.bank}.csv") for t in updated_trs - ) - for year, filename in filename_set: - trs = [t for t in updated_trs if t.date.year == year] - if filename in dfs.keys(): - new_trs = [tr for tr in trs if tr not in rfs[rf_name][1]] - rem_trs = [tr for tr in rfs[rf_name][1] if tr not in trs] - - if new_trs: - dfs[filename].extend(new_trs) - dfs[filename].sort() - - for rem in rem_trs: - dfs[filename].remove(rem) - - else: - dfs[filename] = trs - - Tr.write_transactions(Path(data_dir) / filename, dfs[filename]) - rfs[rf_name][1] = updated_trs - logging.debug(f"{filename} written") - - pickle.dump(rfs, open(".raw.pickle", "wb")) - logging.debug(".raw.pickle written to disk") - - if restart: - for df in Path(data_dir).iterdir(): - if df.name not in dfs: - dfs[df.name] = Tr.read_transactions(df) - for t in dfs[df.name]: - t.category = "" - - return dfs diff --git a/main.py b/main.py index be46520..110222e 100644 --- a/main.py +++ b/main.py @@ -1,16 +1,16 @@ -from datetime import date from pathlib import Path import argparse +import datetime as dt import matplotlib.pyplot as plt import pickle - -from initializer import initialize +import sys from pfbudget.categories import Categories -from pfbudget.transactions import Transaction as Tr, TransactionError, Transactions +from pfbudget.transactions import load_transactions, save_transactions from pfbudget.parsers import Parser +import pfbudget.tools as tools -p = ".pfbudget.pickle" +p = ".pfbudget/state" class PfBudgetInitialized(Exception): @@ -21,21 +21,7 @@ class PfBudgetNotInitialized(Exception): pass -def manual_categorization(trs): - trs.sort_by_bank() - for i, transaction in enumerate(trs): - if not transaction.category: - category = input(f"{transaction.desc()} category: ") - if category == "stop": - break - if category: - transaction.category = category - trs[i] = transaction - - trs.sort() - - -def init(args): +def init(state, args): """init function Creates .pfbudget.pickle which stores the internal state of the program for later use. Parses all raw directory @@ -44,16 +30,24 @@ def init(args): args.raw -- raw dir args.data -- data dir """ - if not Path(p).is_file(): - s = {"filename": p, "raw_dir": args.raw, "data_dir": args.data, "data": []} - with open(p, "wb") as f: - pickle.dump(s, f) - parse(args) + if not state: + s = dict( + filename=p, + raw_dir=args.raw, + data_dir=args.data, + raw_files=[], + data_files=[], + vacations=[], + last_backup="", + last_datadir_backup="", + ) + state = tools.pfstate(p, s) + parse(state, args) else: raise PfBudgetInitialized() -def restart(args): +def restart(state, args): """restart function Deletes .pfbudget.pickle and creates new one. Parses all raw directory into data directory. New dirs can be passed @@ -62,19 +56,39 @@ def restart(args): args.raw -- raw dir args.data -- data dir """ - if Path(p).is_file(): - s = pickle.load(open(p, "rb")) - raw_dir = s["raw_dir"] if not args.raw else args.raw - data_dir = s["data_dir"] if not args.data else args.data + if state is not None: + for fn in state.data_files: + try: + (Path(state.data_dir) / fn).unlink() + except FileNotFoundError: + print("missing {}".format(Path(state.data_dir) / fn)) + sys.exit(-1) - s = {"filename": p, "raw_dir": raw_dir, "data_dir": data_dir, "data": []} - pickle.dump(s, open(p, "wb")) - parse(args) + if args.raw: + state.raw_dir = args.raw + if args.data: + state.data_dir = args.data + state.raw_files = [] + state.data_files = [] + parse(state, args) else: raise PfBudgetNotInitialized() -def parse(args): +def backup(state, args): + """backup function + + Saves all transactions on transactions_#.csv + """ + if args.option == "single": + tools.backup(state) + elif args.option == "all": + tools.full_backup(state) + elif args.option == "restore": + tools.restore(state) + + +def parse(state, args): """parse function Extracts from .pfbudget.pickle the already read files and parses the remaining. args will be None if called from @@ -83,27 +97,71 @@ def parse(args): args.raw -- raw dir args.data -- data dir """ - if not args: - s = pickle.load(open(p, "rb")) - raw_dir = s["raw_dir"] - data_dir = s["data_dir"] - else: - raw_dir = args.raw - data_dir = args.data + raw_dir = args.raw if hasattr(args, "raw") else None + data_dir = args.data if hasattr(args, "data") else None - pass + tools.parser(state, raw_dir, data_dir) + categorize(state, args) + + +def categorize(state, args): + """categorize function + + Automatically categorizes transactions based on the regex of each Category + """ + transactions = load_transactions(state.data_dir) + missing = tools.auto_categorization(state, transactions) + if missing: + tools.manual_categorization(state, transactions) + save_transactions(state.data_dir, transactions) + + +def vacation(state, args): + """vacation function + + Adds vacations to the pfstate + date(2019, 12, 23), date(2020, 1, 2) + date(2020, 7, 1), date(2020, 7, 30) + """ + print(args) + if args.option == "list": + print(state.vacations) + elif args.option == "remove": + vacations = state.vacations + del state.vacations[args.pos[0]] + state.vacations = vacations + elif args.option == "add": + start = dt.datetime.strptime(args.start[0], "%Y/%m/%d").date() + end = dt.datetime.strptime(args.end[0], "%Y/%m/%d").date() + + vacations = state.vacations + vacations.append((start, end)) + state.vacations = vacations + + +def status(state, args): + print(state) + sys.exit(0) if __name__ == "__main__": parser = argparse.ArgumentParser(description="does cool finance stuff") parser.add_argument("-q", "--quiet", help="quiet") - subparsers = parser.add_subparsers(help="sub-command help") + + subparsers = parser.add_subparsers( + dest="task", required=True, help="sub-command help" + ) p_init = subparsers.add_parser("init", help="init help") p_restart = subparsers.add_parser("restart", help="restart help") + p_backup = subparsers.add_parser("backup", help="backup help") p_parse = subparsers.add_parser("parse", help="parse help") + p_vacation = subparsers.add_parser( + "vacation", help="vacation help format: [YYYY/MM/DD]" + ) p_graph = subparsers.add_parser("graph", help="graph help") p_report = subparsers.add_parser("report", help="report help") + p_status = subparsers.add_parser("status", help="status help") p_init.add_argument("raw", help="the raw data dir") p_init.add_argument("data", help="the parsed data dir") @@ -113,17 +171,41 @@ if __name__ == "__main__": p_restart.add_argument("--data", help="new parsed data dir") p_restart.set_defaults(func=restart) + p_backup.add_argument( + "option", + type=str, + choices=["single", "all", "restore"], + nargs="?", + default="single", + help="backup option help", + ) + + subparser_vacation = p_vacation.add_subparsers( + dest="option", required=True, help="vacation suboption help" + ) + p_vacation_add = subparser_vacation.add_parser("add", help="add help") + p_vacation_add.add_argument( + "start", type=str, nargs=1, help="new vacation start date" + ) + p_vacation_add.add_argument("end", type=str, nargs=1, help="new vacation end date") + p_vacation_list = subparser_vacation.add_parser("list", help="list help") + p_vacation_remove = subparser_vacation.add_parser("remove", help="remove help") + p_vacation_remove.add_argument( + "pos", help="position of vacation to remove", type=int, nargs=1 + ) + + p_backup.set_defaults(func=backup) p_parse.set_defaults(func=parse) + p_vacation.set_defaults(func=vacation) + p_report.set_defaults(func=categorize) + p_status.set_defaults(func=status) + state = tools.pfstate(p) + state.filename = p args = parser.parse_args() - args.func(args) + args.func(state, args) - datafiles = initialize("raw", "data", restart=False) - - transactions = Transactions() - for file in datafiles.values(): - transactions.extend(file) - transactions.sort() + transactions = load_transactions(state.data_dir) # reprocess = [Education().name] # for i, transaction in enumerate(transactions): @@ -131,18 +213,8 @@ if __name__ == "__main__": # if transaction.category in reprocess: # transaction.category = '' - if False: - Categories.categorize(transactions) - manual_categorization(transactions) - - for f, file in datafiles.items(): - file_transactions = [t for t in transactions if t in file] - Tr.write_transactions(Path("data") / f, file_transactions) - - Tr.write_transactions("transactions.csv", transactions) - monthly_transactions = transactions.get_transactions_by_month( - start=date(2019, 1, 1), end=date(2020, 11, 30) + start=dt.date(2020, 1, 1), end=dt.date(2020, 12, 31) ) monthly_transactions_by_cat = [] for month_transactions in monthly_transactions.values(): diff --git a/pfbudget/categories.py b/pfbudget/categories.py index c5f3000..8232c23 100644 --- a/pfbudget/categories.py +++ b/pfbudget/categories.py @@ -34,48 +34,6 @@ class Categories: else: return any(pattern.search(t.description.lower()) for pattern in self.regex) - @classmethod - def categorize(cls, transactions): - null_matches = Null().search_all(transactions) - travel_matches = Travel().search_all( - transactions, date(2019, 12, 23), date(2020, 1, 2) - ) - travel_matches.extend( - Travel().search_all(transactions, date(2020, 7, 1), date(2020, 7, 30)) - ) - - for i, transaction in enumerate(transactions): - for category in [cat() for cat in cls.get_categories()]: - if category.search(transaction): - if not transaction.category: - transaction.category = category.name - transactions[i] = transaction - elif ( - transaction.category != category.name - and transaction.category != Travel().name - ): - new_category = input( - f"{transaction.desc()} already has a {transaction.category} assigned. Would you like " - f"to change it to {category.name}? (Y/N) " - ) - correct_answer = False - while not correct_answer: - if new_category.lower() == "y": - transaction.category = category.name - transactions[i] = transaction - correct_answer = True - elif new_category.lower() == "n": - correct_answer = True - else: - new_category = input("? ") - - if transaction in travel_matches and transaction.category not in [ - *cls.get_income_categories(), - ]: - transaction.category = Travel().name - if transaction in null_matches: - transaction.category = Null().name - @classmethod def get_categories(cls): return cls.__subclasses__() @@ -234,6 +192,10 @@ class Pets(Categories): class Travel(Categories): name = "Travel" regex = [c("ryanair"), c("easyjet"), c("airbnb")] + not_in_travel = [ + *Categories.get_income_categories(), + Utilities.name, + ] @staticmethod def search_all(transactions, start, end): diff --git a/pfbudget/parsers.py b/pfbudget/parsers.py index 3f9e66c..1f4665b 100644 --- a/pfbudget/parsers.py +++ b/pfbudget/parsers.py @@ -5,34 +5,34 @@ from pathlib import Path from .transactions import Transaction +def parse_data(file: Path, append=False): + name = file.stem.split("_") + try: + bank, _ = name[0], int(name[1]) + except ValueError: + _, bank = int(name[0]), name[1] + + p = dict( + Bank1=Bank1, + Bank2=Bank2, + Bank2CC=Bank2CC, + BANK3=Bank3, + ) + + try: + parser = p[bank]() + except KeyError as e: + print(f"{e} {bank} parser doesnt exist. Cant parse {name}") + return + + transactions = parser.parse(file) + return transactions + + class Parser: def parse(self, file): pass - @staticmethod - def parse_csv(file: Path, append=False): - name = file.stem.split("_") - try: - bank, _ = name[0], int(name[1]) - except ValueError: - _, bank = int(name[0]), name[1] - - p = dict( - Bank1=Bank1, - Bank2=Bank2, - Bank2CC=Bank2CC, - BANK3=Bank3, - ) - - try: - parser = p[bank]() - except KeyError as e: - print(f"{e} {bank} parser doesnt exist. Cant parse {name}") - return - - transactions = parser.parse(file) - return transactions - class Bank1(Parser): """Bank 1 parser diff --git a/pfbudget/tools.py b/pfbudget/tools.py new file mode 100644 index 0000000..3058cc4 --- /dev/null +++ b/pfbudget/tools.py @@ -0,0 +1,262 @@ +from pathlib import Path +import csv +import datetime as dt +import pickle +import shutil + +from .categories import Categories, Null, Travel +from .transactions import ( + Transaction, + load_transactions, + read_transactions, + write_transactions, +) +from .parsers import parse_data + + +def get_filename(t: Transaction): + return "{}_{}.csv".format(t.year, t.bank) + + +class PFState: + def __init__(self, filename, *args, **kwargs): + if Path(filename).is_file(): + raise FileExistsError("PFState already exists") + + self.filename = filename + for d in args: + for k in d: + setattr(self, k, d[k]) + for k in kwargs: + setattr(self, k, kwargs[k]) + + self._save() + + @property + def filename(self): + return self._filename + + @filename.setter + def filename(self, v): + if not isinstance(v, str): + raise TypeError("Expected string") + self._filename = v + self._save() + + @property + def raw_dir(self): + return self._raw_dir + + @raw_dir.setter + def raw_dir(self, v): + if not isinstance(v, str): + raise TypeError("Expected string") + self._raw_dir = v + self._save() + + @property + def data_dir(self): + return self._data_dir + + @data_dir.setter + def data_dir(self, v): + if not isinstance(v, str): + raise TypeError("Expected string") + self._data_dir = v + self._save() + + @property + def raw_files(self): + return self._raw_files + + @raw_files.setter + def raw_files(self, v): + if not isinstance(v, list): + raise TypeError("Expected list") + self._raw_files = v + self._save() + + @property + def data_files(self): + return self._data_files + + @data_files.setter + def data_files(self, v): + if not isinstance(v, list): + raise TypeError("Expected list") + self._data_files = v + self._save() + + @property + def vacations(self): + return self._vacations + + @vacations.setter + def vacations(self, v): + if not isinstance(v, list): + raise TypeError("Expected list") + self._vacations = v + self._save() + + @property + def last_backup(self): + return self._last_backup + + @last_backup.setter + def last_backup(self, v): + if not isinstance(v, str): + raise TypeError("Expected string") + self._last_backup = v + self._save() + + @property + def last_datadir_backup(self): + return self._last_datadir_backup + + @last_datadir_backup.setter + def last_datadir_backup(self, v): + if not isinstance(v, str): + raise TypeError("Expected string") + self._last_datadir_backup = v + self._save() + + def _save(self): + pickle.dump(self, open(self.filename, "wb")) + + def __repr__(self): + r = [] + for attr, value in self.__dict__.items(): + r.append(": ".join([str(attr), str(value)])) + return ", ".join(r) + + +def pfstate(filename, *args, **kwargs): + """pfstate function + + If it only receives a filename it return false or true depending if that file exists. + If it receives anything else, it will return a PFState. + """ + assert isinstance(filename, str), "filename is not string" + + if Path(filename).is_file(): + pfstate.state = pickle.load(open(filename, "rb")) + if not isinstance(pfstate.state, PFState): + raise TypeError("Unpickled object not of type PFState") + elif args or kwargs: + pfstate.state = PFState(filename, *args, **kwargs) + else: + pfstate.state = None + + return pfstate.state + + +def backup(state: PFState): + transactions = load_transactions(state.data_dir) + filename = ( + ".pfbudget/backups/" + + "transactions_" + + dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss") + + ".csv" + ) + write_transactions(Path(filename), transactions) + + state.last_backup = filename + + +def full_backup(state: PFState): + filename = ".pfbudget/backups/" + dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss") + shutil.copytree(state.data_dir, Path(filename)) + + state.last_datadir_backup = filename + + +def restore(state: PFState): + if not state.last_datadir_backup: + print("No data directory backup exists") + return + + if Path(state.data_dir).is_dir(): + option = input( + "A data directory already exists at {}/ . Are you sure you want to restore the last backup? (Y/N) ".format( + state.data_dir + ) + ) + if option.lower() == "y" or option.lower() == "yes": + shutil.rmtree(state.data_dir) + shutil.copytree(state.last_datadir_backup, state.data_dir) + elif option.lower() == "n" or option.lower() == "no": + return + else: + print("Invalid choice") + return + + +def parser(state: PFState, raw_dir=None, data_dir=None): + raw = Path(state.raw_dir) if not raw_dir else Path(raw_dir) + dat = Path(state.data_dir) if not data_dir else Path(data_dir) + + new_transactions = {} + for rf in raw.iterdir(): + if rf.name not in state.raw_files: + new_transactions[rf.name] = parse_data(rf) + state.raw_files.append(rf.name) + + # really, really bad optimized file append + for _, transactions in new_transactions.items(): + for transaction in transactions: + filename = get_filename(transaction) + old = read_transactions(dat / filename) + old.append(transaction) + old.sort() + write_transactions(dat / filename, old) + if filename not in state.data_files: + state.data_files.append(filename) + + state._save() # append to list doesn't trigger setter + + +def auto_categorization(state: PFState, transactions: list) -> bool: + null = Null() + nulls = null.search_all(transactions) + travel = Travel() + travels = [] + missing = False + + for vacation in state.vacations: + t = travel.search_all(transactions, vacation[0], vacation[1]) + travels.extend(t) + + for transaction in transactions: + if not transaction.category: + for category in [category() for category in Categories.get_categories()]: + if category.search(transaction): + transaction.category = category.name + + if ( + transaction in travels + and transaction.category not in travel.not_in_travel + ): + if transaction.category != travel.name: + transaction.category = travel.name + + if transaction in nulls: + if transaction.category != null.name: + transaction.category = null.name + + if not transaction.category: + missing = True + + return missing + + +def manual_categorization(state: PFState, transactions: list): + for transaction in transactions: + while not transaction.category: + category = input(f"{transaction.desc()} category: ") + if category == "quit": + return + if category not in Categories.get_categories_names(): + print("category doesn't exist") + continue + else: + transaction.category = category diff --git a/pfbudget/transactions.py b/pfbudget/transactions.py index 89edb99..35c8aeb 100644 --- a/pfbudget/transactions.py +++ b/pfbudget/transactions.py @@ -1,23 +1,26 @@ from csv import reader, writer from datetime import date from decimal import Decimal, InvalidOperation +from pathlib import Path from .categories import Categories +COMMENT_TOKEN = "#" + + class TransactionError(Exception): pass class Transaction: - date = None - description = "" - bank = "" - value = 0 - category = "" + def __init__(self, *args, file=None): + self.date = None + self.description = "" + self.bank = "" + self.value = 0 + self.category = "" - def __init__(self, *args): arg = args[0] if len(args) == 1 else list(args) - try: self.date = date.fromisoformat(arg[0]) self.description = " ".join(arg[1].split()) @@ -30,22 +33,16 @@ class Transaction: print(f"{args}") raise TransactionError + self.year = self.date.year + self.month = self.date.month + self.day = self.date.day + + self.file = file + self.modified = False + def to_csv(self): return [self.date, self.description, self.bank, self.value, self.category] - @staticmethod - def read_transactions(file, encoding="utf-8"): - with open(file, newline="", encoding=encoding) as f: - r = reader(f, delimiter="\t") - transactions = [Transaction(row) for row in r if row and row[0][0] != "#"] - return transactions - - @staticmethod - def write_transactions(file, transactions, append=False, encoding="utf-8"): - with open(file, "a" if append else "w", newline="", encoding=encoding) as f: - w = writer(f, delimiter="\t") - w.writerows([transaction.to_csv() for transaction in transactions]) - @staticmethod def get_repeated_transactions(transactions): repeated, new = list(), list() @@ -61,6 +58,15 @@ class Transaction: transactions.sort(key=lambda k: k.bank) return transactions + @property + def category(self): + return self._category + + @category.setter + def category(self, v): + self.modified = True + self._category = v + def __eq__(self, other): return ( self.date == other.date @@ -154,3 +160,46 @@ class Transactions(list): except AttributeError: categories[transaction.category] = [transaction] return categories + + +def load_transactions(data_dir) -> Transactions: + transactions = Transactions() + for df in Path(data_dir).iterdir(): + try: + trs = read_transactions(df) + except TransactionError as e: + print(f"{e} -> datafile {df}") + raise TransactionError + transactions.extend(trs) + + transactions.sort() + return transactions + + +def save_transactions(data_dir, transactions): + files2write = set(t.file if t.modified else None for t in transactions) + files2write.discard(None) + for f in files2write: + trs = [t for t in transactions if t.file == f] + write_transactions(f, trs) + + +def read_transactions(filename, encoding="utf-8"): + try: + with open(filename, newline="", encoding=encoding) as f: + r = reader(f, delimiter="\t") + transactions = [ + Transaction(row, file=filename) + for row in r + if row and row[0][0] != COMMENT_TOKEN + ] + except FileNotFoundError: + transactions = [] + + return transactions + + +def write_transactions(file, transactions, append=False, encoding="utf-8"): + with open(file, "a" if append else "w", newline="", encoding=encoding) as f: + w = writer(f, delimiter="\t") + w.writerows([transaction.to_csv() for transaction in transactions])