diff --git a/categories.py b/categories.py new file mode 100644 index 0000000..6b5f5e5 --- /dev/null +++ b/categories.py @@ -0,0 +1,222 @@ +from datetime import date, timedelta +from re import compile as c + + +class Categories: + name = "" + regex = [] + banks = [] + values = [] + range = () + + def search(self, t): + if self.banks: + return any( + pattern.search(t.description.lower()) + for pattern in self.regex + if t.bank in self.banks + ) + elif self.range: + return any( + pattern.search(t.description.lower()) + for pattern in self.regex + if self.range[0] < t.value < self.range[1] + ) + elif self.values: + return any( + pattern.search(t.description.lower()) + for pattern in self.regex + if t.value in self.values + ) + else: + return any(pattern.search(t.description.lower()) for pattern in self.regex) + + @classmethod + def categorize(cls, transactions): + + income_categories = [ + Income1().name, + Income2().name, + Income3().name, + ] + + null_matches = Null().search_all(transactions) + travel_matches = Travel().search_all( + transactions, date(2019, 12, 23), date(2020, 1, 2) + ) + + for i, transaction in enumerate(transactions): + for category in cls.get_categories(): + if category.search(transaction): + if not transaction.category: + transaction.category = category.name + transactions[i] = transaction + elif ( + transaction.category != category.name + and transaction.category != Travel().name + ): + new_category = input( + f"{transaction} already has a {transaction.category} assigned. Would you like " + f"to change it to {category.name}? (Y/N) " + ) + correct_answer = False + while not correct_answer: + if new_category.lower() == "y": + transaction.category = category.name + transactions[i] = transaction + correct_answer = True + elif new_category.lower() == "n": + correct_answer = True + else: + new_category = input("? ") + + if transaction in travel_matches and transaction.category not in [ + *income_categories, + ]: + transaction.category = Travel().name + if transaction in null_matches: + transaction.category = Null().name + + @classmethod + def get_categories(cls): + return [category() for category in cls.__subclasses__()] + + +class Income1(Categories): + name = "Income1" + regex = [c("company A")] + + +class Income2(Categories): + name = "Income2" + regex = [c("transfer")] + banks = ["BankA"] + + +class Income3(Categories): + name = "Income3" + regex = [c("company B")] + + +class Null(Categories): + name = "Null" + regex = [ + c("transfer A to B"), + c("1"), + c("2"), + ] + + def search(self, transaction): + pass + + def search_all(self, transactions): + matches = [] + for transaction in transactions: + for cancel in [ + cancel + for cancel in transactions + if ( + transaction.date - timedelta(days=4) + <= cancel.date + <= transaction.date + timedelta(days=4) + and any( + pattern.search(transaction.description.lower()) + for pattern in self.regex + ) + and transaction.bank != cancel.bank + and transaction + and cancel not in matches + and cancel != transaction + ) + ]: + + if transaction.value == -cancel.value: + matches.extend([transaction, cancel]) + # if transaction.value > 0: + # transaction, cancel = cancel, transaction + # print('{} -> {}'.format(transaction, cancel)) + break + + return matches + + +class Commute(Categories): + name = "Commute" + regex = [c("uber"), c("train")] + values = [-50] + + def search(self, t): + if any(pattern.search(t.description.lower()) for pattern in self.regex[:1]): + return True + elif t.value in self.values: + return any( + pattern.search(t.description.lower()) for pattern in self.regex[1:] + ) + else: + return False + + +class Utilities(Categories): + name = "Utilities" + regex = [c("electricity", "water", "internet")] + values = [-35] + + def search(self, t): + if any(pattern.search(t.description.lower()) for pattern in self.regex[:2]): + return True + elif t.value in self.values: + return any( + pattern.search(t.description.lower()) for pattern in self.regex[2:] + ) + else: + return False + + +class Groceries(Categories): + name = "Groceries" + regex = [ + c("lidl"), + c("e.leclerc"), + c("aldi"), + ] + + +class EatingOut(Categories): + name = "Eating Out" + regex = [ + c("restaurant 1"), + c("restaurant 2"), + ] + + +class Entertainment(Categories): + name = "Entertainment" + regex = [c("cinema"), c("steam")] + + +class Pets(Categories): + name = "Pets" + + +class Travel(Categories): + name = "Travel" + regex = [c("ryanair"), c("easyjet"), c("airbnb")] + + @staticmethod + def search_all(transactions, start, end): + matches = [] + for transaction in transactions: + if start <= transaction.date < end: + matches.append(transaction) + + return matches + + +class Miscellaneous(Categories): + name = "Miscellaneous" + + +class Investment(Categories): + name = "Investment" + regex = [c("subscrition")] + banks = ["BankC"] diff --git a/main.py b/main.py index a02f8e1..469bbab 100644 --- a/main.py +++ b/main.py @@ -1,24 +1,130 @@ -from datetime import datetime -from decimal import Decimal -import csv -import os +from pathlib import Path +import logging +import pickle +import sys -from parsers import Bank1, Bank2, Bank3, Parser +from categories import Categories +from transaction import Transaction as Tr, TransactionError +from parsers import Parser -def write_transactions(file, transactions, append=False): - with open(file, "a" if append else "w", newline="", encoding="utf-8") as f: - writer = csv.writer(f, delimiter="\t") - writer.writerows(transactions) +def get_transactions(data_dir): + dfs = dict() + for df in Path(data_dir).iterdir(): + try: + trs = Tr.read_transactions(df) + except TransactionError as e: + print(f"{e} -> datafile {df}") + sys.exit(-2) + dfs[df.name] = trs + + return dfs -def parse(parser: Parser, input, output, reverse=True, encoding="utf-8"): - transactions = parser.parse(input, encoding) - if reverse: - transactions.reverse() - write_transactions(output, transactions) +def initialize(raw_dir, data_dir, restart=False): + dfs = get_transactions(data_dir) + if restart: + rfs = dict() + logging.debug("rewriting both .raw and .transactions pickles") + else: + try: + rfs = pickle.load(open(".raw.pickle", "rb")) + assert ( + type(rfs) is dict + ), ".raw.pickle isn't a dictionary, so it could have been corrupted" + logging.debug(".raw.pickle opened") + except FileNotFoundError: + rfs = dict() + logging.debug("no .raw.pickle found") -# parse(Bank1(), ".rawdata/Bank1_2019.csv", "data/2019_Bank1.csv") -# parse(Bank2(), ".rawdata/Bank2_2020.csv", "data/2020_Bank2.csv", reverse=False) -# parse(Bank2(cc=True), ".rawdata/Bank2CC_2020.csv", "data/2020_Bank2CC.csv", reverse=False) -# parse(Bank3(), ".rawdata/Bank3_2019.csv", "data/2019_Bank3.csv", encoding="windows-1252") + updated_trs, update = dict(), False + prompt = " has been modified since last update. Do you want to update the data files? (Yes/No)" + for rf in Path(raw_dir).iterdir(): + if rf.name in rfs and rfs[rf.name][0] == rf.stat().st_mtime: + logging.debug(f"{rf.name} hasn't been modified since last access") + elif rf.name not in rfs or input(f"{rf.name}" + prompt).lower() == "yes": + trs = Parser.parse_csv(rf) + updated_trs[rf.name] = trs + try: + rfs[rf.name][0] = rf.stat().st_mtime + except KeyError: + rfs[rf.name] = [rf.stat().st_mtime, []] + update = True + logging.info(f"{rf.name} parsed") + + if update: + for rf_name, updated_trs in updated_trs.items(): + filename_set = set( + (t.date.year, f"{t.date.year}_{t.bank}.csv") for t in updated_trs + ) + for year, filename in filename_set: + trs = [t for t in updated_trs if t.date.year == year] + if filename in dfs.keys(): + new_trs = [tr for tr in trs if tr not in rfs[rf_name][1]] + rem_trs = [tr for tr in rfs[rf_name][1] if tr not in trs] + + if new_trs: + dfs[filename].extend(new_trs).sort() + + for rem in rem_trs: + dfs[filename].remove(rem) + + else: + dfs[filename] = trs + + Tr.write_transactions(Path(data_dir) / filename, dfs[filename]) + rfs[rf_name][1] = updated_trs + logging.debug(f"{filename} written") + + pickle.dump(rfs, open(".raw.pickle", "wb")) + logging.debug(".raw.pickle written to disk") + + if restart: + for df in Path(data_dir).iterdir(): + if df.name not in dfs: + dfs[df.name] = Tr.read_transactions(df) + for t in dfs[df.name]: + t.category = "" + + return dfs + + +def manual_categorization(trs): + trs = Tr.sort_by_bank(trs) + for i, transaction in enumerate(trs): + if not transaction.category: + category = input(f"{transaction} category: ") + if category == "stop": + break + if category: + transaction.category = category + trs[i] = transaction + + trs.sort() + + +if __name__ == "__main__": + logging.basicConfig(level=logging.DEBUG) + + datafiles = initialize(".raw", "data", restart=False) + + transactions = list() + for file in datafiles.values(): + transactions.extend(file) + transactions.sort() + + # reprocess = [Education().name] + # for i, transaction in enumerate(transactions): + # for category in Categories.get_categories(): + # if transaction.category in reprocess: + # transaction.category = '' + + Categories.categorize(transactions) + + manual_categorization(transactions) + + for f, file in datafiles.items(): + file_transactions = [t for t in transactions if t in file] + Tr.write_transactions(Path("data") / f, file_transactions) + + Tr.write_transactions("transactions.csv", transactions) diff --git a/parsers.py b/parsers.py index d93792b..b762c00 100644 --- a/parsers.py +++ b/parsers.py @@ -1,18 +1,37 @@ from datetime import datetime from decimal import Decimal, InvalidOperation +from pathlib import Path +from transaction import Transaction class Parser: - @staticmethod - def get_transactions(file, encoding, sep="\t"): - with open(file, newline="", encoding=encoding) as f: - transactions = [line.rstrip().split(sep) for line in f] - - return transactions - - def parse(self, file, encoding="utf-8"): + def parse(self, file): pass + @staticmethod + def parse_csv(file: Path, append=False): + name = file.stem.split("_") + try: + bank, _ = name[0], int(name[1]) + except ValueError: + _, bank = int(name[0]), name[1] + + p = dict( + Bank1=Bank1, + Bank2=Bank2, + Bank2CC=Bank2CC, + BANK3=Bank3, + ) + + try: + parser = p[bank]() + except KeyError as e: + print(f"{e} {bank} parser doesnt exist. Cant parse {name}") + return + + transactions = parser.parse(file) + return transactions + class Bank1(Parser): """Bank 1 parser @@ -22,18 +41,29 @@ class Bank1(Parser): separator: ; starting line: 5 date format: %d/%m/%Y + + The reading order is reversed to go from earlier to latest. """ - def parse(self, file, encoding="utf-8"): - transactions = [] + encoding = "utf-8" + separator = ";" - for transaction in self.get_transactions(file, encoding, sep=";")[5:]: + def parse(self, file): + transactions = [] + reader = [ + line.rstrip().split(self.separator) + for line in open(file, encoding=self.encoding) + ][5:] + + for transaction in reversed(reader): transaction = [field.rstrip() for field in transaction] date = datetime.strptime(transaction[1], "%d/%m/%Y").date() description = " ".join(transaction[3].split()) value = Decimal(transaction[4]) - transactions.append([date.isoformat(), description, "Bank1", value]) + transactions.append( + Transaction(date.isoformat(), description, "Bank1", value) + ) return transactions @@ -46,18 +76,19 @@ class Bank2(Parser): separator: tab date format: %d/%m/%Y decimal separator: , - - Bank 2 also has an associated credit card, for which the transaction value - has to be negated. """ - def __init__(self, cc=False): - self.cc = cc + encoding = "utf-8" + separator = "\t" - def parse(self, file, encoding="utf-8"): + def parse(self, file): transactions = [] + reader = [ + line.rstrip().split(self.separator) + for line in open(file, encoding=self.encoding) + ] - for transaction in self.get_transactions(file, encoding): + for transaction in reader: date = datetime.strptime(transaction[0], "%d/%m/%Y").date() description = transaction[2] try: @@ -66,13 +97,48 @@ class Bank2(Parser): transaction[3] = transaction[3].replace(",", "") value = Decimal(transaction[3]) - if not self.cc: - card = "Bank2" - else: - value = -value - card = "Bank2 CC" + transactions.append( + Transaction(date.isoformat(), description, "Bank2", value) + ) - transactions.append([date.isoformat(), description, card, value]) + return transactions + + +class Bank2CC(Parser): + """Bank 2 credit card parser + + Bank 2 credit card transcripts have the following properties: + encoding: utf-8 + separator: tab + date format: %d/%m/%Y + decimal separator: , + """ + + encoding = "utf-8" + separator = "\t" + + def parse(self, file): + transactions = [] + reader = [ + line.rstrip().split(self.separator) + for line in open(file, encoding=self.encoding) + ] + + for transaction in reader: + date = datetime.strptime(transaction[0], "%d/%m/%Y").date() + description = transaction[2] + try: + value = Decimal(transaction[3]) + except InvalidOperation: + transaction[3] = transaction[3].replace(",", "") + value = -Decimal(transaction[3]) + + if value > 0: + date = datetime.strptime(transaction[1], "%d/%m/%Y").date() + + transactions.append( + Transaction(date.isoformat(), description, "Bank2CC", value) + ) return transactions @@ -90,13 +156,20 @@ class Bank3(Parser): thousands separator: . Bank 3 has credits in a different column from debits. These also have to be - negated. + negated. The reading order is reversed to go from earlier to latest. """ - def parse(self, file, encoding="utf-8"): - transactions = [] + encoding = "windows-1252" + separator = "," - for transaction in self.get_transactions(file, encoding, sep=";")[7:-1]: + def parse(self, file): + transactions = [] + reader = [ + line.rstrip().split(self.separator) + for line in open(file, encoding=self.encoding) + ][7:-1] + + for transaction in reversed(reader): transaction = [field.rstrip() for field in transaction] date = datetime.strptime(transaction[1], "%d-%m-%Y").date() description = transaction[2] @@ -107,6 +180,8 @@ class Bank3(Parser): t = transaction[4].replace(".", "").replace(",", ".") value = Decimal(t) - transactions.append([date.isoformat(), description, "Bank3", value]) + transactions.append( + Transaction(date.isoformat(), description, "Bank3", value) + ) return transactions diff --git a/reader.py b/reader.py deleted file mode 100644 index 9c31b45..0000000 --- a/reader.py +++ /dev/null @@ -1,296 +0,0 @@ -from decimal import Decimal -import csv -import datetime -import matplotlib.pyplot as plt -import sys - - -class Transaction: - def __init__(self, date, description, value, category): - self.id = id(self) - self.date = date - self.description = description - self.value = value - self.category = category - - def __repr__(self): - return f"{self.date.date()} {self.description} {self.value} € {self.category}" - - -class MonthlyTransactions: - def __init__(self, month, transactions): - self.month = datetime.datetime.strptime(str(month), "%m") - self.transactions = transactions - - income_categories = [ - "Income1", - "Income2", - "Income3", - ] - fixed_expenses_categories = [ - "Rent", - "Commmute", - "Utilities", - ] - variable_expenses_categories = [ - "Groceries", - "Eating Out", - "Entertainment", - "Pets", - "Travel", - "Miscellaneous", - ] - self.expense_categories = ( - fixed_expenses_categories + variable_expenses_categories - ) - - self.income_per_cat = dict.fromkeys(income_categories, 0) - self.fixed_expenses_per_cat = dict.fromkeys(fixed_expenses_categories, 0) - self.variable_expenses_per_cat = dict.fromkeys(variable_expenses_categories, 0) - self.null = 0 - self.investments = 0 - - self.separate_categories(self.transactions) - - self.expenses_per_cat = { - **self.income_per_cat, - **self.fixed_expenses_per_cat, - **self.variable_expenses_per_cat, - } - - def separate_categories(self, transactions): - for transaction in transactions: - if transaction.category == "Null": - self.null += transaction.value - continue - if transaction.category == "Investment": - self.investments += transaction.value - continue - try: - self.income_per_cat[transaction.category] -= transaction.value - continue - except KeyError: - pass - try: - self.fixed_expenses_per_cat[transaction.category] += transaction.value - continue - except KeyError: - pass - try: - self.variable_expenses_per_cat[ - transaction.category - ] += transaction.value - continue - except KeyError as e: - if ", " in transaction.category: - categories = transaction.category.split(", ") - print(f"{transaction} has two categories. Allocate each.") - values = [] - - while transaction.value != sum(values): - for category in categories: - value = Decimal(input(f"Category {category}: ")) - values.append(value) - - new_transactions = [] - for value, category in zip(values, categories): - new_transactions.append( - Transaction( - transaction.date, - transaction.description, - value, - category, - ) - ) - - self.separate_categories(new_transactions) - - else: - print(repr(e)) - print(transaction) - sys.exit(2) - - def income(self): - return sum(self.income_per_cat.values()) - - def fixed_expenses(self): - return sum(self.fixed_expenses_per_cat.values()) - - def variable_expenses(self): - return sum(self.variable_expenses_per_cat.values()) - - def expenses(self): - return self.fixed_expenses() + self.variable_expenses() - - def __repr__(self): - info = [] - for k, v in self.income_per_cat.items(): - info.extend([k, v]) - for k, v in self.fixed_expenses_per_cat.items(): - info.extend([k, v]) - for k, v in self.variable_expenses_per_cat.items(): - info.extend([k, v]) - - p = """ -{0:>40} Report -Income Fixed Expenses Variable Expenses -{1:<16}{2:>9.2f} {11:<16}{12:>9.2f} {25:<16}{26:>9.2f} -{3:<16}{4:>9.2f} {13:<16}{14:>9.2f} {27:<16}{28:>9.2f} -{5:<16}{6:>9.2f} {15:<16}{16:>9.2f} {29:<16}{30:>9.2f} -{7:<16}{8:>9.2f} {17:<16}{18:>9.2f} {31:<16}{32:>9.2f} -{9:<16}{10:>9.2f} {19:<16}{20:>9.2f} {33:<16}{34:>9.2f} - {21:<16}{22:>9.2f} {35:<16}{36:>9.2f} - {23:<16}{24:>9.2f} {37:<16}{38:>9.2f} - {39:<16}{40:>9.2f} - {41:<16}{42:>9.2f} - {43:<16}{44:>9.2f} - {45:<16}{46:>9.2f} - {47:<16}{48:>9.2f} - {49:<16}{50:>9.2f} - {51:<16}{52:>9.2f} - -{53:>25.2f} {54:>25.2f} {55:>25.2f} - -Expenses:{56:>16.2f} -Net:{57:>21.2f}""".format( - self.month.strftime("%B"), - *info, - self.income(), - self.fixed_expenses(), - self.variable_expenses(), - self.expenses(), - self.income() - self.expenses(), - ) - - return p - - -def get_transactions(csvfile): - with open(csvfile, newline="") as fp: - reader = csv.reader(fp, delimiter="\t") - - transactions = [] - - for transaction in reader: - try: - # date = datetime.datetime.strptime(transaction[0], "%Y-%m-%d") - date = datetime.datetime.strptime(transaction[0], "%d/%m/%Y") - description = transaction[1] - value = Decimal(transaction[2]) - category = transaction[3] - transactions.append(Transaction(date, description, value, category)) - - except Exception as e: - print(repr(e)) - print(transaction) - sys.exit(2) - - return transactions - - -def reorder_transactions(transactions): - return sorted(transactions, key=lambda transaction: transaction.date) - - -def write_transactions(csvfile, transactions): - with open(csvfile, "w", newline="") as fp: - writer = csv.writer(fp, delimiter="\t") - - for t in transactions: - writer.writerow([t.date.date(), t.description, t.value, t.category]) - - -def get_month_transactions(transactions, month): - month_transactions = [] - for transaction in transactions: - if transaction.date.month == month: - month_transactions.append(transaction) - - return month_transactions - - -def get_value_per_category(transactions): - categories = dict() - - for transaction in transactions: - try: - categories[transaction.category] += transaction.value - except KeyError: - categories[transaction.category] = transaction.value - - return categories - - -def split_income_expenses(value_per_category): - income = dict() - expenses = dict() - - for category, value in value_per_category.items(): - if category.startswith("Income"): - income[category] = -value - elif category == "Investment": - pass - else: - expenses[category] = value - - return income, expenses - - -def plot(monthly_transactions): - x = range(1, 7) - y_income = [float(month.income()) for month in monthly_transactions] - y_fixed_expenses = [float(month.fixed_expenses()) for month in monthly_transactions] - y_variable_expenses = [ - float(month.variable_expenses()) for month in monthly_transactions - ] - - y = [] - labels = monthly_transactions[0].expense_categories - for label in labels: - category = [ - float(month.expenses_per_cat[label]) for month in monthly_transactions - ] - y.append(category) - - no_negatives = False - while not no_negatives: - no_negatives = True - for category in y: - for month in range(0, 6): - if category[month] < 0: - category[month - 1] += category[month] - category[month] = 0 - no_negatives = False - - plt.plot(x, y_income, label="Income") - plt.stackplot(x, y, labels=labels) - plt.legend(loc="upper left") - plt.show() - - -if __name__ == "__main__": - - transactions = get_transactions("transactions.csv") - - transactions = reorder_transactions(transactions) - - write_transactions("transactions_ordered.csv", transactions) - - monthly_transactions = list() - for month in range(1, 7): - month_transactions = MonthlyTransactions( - month, get_month_transactions(transactions, month) - ) - monthly_transactions.append(month_transactions) - - print(month_transactions) - - plot(monthly_transactions) - - total_income = sum(month.income() for month in monthly_transactions) - total_expenses = sum(month.expenses() for month in monthly_transactions) - - if total_income - total_expenses > 0: - print(f"\nWe're {total_income - total_expenses} richer!") - else: - print(f"We're {total_expenses - total_income} poorer :(") diff --git a/transaction.py b/transaction.py new file mode 100644 index 0000000..d2b8244 --- /dev/null +++ b/transaction.py @@ -0,0 +1,92 @@ +from csv import reader, writer +from datetime import date +from decimal import Decimal, InvalidOperation + + +class TransactionError(Exception): + pass + + +class Transaction: + date = None + description = "" + bank = "" + value = 0 + category = "" + + def __init__(self, *args): + arg = args[0] if len(args) == 1 else list(args) + + try: + self.date = date.fromisoformat(arg[0]) + self.description = " ".join(arg[1].split()) + self.bank = arg[2] + self.value = Decimal(arg[3]) + self.category = arg[4] + except IndexError: + pass + except InvalidOperation: + print(f"{args}") + raise TransactionError + + def to_csv(self): + return [self.date, self.description, self.bank, self.value, self.category] + + @staticmethod + def read_transactions(file, encoding="utf-8"): + with open(file, newline="", encoding=encoding) as f: + r = reader(f, delimiter="\t") + transactions = [Transaction(row) for row in r] + return transactions + + @staticmethod + def write_transactions(file, transactions, append=False, encoding="utf-8"): + with open(file, "a" if append else "w", newline="", encoding=encoding) as f: + w = writer(f, delimiter="\t") + w.writerows([transaction.to_csv() for transaction in transactions]) + + @staticmethod + def get_repeated_transactions(transactions): + repeated, new = list(), list() + for t in transactions: + if t not in new: + new.append(t) + else: + repeated.append(t) + return repeated + + @staticmethod + def sort_by_bank(transactions): + transactions.sort(key=lambda k: k.bank) + return transactions + + def __eq__(self, other): + return ( + self.date == other.date + and self.description == other.description + and self.bank == other.bank + and self.value == other.value + ) + + def __ne__(self, other): + return ( + self.date != other.date + or self.description != other.description + or self.bank != other.bank + or self.value != other.value + ) + + def __lt__(self, other): + return self.date < other.date + + def __le__(self, other): + return self.date <= other.date + + def __gt__(self, other): + return self.date > other.date + + def __ge__(self, other): + return self.date >= other.date + + def __repr__(self): + return f"{self.date} {self.description} {self.value}€ from {self.bank} ({self.category})"