Automatic and manual categorization

reader.py is removed, with main.py becoming the main function where all
others are called from.
All categories added on categories.py, each with its own search function
and regex. The categorize function is also defined on the base class

The parsers have been cleaned to a more strimmed version. Each parser has
its own parse method and encoding and separator attributes.

The `Transaction` class has been incremented with all the comparator
methods. It also has the read and write from file methods.

The transactions.pickle is no longer used, since changes can be done
directly to the parsed data in the data_dir, making the file unused. A
manual categorization function has been created to help fill in the
gaps, interchangeable with directly editing the parsed .csv files. The
master record of data are the .csv present in the data_dir.
This commit is contained in:
Luís Murta 2020-08-04 20:42:29 +01:00
parent 41d599c025
commit ca12d1846c
Signed by: satprog
GPG Key ID: DDF2EFC6179009DC
5 changed files with 543 additions and 344 deletions

222
categories.py Normal file
View File

@ -0,0 +1,222 @@
from datetime import date, timedelta
from re import compile as c
class Categories:
name = ""
regex = []
banks = []
values = []
range = ()
def search(self, t):
if self.banks:
return any(
pattern.search(t.description.lower())
for pattern in self.regex
if t.bank in self.banks
)
elif self.range:
return any(
pattern.search(t.description.lower())
for pattern in self.regex
if self.range[0] < t.value < self.range[1]
)
elif self.values:
return any(
pattern.search(t.description.lower())
for pattern in self.regex
if t.value in self.values
)
else:
return any(pattern.search(t.description.lower()) for pattern in self.regex)
@classmethod
def categorize(cls, transactions):
income_categories = [
Income1().name,
Income2().name,
Income3().name,
]
null_matches = Null().search_all(transactions)
travel_matches = Travel().search_all(
transactions, date(2019, 12, 23), date(2020, 1, 2)
)
for i, transaction in enumerate(transactions):
for category in cls.get_categories():
if category.search(transaction):
if not transaction.category:
transaction.category = category.name
transactions[i] = transaction
elif (
transaction.category != category.name
and transaction.category != Travel().name
):
new_category = input(
f"{transaction} already has a {transaction.category} assigned. Would you like "
f"to change it to {category.name}? (Y/N) "
)
correct_answer = False
while not correct_answer:
if new_category.lower() == "y":
transaction.category = category.name
transactions[i] = transaction
correct_answer = True
elif new_category.lower() == "n":
correct_answer = True
else:
new_category = input("? ")
if transaction in travel_matches and transaction.category not in [
*income_categories,
]:
transaction.category = Travel().name
if transaction in null_matches:
transaction.category = Null().name
@classmethod
def get_categories(cls):
return [category() for category in cls.__subclasses__()]
class Income1(Categories):
name = "Income1"
regex = [c("company A")]
class Income2(Categories):
name = "Income2"
regex = [c("transfer")]
banks = ["BankA"]
class Income3(Categories):
name = "Income3"
regex = [c("company B")]
class Null(Categories):
name = "Null"
regex = [
c("transfer A to B"),
c("1"),
c("2"),
]
def search(self, transaction):
pass
def search_all(self, transactions):
matches = []
for transaction in transactions:
for cancel in [
cancel
for cancel in transactions
if (
transaction.date - timedelta(days=4)
<= cancel.date
<= transaction.date + timedelta(days=4)
and any(
pattern.search(transaction.description.lower())
for pattern in self.regex
)
and transaction.bank != cancel.bank
and transaction
and cancel not in matches
and cancel != transaction
)
]:
if transaction.value == -cancel.value:
matches.extend([transaction, cancel])
# if transaction.value > 0:
# transaction, cancel = cancel, transaction
# print('{} -> {}'.format(transaction, cancel))
break
return matches
class Commute(Categories):
name = "Commute"
regex = [c("uber"), c("train")]
values = [-50]
def search(self, t):
if any(pattern.search(t.description.lower()) for pattern in self.regex[:1]):
return True
elif t.value in self.values:
return any(
pattern.search(t.description.lower()) for pattern in self.regex[1:]
)
else:
return False
class Utilities(Categories):
name = "Utilities"
regex = [c("electricity", "water", "internet")]
values = [-35]
def search(self, t):
if any(pattern.search(t.description.lower()) for pattern in self.regex[:2]):
return True
elif t.value in self.values:
return any(
pattern.search(t.description.lower()) for pattern in self.regex[2:]
)
else:
return False
class Groceries(Categories):
name = "Groceries"
regex = [
c("lidl"),
c("e.leclerc"),
c("aldi"),
]
class EatingOut(Categories):
name = "Eating Out"
regex = [
c("restaurant 1"),
c("restaurant 2"),
]
class Entertainment(Categories):
name = "Entertainment"
regex = [c("cinema"), c("steam")]
class Pets(Categories):
name = "Pets"
class Travel(Categories):
name = "Travel"
regex = [c("ryanair"), c("easyjet"), c("airbnb")]
@staticmethod
def search_all(transactions, start, end):
matches = []
for transaction in transactions:
if start <= transaction.date < end:
matches.append(transaction)
return matches
class Miscellaneous(Categories):
name = "Miscellaneous"
class Investment(Categories):
name = "Investment"
regex = [c("subscrition")]
banks = ["BankC"]

142
main.py
View File

@ -1,24 +1,130 @@
from datetime import datetime
from decimal import Decimal
import csv
import os
from pathlib import Path
import logging
import pickle
import sys
from parsers import Bank1, Bank2, Bank3, Parser
from categories import Categories
from transaction import Transaction as Tr, TransactionError
from parsers import Parser
def write_transactions(file, transactions, append=False):
with open(file, "a" if append else "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f, delimiter="\t")
writer.writerows(transactions)
def get_transactions(data_dir):
dfs = dict()
for df in Path(data_dir).iterdir():
try:
trs = Tr.read_transactions(df)
except TransactionError as e:
print(f"{e} -> datafile {df}")
sys.exit(-2)
dfs[df.name] = trs
return dfs
def parse(parser: Parser, input, output, reverse=True, encoding="utf-8"):
transactions = parser.parse(input, encoding)
if reverse:
transactions.reverse()
write_transactions(output, transactions)
def initialize(raw_dir, data_dir, restart=False):
dfs = get_transactions(data_dir)
if restart:
rfs = dict()
logging.debug("rewriting both .raw and .transactions pickles")
else:
try:
rfs = pickle.load(open(".raw.pickle", "rb"))
assert (
type(rfs) is dict
), ".raw.pickle isn't a dictionary, so it could have been corrupted"
logging.debug(".raw.pickle opened")
except FileNotFoundError:
rfs = dict()
logging.debug("no .raw.pickle found")
# parse(Bank1(), ".rawdata/Bank1_2019.csv", "data/2019_Bank1.csv")
# parse(Bank2(), ".rawdata/Bank2_2020.csv", "data/2020_Bank2.csv", reverse=False)
# parse(Bank2(cc=True), ".rawdata/Bank2CC_2020.csv", "data/2020_Bank2CC.csv", reverse=False)
# parse(Bank3(), ".rawdata/Bank3_2019.csv", "data/2019_Bank3.csv", encoding="windows-1252")
updated_trs, update = dict(), False
prompt = " has been modified since last update. Do you want to update the data files? (Yes/No)"
for rf in Path(raw_dir).iterdir():
if rf.name in rfs and rfs[rf.name][0] == rf.stat().st_mtime:
logging.debug(f"{rf.name} hasn't been modified since last access")
elif rf.name not in rfs or input(f"{rf.name}" + prompt).lower() == "yes":
trs = Parser.parse_csv(rf)
updated_trs[rf.name] = trs
try:
rfs[rf.name][0] = rf.stat().st_mtime
except KeyError:
rfs[rf.name] = [rf.stat().st_mtime, []]
update = True
logging.info(f"{rf.name} parsed")
if update:
for rf_name, updated_trs in updated_trs.items():
filename_set = set(
(t.date.year, f"{t.date.year}_{t.bank}.csv") for t in updated_trs
)
for year, filename in filename_set:
trs = [t for t in updated_trs if t.date.year == year]
if filename in dfs.keys():
new_trs = [tr for tr in trs if tr not in rfs[rf_name][1]]
rem_trs = [tr for tr in rfs[rf_name][1] if tr not in trs]
if new_trs:
dfs[filename].extend(new_trs).sort()
for rem in rem_trs:
dfs[filename].remove(rem)
else:
dfs[filename] = trs
Tr.write_transactions(Path(data_dir) / filename, dfs[filename])
rfs[rf_name][1] = updated_trs
logging.debug(f"{filename} written")
pickle.dump(rfs, open(".raw.pickle", "wb"))
logging.debug(".raw.pickle written to disk")
if restart:
for df in Path(data_dir).iterdir():
if df.name not in dfs:
dfs[df.name] = Tr.read_transactions(df)
for t in dfs[df.name]:
t.category = ""
return dfs
def manual_categorization(trs):
trs = Tr.sort_by_bank(trs)
for i, transaction in enumerate(trs):
if not transaction.category:
category = input(f"{transaction} category: ")
if category == "stop":
break
if category:
transaction.category = category
trs[i] = transaction
trs.sort()
if __name__ == "__main__":
logging.basicConfig(level=logging.DEBUG)
datafiles = initialize(".raw", "data", restart=False)
transactions = list()
for file in datafiles.values():
transactions.extend(file)
transactions.sort()
# reprocess = [Education().name]
# for i, transaction in enumerate(transactions):
# for category in Categories.get_categories():
# if transaction.category in reprocess:
# transaction.category = ''
Categories.categorize(transactions)
manual_categorization(transactions)
for f, file in datafiles.items():
file_transactions = [t for t in transactions if t in file]
Tr.write_transactions(Path("data") / f, file_transactions)
Tr.write_transactions("transactions.csv", transactions)

View File

@ -1,18 +1,37 @@
from datetime import datetime
from decimal import Decimal, InvalidOperation
from pathlib import Path
from transaction import Transaction
class Parser:
@staticmethod
def get_transactions(file, encoding, sep="\t"):
with open(file, newline="", encoding=encoding) as f:
transactions = [line.rstrip().split(sep) for line in f]
return transactions
def parse(self, file, encoding="utf-8"):
def parse(self, file):
pass
@staticmethod
def parse_csv(file: Path, append=False):
name = file.stem.split("_")
try:
bank, _ = name[0], int(name[1])
except ValueError:
_, bank = int(name[0]), name[1]
p = dict(
Bank1=Bank1,
Bank2=Bank2,
Bank2CC=Bank2CC,
BANK3=Bank3,
)
try:
parser = p[bank]()
except KeyError as e:
print(f"{e} {bank} parser doesnt exist. Cant parse {name}")
return
transactions = parser.parse(file)
return transactions
class Bank1(Parser):
"""Bank 1 parser
@ -22,18 +41,29 @@ class Bank1(Parser):
separator: ;
starting line: 5
date format: %d/%m/%Y
The reading order is reversed to go from earlier to latest.
"""
def parse(self, file, encoding="utf-8"):
transactions = []
encoding = "utf-8"
separator = ";"
for transaction in self.get_transactions(file, encoding, sep=";")[5:]:
def parse(self, file):
transactions = []
reader = [
line.rstrip().split(self.separator)
for line in open(file, encoding=self.encoding)
][5:]
for transaction in reversed(reader):
transaction = [field.rstrip() for field in transaction]
date = datetime.strptime(transaction[1], "%d/%m/%Y").date()
description = " ".join(transaction[3].split())
value = Decimal(transaction[4])
transactions.append([date.isoformat(), description, "Bank1", value])
transactions.append(
Transaction(date.isoformat(), description, "Bank1", value)
)
return transactions
@ -46,18 +76,19 @@ class Bank2(Parser):
separator: tab
date format: %d/%m/%Y
decimal separator: ,
Bank 2 also has an associated credit card, for which the transaction value
has to be negated.
"""
def __init__(self, cc=False):
self.cc = cc
encoding = "utf-8"
separator = "\t"
def parse(self, file, encoding="utf-8"):
def parse(self, file):
transactions = []
reader = [
line.rstrip().split(self.separator)
for line in open(file, encoding=self.encoding)
]
for transaction in self.get_transactions(file, encoding):
for transaction in reader:
date = datetime.strptime(transaction[0], "%d/%m/%Y").date()
description = transaction[2]
try:
@ -66,13 +97,48 @@ class Bank2(Parser):
transaction[3] = transaction[3].replace(",", "")
value = Decimal(transaction[3])
if not self.cc:
card = "Bank2"
else:
value = -value
card = "Bank2 CC"
transactions.append(
Transaction(date.isoformat(), description, "Bank2", value)
)
transactions.append([date.isoformat(), description, card, value])
return transactions
class Bank2CC(Parser):
"""Bank 2 credit card parser
Bank 2 credit card transcripts have the following properties:
encoding: utf-8
separator: tab
date format: %d/%m/%Y
decimal separator: ,
"""
encoding = "utf-8"
separator = "\t"
def parse(self, file):
transactions = []
reader = [
line.rstrip().split(self.separator)
for line in open(file, encoding=self.encoding)
]
for transaction in reader:
date = datetime.strptime(transaction[0], "%d/%m/%Y").date()
description = transaction[2]
try:
value = Decimal(transaction[3])
except InvalidOperation:
transaction[3] = transaction[3].replace(",", "")
value = -Decimal(transaction[3])
if value > 0:
date = datetime.strptime(transaction[1], "%d/%m/%Y").date()
transactions.append(
Transaction(date.isoformat(), description, "Bank2CC", value)
)
return transactions
@ -90,13 +156,20 @@ class Bank3(Parser):
thousands separator: .
Bank 3 has credits in a different column from debits. These also have to be
negated.
negated. The reading order is reversed to go from earlier to latest.
"""
def parse(self, file, encoding="utf-8"):
transactions = []
encoding = "windows-1252"
separator = ","
for transaction in self.get_transactions(file, encoding, sep=";")[7:-1]:
def parse(self, file):
transactions = []
reader = [
line.rstrip().split(self.separator)
for line in open(file, encoding=self.encoding)
][7:-1]
for transaction in reversed(reader):
transaction = [field.rstrip() for field in transaction]
date = datetime.strptime(transaction[1], "%d-%m-%Y").date()
description = transaction[2]
@ -107,6 +180,8 @@ class Bank3(Parser):
t = transaction[4].replace(".", "").replace(",", ".")
value = Decimal(t)
transactions.append([date.isoformat(), description, "Bank3", value])
transactions.append(
Transaction(date.isoformat(), description, "Bank3", value)
)
return transactions

296
reader.py
View File

@ -1,296 +0,0 @@
from decimal import Decimal
import csv
import datetime
import matplotlib.pyplot as plt
import sys
class Transaction:
def __init__(self, date, description, value, category):
self.id = id(self)
self.date = date
self.description = description
self.value = value
self.category = category
def __repr__(self):
return f"{self.date.date()} {self.description} {self.value}{self.category}"
class MonthlyTransactions:
def __init__(self, month, transactions):
self.month = datetime.datetime.strptime(str(month), "%m")
self.transactions = transactions
income_categories = [
"Income1",
"Income2",
"Income3",
]
fixed_expenses_categories = [
"Rent",
"Commmute",
"Utilities",
]
variable_expenses_categories = [
"Groceries",
"Eating Out",
"Entertainment",
"Pets",
"Travel",
"Miscellaneous",
]
self.expense_categories = (
fixed_expenses_categories + variable_expenses_categories
)
self.income_per_cat = dict.fromkeys(income_categories, 0)
self.fixed_expenses_per_cat = dict.fromkeys(fixed_expenses_categories, 0)
self.variable_expenses_per_cat = dict.fromkeys(variable_expenses_categories, 0)
self.null = 0
self.investments = 0
self.separate_categories(self.transactions)
self.expenses_per_cat = {
**self.income_per_cat,
**self.fixed_expenses_per_cat,
**self.variable_expenses_per_cat,
}
def separate_categories(self, transactions):
for transaction in transactions:
if transaction.category == "Null":
self.null += transaction.value
continue
if transaction.category == "Investment":
self.investments += transaction.value
continue
try:
self.income_per_cat[transaction.category] -= transaction.value
continue
except KeyError:
pass
try:
self.fixed_expenses_per_cat[transaction.category] += transaction.value
continue
except KeyError:
pass
try:
self.variable_expenses_per_cat[
transaction.category
] += transaction.value
continue
except KeyError as e:
if ", " in transaction.category:
categories = transaction.category.split(", ")
print(f"{transaction} has two categories. Allocate each.")
values = []
while transaction.value != sum(values):
for category in categories:
value = Decimal(input(f"Category {category}: "))
values.append(value)
new_transactions = []
for value, category in zip(values, categories):
new_transactions.append(
Transaction(
transaction.date,
transaction.description,
value,
category,
)
)
self.separate_categories(new_transactions)
else:
print(repr(e))
print(transaction)
sys.exit(2)
def income(self):
return sum(self.income_per_cat.values())
def fixed_expenses(self):
return sum(self.fixed_expenses_per_cat.values())
def variable_expenses(self):
return sum(self.variable_expenses_per_cat.values())
def expenses(self):
return self.fixed_expenses() + self.variable_expenses()
def __repr__(self):
info = []
for k, v in self.income_per_cat.items():
info.extend([k, v])
for k, v in self.fixed_expenses_per_cat.items():
info.extend([k, v])
for k, v in self.variable_expenses_per_cat.items():
info.extend([k, v])
p = """
{0:>40} Report
Income Fixed Expenses Variable Expenses
{1:<16}{2:>9.2f} {11:<16}{12:>9.2f} {25:<16}{26:>9.2f}
{3:<16}{4:>9.2f} {13:<16}{14:>9.2f} {27:<16}{28:>9.2f}
{5:<16}{6:>9.2f} {15:<16}{16:>9.2f} {29:<16}{30:>9.2f}
{7:<16}{8:>9.2f} {17:<16}{18:>9.2f} {31:<16}{32:>9.2f}
{9:<16}{10:>9.2f} {19:<16}{20:>9.2f} {33:<16}{34:>9.2f}
{21:<16}{22:>9.2f} {35:<16}{36:>9.2f}
{23:<16}{24:>9.2f} {37:<16}{38:>9.2f}
{39:<16}{40:>9.2f}
{41:<16}{42:>9.2f}
{43:<16}{44:>9.2f}
{45:<16}{46:>9.2f}
{47:<16}{48:>9.2f}
{49:<16}{50:>9.2f}
{51:<16}{52:>9.2f}
{53:>25.2f} {54:>25.2f} {55:>25.2f}
Expenses:{56:>16.2f}
Net:{57:>21.2f}""".format(
self.month.strftime("%B"),
*info,
self.income(),
self.fixed_expenses(),
self.variable_expenses(),
self.expenses(),
self.income() - self.expenses(),
)
return p
def get_transactions(csvfile):
with open(csvfile, newline="") as fp:
reader = csv.reader(fp, delimiter="\t")
transactions = []
for transaction in reader:
try:
# date = datetime.datetime.strptime(transaction[0], "%Y-%m-%d")
date = datetime.datetime.strptime(transaction[0], "%d/%m/%Y")
description = transaction[1]
value = Decimal(transaction[2])
category = transaction[3]
transactions.append(Transaction(date, description, value, category))
except Exception as e:
print(repr(e))
print(transaction)
sys.exit(2)
return transactions
def reorder_transactions(transactions):
return sorted(transactions, key=lambda transaction: transaction.date)
def write_transactions(csvfile, transactions):
with open(csvfile, "w", newline="") as fp:
writer = csv.writer(fp, delimiter="\t")
for t in transactions:
writer.writerow([t.date.date(), t.description, t.value, t.category])
def get_month_transactions(transactions, month):
month_transactions = []
for transaction in transactions:
if transaction.date.month == month:
month_transactions.append(transaction)
return month_transactions
def get_value_per_category(transactions):
categories = dict()
for transaction in transactions:
try:
categories[transaction.category] += transaction.value
except KeyError:
categories[transaction.category] = transaction.value
return categories
def split_income_expenses(value_per_category):
income = dict()
expenses = dict()
for category, value in value_per_category.items():
if category.startswith("Income"):
income[category] = -value
elif category == "Investment":
pass
else:
expenses[category] = value
return income, expenses
def plot(monthly_transactions):
x = range(1, 7)
y_income = [float(month.income()) for month in monthly_transactions]
y_fixed_expenses = [float(month.fixed_expenses()) for month in monthly_transactions]
y_variable_expenses = [
float(month.variable_expenses()) for month in monthly_transactions
]
y = []
labels = monthly_transactions[0].expense_categories
for label in labels:
category = [
float(month.expenses_per_cat[label]) for month in monthly_transactions
]
y.append(category)
no_negatives = False
while not no_negatives:
no_negatives = True
for category in y:
for month in range(0, 6):
if category[month] < 0:
category[month - 1] += category[month]
category[month] = 0
no_negatives = False
plt.plot(x, y_income, label="Income")
plt.stackplot(x, y, labels=labels)
plt.legend(loc="upper left")
plt.show()
if __name__ == "__main__":
transactions = get_transactions("transactions.csv")
transactions = reorder_transactions(transactions)
write_transactions("transactions_ordered.csv", transactions)
monthly_transactions = list()
for month in range(1, 7):
month_transactions = MonthlyTransactions(
month, get_month_transactions(transactions, month)
)
monthly_transactions.append(month_transactions)
print(month_transactions)
plot(monthly_transactions)
total_income = sum(month.income() for month in monthly_transactions)
total_expenses = sum(month.expenses() for month in monthly_transactions)
if total_income - total_expenses > 0:
print(f"\nWe're {total_income - total_expenses} richer!")
else:
print(f"We're {total_expenses - total_income} poorer :(")

92
transaction.py Normal file
View File

@ -0,0 +1,92 @@
from csv import reader, writer
from datetime import date
from decimal import Decimal, InvalidOperation
class TransactionError(Exception):
pass
class Transaction:
date = None
description = ""
bank = ""
value = 0
category = ""
def __init__(self, *args):
arg = args[0] if len(args) == 1 else list(args)
try:
self.date = date.fromisoformat(arg[0])
self.description = " ".join(arg[1].split())
self.bank = arg[2]
self.value = Decimal(arg[3])
self.category = arg[4]
except IndexError:
pass
except InvalidOperation:
print(f"{args}")
raise TransactionError
def to_csv(self):
return [self.date, self.description, self.bank, self.value, self.category]
@staticmethod
def read_transactions(file, encoding="utf-8"):
with open(file, newline="", encoding=encoding) as f:
r = reader(f, delimiter="\t")
transactions = [Transaction(row) for row in r]
return transactions
@staticmethod
def write_transactions(file, transactions, append=False, encoding="utf-8"):
with open(file, "a" if append else "w", newline="", encoding=encoding) as f:
w = writer(f, delimiter="\t")
w.writerows([transaction.to_csv() for transaction in transactions])
@staticmethod
def get_repeated_transactions(transactions):
repeated, new = list(), list()
for t in transactions:
if t not in new:
new.append(t)
else:
repeated.append(t)
return repeated
@staticmethod
def sort_by_bank(transactions):
transactions.sort(key=lambda k: k.bank)
return transactions
def __eq__(self, other):
return (
self.date == other.date
and self.description == other.description
and self.bank == other.bank
and self.value == other.value
)
def __ne__(self, other):
return (
self.date != other.date
or self.description != other.description
or self.bank != other.bank
or self.value != other.value
)
def __lt__(self, other):
return self.date < other.date
def __le__(self, other):
return self.date <= other.date
def __gt__(self, other):
return self.date > other.date
def __ge__(self, other):
return self.date >= other.date
def __repr__(self):
return f"{self.date} {self.description} {self.value}€ from {self.bank} ({self.category})"