Categorizing refactored with SQLite DB and YAML

`categorize_data` is the new entry for data categorization and receives
a DBManager. Contains the categorizing logic.
Categorizer configuration now done solely from categories.yaml file.

Ancilliary database methods added to DBManager required for categorizing
transactions.

Adds categorize to command line options.
Removes obsolete restart options and method from runnable.py.
Fixes parse and categorize method, now take cmd line arguments and
`DBManager`.

Removes obsolete tools.py, all functions already rewritten in relevant
modules.

Updated categories.yaml with new keys.
This commit is contained in:
Luís Murta 2021-06-11 22:11:07 +01:00
parent 569469eac4
commit deaa71ead4
Signed by: satprog
GPG Key ID: DDF2EFC6179009DC
5 changed files with 148 additions and 398 deletions

View File

@ -5,7 +5,7 @@ Income1:
Income2:
regex:
- transfer
bank:
banks:
- BankA
Income3:
@ -55,6 +55,10 @@ Travel:
- ryanair
- easyjet
- airbnb
negative_regex:
- Commute
- Utilities
date_fmt: "%Y-%m-%d"
dates:
- ["2019-12-23", "2020-01-02"]

View File

@ -1,224 +1,126 @@
from datetime import timedelta
from re import compile as c
from collections import namedtuple
import datetime as dt
import logging
import re
import yaml
from .database import DBManager
class Categories:
name = ""
regex = []
banks = []
values = []
range = ()
Options = namedtuple(
"Options",
[
"regex",
"banks",
"regular",
"negative_regex",
"date_fmt",
"vacations",
"timedelta",
],
defaults=[[], [], [], [], "", [], 4],
)
def search(self, t):
if not self.regex:
return False
cfg = yaml.safe_load(open("categories.yaml"))
categories = {k: Options(**v) if v else Options() for k, v in cfg.items()}
if self.banks:
return any(
pattern.search(t.description.lower())
for pattern in self.regex
if t.bank in self.banks
)
elif self.range:
return any(
pattern.search(t.description.lower())
for pattern in self.regex
if self.range[0] < t.value < self.range[1]
)
elif self.values:
return any(
pattern.search(t.description.lower())
for pattern in self.regex
if t.value in self.values
def categorize_data(db: DBManager):
# 1st) Classifying null transactions, i.e. transfers between banks.
# Will not overwrite previous categories
nulls(db)
# 2nd) Classifying all vacations by vacation dates
# Will not overwrite previous categories
vacations(db)
# 3rd) Classify all else based on regex
transactions = [list(t) for t in db.get_uncategorized_transactions()]
for transaction in transactions:
if not transaction[4]:
for name, category in categories.items():
if matches(transaction, category):
transaction[4] = name
break
db.update_categories(transactions)
# 4th) Manually update categories from the uncategorized transactions
transactions = [list(t) for t in db.get_uncategorized_transactions()]
if transactions:
print(f"Still {len(transactions)} uncategorized transactions left")
for transaction in transactions:
while True:
category = input(f"{transaction} category: ")
if category == "quit" or category == "exit":
return
if category not in categories:
print(
f"Category {category} doesn't exist. Please use one of {categories.keys()}"
)
else:
return any(pattern.search(t.description.lower()) for pattern in self.regex)
@classmethod
def get_categories(cls):
return cls.__subclasses__()
transaction[4] = category
db.update_category(transaction)
break
def get_categories():
return [cat.name for cat in Categories.get_categories()]
def vacations(db: DBManager) -> None:
try:
date_fmt = categories["Travel"].date_fmt
for start, end in categories["Travel"].vacations:
try:
start = dt.datetime.strptime(start, date_fmt).date().isoformat()
end = dt.datetime.strptime(end, date_fmt).date().isoformat()
except ValueError as e:
logging.warning(f"{e} continuing...")
continue
not_vacations = categories["Travel"].negative_regex
def get_income_categories():
return [cat for cat in get_categories() if "Income" in cat]
def get_fixed_expenses():
return [Utilities.name]
def get_required_expenses():
return [Groceries.name, Commute.name]
def get_health_expenses():
return [Medical.name]
def get_discretionary_expenses():
return [
cat
for cat in get_categories()
if cat
not in [
*get_income_categories(),
*get_fixed_expenses(),
*get_required_expenses(),
*get_health_expenses(),
Investment.name,
Null.name,
]
]
class Income1(Categories):
name = "Income1"
regex = [c("company A")]
class Income2(Categories):
name = "Income2"
regex = [c("transfer")]
banks = ["BankA"]
class Income3(Categories):
name = "Income3"
regex = [c("company B")]
class Null(Categories):
name = "Null"
regex = [
c("transfer A to B"),
c("1"),
c("2"),
]
def search(self, transaction):
pass
def search_all(self, transactions):
matches = []
if transactions := [
list(t) for t in db.get_daterage_without(start, end, *not_vacations)
]:
for transaction in transactions:
for cancel in [
transaction[4] = "Travel"
db.update_categories(transactions)
except KeyError as e:
print(e)
def nulls(db: DBManager) -> None:
null = categories.get("Null", Options())
transactions = [list(t) for t in db.get_uncategorized_transactions()]
matching_transactions = []
for t in transactions:
for cancel in (
cancel
for cancel in transactions
if (
transaction.date - timedelta(days=4)
<= cancel.date
<= transaction.date + timedelta(days=4)
and any(
pattern.search(transaction.description.lower())
for pattern in self.regex
dt.datetime.fromisoformat(t[0]) - dt.timedelta(days=null.timedelta)
<= dt.datetime.fromisoformat(cancel[0])
and dt.datetime.fromisoformat(cancel[0])
<= dt.datetime.fromisoformat(t[0]) + dt.timedelta(days=null.timedelta)
and (matches(t, null) if null.regex else True)
and t[2] != cancel[2]
and t not in matching_transactions
and cancel not in matching_transactions
and cancel != t
and t[3] == -cancel[3]
)
and transaction.bank != cancel.bank
and transaction
and cancel not in matches
and cancel != transaction
)
]:
):
t[4] = "Null"
cancel[4] = "Null"
matching_transactions.extend([t, cancel])
break # There will only be one match per null transaction pair
if transaction.value == -cancel.value:
matches.extend([transaction, cancel])
# if transaction.value > 0:
# transaction, cancel = cancel, transaction
# print('{} -> {}'.format(transaction, cancel))
break
return matches
db.update_categories(matching_transactions)
class Commute(Categories):
name = "Commute"
regex = [c("uber"), c("train")]
values = [-50]
def search(self, t):
if any(pattern.search(t.description.lower()) for pattern in self.regex[:1]):
return True
elif t.value in self.values:
return any(
pattern.search(t.description.lower()) for pattern in self.regex[1:]
)
else:
def matches(transaction, category: Options):
if not category.regex:
return False
class Utilities(Categories):
name = "Utilities"
regex = [c("electricity", "water", "internet")]
values = [-35]
def search(self, t):
if any(pattern.search(t.description.lower()) for pattern in self.regex[:2]):
return True
elif t.value in self.values:
return any(
pattern.search(t.description.lower()) for pattern in self.regex[2:]
re.compile(pattern).search(transaction[1].lower()) for pattern in category.regex
)
else:
return False
class Groceries(Categories):
name = "Groceries"
regex = [
c("lidl"),
c("e.leclerc"),
c("aldi"),
]
class EatingOut(Categories):
name = "Eating Out"
regex = [
c("restaurant 1"),
c("restaurant 2"),
]
class Entertainment(Categories):
name = "Entertainment"
regex = [c("cinema"), c("steam")]
class Pets(Categories):
name = "Pets"
class Travel(Categories):
name = "Travel"
regex = [c("ryanair"), c("easyjet"), c("airbnb")]
not_in_travel = [
*get_income_categories(),
Utilities.name,
]
@staticmethod
def search_all(transactions, start, end):
matches = []
for transaction in transactions:
if start <= transaction.date < end:
matches.append(transaction)
return matches
class Miscellaneous(Categories):
name = "Miscellaneous"
class Investment(Categories):
name = "Investment"
regex = [c("subscrition")]
banks = ["BankC"]
class Medical(Categories):
name = "Medical"
regex = [c("hospital", "pharmacy")]

View File

@ -83,6 +83,13 @@ SELECT EXTRACT((?) FROM date) AS (?), date, description, bank, value
FROM transactions
"""
SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES = """
SELECT *
FROM transactions
WHERE date BETWEEN (?) AND (?)
AND category NOT IN {}
"""
class DBManager:
"""SQLite DB connection manager"""
@ -163,6 +170,13 @@ class DBManager:
logger.info(f"Update {transaction} category")
self.__execute(UPDATE_CATEGORY, (transaction[4], *transaction[:4]))
def update_categories(self, transactions):
logger.info(f"Update {len(transactions)} transactions' categories")
self.__executemany(
UPDATE_CATEGORY,
[(transaction[4], *transaction[:4]) for transaction in transactions],
)
def get_duplicated_transactions(self):
logger.info("Get duplicated transactions")
return self.__execute(DUPLICATED_TRANSACTIONS)
@ -187,6 +201,13 @@ class DBManager:
logger.info("Get uncategorized transactions")
return self.get_category(None)
def get_daterage_without(self, start, end, *categories):
logger.info(f"Get transactions between {start} and {end} not in {categories}")
query = SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
"(" + ", ".join("?" for _ in categories) + ")"
)
return self.__execute(query, (start, end, *categories))
def export(self):
filename = pathlib.Path(
"@".join([self.db, datetime.datetime.now().isoformat()])

View File

@ -2,12 +2,12 @@ from pathlib import Path
import argparse
import datetime as dt
from .categories import categorize_data
from .database import DBManager
from .graph import discrete, monthly
from .parsers import parse_data
from .transactions import load_transactions, save_transactions
from . import report
from . import tools
DEFAULT_DB = "data.db"
@ -54,7 +54,12 @@ def argparser():
p_parse.add_argument("--bank", nargs=1, type=str)
p_parse.set_defaults(func=parse)
# p_restart = subparsers.add_parser("restart", help="restart help")
"""
Categorizing
"""
p_categorize = subparsers.add_parser("categorize", help="parse help")
p_categorize.set_defaults(func=categorize)
p_vacation = subparsers.add_parser(
"vacation", help="vacation help format: [YYYY/MM/DD]"
)
@ -62,12 +67,6 @@ def argparser():
p_report = subparsers.add_parser("report", help="report help")
p_status = subparsers.add_parser("status", help="status help")
# p_restart.add_argument("--raw", help="new raw data dir")
# p_restart.add_argument("--data", help="new parsed data dir")
# p_export.add_argument("option", type=str, choices=["single", "all", "restore"], nargs="?", default="single",
# help="backup option help")
subparser_vacation = p_vacation.add_subparsers(
dest="option", required=True, help="vacation suboption help"
)
@ -107,44 +106,11 @@ def argparser():
return parser
def restart(state, args):
"""Restart
Deletes state and creates a new one.
Parses all raw files into the data directory. New dirs can be passed as
arguments, otherwise uses previous values.
Args:
state (PFState): Internal state of the program
args (dict): argparse variables
Raises:
DataFileMissing: Missing data files from those listed in state
PfBudgetNotInitialized: Raised when no state has been initialized yet
"""
if state is not None:
for fn in state.data_files:
try:
(Path(state.data_dir) / fn).unlink()
except FileNotFoundError:
raise DataFileMissing("missing {}".format(Path(state.data_dir) / fn))
if args.raw:
state.raw_dir = args.raw
if args.data:
state.data_dir = args.data
state.raw_files = []
state.data_files = []
parse(state, args)
else:
raise PfBudgetNotInitialized(f"{Path(tools.STATE_FILE)} doesn't exist")
def parse(args):
def parse(args, db):
"""Parser
Parses the contents of the raw directory into the data files, and
categorizes the transactions.
categorizes the transactions
Args:
state (PFState): Internal state of the program
@ -158,26 +124,20 @@ def parse(args):
trs = parse_data(path, args.bank)
else:
raise FileNotFoundError
# tools.parser(state, raw_dir, data_dir)
# categorize(state, args)
print("\n".join([t.desc() for t in trs]))
def categorize(state, args):
def categorize(args, db):
"""Categorization
Automatically categorizes transactions based on the regex of each
category. Manually present the remaining to the user.
category. Manually present the remaining to the user
Args:
state (PFState): Internal state of the program
args (dict): argparse variables
"""
transactions = load_transactions(state.data_dir)
missing = tools.auto_categorization(state, transactions)
if missing:
tools.manual_categorization(state, transactions)
save_transactions(state.data_dir, transactions)
categorize_data(db)
def vacation(state, args):
@ -261,5 +221,6 @@ def f_report(state, args):
def run():
db = DBManager("transactions.db")
args = argparser().parse_args()
args.func(args)
args.func(args, db)

View File

@ -1,138 +0,0 @@
from pathlib import Path
import datetime as dt
import shutil
from .categories import Categories, Null, Travel, get_categories
from .parsers import parse_data
from .state import PFState
from .transactions import (
Transaction,
load_transactions,
read_transactions,
write_transactions,
)
DIR = ".pfbudget/"
STATE_FILE = DIR + "state"
BACKUP_DIR = DIR + "backup/"
def get_filename(t: Transaction):
return "{}_{}.csv".format(t.year, t.bank)
def backup(state: PFState):
transactions = load_transactions(state.data_dir)
filename = (
BACKUP_DIR
+ "transactions_"
+ dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss")
+ ".csv"
)
write_transactions(Path(filename), transactions)
state.last_backup = filename
def full_backup(state: PFState):
filename = BACKUP_DIR + dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss")
shutil.copytree(state.data_dir, Path(filename))
state.last_datadir_backup = filename
def restore(state: PFState):
if not state.last_datadir_backup:
print("No data directory backup exists")
return
if Path(state.data_dir).is_dir():
option = input(
"A data directory already exists at {}/ . Are you sure you want to restore the last backup? (Y/N) ".format(
state.data_dir
)
)
if option.lower() == "y" or option.lower() == "yes":
shutil.rmtree(state.data_dir)
shutil.copytree(state.last_datadir_backup, state.data_dir)
elif option.lower() == "n" or option.lower() == "no":
return
else:
print("Invalid choice")
return
def parser(state: PFState, raw_dir=None, data_dir=None):
raw = Path(state.raw_dir) if not raw_dir else Path(raw_dir)
dat = Path(state.data_dir) if not data_dir else Path(data_dir)
new_transactions = {}
for rf in raw.iterdir():
if rf.name not in state.raw_files:
new_transactions[rf.name] = parse_data(rf)
state.raw_files.append(rf.name)
# really, really bad optimized file append
for _, transactions in new_transactions.items():
for transaction in transactions:
filename = get_filename(transaction)
old = read_transactions(dat / filename)
old.append(transaction)
old.sort()
write_transactions(dat / filename, old)
if filename not in state.data_files:
state.data_files.append(filename)
state._save() # append to list doesn't trigger setter
def auto_categorization(state: PFState, transactions: list) -> bool:
null = Null()
nulls = null.search_all(transactions)
travel = Travel()
travels = []
missing = False
for vacation in state.vacations:
t = travel.search_all(transactions, vacation[0], vacation[1])
travels.extend(t)
for transaction in transactions:
if not transaction.category:
for category in [category() for category in Categories.get_categories()]:
if category.search(transaction):
transaction.category = category.name
if (
transaction in travels
and transaction.category not in travel.not_in_travel
):
if transaction.category != travel.name:
transaction.category = travel.name
if transaction in nulls:
if transaction.category != null.name:
transaction.category = null.name
if not transaction.category:
missing = True
return missing
def manual_categorization(state: PFState, transactions: list):
print(
"Please categorize the following transactions. If you want to exit, write 'quit'"
)
for transaction in transactions:
while not transaction.category:
category = input(f"{transaction.desc()} category: ")
if category == "quit":
return
if category not in get_categories():
print(
f"Category {category} doesn't exist. Please use one of {get_categories()}"
)
continue
else:
transaction.category = category