budget/pfbudget/core/manager.py
Luís Murta 95eff24418
Remove null rules from Categorizer
and allow for multiple null rules on the Nullifier.

Also create an empty list on the Nullifier constructor to simplify the
logic afterwards, in the case no "null" rule exist.
2024-01-22 22:02:09 +00:00

412 lines
15 KiB
Python

import json
from pathlib import Path
import pickle
from typing import Optional
import webbrowser
from pfbudget.common.types import Operation
from pfbudget.db.client import Client
from pfbudget.db.model import (
Bank,
BankTransaction,
Category,
CategoryGroup,
CategoryRule,
CategorySchedule,
Link,
MoneyTransaction,
NordigenBank,
Rule,
CategorySelector,
SplitTransaction,
Tag,
TagRule,
Transaction,
TransactionCategory,
)
from pfbudget.extract.nordigen import NordigenClient, NordigenCredentialsManager
from pfbudget.extract.parsers import parse_data
from pfbudget.extract.psd2 import PSD2Extractor
from pfbudget.load.database import DatabaseLoader
from pfbudget.transform.categorizer import Categorizer
from pfbudget.transform.nullifier import Nullifier
from pfbudget.transform.tagger import Tagger
class Manager:
def __init__(self, db: str, verbosity: int = 0):
self._db = db
self._database: Optional[Client] = None
self._verbosity = verbosity
def action(self, op: Operation, params=None):
if self._verbosity > 0:
print(f"op={op}, params={params}")
if params is None:
params = []
match (op):
case Operation.Init:
pass
case Operation.Transactions:
return [t.format for t in self.database.select(Transaction)]
case Operation.Parse:
# Adapter for the parse_data method. Can be refactored.
args = {"bank": params[1], "creditcard": params[2], "category": None}
transactions = []
for path in [Path(p) for p in params[0]]:
if path.is_dir():
for file in path.iterdir():
transactions.extend(self.parse(file, args))
elif path.is_file():
transactions.extend(self.parse(path, args))
else:
raise FileNotFoundError(path)
if (
len(transactions) > 0
and input(f"{transactions[:5]}\nCommit? (y/n)") == "y"
):
self.database.insert(sorted(transactions))
case Operation.Download:
if params[3]:
values = params[3]
banks = self.database.select(Bank, lambda: Bank.name.in_(values))
else:
banks = self.database.select(Bank, Bank.nordigen)
extractor = PSD2Extractor(self.nordigen_client())
transactions = []
for bank in banks:
transactions.extend(extractor.extract(bank, params[0], params[1]))
# dry-run
if params[2]:
print(sorted(transactions))
return
loader = DatabaseLoader(self.database)
loader.load(sorted(transactions))
case Operation.Categorize:
with self.database.session as session:
uncategorized = session.select(
BankTransaction, lambda: ~BankTransaction.category.has()
)
categories = session.select(Category)
tags = session.select(Tag)
rules = [
rule
for cat in categories
if cat.name == "null"
for rule in cat.rules
]
Nullifier(rules).transform_inplace(uncategorized)
rules = [
rule
for cat in categories
if cat.name != "null"
for rule in cat.rules
]
Categorizer(rules).transform_inplace(uncategorized)
rules = [rule for tag in tags for rule in tag.rules]
Tagger(rules).transform_inplace(uncategorized)
case Operation.BankMod:
self.database.update(Bank, params)
case Operation.PSD2Mod:
self.database.update(NordigenBank, params)
case Operation.BankDel:
self.database.delete(Bank, Bank.name, params)
case Operation.PSD2Del:
self.database.delete(NordigenBank, NordigenBank.name, params)
case Operation.RequisitionId:
bank_name = params[0]
bank = self.database.select(Bank, (lambda: Bank.name == bank_name))[0]
if not bank.nordigen or not bank.nordigen.bank_id:
raise ValueError(f"{bank} doesn't have a Nordigen ID")
link, req_id = self.nordigen_client().new_requisition(
bank.nordigen.bank_id
)
self.database.update(
NordigenBank,
[{"name": bank.nordigen.name, "requisition_id": req_id}],
)
webbrowser.open(link)
case Operation.PSD2CountryBanks:
banks = self.nordigen_client().country_banks(params[0])
print(banks)
case (
Operation.BankAdd
| Operation.CategoryAdd
| Operation.GroupAdd
| Operation.PSD2Add
| Operation.RuleAdd
| Operation.TagAdd
| Operation.TagRuleAdd
):
self.database.insert(params)
case Operation.CategoryUpdate:
self.database.update(Category, params)
case Operation.CategoryRemove:
self.database.delete(Category, Category.name, params)
case Operation.CategorySchedule:
raise NotImplementedError
case Operation.RuleRemove:
self.database.delete(CategoryRule, CategoryRule.id, params)
case Operation.TagRemove:
self.database.delete(Tag, Tag.name, params)
case Operation.TagRuleRemove:
self.database.delete(TagRule, TagRule.id, params)
case Operation.RuleModify | Operation.TagRuleModify:
self.database.update(Rule, params)
case Operation.GroupRemove:
self.database.delete(CategoryGroup, CategoryGroup.name, params)
case Operation.Forge:
if not (
isinstance(params[0], int)
and all(isinstance(p, int) for p in params[1])
):
raise TypeError("f{params} are not transaction ids")
with self.database.session as session:
id = params[0]
original = session.select(
Transaction, lambda: Transaction.id == id
)[0]
ids = params[1]
links = session.select(Transaction, lambda: Transaction.id.in_(ids))
if not original.category:
original.category = self.askcategory(original)
for link in links:
if (
not link.category
or link.category.name != original.category.name
):
print(
f"{link} category will change to"
f" {original.category.name}"
)
link.category = original.category
tobelinked = [Link(original.id, link.id) for link in links]
session.insert(tobelinked)
case Operation.Dismantle:
raise NotImplementedError
case Operation.Split:
if len(params) < 1 and not all(
isinstance(p, Transaction) for p in params
):
raise TypeError(f"{params} are not transactions")
# t -> t1, t2, t3; t.value == Σti.value
original: Transaction = params[0]
if not original.amount == sum(t.amount for t in params[1:]):
raise ValueError(
f"{original.amount}€ != {sum(v for v, _ in params[1:])}"
)
with self.database.session as session:
originals = session.select(
Transaction, lambda: Transaction.id == original.id
)
assert len(originals) == 1, ">1 transactions matched {original.id}!"
originals[0].split = True
transactions = []
for t in params[1:]:
if originals[0].date != t.date:
t.date = originals[0].date
print(
f"{t.date} is different from original date"
f" {originals[0].date}, using original"
)
splitted = SplitTransaction(
t.date, t.description, t.amount, originals[0].id
)
splitted.category = t.category
transactions.append(splitted)
session.insert(transactions)
case Operation.Export:
self.dump(params[0], params[1], self.database.select(Transaction))
case Operation.Import:
transactions = []
for row in self.load(params[0], params[1]):
match row["type"]:
case "bank":
transaction = BankTransaction(
row["date"],
row["description"],
row["amount"],
row["bank"],
)
case "money":
transaction = MoneyTransaction(
row["date"], row["description"], row["amount"]
)
# TODO case "split" how to match to original transaction?? also
# save ids?
case _:
continue
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"], category["selector"]["selector"]
)
transactions.append(transaction)
if self.certify(transactions):
self.database.insert(transactions)
case Operation.ExportBanks:
self.dump(params[0], params[1], self.database.select(Bank))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0], params[1]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = NordigenBank(**row["nordigen"])
banks.append(bank)
if self.certify(banks):
self.database.insert(banks)
case Operation.ExportCategoryRules:
self.dump(params[0], params[1], self.database.select(CategoryRule))
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
self.database.insert(rules)
case Operation.ExportTagRules:
self.dump(params[0], params[1], self.database.select(TagRule))
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
self.database.insert(rules)
case Operation.ExportCategories:
self.dump(params[0], params[1], self.database.select(Category))
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
categories = []
for row in self.load(params[0], params[1]):
category = Category(row["name"], row["group"])
if len(row["rules"]) > 0:
# Only category rules could have been created with a rule
rules = row["rules"]
for rule in rules:
del rule["type"]
category.rules = [CategoryRule(**rule) for rule in rules]
if row["schedule"]:
category.schedule = CategorySchedule(**row["schedule"])
categories.append(category)
if self.certify(categories):
self.database.insert(categories)
case Operation.ExportCategoryGroups:
self.dump(params[0], params[1], self.database.select(CategoryGroup))
case Operation.ImportCategoryGroups:
groups = [
CategoryGroup(**row) for row in self.load(params[0], params[1])
]
if self.certify(groups):
self.database.insert(groups)
def parse(self, filename: Path, args: dict):
return parse_data(filename, args)
def askcategory(self, transaction: Transaction):
selector = CategorySelector.manual
categories = self.database.select(Category)
while True:
category = input(f"{transaction}: ")
if category in [c.name for c in categories]:
return TransactionCategory(category, selector)
@staticmethod
def dump(fn, format, sequence):
if format == "pickle":
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
elif format == "json":
with open(fn, "w", newline="") as f:
json.dump([e.format for e in sequence], f, indent=4, default=str)
else:
print("format not well specified")
@staticmethod
def load(fn, format):
if format == "pickle":
with open(fn, "rb") as f:
return pickle.load(f)
else:
print("format not well specified")
return []
@staticmethod
def certify(imports: list) -> bool:
if input(f"{imports[:10]}\nDoes the import seem correct? (y/n)") == "y":
return True
return False
@property
def database(self) -> Client:
if not self._database:
self._database = Client(self._db, echo=self._verbosity > 2)
return self._database
def nordigen_client(self) -> NordigenClient:
return NordigenClient(NordigenCredentialsManager.default, self.database)