Compare commits

...

5 Commits

Author SHA1 Message Date
ed2dda63e9
Allows using rules for the nullying step 2023-02-23 23:24:01 +00:00
1a774e3769
Adds get all transactions operation 2023-02-23 23:23:19 +00:00
dd724b6c28
Export in .csv
Importing is not supported, since there's no way to represent a Null
field in .csv
2023-02-23 23:21:54 +00:00
6f68d971ee
Clear up forge/dismantle logic 2023-02-11 22:48:04 +00:00
f7df033d58
Add start date rule
Rename date to end.
2023-02-11 22:46:41 +00:00
8 changed files with 196 additions and 81 deletions

View File

@ -0,0 +1,32 @@
"""Start/End date rule
Revision ID: 952de57a3c43
Revises: 18572111d9ff
Create Date: 2023-02-06 21:57:57.545327+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "952de57a3c43"
down_revision = "18572111d9ff"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"rules", sa.Column("start", sa.Date(), nullable=True), schema="transactions"
)
op.alter_column("rules", column_name="date", new_column_name="end", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column("rules", column_name="end", new_column_name="date", schema="transactions")
op.drop_column("rules", "start", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,6 +1,3 @@
from decimal import Decimal
from typing import Sequence
from pfbudget.cli.argparser import argparser
from pfbudget.cli.interactive import Interactive
from pfbudget.common.types import Operation
@ -146,12 +143,13 @@ if __name__ == "__main__":
]
case Operation.RuleAdd:
keys = {"category", "date", "description", "bank", "min", "max"}
keys = {"category", "start", "end", "description", "regex", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.CategoryRule(
args["date"][0] if args["date"] else None,
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
@ -197,12 +195,13 @@ if __name__ == "__main__":
params = [type.Tag(tag) for tag in args["tag"]]
case Operation.TagRuleAdd:
keys = {"tag", "date", "description", "bank", "min", "max"}
keys = {"tag", "start", "end", "description", "regex", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.TagRule(
args["date"][0] if args["date"] else None,
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
@ -238,7 +237,7 @@ if __name__ == "__main__":
keys = {"original", "links"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Link(args["original"][0], link) for link in args["links"]]
params = [args["original"][0], args["links"]]
case (
Operation.Export
@ -254,9 +253,9 @@ if __name__ == "__main__":
| Operation.ExportCategoryGroups
| Operation.ImportCategoryGroups
):
keys = {"file"}
keys = {"file", "format"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["file"]
params = [args["file"][0], args["format"][0]]
Manager(db, verbosity).action(op, params)

View File

@ -63,7 +63,7 @@ def argparser() -> argparse.ArgumentParser:
# Exports transactions to .csv file
export = subparsers.add_parser("export")
export.set_defaults(op=Operation.Export)
export_args(export)
file_options(export)
pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import)
@ -215,11 +215,11 @@ def bank(parser: argparse.ArgumentParser):
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportBanks)
export_args(export)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportBanks)
export_args(pimport)
file_options(pimport)
def nordigen(parser: argparse.ArgumentParser):
@ -276,11 +276,11 @@ def category(parser: argparse.ArgumentParser):
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategories)
export_args(export)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategories)
export_args(pimport)
file_options(pimport)
def category_group(parser: argparse.ArgumentParser):
@ -296,11 +296,11 @@ def category_group(parser: argparse.ArgumentParser):
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryGroups)
export_args(export)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryGroups)
export_args(pimport)
file_options(pimport)
def category_rule(parser: argparse.ArgumentParser):
@ -324,11 +324,11 @@ def category_rule(parser: argparse.ArgumentParser):
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryRules)
export_args(export)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryRules)
export_args(pimport)
file_options(pimport)
def tags(parser: argparse.ArgumentParser):
@ -366,15 +366,16 @@ def tag_rule(parser: argparse.ArgumentParser):
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportTagRules)
export_args(export)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportTagRules)
export_args(pimport)
file_options(pimport)
def rules(parser: argparse.ArgumentParser):
parser.add_argument("--date", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--start", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--end", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--description", nargs=1, type=str)
parser.add_argument("--regex", nargs=1, type=str)
parser.add_argument("--bank", nargs=1, type=str)
@ -396,5 +397,6 @@ def link(parser: argparse.ArgumentParser):
dismantle.add_argument("links", nargs="+", type=int)
def export_args(parser: argparse.ArgumentParser):
def file_options(parser: argparse.ArgumentParser):
parser.add_argument("file", nargs=1, type=str)
parser.add_argument("format", nargs=1, default="pickle")

View File

@ -6,6 +6,7 @@ from enum import Enum, auto
class Operation(Enum):
Init = auto()
Transactions = auto()
Parse = auto()
Download = auto()
Categorize = auto()

View File

@ -9,7 +9,7 @@ class Categorizer:
options = {}
def __init__(self):
self.options["null_days"] = 4
self.options["null_days"] = 3
def rules(
self,
@ -28,14 +28,20 @@ class Categorizer:
tags (Sequence[Tag]): currently available tags
"""
self._nullify(transactions)
try:
null = next(cat for cat in categories if cat.name == "null")
print("Nullifying")
self._nullify(transactions, null)
categories = [cat for cat in categories if cat.name != "null"]
except StopIteration:
print("Null category not defined")
self._rule_based_categories(transactions, categories)
self._rule_based_tags(transactions, tags)
@Timer(name="nullify")
def _nullify(self, transactions: Sequence[t.BankTransaction]):
print(f"Nullifying {len(transactions)} transactions")
def _nullify(self, transactions: Sequence[t.BankTransaction], null: t.Category):
count = 0
matching = []
for transaction in transactions:
@ -46,11 +52,13 @@ class Categorizer:
transaction.date - timedelta(days=self.options["null_days"])
<= cancel.date
<= transaction.date + timedelta(days=self.options["null_days"])
and transaction not in matching
and cancel not in matching
and cancel != transaction
and cancel.bank != transaction.bank
and cancel.amount == -transaction.amount
and transaction not in matching
and cancel not in matching
and all(r.matches(transaction) for r in null.rules)
and all(r.matches(cancel) for r in null.rules)
)
):
transaction.category = t.TransactionCategory(
@ -65,7 +73,7 @@ class Categorizer:
count += 2
break
print(f"Nullified {count} transactions")
print(f"Nullified {count} of {len(transactions)} transactions")
@Timer(name="categoryrules")
def _rule_based_categories(
@ -87,12 +95,14 @@ class Categorizer:
continue
# passed all conditions, assign category
if transaction.category:
if transaction.category.name == category.name:
continue
if (
transaction.category
and transaction.category.name == category.name
):
if (
input(f"Overwrite {transaction} with {category}? (y/n)")
input(
f"Overwrite {transaction} with {category.name}? (y/n)"
)
== "y"
):
transaction.category.name = category.name

View File

@ -1,3 +1,4 @@
import csv
from pathlib import Path
import pickle
import webbrowser
@ -17,6 +18,7 @@ from pfbudget.db.model import (
MoneyTransaction,
Nordigen,
Rule,
Selector_T,
SplitTransaction,
Tag,
TagRule,
@ -32,14 +34,23 @@ class Manager:
self._db = db
self._verbosity = verbosity
def action(self, op: Operation, params: list):
def action(self, op: Operation, params=None):
if self._verbosity > 0:
print(f"op={op}, params={params}")
if params is None:
params = []
match (op):
case Operation.Init:
pass
case Operation.Transactions:
with self.db.session() as session:
transactions = session.get(Transaction)
ret = [t.format for t in transactions]
return ret
case Operation.Parse:
# Adapter for the parse_data method. Can be refactored.
args = {"bank": params[1], "creditcard": params[2], "category": None}
@ -168,8 +179,32 @@ class Manager:
session.remove_by_name(CategoryGroup, params)
case Operation.Forge:
if not (
isinstance(params[0], int)
and all(isinstance(p, int) for p in params[1])
):
raise TypeError("f{params} are not transaction ids")
with self.db.session() as session:
session.add(params)
original = session.get(Transaction, Transaction.id, params[0])[0]
links = session.get(Transaction, Transaction.id, params[1])
if not original.category:
original.category = self.askcategory(original)
for link in links:
if (
not link.category
or link.category.name != original.category.name
):
print(
f"{link} category will change to"
f" {original.category.name}"
)
link.category = original.category
tobelinked = [Link(original.id, link.id) for link in links]
session.add(tobelinked)
case Operation.Dismantle:
assert all(isinstance(param, Link) for param in params)
@ -202,7 +237,8 @@ class Manager:
if originals[0].date != t.date:
t.date = originals[0].date
print(
f"{t.date} is different from original date {originals[0].date}, using original"
f"{t.date} is different from original date"
f" {originals[0].date}, using original"
)
splitted = SplitTransaction(
@ -215,11 +251,11 @@ class Manager:
case Operation.Export:
with self.db.session() as session:
self.dump(params[0], sorted(session.get(Transaction)))
self.dump(params[0], params[1], sorted(session.get(Transaction)))
case Operation.Import:
transactions = []
for row in self.load(params[0]):
for row in self.load(params[0], params[1]):
match row["type"]:
case "bank":
transaction = BankTransaction(
@ -252,11 +288,11 @@ class Manager:
case Operation.ExportBanks:
with self.db.session() as session:
self.dump(params[0], session.get(Bank))
self.dump(params[0], params[1], session.get(Bank))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0]):
for row in self.load(params[0], params[1]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = Nordigen(**row["nordigen"])
@ -268,10 +304,10 @@ class Manager:
case Operation.ExportCategoryRules:
with self.db.session() as session:
self.dump(params[0], session.get(CategoryRule))
self.dump(params[0], params[1], session.get(CategoryRule))
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0])]
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
@ -279,10 +315,10 @@ class Manager:
case Operation.ExportTagRules:
with self.db.session() as session:
self.dump(params[0], session.get(TagRule))
self.dump(params[0], params[1], session.get(TagRule))
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0])]
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
@ -290,12 +326,12 @@ class Manager:
case Operation.ExportCategories:
with self.db.session() as session:
self.dump(params[0], session.get(Category))
self.dump(params[0], params[1], session.get(Category))
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
categories = []
for row in self.load(params[0]):
for row in self.load(params[0], params[1]):
category = Category(row["name"], row["group"])
if len(row["rules"]) > 0:
# Only category rules could have been created with a rule
@ -314,10 +350,12 @@ class Manager:
case Operation.ExportCategoryGroups:
with self.db.session() as session:
self.dump(params[0], session.get(CategoryGroup))
self.dump(params[0], params[1], session.get(CategoryGroup))
case Operation.ImportCategoryGroups:
groups = [CategoryGroup(**row) for row in self.load(params[0])]
groups = [
CategoryGroup(**row) for row in self.load(params[0], params[1])
]
if self.certify(groups):
with self.db.session() as session:
@ -326,15 +364,38 @@ class Manager:
def parse(self, filename: Path, args: dict):
return parse_data(filename, args)
@staticmethod
def dump(fn, sequence):
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
def askcategory(self, transaction: Transaction):
selector = CategorySelector(Selector_T.manual)
with self.db.session() as session:
categories = session.get(Category)
while True:
category = input(f"{transaction}: ")
if category in [c.name for c in categories]:
return TransactionCategory(category, selector)
@staticmethod
def load(fn):
def dump(fn, format, sequence):
if format == "pickle":
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
elif format == "csv":
with open(fn, "w", newline="") as f:
csv.writer(f).writerows([e.format.values() for e in sequence])
else:
print("format not well specified")
@staticmethod
def load(fn, format):
if format == "pickle":
with open(fn, "rb") as f:
return pickle.load(f)
elif format == "csv":
raise Exception("CSV import not supported")
else:
print("format not well specified")
return []
@staticmethod
def certify(imports: list) -> bool:

View File

@ -51,7 +51,10 @@ class DbClient:
def get(self, type: Type[T], column=None, values=None) -> Sequence[T]:
if column is not None:
if values:
if isinstance(values, Sequence):
stmt = select(type).where(column.in_(values))
else:
stmt = select(type).where(column == values)
else:
stmt = select(type).where(column)
else:

View File

@ -335,7 +335,8 @@ class Rule(Base, Export):
__tablename__ = "rules"
id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[Optional[dt.date]]
start: Mapped[Optional[dt.date]]
end: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]]
regex: Mapped[Optional[str]]
bank: Mapped[Optional[str]]
@ -349,32 +350,34 @@ class Rule(Base, Export):
"polymorphic_on": "type",
}
def matches(self, transaction: BankTransaction) -> bool:
if (
(self.date and self.date < transaction.date)
or (
self.description
and transaction.description
and self.description not in transaction.description
def matches(self, t: BankTransaction) -> bool:
valid = None
if self.regex:
valid = re.compile(self.regex, re.IGNORECASE)
ops = (
Rule.exists(self.start, lambda r: r < t.date),
Rule.exists(self.end, lambda r: r > t.date),
Rule.exists(self.description, lambda r: r == t.description),
Rule.exists(
valid,
lambda r: r.search(t.description) if t.description else False,
),
Rule.exists(self.bank, lambda r: r == t.bank),
Rule.exists(self.min, lambda r: r < t.amount),
Rule.exists(self.max, lambda r: r > t.amount),
)
or (
self.regex
and transaction.description
and not re.compile(self.regex, re.IGNORECASE).search(
transaction.description
)
)
or (self.bank and self.bank != transaction.bank)
or (self.min and self.min > transaction.amount)
or (self.max and self.max < transaction.amount)
):
return False
if all(ops):
return True
return False
@property
def format(self) -> dict[str, Any]:
return dict(
date=self.date,
start=self.start,
end=self.end,
description=self.description,
regex=self.regex,
bank=self.bank,
@ -383,6 +386,10 @@ class Rule(Base, Export):
type=self.type,
)
@staticmethod
def exists(r, op) -> bool:
return op(r) if r is not None else True
class CategoryRule(Rule):
__table_args__ = {"schema": "category"}