Adds regex rule and remove rule option

Categorization rules can now search using a regex pattern.
This commit is contained in:
Luís Murta 2022-12-10 17:51:23 +00:00
parent d321481e29
commit 72a8995fe6
Signed by: satprog
GPG Key ID: 169EF1BBD7049F94
8 changed files with 90 additions and 25 deletions

View File

@ -0,0 +1,32 @@
"""Regex rule
Revision ID: 0ce89e987770
Revises: 7adf89ec8d14
Create Date: 2022-12-10 14:00:49.418494+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "0ce89e987770"
down_revision = "7adf89ec8d14"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column("regex", sa.String(), nullable=True),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("categories_rules", "regex", schema="transactions")
# ### end Alembic commands ###

View File

@ -43,7 +43,7 @@ if __name__ == "__main__":
for cat in args["category"]
]
case pfbudget.Operation.CategoryRule:
case pfbudget.Operation.RuleAdd:
assert args.keys() >= {
"category",
"date",
@ -58,6 +58,7 @@ if __name__ == "__main__":
cat,
args["date"][0] if args["date"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
@ -65,6 +66,10 @@ if __name__ == "__main__":
for cat in args["category"]
]
case pfbudget.Operation.RuleRemove:
assert args.keys() >= {"id"}, "argparser ill defined"
params = args["id"]
case pfbudget.Operation.GroupAdd:
assert "group" in args, "argparser ill defined"
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]]

View File

@ -341,13 +341,7 @@ def category(parser: argparse.ArgumentParser, universal: argparse.ArgumentParser
schedule.add_argument("--frequency", nargs=1, default=[1], type=int)
rule = commands.add_parser("rule", parents=[universal])
rule.set_defaults(op=Operation.CategoryRule)
rule.add_argument("category", nargs="+", type=str)
rule.add_argument("--date", nargs=1, type=dt.date.fromisoformat)
rule.add_argument("--description", nargs=1, type=str)
rule.add_argument("--bank", nargs=1, type=str)
rule.add_argument("--min", nargs=1, type=float)
rule.add_argument("--max", nargs=1, type=float)
category_rule(rule, universal)
group = commands.add_parser("group", parents=[universal])
category_group(group, universal)
@ -365,6 +359,24 @@ def category_group(parser: argparse.ArgumentParser, universal: argparse.Argument
remove.add_argument("group", nargs="+", type=str)
def category_rule(parser: argparse.ArgumentParser, universal: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add", parents=[universal])
add.set_defaults(op=Operation.RuleAdd)
add.add_argument("category", nargs="+", type=str)
add.add_argument("--date", nargs=1, type=dt.date.fromisoformat)
add.add_argument("--description", nargs=1, type=str)
add.add_argument("--regex", nargs=1, type=str)
add.add_argument("--bank", nargs=1, type=str)
add.add_argument("--min", nargs=1, type=float)
add.add_argument("--max", nargs=1, type=float)
remove = commands.add_parser("remove", parents=[universal])
remove.set_defaults(op=Operation.RuleRemove)
remove.add_argument("id", nargs="+", type=int)
def run():
args = vars(argparser().parse_args())
assert "op" in args, "No operation selected"

View File

@ -17,7 +17,8 @@ class Operation(Enum):
CategoryUpdate = auto()
CategoryRemove = auto()
CategorySchedule = auto()
CategoryRule = auto()
RuleAdd = auto()
RuleRemove = auto()
GroupAdd = auto()
GroupRemove = auto()

View File

@ -7,6 +7,7 @@ from pfbudget.db.model import (
)
from datetime import timedelta
import re
class Categorizer:
@ -64,9 +65,13 @@ class Categorizer:
if rule.date:
if rule.date < transaction.date:
continue
if rule.description:
if rule.description and transaction.description:
if rule.description not in transaction.description:
continue
if rule.regex and transaction.description:
p = re.compile(rule.regex, re.IGNORECASE)
if not p.search(transaction.description):
continue
if rule.bank:
if rule.bank != transaction.bank:
continue

View File

@ -65,20 +65,23 @@ class Manager:
with self.db.session() as session:
session.updateschedules(params)
case Operation.CategoryRule:
case Operation.RuleAdd:
with self.db.session() as session:
session.addrules(params)
case Operation.RuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.removerules(params)
case Operation.GroupAdd:
with self.db.session() as session:
for group in self.args["group"]:
session.addcategorygroup(CategoryGroup(name=group))
session.addgroups(CategoryGroup(params))
case Operation.GroupRemove:
assert all(isinstance(param, CategoryGroup) for param in params)
with self.db.session() as session:
session.removecategorygroup(
[CategoryGroup(name=group) for group in self.args["group"]]
)
session.removegroups(params)
# def init(self):
# client = DatabaseClient(self.__db)

View File

@ -116,10 +116,16 @@ class DbClient:
def addrules(self, rules: list[CategoryRule]):
self.__session.add_all(rules)
def addcategorygroup(self, group: CategoryGroup):
self.__session.add(group)
def removerules(self, ids: list[int]):
stmt = delete(CategoryRule).where(
CategoryRule.id.in_(ids)
)
self.__session.execute(stmt)
def removecategorygroup(self, groups: list[CategoryGroup]):
def addgroups(self, groups: list[CategoryGroup]):
self.__session.add_all(groups)
def removegroups(self, groups: list[CategoryGroup]):
stmt = delete(CategoryGroup).where(
CategoryGroup.name.in_([grp.name for grp in groups])
)

View File

@ -178,12 +178,13 @@ class CategoryRule(Base):
__tablename__ = "categories_rules"
id: Mapped[idpk] = mapped_column(autoincrement=True, init=False)
name: Mapped[catfk] = mapped_column()
date: Mapped[Optional[dt.date]] = mapped_column()
description: Mapped[Optional[str]] = mapped_column()
bank: Mapped[Optional[str]] = mapped_column()
min_amount: Mapped[Optional[float]] = mapped_column()
max_amount: Mapped[Optional[float]] = mapped_column()
name: Mapped[catfk]
date: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]]
regex: Mapped[Optional[str]]
bank: Mapped[Optional[str]]
min_amount: Mapped[Optional[float]]
max_amount: Mapped[Optional[float]]
def __hash__(self):
return hash(self.id)