Swap almost all remaining calls to the old postgresql only DB class with the new DB client. Warning! Some operations are currently not implement, such as setting category schedules and dismantling links. `update` and `delete` methods added to DB `Client`.
418 lines
15 KiB
Python
418 lines
15 KiB
Python
import csv
|
|
import json
|
|
from pathlib import Path
|
|
import pickle
|
|
from typing import Optional
|
|
import webbrowser
|
|
|
|
from pfbudget.common.types import Operation
|
|
from pfbudget.db.client import Client
|
|
from pfbudget.db.model import (
|
|
Bank,
|
|
BankTransaction,
|
|
Category,
|
|
CategoryGroup,
|
|
CategoryRule,
|
|
CategorySchedule,
|
|
CategorySelector,
|
|
Link,
|
|
MoneyTransaction,
|
|
Nordigen,
|
|
Rule,
|
|
Selector_T,
|
|
SplitTransaction,
|
|
Tag,
|
|
TagRule,
|
|
Transaction,
|
|
TransactionCategory,
|
|
)
|
|
from pfbudget.extract.nordigen import NordigenClient, NordigenCredentialsManager
|
|
from pfbudget.extract.parsers import parse_data
|
|
from pfbudget.extract.psd2 import PSD2Extractor
|
|
from pfbudget.transform.categorizer import Categorizer
|
|
from pfbudget.transform.nullifier import Nullifier
|
|
from pfbudget.transform.tagger import Tagger
|
|
|
|
|
|
class Manager:
|
|
def __init__(self, db: str, verbosity: int = 0):
|
|
self._db = db
|
|
self._database: Optional[Client] = None
|
|
self._verbosity = verbosity
|
|
|
|
def action(self, op: Operation, params=None):
|
|
if self._verbosity > 0:
|
|
print(f"op={op}, params={params}")
|
|
|
|
if params is None:
|
|
params = []
|
|
|
|
match (op):
|
|
case Operation.Init:
|
|
pass
|
|
|
|
case Operation.Transactions:
|
|
return [t.format for t in self.database.select(Transaction)]
|
|
|
|
case Operation.Parse:
|
|
# Adapter for the parse_data method. Can be refactored.
|
|
args = {"bank": params[1], "creditcard": params[2], "category": None}
|
|
transactions = []
|
|
for path in [Path(p) for p in params[0]]:
|
|
if path.is_dir():
|
|
for file in path.iterdir():
|
|
transactions.extend(self.parse(file, args))
|
|
elif path.is_file():
|
|
transactions.extend(self.parse(path, args))
|
|
else:
|
|
raise FileNotFoundError(path)
|
|
|
|
if (
|
|
len(transactions) > 0
|
|
and input(f"{transactions[:5]}\nCommit? (y/n)") == "y"
|
|
):
|
|
self.database.insert(sorted(transactions))
|
|
|
|
case Operation.Download:
|
|
if params[3]:
|
|
values = params[3]
|
|
banks = self.database.select(Bank, lambda: Bank.name.in_(values))
|
|
else:
|
|
banks = self.database.select(Bank, Bank.nordigen)
|
|
|
|
client = Manager.nordigen_client()
|
|
extractor = PSD2Extractor(client)
|
|
transactions = []
|
|
for bank in banks:
|
|
transactions.extend(extractor.extract(bank, params[0], params[1]))
|
|
|
|
# dry-run
|
|
if not params[2]:
|
|
self.database.insert(sorted(transactions))
|
|
else:
|
|
print(sorted(transactions))
|
|
|
|
case Operation.Categorize:
|
|
with self.database.session as session:
|
|
uncategorized = session.select(
|
|
BankTransaction, lambda: ~BankTransaction.category.has()
|
|
)
|
|
categories = session.select(Category)
|
|
tags = session.select(Tag)
|
|
|
|
rules = [cat.rules for cat in categories if cat.name == "null"]
|
|
Nullifier(rules).transform_inplace(uncategorized)
|
|
|
|
rules = [rule for cat in categories for rule in cat.rules]
|
|
Categorizer(rules).transform_inplace(uncategorized)
|
|
|
|
rules = [rule for tag in tags for rule in tag.rules]
|
|
Tagger(rules).transform_inplace(uncategorized)
|
|
|
|
case Operation.BankMod:
|
|
self.database.update(Bank, params)
|
|
|
|
case Operation.PSD2Mod:
|
|
self.database.update(Nordigen, params)
|
|
|
|
case Operation.BankDel:
|
|
self.database.delete(Bank, Bank.name, params)
|
|
|
|
case Operation.PSD2Del:
|
|
self.database.delete(Nordigen, Nordigen.name, params)
|
|
|
|
case Operation.Token:
|
|
Manager.nordigen_client().generate_token()
|
|
|
|
case Operation.RequisitionId:
|
|
link, _ = Manager.nordigen_client().requisition(params[0], params[1])
|
|
print(f"Opening {link} to request access to {params[0]}")
|
|
webbrowser.open(link)
|
|
|
|
case Operation.PSD2CountryBanks:
|
|
banks = Manager.nordigen_client().country_banks(params[0])
|
|
print(banks)
|
|
|
|
case (
|
|
Operation.BankAdd
|
|
| Operation.CategoryAdd
|
|
| Operation.GroupAdd
|
|
| Operation.PSD2Add
|
|
| Operation.RuleAdd
|
|
| Operation.TagAdd
|
|
| Operation.TagRuleAdd
|
|
):
|
|
self.database.insert(params)
|
|
|
|
case Operation.CategoryUpdate:
|
|
self.database.update(Category, params)
|
|
|
|
case Operation.CategoryRemove:
|
|
self.database.delete(Category, Category.name, params)
|
|
|
|
case Operation.CategorySchedule:
|
|
raise NotImplementedError
|
|
|
|
case Operation.RuleRemove:
|
|
self.database.delete(CategoryRule, CategoryRule.id, params)
|
|
|
|
case Operation.TagRemove:
|
|
self.database.delete(Tag, Tag.name, params)
|
|
|
|
case Operation.TagRuleRemove:
|
|
self.database.delete(TagRule, TagRule.id, params)
|
|
|
|
case Operation.RuleModify | Operation.TagRuleModify:
|
|
self.database.update(Rule, params)
|
|
|
|
case Operation.GroupRemove:
|
|
self.database.delete(CategoryGroup, CategoryGroup.name, params)
|
|
|
|
case Operation.Forge:
|
|
if not (
|
|
isinstance(params[0], int)
|
|
and all(isinstance(p, int) for p in params[1])
|
|
):
|
|
raise TypeError("f{params} are not transaction ids")
|
|
|
|
with self.database.session as session:
|
|
id = params[0]
|
|
original = session.select(
|
|
Transaction, lambda: Transaction.id == id
|
|
)[0]
|
|
|
|
ids = params[1]
|
|
links = session.select(Transaction, lambda: Transaction.id.in_(ids))
|
|
|
|
if not original.category:
|
|
original.category = self.askcategory(original)
|
|
|
|
for link in links:
|
|
if (
|
|
not link.category
|
|
or link.category.name != original.category.name
|
|
):
|
|
print(
|
|
f"{link} category will change to"
|
|
f" {original.category.name}"
|
|
)
|
|
link.category = original.category
|
|
|
|
tobelinked = [Link(original.id, link.id) for link in links]
|
|
session.insert(tobelinked)
|
|
|
|
case Operation.Dismantle:
|
|
raise NotImplementedError
|
|
|
|
case Operation.Split:
|
|
if len(params) < 1 and not all(
|
|
isinstance(p, Transaction) for p in params
|
|
):
|
|
raise TypeError(f"{params} are not transactions")
|
|
|
|
# t -> t1, t2, t3; t.value == Σti.value
|
|
original: Transaction = params[0]
|
|
if not original.amount == sum(t.amount for t in params[1:]):
|
|
raise ValueError(
|
|
f"{original.amount}€ != {sum(v for v, _ in params[1:])}€"
|
|
)
|
|
|
|
with self.database.session as session:
|
|
originals = session.select(
|
|
Transaction, lambda: Transaction.id == original.id
|
|
)
|
|
assert len(originals) == 1, ">1 transactions matched {original.id}!"
|
|
|
|
originals[0].split = True
|
|
transactions = []
|
|
for t in params[1:]:
|
|
if originals[0].date != t.date:
|
|
t.date = originals[0].date
|
|
print(
|
|
f"{t.date} is different from original date"
|
|
f" {originals[0].date}, using original"
|
|
)
|
|
|
|
splitted = SplitTransaction(
|
|
t.date, t.description, t.amount, originals[0].id
|
|
)
|
|
splitted.category = t.category
|
|
transactions.append(splitted)
|
|
|
|
session.insert(transactions)
|
|
|
|
case Operation.Export:
|
|
with self.database.session as session:
|
|
self.dump(
|
|
params[0], params[1], self.database.select(Transaction, session)
|
|
)
|
|
|
|
case Operation.Import:
|
|
transactions = []
|
|
for row in self.load(params[0], params[1]):
|
|
match row["type"]:
|
|
case "bank":
|
|
transaction = BankTransaction(
|
|
row["date"],
|
|
row["description"],
|
|
row["amount"],
|
|
row["bank"],
|
|
)
|
|
|
|
case "money":
|
|
transaction = MoneyTransaction(
|
|
row["date"], row["description"], row["amount"]
|
|
)
|
|
|
|
# TODO case "split" how to match to original transaction?? also
|
|
# save ids?
|
|
case _:
|
|
continue
|
|
|
|
if category := row.pop("category", None):
|
|
transaction.category = TransactionCategory(
|
|
category["name"],
|
|
CategorySelector(category["selector"]["selector"]),
|
|
)
|
|
|
|
transactions.append(transaction)
|
|
|
|
if self.certify(transactions):
|
|
self.database.insert(transactions)
|
|
|
|
case Operation.ExportBanks:
|
|
with self.database.session as session:
|
|
self.dump(params[0], params[1], self.database.select(Bank, session))
|
|
|
|
case Operation.ImportBanks:
|
|
banks = []
|
|
for row in self.load(params[0], params[1]):
|
|
bank = Bank(row["name"], row["BIC"], row["type"])
|
|
if row["nordigen"]:
|
|
bank.nordigen = Nordigen(**row["nordigen"])
|
|
banks.append(bank)
|
|
|
|
if self.certify(banks):
|
|
self.database.insert(banks)
|
|
|
|
case Operation.ExportCategoryRules:
|
|
with self.database.session as session:
|
|
self.dump(
|
|
params[0],
|
|
params[1],
|
|
self.database.select(CategoryRule, session),
|
|
)
|
|
|
|
case Operation.ImportCategoryRules:
|
|
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
|
|
|
|
if self.certify(rules):
|
|
self.database.insert(rules)
|
|
|
|
case Operation.ExportTagRules:
|
|
with self.database.session as session:
|
|
self.dump(
|
|
params[0], params[1], self.database.select(TagRule, session)
|
|
)
|
|
|
|
case Operation.ImportTagRules:
|
|
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
|
|
|
|
if self.certify(rules):
|
|
self.database.insert(rules)
|
|
|
|
case Operation.ExportCategories:
|
|
with self.database.session as session:
|
|
self.dump(
|
|
params[0], params[1], self.database.select(Category, session)
|
|
)
|
|
|
|
case Operation.ImportCategories:
|
|
# rules = [Category(**row) for row in self.load(params[0])]
|
|
categories = []
|
|
for row in self.load(params[0], params[1]):
|
|
category = Category(row["name"], row["group"])
|
|
if len(row["rules"]) > 0:
|
|
# Only category rules could have been created with a rule
|
|
rules = row["rules"]
|
|
for rule in rules:
|
|
del rule["type"]
|
|
|
|
category.rules = set(CategoryRule(**rule) for rule in rules)
|
|
if row["schedule"]:
|
|
category.schedule = CategorySchedule(**row["schedule"])
|
|
categories.append(category)
|
|
|
|
if self.certify(categories):
|
|
self.database.insert(categories)
|
|
|
|
case Operation.ExportCategoryGroups:
|
|
with self.database.session as session:
|
|
self.dump(
|
|
params[0],
|
|
params[1],
|
|
self.database.select(CategoryGroup, session),
|
|
)
|
|
|
|
case Operation.ImportCategoryGroups:
|
|
groups = [
|
|
CategoryGroup(**row) for row in self.load(params[0], params[1])
|
|
]
|
|
|
|
if self.certify(groups):
|
|
self.database.insert(groups)
|
|
|
|
def parse(self, filename: Path, args: dict):
|
|
return parse_data(filename, args)
|
|
|
|
def askcategory(self, transaction: Transaction):
|
|
selector = CategorySelector(Selector_T.manual)
|
|
|
|
categories = self.database.select(Category)
|
|
|
|
while True:
|
|
category = input(f"{transaction}: ")
|
|
if category in [c.name for c in categories]:
|
|
return TransactionCategory(category, selector)
|
|
|
|
@staticmethod
|
|
def dump(fn, format, sequence):
|
|
if format == "pickle":
|
|
with open(fn, "wb") as f:
|
|
pickle.dump([e.format for e in sequence], f)
|
|
elif format == "csv":
|
|
with open(fn, "w", newline="") as f:
|
|
csv.writer(f).writerows([e.format.values() for e in sequence])
|
|
elif format == "json":
|
|
with open(fn, "w", newline="") as f:
|
|
json.dump([e.format for e in sequence], f, indent=4, default=str)
|
|
else:
|
|
print("format not well specified")
|
|
|
|
@staticmethod
|
|
def load(fn, format):
|
|
if format == "pickle":
|
|
with open(fn, "rb") as f:
|
|
return pickle.load(f)
|
|
elif format == "csv":
|
|
raise Exception("CSV import not supported")
|
|
else:
|
|
print("format not well specified")
|
|
return []
|
|
|
|
@staticmethod
|
|
def certify(imports: list) -> bool:
|
|
if input(f"{imports[:10]}\nDoes the import seem correct? (y/n)") == "y":
|
|
return True
|
|
return False
|
|
|
|
@property
|
|
def database(self) -> Client:
|
|
if not self._database:
|
|
self._database = Client(self._db, echo=self._verbosity > 2)
|
|
return self._database
|
|
|
|
@staticmethod
|
|
def nordigen_client() -> NordigenClient:
|
|
return NordigenClient(NordigenCredentialsManager.default)
|