budget/pfbudget/core/manager.py
Luís Murta 94322ae542
Generalize insert in new Database interface
Move over all inserts on manager.py to new interface.
2023-04-29 20:20:20 +01:00

448 lines
17 KiB
Python

import csv
import json
from pathlib import Path
import pickle
from typing import Optional
import webbrowser
from pfbudget.common.types import Operation
from pfbudget.db.client import Client
from pfbudget.db.model import (
Bank,
BankTransaction,
Category,
CategoryGroup,
CategoryRule,
CategorySchedule,
CategorySelector,
Link,
MoneyTransaction,
Nordigen,
Rule,
Selector_T,
SplitTransaction,
Tag,
TagRule,
Transaction,
TransactionCategory,
)
from pfbudget.db.postgresql import DbClient
from pfbudget.extract.nordigen import NordigenClient, NordigenCredentialsManager
from pfbudget.extract.parsers import parse_data
from pfbudget.extract.psd2 import PSD2Extractor
from pfbudget.transform.categorizer import Categorizer
from pfbudget.transform.nullifier import Nullifier
from pfbudget.transform.tagger import Tagger
class Manager:
def __init__(self, db: str, verbosity: int = 0):
self._db = db
self._database: Optional[Client] = None
self._verbosity = verbosity
def action(self, op: Operation, params=None):
if self._verbosity > 0:
print(f"op={op}, params={params}")
if params is None:
params = []
match (op):
case Operation.Init:
pass
case Operation.Transactions:
return [t.format for t in self.database.select(Transaction)]
case Operation.Parse:
# Adapter for the parse_data method. Can be refactored.
args = {"bank": params[1], "creditcard": params[2], "category": None}
transactions = []
for path in [Path(p) for p in params[0]]:
if path.is_dir():
for file in path.iterdir():
transactions.extend(self.parse(file, args))
elif path.is_file():
transactions.extend(self.parse(path, args))
else:
raise FileNotFoundError(path)
if (
len(transactions) > 0
and input(f"{transactions[:5]}\nCommit? (y/n)") == "y"
):
self.database.insert(sorted(transactions))
case Operation.Download:
if params[3]:
values = params[3]
banks = self.database.select(Bank, lambda: Bank.name.in_(values))
else:
banks = self.database.select(Bank, Bank.nordigen)
client = Manager.nordigen_client()
extractor = PSD2Extractor(client)
transactions = []
for bank in banks:
transactions.extend(extractor.extract(bank, params[0], params[1]))
# dry-run
if not params[2]:
self.database.insert(sorted(transactions))
else:
print(sorted(transactions))
case Operation.Categorize:
with self.database.session as session:
uncategorized = session.select(
BankTransaction, lambda: ~BankTransaction.category.has()
)
categories = session.select(Category)
tags = session.select(Tag)
rules = [cat.rules for cat in categories if cat.name == "null"]
Nullifier(rules).transform_inplace(uncategorized)
rules = [rule for cat in categories for rule in cat.rules]
Categorizer(rules).transform_inplace(uncategorized)
rules = [rule for tag in tags for rule in tag.rules]
Tagger(rules).transform_inplace(uncategorized)
case Operation.BankMod:
with self.db.session() as session:
session.update(Bank, params)
case Operation.PSD2Mod:
with self.db.session() as session:
session.update(Nordigen, params)
case Operation.BankDel:
with self.db.session() as session:
session.remove_by_name(Bank, params)
case Operation.PSD2Del:
with self.db.session() as session:
session.remove_by_name(Nordigen, params)
case Operation.Token:
Manager.nordigen_client().generate_token()
case Operation.RequisitionId:
link, _ = Manager.nordigen_client().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
webbrowser.open(link)
case Operation.PSD2CountryBanks:
banks = Manager.nordigen_client().country_banks(params[0])
print(banks)
case (
Operation.BankAdd
| Operation.CategoryAdd
| Operation.GroupAdd
| Operation.PSD2Add
| Operation.RuleAdd
| Operation.TagAdd
| Operation.TagRuleAdd
):
self.database.insert(params)
case Operation.CategoryUpdate:
with self.db.session() as session:
session.updategroup(*params)
case Operation.CategoryRemove:
with self.db.session() as session:
session.remove_by_name(Category, params)
case Operation.CategorySchedule:
with self.db.session() as session:
session.updateschedules(params)
case Operation.RuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.remove_by_id(CategoryRule, params)
case Operation.TagRemove:
with self.db.session() as session:
session.remove_by_name(Tag, params)
case Operation.TagRuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.remove_by_id(TagRule, params)
case Operation.RuleModify | Operation.TagRuleModify:
assert all(isinstance(param, dict) for param in params)
with self.db.session() as session:
session.update(Rule, params)
case Operation.GroupRemove:
assert all(isinstance(param, CategoryGroup) for param in params)
with self.db.session() as session:
session.remove_by_name(CategoryGroup, params)
case Operation.Forge:
if not (
isinstance(params[0], int)
and all(isinstance(p, int) for p in params[1])
):
raise TypeError("f{params} are not transaction ids")
with self.db.session() as session:
original = session.get(Transaction, Transaction.id, params[0])[0]
links = session.get(Transaction, Transaction.id, params[1])
if not original.category:
original.category = self.askcategory(original)
for link in links:
if (
not link.category
or link.category.name != original.category.name
):
print(
f"{link} category will change to"
f" {original.category.name}"
)
link.category = original.category
tobelinked = [Link(original.id, link.id) for link in links]
session.insert(tobelinked)
case Operation.Dismantle:
assert all(isinstance(param, Link) for param in params)
with self.db.session() as session:
original = params[0].original
links = [link.link for link in params]
session.remove_links(original, links)
case Operation.Split:
if len(params) < 1 and not all(
isinstance(p, Transaction) for p in params
):
raise TypeError(f"{params} are not transactions")
# t -> t1, t2, t3; t.value == Σti.value
original: Transaction = params[0]
if not original.amount == sum(t.amount for t in params[1:]):
raise ValueError(
f"{original.amount}€ != {sum(v for v, _ in params[1:])}"
)
with self.db.session() as session:
originals = session.get(Transaction, Transaction.id, [original.id])
assert len(originals) == 1, ">1 transactions matched {original.id}!"
originals[0].split = True
transactions = []
for t in params[1:]:
if originals[0].date != t.date:
t.date = originals[0].date
print(
f"{t.date} is different from original date"
f" {originals[0].date}, using original"
)
splitted = SplitTransaction(
t.date, t.description, t.amount, originals[0].id
)
splitted.category = t.category
transactions.append(splitted)
session.insert(transactions)
case Operation.Export:
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(Transaction, session)
)
case Operation.Import:
transactions = []
for row in self.load(params[0], params[1]):
match row["type"]:
case "bank":
transaction = BankTransaction(
row["date"],
row["description"],
row["amount"],
row["bank"],
)
case "money":
transaction = MoneyTransaction(
row["date"], row["description"], row["amount"]
)
# TODO case "split" how to match to original transaction?? also
# save ids?
case _:
continue
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"],
CategorySelector(category["selector"]["selector"]),
)
transactions.append(transaction)
if self.certify(transactions):
with self.db.session() as session:
session.insert(transactions)
case Operation.ExportBanks:
with self.database.session as session:
self.dump(params[0], params[1], self.database.select(Bank, session))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0], params[1]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = Nordigen(**row["nordigen"])
banks.append(bank)
if self.certify(banks):
with self.db.session() as session:
session.insert(banks)
case Operation.ExportCategoryRules:
with self.database.session as session:
self.dump(
params[0],
params[1],
self.database.select(CategoryRule, session),
)
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
session.insert(rules)
case Operation.ExportTagRules:
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(TagRule, session)
)
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
session.insert(rules)
case Operation.ExportCategories:
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(Category, session)
)
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
categories = []
for row in self.load(params[0], params[1]):
category = Category(row["name"], row["group"])
if len(row["rules"]) > 0:
# Only category rules could have been created with a rule
rules = row["rules"]
for rule in rules:
del rule["type"]
category.rules = set(CategoryRule(**rule) for rule in rules)
if row["schedule"]:
category.schedule = CategorySchedule(**row["schedule"])
categories.append(category)
if self.certify(categories):
with self.db.session() as session:
session.insert(categories)
case Operation.ExportCategoryGroups:
with self.database.session as session:
self.dump(
params[0],
params[1],
self.database.select(CategoryGroup, session),
)
case Operation.ImportCategoryGroups:
groups = [
CategoryGroup(**row) for row in self.load(params[0], params[1])
]
if self.certify(groups):
with self.db.session() as session:
session.insert(groups)
def parse(self, filename: Path, args: dict):
return parse_data(filename, args)
def askcategory(self, transaction: Transaction):
selector = CategorySelector(Selector_T.manual)
with self.db.session() as session:
categories = session.get(Category)
while True:
category = input(f"{transaction}: ")
if category in [c.name for c in categories]:
return TransactionCategory(category, selector)
@staticmethod
def dump(fn, format, sequence):
if format == "pickle":
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
elif format == "csv":
with open(fn, "w", newline="") as f:
csv.writer(f).writerows([e.format.values() for e in sequence])
elif format == "json":
with open(fn, "w", newline="") as f:
json.dump([e.format for e in sequence], f, indent=4, default=str)
else:
print("format not well specified")
@staticmethod
def load(fn, format):
if format == "pickle":
with open(fn, "rb") as f:
return pickle.load(f)
elif format == "csv":
raise Exception("CSV import not supported")
else:
print("format not well specified")
return []
@staticmethod
def certify(imports: list) -> bool:
if input(f"{imports[:10]}\nDoes the import seem correct? (y/n)") == "y":
return True
return False
@property
def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 2)
@property
def database(self) -> Client:
if not self._database:
self._database = Client(self._db, echo=self._verbosity > 2)
return self._database
@db.setter
def db(self, url: str):
self._db = url
@staticmethod
def nordigen_client() -> NordigenClient:
return NordigenClient(NordigenCredentialsManager.default)