This will provide differentiation between income that's regular and stable (long-term contract) and that which is not, such as gigs, presents, etc. It presents the information separated in both graphs and reports.
179 lines
5.6 KiB
Python
179 lines
5.6 KiB
Python
from __future__ import annotations
|
|
from collections import namedtuple
|
|
from typing import TYPE_CHECKING
|
|
import datetime as dt
|
|
import logging
|
|
import re
|
|
import yaml
|
|
|
|
|
|
if TYPE_CHECKING:
|
|
from pfbudget.database import DBManager
|
|
from pfbudget.transactions import Transaction
|
|
|
|
|
|
Options = namedtuple(
|
|
"Options",
|
|
[
|
|
"group",
|
|
"regex",
|
|
"banks",
|
|
"regular",
|
|
"not_in_groups",
|
|
"date_fmt",
|
|
"vacations",
|
|
"timedelta",
|
|
],
|
|
defaults=["No group", [], [], [], [], "", [], 4],
|
|
)
|
|
|
|
cfg = yaml.safe_load(open("categories.yaml"))
|
|
try:
|
|
categories = {
|
|
str(k): Options(**v) if v else Options()
|
|
for k, v in cfg.items()
|
|
if k and k != "Groups"
|
|
}
|
|
except TypeError:
|
|
logging.exception("Invalid option in categories.yaml")
|
|
categories = {}
|
|
|
|
groups = {
|
|
group: [
|
|
category for category, options in categories.items() if options.group == group
|
|
]
|
|
for group in set(category.group for category in categories.values())
|
|
}
|
|
categories.setdefault("Null", Options())
|
|
|
|
order = {k: i for i, k in enumerate(cfg["Groups"])}
|
|
groups = {
|
|
group: groups[group]
|
|
for group in sorted(groups, key=lambda x: order.get(x, len(groups)))
|
|
}
|
|
|
|
|
|
def categorize_data(db: DBManager):
|
|
|
|
# 1st) Classifying null transactions, i.e. transfers between banks.
|
|
# Will not overwrite previous categories
|
|
nulls(db)
|
|
|
|
# 2nd) Classifying all vacations by vacation dates
|
|
# Will not overwrite previous categories
|
|
vacations(db)
|
|
|
|
# 3rd) Classify all else based on regex
|
|
if transactions := db.get_uncategorized_transactions():
|
|
for transaction in transactions:
|
|
for name, category in categories.items():
|
|
if matches(transaction, category):
|
|
transaction.category = name
|
|
break
|
|
db.update_categories(
|
|
[transaction for transaction in transactions if transaction.category]
|
|
)
|
|
|
|
# 4th) Manually update categories from the uncategorized transactions
|
|
if transactions := db.get_uncategorized_transactions():
|
|
print(
|
|
f"Still {len(transactions)} uncategorized transactions left. Type quit/exit to exit the program."
|
|
)
|
|
for transaction in transactions:
|
|
while True:
|
|
category = input(f"{repr(transaction)} category: ")
|
|
if category == "quit" or category == "exit":
|
|
return
|
|
if not category:
|
|
break
|
|
if category not in categories:
|
|
print(
|
|
f"Category {category} doesn't exist. Please use one of {categories.keys()}"
|
|
)
|
|
else:
|
|
transaction.category = category
|
|
db.update_category(transaction)
|
|
break
|
|
|
|
|
|
def vacations(db: DBManager) -> None:
|
|
try:
|
|
date_fmt = categories["Travel"].date_fmt
|
|
for start, end in categories["Travel"].vacations:
|
|
try:
|
|
start = dt.datetime.strptime(start, date_fmt).date().isoformat()
|
|
end = dt.datetime.strptime(end, date_fmt).date().isoformat()
|
|
except ValueError as e:
|
|
logging.warning(f"{e} continuing...")
|
|
continue
|
|
|
|
not_in_groups = categories["Travel"].not_in_groups # default is []
|
|
|
|
update = False
|
|
if transactions := db.get_daterange_uncategorized_transactions(start, end):
|
|
for transaction in transactions:
|
|
if not_in_groups:
|
|
if not any(
|
|
matches(
|
|
transaction,
|
|
categories.get(category, Options()),
|
|
)
|
|
for group in not_in_groups
|
|
for category in groups[group]
|
|
):
|
|
transaction.category = "Travel"
|
|
update = True
|
|
else:
|
|
transaction.category = "Travel"
|
|
update = True
|
|
|
|
if update:
|
|
db.update_categories(transactions)
|
|
|
|
except KeyError as e:
|
|
logging.exception(e)
|
|
|
|
|
|
def nulls(db: DBManager) -> None:
|
|
null = categories.get("Null", Options())
|
|
transactions = db.get_uncategorized_transactions()
|
|
if not transactions:
|
|
return
|
|
|
|
matching_transactions = []
|
|
for t in transactions:
|
|
for cancel in (
|
|
cancel
|
|
for cancel in transactions
|
|
if (
|
|
t.date - dt.timedelta(days=null.timedelta)
|
|
<= cancel.date
|
|
<= t.date + dt.timedelta(days=null.timedelta)
|
|
and (matches(t, null) if null.regex else True)
|
|
and t.bank != cancel.bank
|
|
and t not in matching_transactions
|
|
and cancel not in matching_transactions
|
|
and cancel != t
|
|
and t.value == -cancel.value
|
|
)
|
|
):
|
|
t.category = "Null"
|
|
cancel.category = "Null"
|
|
matching_transactions.extend([t, cancel])
|
|
break # There will only be one match per null transaction pair
|
|
|
|
if matching_transactions:
|
|
db.update_categories(matching_transactions)
|
|
|
|
|
|
def matches(transaction: Transaction, category: Options):
|
|
if not category.regex:
|
|
return False
|
|
try:
|
|
return any(
|
|
re.compile(pattern.lower()).search(transaction.description.lower())
|
|
for pattern in category.regex
|
|
)
|
|
except re.error as e:
|
|
print(f"{e}{transaction} {category}")
|