From e27f2f08cf55d72d8953e9816256c16f501c7d6c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lu=C3=ADs=20Murta?= Date: Mon, 19 Dec 2022 21:59:41 +0000 Subject: [PATCH] Adds rule based tagging of transactions Tags will work as additional categories to filter/organize by. It makes sense they can also be rule based. Since rules are common to both categories and tags, reorganize the classes in the model. It doesn't affect the DB. --- pfbudget/cli/runnable.py | 1 - pfbudget/core/categorizer.py | 81 +++++++++++++++++++++++---------- pfbudget/core/manager.py | 10 ++--- pfbudget/db/client.py | 4 ++ pfbudget/db/model.py | 86 ++++++++++++++++++++++-------------- 5 files changed, 117 insertions(+), 65 deletions(-) diff --git a/pfbudget/cli/runnable.py b/pfbudget/cli/runnable.py index cad96cc..18334d4 100644 --- a/pfbudget/cli/runnable.py +++ b/pfbudget/cli/runnable.py @@ -383,7 +383,6 @@ def category_rule(parser: argparse.ArgumentParser, universal: argparse.ArgumentP def tags(parser: argparse.ArgumentParser, universal: argparse.ArgumentParser): - commands = parser.add_subparsers(required=True) add = commands.add_parser("add", parents=[universal]) diff --git a/pfbudget/core/categorizer.py b/pfbudget/core/categorizer.py index 4348986..fb7405a 100644 --- a/pfbudget/core/categorizer.py +++ b/pfbudget/core/categorizer.py @@ -2,12 +2,13 @@ from pfbudget.db.model import ( Category, CategorySelector, Selector, + Tag, Transaction, TransactionCategory, + TransactionTag, ) from datetime import timedelta -import re class Categorizer: @@ -16,7 +17,12 @@ class Categorizer: def __init__(self): self.options["null_days"] = 4 - def categorize(self, transactions: list[Transaction], categories: list[Category]): + def categorize( + self, + transactions: list[Transaction], + categories: list[Category], + tags: list[Tag], + ): """Overarching categorization tool Receives a list of transactions (by ref) and updates their category @@ -26,7 +32,8 @@ class Categorizer: """ self._nullify(transactions) - self._rules(transactions, categories) + self._rule_based_categories(transactions, categories) + self._rule_based_tags(transactions, tags) def _nullify(self, transactions: list[Transaction]): count = 0 @@ -58,31 +65,57 @@ class Categorizer: print(f"Nullified {count} transactions") - def _rules(self, transactions: list[Transaction], categories: list[Category]): + def _rule_based_categories( + self, transactions: list[Transaction], categories: list[Category] + ): + d = {} for category in [c for c in categories if c.rules]: for rule in category.rules: - for transaction in [t for t in transactions if not t.category]: - if rule.date: - if rule.date < transaction.date: - continue - if rule.description and transaction.description: - if rule.description not in transaction.description: - continue - if rule.regex and transaction.description: - p = re.compile(rule.regex, re.IGNORECASE) - if not p.search(transaction.description): - continue - if rule.bank: - if rule.bank != transaction.bank: - continue - if rule.min: - if rule.min > transaction.amount: - continue - if rule.max: - if rule.max < transaction.amount: - continue + # for transaction in [t for t in transactions if not t.category]: + for transaction in [ + t + for t in transactions + if not t.category or t.category.name != "null" + ]: + if not rule.matches(transaction): + continue # passed all conditions, assign category transaction.category = TransactionCategory( category.name, CategorySelector(Selector.rules) ) + + if rule in d: + d[rule] += 1 + else: + d[rule] = 1 + + for k, v in d.items(): + print(f"{v}: {k}") + + def _rule_based_tags(self, transactions: list[Transaction], tags: list[Tag]): + d = {} + for tag in [t for t in tags if t.rules]: + for rule in tag.rules: + # for transaction in [t for t in transactions if not t.category]: + for transaction in [ + t + for t in transactions + if tag.name not in [tag.tag for tag in t.tags] + ]: + if not rule.matches(transaction): + continue + + if not transaction.tags: + transaction.tags = {TransactionTag(tag.name)} + else: + transaction.tags.add(TransactionTag(tag.name)) + + if rule in d: + d[rule] += 1 + else: + d[rule] = 1 + + for k, v in d.items(): + print(f"{v}: {k}") + diff --git a/pfbudget/core/manager.py b/pfbudget/core/manager.py index bf6d4bf..d8bc431 100644 --- a/pfbudget/core/manager.py +++ b/pfbudget/core/manager.py @@ -34,11 +34,13 @@ class Manager: case Operation.Download: # TODO this is a monstrosity, remove when possible download(self, self.args) + case Operation.Categorize: with self.db.session() as session: uncategorized = session.uncategorized() categories = session.categories() - Categorizer().categorize(uncategorized, categories) + tags = session.tags() + Categorizer().categorize(uncategorized, categories, tags) case Operation.Register: # self._db = DbClient(args["database"]) @@ -56,7 +58,7 @@ class Manager: self.args["name"], self.args["country"] ) - case Operation.CategoryAdd | Operation.TagAdd: + case Operation.CategoryAdd | Operation.RuleAdd | Operation.TagAdd | Operation.TagRuleAdd: with self.db.session() as session: session.add(params) @@ -72,10 +74,6 @@ class Manager: with self.db.session() as session: session.updateschedules(params) - case Operation.RuleAdd | Operation.TagRuleAdd: - with self.db.session() as session: - session.add(params) - case Operation.RuleRemove: assert all(isinstance(param, int) for param in params) with self.db.session() as session: diff --git a/pfbudget/db/client.py b/pfbudget/db/client.py index 68803e3..c29cf56 100644 --- a/pfbudget/db/client.py +++ b/pfbudget/db/client.py @@ -130,5 +130,9 @@ class DbClient: stmt = select(Category) return self.__session.scalars(stmt).all() + def tags(self) -> list[Tag]: + stmt = select(Tag) + return self.__session.scalars(stmt).all() + def session(self) -> ClientSession: return self.ClientSession(self.engine) diff --git a/pfbudget/db/model.py b/pfbudget/db/model.py index 0c31380..7ef354d 100644 --- a/pfbudget/db/model.py +++ b/pfbudget/db/model.py @@ -21,6 +21,7 @@ from decimal import Decimal from typing import Annotated, Optional import datetime as dt import enum +import re class Base(MappedAsDataclass, DeclarativeBase): @@ -68,29 +69,23 @@ class Bank(Base): bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))] -idpk = Annotated[ - int, mapped_column(BigInteger, primary_key=True, autoincrement=True, init=False) -] +idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=True)] money = Annotated[Decimal, mapped_column(Numeric(16, 2))] class Transaction(Base): __tablename__ = "originals" - id: Mapped[idpk] + id: Mapped[idpk] = mapped_column(init=False) date: Mapped[dt.date] description: Mapped[Optional[str]] bank: Mapped[bankfk] amount: Mapped[money] category: Mapped[Optional[TransactionCategory]] = relationship() - note: Mapped[Optional[Note]] = relationship(back_populates="original", default=None) - tags: Mapped[Optional[set[TransactionTag]]] = relationship( - back_populates="original", - cascade="all, delete-orphan", - passive_deletes=True, - default=None, ) + note: Mapped[Optional[Note]] = relationship(back_populates="original") + tags: Mapped[Optional[set[TransactionTag]]] = relationship() def __repr__(self) -> str: return f"Transaction(date={self.date}, description={self.description}, bank={self.bank}, amount={self.amount}, category={self.category})" @@ -136,9 +131,9 @@ class TransactionCategory(Base): __tablename__ = "categorized" id: Mapped[idfk] = mapped_column(primary_key=True, init=False) - name: Mapped[str] = mapped_column(ForeignKey(Category.name)) + name: Mapped[catfk] - selector: Mapped[CategorySelector] = relationship() + selector: Mapped[CategorySelector] = relationship(cascade="all, delete-orphan") def __repr__(self) -> str: return f"Category({self.name})" @@ -147,7 +142,7 @@ class TransactionCategory(Base): class Note(Base): __tablename__ = "notes" - id: Mapped[idfk] = mapped_column(primary_key=True) + id: Mapped[idfk] = mapped_column(primary_key=True, init=False) note: Mapped[str] original: Mapped[Transaction] = relationship(back_populates="note") @@ -180,24 +175,9 @@ class Tag(Base): class TransactionTag(Base): __tablename__ = "tags" - id: Mapped[idfk] = mapped_column(primary_key=True) + id: Mapped[idfk] = mapped_column(primary_key=True, init=False) tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True) - original: Mapped[Transaction] = relationship(back_populates="tags") - - -class CategoryRule(Base): - __tablename__ = "categories_rules" - - id: Mapped[idpk] - name: Mapped[catfk] - date: Mapped[Optional[dt.date]] - description: Mapped[Optional[str]] - regex: Mapped[Optional[str]] - bank: Mapped[Optional[str]] - min: Mapped[Optional[money]] - max: Mapped[Optional[money]] - def __hash__(self): return hash(self.id) @@ -253,14 +233,52 @@ class CategorySchedule(Base): return f"{self.name} schedule=Schedule(period={self.period}, multiplier={self.period_multiplier}, amount={self.amount})" -class TagRule(Base): - __tablename__ = "tag_rules" - - id: Mapped[idpk] - tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE")) +class Rule: date: Mapped[Optional[dt.date]] description: Mapped[Optional[str]] regex: Mapped[Optional[str]] bank: Mapped[Optional[str]] min: Mapped[Optional[money]] max: Mapped[Optional[money]] + + def matches(self, transaction: Transaction) -> bool: + if ( + (self.date and self.date < transaction.date) + or ( + self.description + and transaction.description + and self.description not in transaction.description + ) + or ( + self.regex + and transaction.description + and not re.compile(self.regex, re.IGNORECASE).search( + transaction.description + ) + ) + or (self.bank and self.bank != transaction.bank) + or (self.min and self.min > transaction.amount) + or (self.max and self.max < transaction.amount) + ): + return False + return True + + +class CategoryRule(Base, Rule): + __tablename__ = "categories_rules" + + id: Mapped[idpk] = mapped_column(init=False) + name: Mapped[catfk] + + def __hash__(self): + return hash(self.id) + + +class TagRule(Base, Rule): + __tablename__ = "tag_rules" + + id: Mapped[idpk] = mapped_column(init=False) + tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE")) + + def __hash__(self): + return hash(self.id)