diff --git a/categories.yaml b/categories.yaml index d7b2f93..a66fe56 100644 --- a/categories.yaml +++ b/categories.yaml @@ -5,7 +5,7 @@ Income1: Income2: regex: - transfer - bank: + banks: - BankA Income3: @@ -55,6 +55,10 @@ Travel: - ryanair - easyjet - airbnb + negative_regex: + - Commute + - Utilities + date_fmt: "%Y-%m-%d" dates: - ["2019-12-23", "2020-01-02"] diff --git a/pfbudget/categories.py b/pfbudget/categories.py index 6619b40..47552dd 100644 --- a/pfbudget/categories.py +++ b/pfbudget/categories.py @@ -1,224 +1,126 @@ -from datetime import timedelta -from re import compile as c +from collections import namedtuple +import datetime as dt +import logging +import re +import yaml + +from .database import DBManager -class Categories: - name = "" - regex = [] - banks = [] - values = [] - range = () +Options = namedtuple( + "Options", + [ + "regex", + "banks", + "regular", + "negative_regex", + "date_fmt", + "vacations", + "timedelta", + ], + defaults=[[], [], [], [], "", [], 4], +) - def search(self, t): - if not self.regex: - return False - - if self.banks: - return any( - pattern.search(t.description.lower()) - for pattern in self.regex - if t.bank in self.banks - ) - elif self.range: - return any( - pattern.search(t.description.lower()) - for pattern in self.regex - if self.range[0] < t.value < self.range[1] - ) - elif self.values: - return any( - pattern.search(t.description.lower()) - for pattern in self.regex - if t.value in self.values - ) - else: - return any(pattern.search(t.description.lower()) for pattern in self.regex) - - @classmethod - def get_categories(cls): - return cls.__subclasses__() +cfg = yaml.safe_load(open("categories.yaml")) +categories = {k: Options(**v) if v else Options() for k, v in cfg.items()} -def get_categories(): - return [cat.name for cat in Categories.get_categories()] +def categorize_data(db: DBManager): + # 1st) Classifying null transactions, i.e. transfers between banks. + # Will not overwrite previous categories + nulls(db) -def get_income_categories(): - return [cat for cat in get_categories() if "Income" in cat] + # 2nd) Classifying all vacations by vacation dates + # Will not overwrite previous categories + vacations(db) + # 3rd) Classify all else based on regex + transactions = [list(t) for t in db.get_uncategorized_transactions()] + for transaction in transactions: + if not transaction[4]: + for name, category in categories.items(): + if matches(transaction, category): + transaction[4] = name + break + db.update_categories(transactions) -def get_fixed_expenses(): - return [Utilities.name] - - -def get_required_expenses(): - return [Groceries.name, Commute.name] - - -def get_health_expenses(): - return [Medical.name] - - -def get_discretionary_expenses(): - return [ - cat - for cat in get_categories() - if cat - not in [ - *get_income_categories(), - *get_fixed_expenses(), - *get_required_expenses(), - *get_health_expenses(), - Investment.name, - Null.name, - ] - ] - - -class Income1(Categories): - name = "Income1" - regex = [c("company A")] - - -class Income2(Categories): - name = "Income2" - regex = [c("transfer")] - banks = ["BankA"] - - -class Income3(Categories): - name = "Income3" - regex = [c("company B")] - - -class Null(Categories): - name = "Null" - regex = [ - c("transfer A to B"), - c("1"), - c("2"), - ] - - def search(self, transaction): - pass - - def search_all(self, transactions): - matches = [] + # 4th) Manually update categories from the uncategorized transactions + transactions = [list(t) for t in db.get_uncategorized_transactions()] + if transactions: + print(f"Still {len(transactions)} uncategorized transactions left") for transaction in transactions: - for cancel in [ - cancel - for cancel in transactions - if ( - transaction.date - timedelta(days=4) - <= cancel.date - <= transaction.date + timedelta(days=4) - and any( - pattern.search(transaction.description.lower()) - for pattern in self.regex + while True: + category = input(f"{transaction} category: ") + if category == "quit" or category == "exit": + return + if category not in categories: + print( + f"Category {category} doesn't exist. Please use one of {categories.keys()}" ) - and transaction.bank != cancel.bank - and transaction - and cancel not in matches - and cancel != transaction - ) - ]: - - if transaction.value == -cancel.value: - matches.extend([transaction, cancel]) - # if transaction.value > 0: - # transaction, cancel = cancel, transaction - # print('{} -> {}'.format(transaction, cancel)) + else: + transaction[4] = category + db.update_category(transaction) break - return matches + +def vacations(db: DBManager) -> None: + try: + date_fmt = categories["Travel"].date_fmt + for start, end in categories["Travel"].vacations: + try: + start = dt.datetime.strptime(start, date_fmt).date().isoformat() + end = dt.datetime.strptime(end, date_fmt).date().isoformat() + except ValueError as e: + logging.warning(f"{e} continuing...") + continue + + not_vacations = categories["Travel"].negative_regex + + if transactions := [ + list(t) for t in db.get_daterage_without(start, end, *not_vacations) + ]: + for transaction in transactions: + transaction[4] = "Travel" + + db.update_categories(transactions) + + except KeyError as e: + print(e) -class Commute(Categories): - name = "Commute" - regex = [c("uber"), c("train")] - values = [-50] - - def search(self, t): - if any(pattern.search(t.description.lower()) for pattern in self.regex[:1]): - return True - elif t.value in self.values: - return any( - pattern.search(t.description.lower()) for pattern in self.regex[1:] +def nulls(db: DBManager) -> None: + null = categories.get("Null", Options()) + transactions = [list(t) for t in db.get_uncategorized_transactions()] + matching_transactions = [] + for t in transactions: + for cancel in ( + cancel + for cancel in transactions + if ( + dt.datetime.fromisoformat(t[0]) - dt.timedelta(days=null.timedelta) + <= dt.datetime.fromisoformat(cancel[0]) + and dt.datetime.fromisoformat(cancel[0]) + <= dt.datetime.fromisoformat(t[0]) + dt.timedelta(days=null.timedelta) + and (matches(t, null) if null.regex else True) + and t[2] != cancel[2] + and t not in matching_transactions + and cancel not in matching_transactions + and cancel != t + and t[3] == -cancel[3] ) - else: - return False + ): + t[4] = "Null" + cancel[4] = "Null" + matching_transactions.extend([t, cancel]) + break # There will only be one match per null transaction pair + + db.update_categories(matching_transactions) -class Utilities(Categories): - name = "Utilities" - regex = [c("electricity", "water", "internet")] - values = [-35] - - def search(self, t): - if any(pattern.search(t.description.lower()) for pattern in self.regex[:2]): - return True - elif t.value in self.values: - return any( - pattern.search(t.description.lower()) for pattern in self.regex[2:] - ) - else: - return False - - -class Groceries(Categories): - name = "Groceries" - regex = [ - c("lidl"), - c("e.leclerc"), - c("aldi"), - ] - - -class EatingOut(Categories): - name = "Eating Out" - regex = [ - c("restaurant 1"), - c("restaurant 2"), - ] - - -class Entertainment(Categories): - name = "Entertainment" - regex = [c("cinema"), c("steam")] - - -class Pets(Categories): - name = "Pets" - - -class Travel(Categories): - name = "Travel" - regex = [c("ryanair"), c("easyjet"), c("airbnb")] - not_in_travel = [ - *get_income_categories(), - Utilities.name, - ] - - @staticmethod - def search_all(transactions, start, end): - matches = [] - for transaction in transactions: - if start <= transaction.date < end: - matches.append(transaction) - - return matches - - -class Miscellaneous(Categories): - name = "Miscellaneous" - - -class Investment(Categories): - name = "Investment" - regex = [c("subscrition")] - banks = ["BankC"] - - -class Medical(Categories): - name = "Medical" - regex = [c("hospital", "pharmacy")] +def matches(transaction, category: Options): + if not category.regex: + return False + return any( + re.compile(pattern).search(transaction[1].lower()) for pattern in category.regex + ) diff --git a/pfbudget/database.py b/pfbudget/database.py index ef70148..8d61ff6 100644 --- a/pfbudget/database.py +++ b/pfbudget/database.py @@ -83,6 +83,13 @@ SELECT EXTRACT((?) FROM date) AS (?), date, description, bank, value FROM transactions """ +SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES = """ +SELECT * +FROM transactions +WHERE date BETWEEN (?) AND (?) +AND category NOT IN {} +""" + class DBManager: """SQLite DB connection manager""" @@ -163,6 +170,13 @@ class DBManager: logger.info(f"Update {transaction} category") self.__execute(UPDATE_CATEGORY, (transaction[4], *transaction[:4])) + def update_categories(self, transactions): + logger.info(f"Update {len(transactions)} transactions' categories") + self.__executemany( + UPDATE_CATEGORY, + [(transaction[4], *transaction[:4]) for transaction in transactions], + ) + def get_duplicated_transactions(self): logger.info("Get duplicated transactions") return self.__execute(DUPLICATED_TRANSACTIONS) @@ -187,6 +201,13 @@ class DBManager: logger.info("Get uncategorized transactions") return self.get_category(None) + def get_daterage_without(self, start, end, *categories): + logger.info(f"Get transactions between {start} and {end} not in {categories}") + query = SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format( + "(" + ", ".join("?" for _ in categories) + ")" + ) + return self.__execute(query, (start, end, *categories)) + def export(self): filename = pathlib.Path( "@".join([self.db, datetime.datetime.now().isoformat()]) diff --git a/pfbudget/runnable.py b/pfbudget/runnable.py index 9a5863e..562d054 100644 --- a/pfbudget/runnable.py +++ b/pfbudget/runnable.py @@ -2,12 +2,12 @@ from pathlib import Path import argparse import datetime as dt +from .categories import categorize_data from .database import DBManager from .graph import discrete, monthly from .parsers import parse_data from .transactions import load_transactions, save_transactions from . import report -from . import tools DEFAULT_DB = "data.db" @@ -54,7 +54,12 @@ def argparser(): p_parse.add_argument("--bank", nargs=1, type=str) p_parse.set_defaults(func=parse) - # p_restart = subparsers.add_parser("restart", help="restart help") + """ + Categorizing + """ + p_categorize = subparsers.add_parser("categorize", help="parse help") + p_categorize.set_defaults(func=categorize) + p_vacation = subparsers.add_parser( "vacation", help="vacation help format: [YYYY/MM/DD]" ) @@ -62,12 +67,6 @@ def argparser(): p_report = subparsers.add_parser("report", help="report help") p_status = subparsers.add_parser("status", help="status help") - # p_restart.add_argument("--raw", help="new raw data dir") - # p_restart.add_argument("--data", help="new parsed data dir") - - # p_export.add_argument("option", type=str, choices=["single", "all", "restore"], nargs="?", default="single", - # help="backup option help") - subparser_vacation = p_vacation.add_subparsers( dest="option", required=True, help="vacation suboption help" ) @@ -107,44 +106,11 @@ def argparser(): return parser -def restart(state, args): - """Restart - - Deletes state and creates a new one. - Parses all raw files into the data directory. New dirs can be passed as - arguments, otherwise uses previous values. - - Args: - state (PFState): Internal state of the program - args (dict): argparse variables - - Raises: - DataFileMissing: Missing data files from those listed in state - PfBudgetNotInitialized: Raised when no state has been initialized yet - """ - if state is not None: - for fn in state.data_files: - try: - (Path(state.data_dir) / fn).unlink() - except FileNotFoundError: - raise DataFileMissing("missing {}".format(Path(state.data_dir) / fn)) - - if args.raw: - state.raw_dir = args.raw - if args.data: - state.data_dir = args.data - state.raw_files = [] - state.data_files = [] - parse(state, args) - else: - raise PfBudgetNotInitialized(f"{Path(tools.STATE_FILE)} doesn't exist") - - -def parse(args): +def parse(args, db): """Parser Parses the contents of the raw directory into the data files, and - categorizes the transactions. + categorizes the transactions Args: state (PFState): Internal state of the program @@ -158,26 +124,20 @@ def parse(args): trs = parse_data(path, args.bank) else: raise FileNotFoundError - # tools.parser(state, raw_dir, data_dir) - # categorize(state, args) print("\n".join([t.desc() for t in trs])) -def categorize(state, args): +def categorize(args, db): """Categorization Automatically categorizes transactions based on the regex of each - category. Manually present the remaining to the user. + category. Manually present the remaining to the user Args: state (PFState): Internal state of the program args (dict): argparse variables """ - transactions = load_transactions(state.data_dir) - missing = tools.auto_categorization(state, transactions) - if missing: - tools.manual_categorization(state, transactions) - save_transactions(state.data_dir, transactions) + categorize_data(db) def vacation(state, args): @@ -261,5 +221,6 @@ def f_report(state, args): def run(): + db = DBManager("transactions.db") args = argparser().parse_args() - args.func(args) + args.func(args, db) diff --git a/pfbudget/tools.py b/pfbudget/tools.py deleted file mode 100644 index b740a96..0000000 --- a/pfbudget/tools.py +++ /dev/null @@ -1,138 +0,0 @@ -from pathlib import Path -import datetime as dt -import shutil - -from .categories import Categories, Null, Travel, get_categories -from .parsers import parse_data -from .state import PFState -from .transactions import ( - Transaction, - load_transactions, - read_transactions, - write_transactions, -) - -DIR = ".pfbudget/" -STATE_FILE = DIR + "state" -BACKUP_DIR = DIR + "backup/" - - -def get_filename(t: Transaction): - return "{}_{}.csv".format(t.year, t.bank) - - -def backup(state: PFState): - transactions = load_transactions(state.data_dir) - filename = ( - BACKUP_DIR - + "transactions_" - + dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss") - + ".csv" - ) - write_transactions(Path(filename), transactions) - - state.last_backup = filename - - -def full_backup(state: PFState): - filename = BACKUP_DIR + dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss") - shutil.copytree(state.data_dir, Path(filename)) - - state.last_datadir_backup = filename - - -def restore(state: PFState): - if not state.last_datadir_backup: - print("No data directory backup exists") - return - - if Path(state.data_dir).is_dir(): - option = input( - "A data directory already exists at {}/ . Are you sure you want to restore the last backup? (Y/N) ".format( - state.data_dir - ) - ) - if option.lower() == "y" or option.lower() == "yes": - shutil.rmtree(state.data_dir) - shutil.copytree(state.last_datadir_backup, state.data_dir) - elif option.lower() == "n" or option.lower() == "no": - return - else: - print("Invalid choice") - return - - -def parser(state: PFState, raw_dir=None, data_dir=None): - raw = Path(state.raw_dir) if not raw_dir else Path(raw_dir) - dat = Path(state.data_dir) if not data_dir else Path(data_dir) - - new_transactions = {} - for rf in raw.iterdir(): - if rf.name not in state.raw_files: - new_transactions[rf.name] = parse_data(rf) - state.raw_files.append(rf.name) - - # really, really bad optimized file append - for _, transactions in new_transactions.items(): - for transaction in transactions: - filename = get_filename(transaction) - old = read_transactions(dat / filename) - old.append(transaction) - old.sort() - write_transactions(dat / filename, old) - if filename not in state.data_files: - state.data_files.append(filename) - - state._save() # append to list doesn't trigger setter - - -def auto_categorization(state: PFState, transactions: list) -> bool: - null = Null() - nulls = null.search_all(transactions) - travel = Travel() - travels = [] - missing = False - - for vacation in state.vacations: - t = travel.search_all(transactions, vacation[0], vacation[1]) - travels.extend(t) - - for transaction in transactions: - if not transaction.category: - for category in [category() for category in Categories.get_categories()]: - if category.search(transaction): - transaction.category = category.name - - if ( - transaction in travels - and transaction.category not in travel.not_in_travel - ): - if transaction.category != travel.name: - transaction.category = travel.name - - if transaction in nulls: - if transaction.category != null.name: - transaction.category = null.name - - if not transaction.category: - missing = True - - return missing - - -def manual_categorization(state: PFState, transactions: list): - print( - "Please categorize the following transactions. If you want to exit, write 'quit'" - ) - for transaction in transactions: - while not transaction.category: - category = input(f"{transaction.desc()} category: ") - if category == "quit": - return - if category not in get_categories(): - print( - f"Category {category} doesn't exist. Please use one of {get_categories()}" - ) - continue - else: - transaction.category = category