Compare commits

..

1 Commits

Author SHA1 Message Date
8fe0ecc597
Introduces categorizer that works on ORM classes
Categorizer will work directly on ORM classes, which will cleanup the
code, since changes will automatically be persisted when change the
objects.

Adds wrapper session class inside the DbClient for the manager to use.
The manager will have to have some DB session knowledge, which adds some
unfortunate coupling.

Removes some unnecessary relations between tables that were added by
mistake.

category CLI option now uses the manager.
2022-12-04 16:09:54 +00:00
14 changed files with 118 additions and 748 deletions

View File

@ -1,54 +0,0 @@
"""Category selector
Revision ID: 6863dda76ea2
Revises: 83f4c9837f6e
Create Date: 2022-12-08 00:56:59.032641+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6863dda76ea2"
down_revision = "83f4c9837f6e"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_categories_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_categories_selector")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("categories_selector", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,92 +0,0 @@
"""Transaction based rules
Revision ID: 8b5d5fbc8211
Revises: e77395969585
Create Date: 2022-12-08 21:05:41.378466+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "8b5d5fbc8211"
down_revision = "e77395969585"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.drop_constraint("pk_categories_rules", "categories_rules", schema="transactions")
op.execute(sa.schema.CreateSequence(sa.schema.Sequence("categories_rules_id_seq", schema="transactions")))
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column(
"id",
sa.BigInteger(),
autoincrement=True,
nullable=False,
server_default=sa.text(
"nextval('transactions.categories_rules_id_seq'::regclass)"
),
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("date", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("description", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("bank", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("min_amount", sa.Float(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("max_amount", sa.Float(), nullable=True),
schema="transactions",
)
op.drop_column("categories_rules", "rule", schema="transactions")
# ### end Alembic commands ###
op.create_primary_key(
"pk_categories_rules",
"categories_rules",
["id"],
schema="transactions",
)
def downgrade() -> None:
op.drop_constraint("pk_categories_rules", "categories_rules", schema="transactions")
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column("rule", sa.String(), autoincrement=False, nullable=False),
schema="transactions",
)
op.drop_column("categories_rules", "max_amount", schema="transactions")
op.drop_column("categories_rules", "min_amount", schema="transactions")
op.drop_column("categories_rules", "bank", schema="transactions")
op.drop_column("categories_rules", "description", schema="transactions")
op.drop_column("categories_rules", "date", schema="transactions")
op.drop_column("categories_rules", "id", schema="transactions")
# ### end Alembic commands ###
op.execute(sa.schema.DropSequence(sa.schema.Sequence("categories_rules_id_seq", schema="transactions")))
op.create_primary_key(
"pk_categories_rules",
"categories_rules",
["name", "rule"],
schema="transactions",
)

View File

@ -1,53 +0,0 @@
"""Category schedule
Revision ID: d18cbd50f7c6
Revises: 6863dda76ea2
Create Date: 2022-12-08 13:30:29.048811+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d18cbd50f7c6"
down_revision = "6863dda76ea2"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_schedules",
sa.Column("name", sa.String(), nullable=False),
sa.Column("recurring", sa.Boolean(), nullable=False),
sa.Column(
"period",
sa.Enum(
"daily",
"monthly",
"yearly",
name="period",
schema="transactions",
inherit_schema=True,
),
nullable=True,
),
sa.Column("period_multiplier", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name=op.f("fk_categories_schedules_name_categories_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_schedules")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("categories_schedules", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,37 +0,0 @@
"""Weekly period
Revision ID: e77395969585
Revises: d18cbd50f7c6
Create Date: 2022-12-08 16:35:27.506504+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "e77395969585"
down_revision = "d18cbd50f7c6"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("ALTER TYPE transactions.period ADD VALUE 'weekly' AFTER 'daily'")
def downgrade() -> None:
op.execute(
"""CREATE TYPE transactions.period_new
AS ENUM ('daily', 'monthly', 'yearly')
"""
)
op.execute("UPDATE transactions.categories_schedules SET period = DEFAULT WHERE period = 'weekly'")
op.execute(
"""ALTER TABLE transactions.categories_schedules
ALTER COLUMN period TYPE transactions.period_new
USING period::text::transactions.period_new
"""
)
op.execute("DROP TYPE transactions.period")
op.execute("ALTER TYPE transactions.period_new RENAME TO period")

4
main.py Normal file
View File

@ -0,0 +1,4 @@
from pfbudget import run
if __name__ == "__main__":
run()

View File

@ -1,11 +1,7 @@
__all__ = ["argparser", "Manager", "parse_data", "categorize_data"] __all__ = ["run", "parse_data", "categorize_data"]
__author__ = "Luís Murta" __author__ = "Luís Murta"
__version__ = "0.1" __version__ = "0.1"
from pfbudget.common.types import Operation
from pfbudget.core.categories import categorize_data from pfbudget.core.categories import categorize_data
from pfbudget.core.manager import Manager from pfbudget.cli.runnable import run
from pfbudget.cli.runnable import argparser
from pfbudget.input.parsers import parse_data from pfbudget.input.parsers import parse_data
import pfbudget.db.model as types

View File

@ -1,76 +1,4 @@
import pfbudget from pfbudget.cli.runnable import run
if __name__ == "__main__": if __name__ == "__main__":
argparser = pfbudget.argparser() run()
args = vars(argparser.parse_args())
assert "op" in args, "No pfbudget.Operation selected"
op: pfbudget.Operation = args["op"]
assert "database" in args, "No database selected"
db = args["database"]
params = None
match (op):
case pfbudget.Operation.CategoryAdd:
assert args.keys() >= {"category", "group"}, "argparser ill defined"
params = [
pfbudget.types.Category(cat, args["group"][0])
for cat in args["category"]
]
case pfbudget.Operation.CategoryUpdate:
assert args.keys() >= {"category", "group"}, "argparser ill defined"
params = [pfbudget.types.Category(cat) for cat in args["category"]]
params.append(args["group"][0])
case pfbudget.Operation.CategoryRemove:
assert "category" in args, "argparser ill defined"
params = [pfbudget.types.Category(cat) for cat in args["category"]]
case pfbudget.Operation.CategorySchedule:
assert args.keys() >= {
"category",
"period",
"frequency",
}, "argparser ill defined"
params = [
pfbudget.types.CategorySchedule(
cat, True, args["period"][0], args["frequency"][0]
)
for cat in args["category"]
]
case pfbudget.Operation.CategoryRule:
assert args.keys() >= {
"category",
"date",
"description",
"bank",
"min",
"max",
}, "argparser ill defined"
params = [
pfbudget.types.CategoryRule(
cat,
args["date"][0] if args["date"] else None,
args["description"][0] if args["description"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
)
for cat in args["category"]
]
case pfbudget.Operation.GroupAdd:
assert "group" in args, "argparser ill defined"
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]]
case pfbudget.Operation.GroupRemove:
assert "group" in args, "argparser ill defined"
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]]
pfbudget.Manager(db, args).action(op, params)

View File

@ -2,9 +2,8 @@ from pathlib import Path
import argparse import argparse
import re import re
from pfbudget.common.types import Operation
from pfbudget.core.categories import categorize_data from pfbudget.core.categories import categorize_data
from pfbudget.db.model import Period from pfbudget.core.manager import Manager
from pfbudget.input.json import JsonParser from pfbudget.input.json import JsonParser
from pfbudget.input.nordigen import NordigenInput from pfbudget.input.nordigen import NordigenInput
from pfbudget.db.sqlite import DatabaseClient from pfbudget.db.sqlite import DatabaseClient
@ -28,22 +27,19 @@ class DataFileMissing(Exception):
pass pass
def argparser() -> argparse.ArgumentParser: def argparser(manager: Manager) -> argparse.ArgumentParser:
universal = argparse.ArgumentParser(add_help=False) help = argparse.ArgumentParser(add_help=False)
universal.add_argument( help.add_argument(
"-db", "-db",
"--database", "--database",
nargs="?", nargs="?",
help="select current database", help="select current database",
default=DEFAULT_DB, default=DEFAULT_DB,
) )
universal.add_argument( help.add_argument(
"-q", "--quiet", action="store_true", help="reduces the amount of verbose" "-q", "--quiet", action="store_true", help="reduces the amount of verbose"
) )
universal.add_argument(
"-v", "--verbose", action="store_true", help="increases the amount of verbose"
)
period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group() period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group()
period.add_argument( period.add_argument(
@ -55,7 +51,7 @@ def argparser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="does cool finance stuff", description="does cool finance stuff",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
parser.add_argument( parser.add_argument(
@ -67,7 +63,7 @@ def argparser() -> argparse.ArgumentParser:
).group(1), ).group(1),
) )
subparsers = parser.add_subparsers(required=True) subparsers = parser.add_subparsers(dest="command", required=True)
""" """
Init Init
@ -75,10 +71,10 @@ def argparser() -> argparse.ArgumentParser:
p_init = subparsers.add_parser( p_init = subparsers.add_parser(
"init", "init",
description="Initializes the SQLite3 database", description="Initializes the SQLite3 database",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_init.set_defaults(command=Operation.Init) p_init.set_defaults(func=lambda args: manager.init())
""" """
Exporting Exporting
@ -86,7 +82,7 @@ def argparser() -> argparse.ArgumentParser:
p_export = subparsers.add_parser( p_export = subparsers.add_parser(
"export", "export",
description="Exports the selected database to a .csv file", description="Exports the selected database to a .csv file",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export()) p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export())
@ -97,14 +93,14 @@ def argparser() -> argparse.ArgumentParser:
p_parse = subparsers.add_parser( p_parse = subparsers.add_parser(
"parse", "parse",
description="Parses and adds the requested transactions into the selected database", description="Parses and adds the requested transactions into the selected database",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_parse.add_argument("path", nargs="+", type=str) p_parse.add_argument("path", nargs="+", type=str)
p_parse.add_argument("--bank", nargs=1, type=str) p_parse.add_argument("--bank", nargs=1, type=str)
p_parse.add_argument("--creditcard", nargs=1, type=str) p_parse.add_argument("--creditcard", nargs=1, type=str)
p_parse.add_argument("--category", nargs=1, type=int) p_parse.add_argument("--category", nargs=1, type=int)
p_parse.set_defaults(command=Operation.Parse) p_parse.set_defaults(func=lambda args: parse(manager, args))
""" """
Categorizing Categorizing
@ -112,10 +108,12 @@ def argparser() -> argparse.ArgumentParser:
p_categorize = subparsers.add_parser( p_categorize = subparsers.add_parser(
"categorize", "categorize",
description="Categorizes the transactions in the selected database", description="Categorizes the transactions in the selected database",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_categorize.set_defaults(command=Operation.Categorize) p_categorize.set_defaults(
func=lambda args: manager.categorize(vars(args))
)
""" """
Graph Graph
@ -123,7 +121,7 @@ def argparser() -> argparse.ArgumentParser:
p_graph = subparsers.add_parser( p_graph = subparsers.add_parser(
"graph", "graph",
description="Graph of the transactions", description="Graph of the transactions",
parents=[universal, period], parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_graph.add_argument( p_graph.add_argument(
@ -143,7 +141,7 @@ def argparser() -> argparse.ArgumentParser:
p_report = subparsers.add_parser( p_report = subparsers.add_parser(
"report", "report",
description="Prints report of transaction groups", description="Prints report of transaction groups",
parents=[universal, period], parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_report.add_argument( p_report.add_argument(
@ -162,7 +160,7 @@ def argparser() -> argparse.ArgumentParser:
p_register = subparsers.add_parser( p_register = subparsers.add_parser(
"register", "register",
description="Register a bank", description="Register a bank",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_register.add_argument("bank", type=str, nargs=1, help="bank option help") p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
@ -170,7 +168,7 @@ def argparser() -> argparse.ArgumentParser:
"--requisition", type=str, nargs=1, help="requisition option help" "--requisition", type=str, nargs=1, help="requisition option help"
) )
p_register.add_argument("--invert", action="store_true") p_register.add_argument("--invert", action="store_true")
p_register.set_defaults(command=Operation.Register) p_register.set_defaults(func=lambda args: manager.register(vars(args)))
""" """
Unregister bank Unregister bank
@ -178,11 +176,11 @@ def argparser() -> argparse.ArgumentParser:
p_register = subparsers.add_parser( p_register = subparsers.add_parser(
"unregister", "unregister",
description="Unregister a bank", description="Unregister a bank",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_register.add_argument("bank", type=str, nargs=1, help="bank option help") p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.set_defaults(command=Operation.Unregister) p_register.set_defaults(func=lambda args: manager.unregister(vars(args)))
""" """
Nordigen API Nordigen API
@ -190,10 +188,10 @@ def argparser() -> argparse.ArgumentParser:
p_nordigen_access = subparsers.add_parser( p_nordigen_access = subparsers.add_parser(
"token", "token",
description="Get new access token", description="Get new access token",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_nordigen_access.set_defaults(command=Operation.Token) p_nordigen_access.set_defaults(func=lambda args: NordigenInput(manager).token())
""" """
(Re)new bank requisition ID (Re)new bank requisition ID
@ -201,12 +199,16 @@ def argparser() -> argparse.ArgumentParser:
p_nordigen_access = subparsers.add_parser( p_nordigen_access = subparsers.add_parser(
"renew", "renew",
description="(Re)new the Bank requisition ID", description="(Re)new the Bank requisition ID",
parents=[universal], parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_nordigen_access.add_argument("name", nargs=1, type=str) p_nordigen_access.add_argument("name", nargs=1, type=str)
p_nordigen_access.add_argument("country", nargs=1, type=str) p_nordigen_access.add_argument("country", nargs=1, type=str)
p_nordigen_access.set_defaults(command=Operation.Renew) p_nordigen_access.set_defaults(
func=lambda args: NordigenInput(manager).requisition(
args.name[0], args.country[0]
)
)
""" """
Downloading through Nordigen API Downloading through Nordigen API
@ -214,50 +216,46 @@ def argparser() -> argparse.ArgumentParser:
p_nordigen_download = subparsers.add_parser( p_nordigen_download = subparsers.add_parser(
"download", "download",
description="Downloads transactions using Nordigen API", description="Downloads transactions using Nordigen API",
parents=[universal, period], parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
p_nordigen_download.add_argument("--id", nargs="+", type=str) p_nordigen_download.add_argument("--id", nargs="+", type=str)
p_nordigen_download.add_argument("--name", nargs="+", type=str) p_nordigen_download.add_argument("--name", nargs="+", type=str)
p_nordigen_download.add_argument("--all", action="store_true") p_nordigen_download.add_argument("--all", action="store_true")
p_nordigen_download.set_defaults(command=Operation.Download) p_nordigen_download.set_defaults(func=lambda args: download(manager, args))
# """ """
# List available banks on Nordigen API List available banks on Nordigen API
# """ """
# p_nordigen_list = subparsers.add_parser( p_nordigen_list = subparsers.add_parser(
# "list", "list",
# description="Lists banks in {country}", description="Lists banks in {country}",
# parents=[help], parents=[help],
# formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# ) )
# p_nordigen_list.add_argument("country", nargs=1, type=str) p_nordigen_list.add_argument("country", nargs=1, type=str)
# p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args)) p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args))
# """ """
# Nordigen JSONs Nordigen JSONs
# """ """
# p_nordigen_json = subparsers.add_parser( p_nordigen_json = subparsers.add_parser(
# "json", "json",
# description="", description="",
# parents=[help], parents=[help],
# formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# ) )
# p_nordigen_json.add_argument("json", nargs=1, type=str) p_nordigen_json.add_argument("json", nargs=1, type=str)
# p_nordigen_json.add_argument("bank", nargs=1, type=str) p_nordigen_json.add_argument("bank", nargs=1, type=str)
# p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction) p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction)
# p_nordigen_json.set_defaults( p_nordigen_json.set_defaults(
# func=lambda args: manager.parser(JsonParser(vars(args))) func=lambda args: manager.parser(JsonParser(vars(args)))
# ) )
# Categories
category_parser = subparsers.add_parser("category", parents=[universal])
category(category_parser, universal)
return parser return parser
def parse(manager, args): def parse(manager: Manager, args):
"""Parses the contents of the path in args to the selected database. """Parses the contents of the path in args to the selected database.
Args: Args:
@ -307,65 +305,17 @@ def report(args):
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end) pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
# def nordigen_banks(manager: Manager, args): def nordigen_banks(manager: Manager, args):
# input = NordigenInput(manager) input = NordigenInput(manager)
# input.list(vars(args)["country"][0]) input.list(vars(args)["country"][0])
def download(manager, args: dict): def download(manager: Manager, args):
start, end = pfbudget.utils.parse_args_period(args) start, end = pfbudget.utils.parse_args_period(args)
manager.parser(NordigenInput(manager, args, start, end)) manager.parser(NordigenInput(manager, vars(args), start, end))
def category(parser: argparse.ArgumentParser, universal: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add", parents=[universal])
add.set_defaults(op=Operation.CategoryAdd)
add.add_argument("category", nargs="+", type=str)
add.add_argument("--group", nargs="?", type=str)
remove = commands.add_parser("remove", parents=[universal])
remove.set_defaults(op=Operation.CategoryRemove)
remove.add_argument("category", nargs="+", type=str)
update = commands.add_parser("update", parents=[universal])
update.set_defaults(op=Operation.CategoryUpdate)
update.add_argument("category", nargs="+", type=str)
update.add_argument("--group", nargs="?", type=str)
schedule = commands.add_parser("schedule", parents=[universal])
schedule.set_defaults(op=Operation.CategorySchedule)
schedule.add_argument("category", nargs="+", type=str)
schedule.add_argument("period", nargs=1, choices=[e.value for e in Period])
schedule.add_argument("--frequency", nargs=1, default=[1], type=int)
rule = commands.add_parser("rule", parents=[universal])
rule.set_defaults(op=Operation.CategoryRule)
rule.add_argument("category", nargs="+", type=str)
rule.add_argument("--date", nargs=1, type=str)
rule.add_argument("--description", nargs=1, type=str)
rule.add_argument("--bank", nargs=1, type=str)
rule.add_argument("--min", nargs=1, type=float)
rule.add_argument("--max", nargs=1, type=float)
group = commands.add_parser("group", parents=[universal])
category_group(group, universal)
def category_group(parser: argparse.ArgumentParser, universal: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add", parents=[universal])
add.set_defaults(op=Operation.GroupAdd)
add.add_argument("group", nargs="+", type=str)
remove = commands.add_parser("remove", parents=[universal])
remove.set_defaults(op=Operation.GroupRemove)
remove.add_argument("group", nargs="+", type=str)
def run(): def run():
args = vars(argparser().parse_args()) manager = Manager(DEFAULT_DB)
assert "op" in args, "No operation selected" args = argparser(manager).parse_args()
return args["op"], args args.func(args)

View File

@ -4,24 +4,6 @@ from decimal import Decimal, InvalidOperation
from enum import Enum, auto from enum import Enum, auto
class Operation(Enum):
Init = auto()
Parse = auto()
Download = auto()
Categorize = auto()
Register = auto()
Unregister = auto()
Token = auto()
Renew = auto()
CategoryAdd = auto()
CategoryUpdate = auto()
CategoryRemove = auto()
CategorySchedule = auto()
CategoryRule = auto()
GroupAdd = auto()
GroupRemove = auto()
class TransactionError(Exception): class TransactionError(Exception):
pass pass

View File

@ -1,47 +0,0 @@
from pfbudget.db.model import Transaction, TransactionCategory
from datetime import timedelta
class Categorizer:
options = {}
def __init__(self):
self.options["null_days"] = 4
def categorize(self, transactions: list[Transaction]):
"""Overarching categorization tool
Receives a list of transactions (by ref) and updates their category
Args:
transactions (list[Transaction]): uncategorized transactions
"""
self._nullify(transactions)
def _nullify(self, transactions: list[Transaction]):
count = 0
matching = []
for transaction in transactions:
for cancel in (
cancel
for cancel in transactions
if (
transaction.date - timedelta(days=self.options["null_days"])
<= cancel.date
<= transaction.date + timedelta(days=self.options["null_days"])
and transaction not in matching
and cancel not in matching
and cancel != transaction
and cancel.bank != transaction.bank
and cancel.amount == -transaction.amount
)
):
transaction.category = TransactionCategory(name="null")
cancel.category = TransactionCategory(name="null")
matching.extend([transaction, cancel])
count += 2
break
print(f"Nullified {count} transactions")

View File

@ -1,102 +1,34 @@
from pfbudget.input.input import Input from pfbudget.input.input import Input
from pfbudget.input.nordigen import NordigenClient
from pfbudget.input.parsers import parse_data from pfbudget.input.parsers import parse_data
from pfbudget.db.client import DbClient from pfbudget.db.client import DbClient
from pfbudget.db.model import Category, CategoryGroup, CategoryRule, CategorySchedule
from pfbudget.common.types import Operation
from pfbudget.core.categorizer import Categorizer from pfbudget.core.categorizer import Categorizer
from pfbudget.utils import convert from pfbudget.utils import convert
from pfbudget.cli.runnable import download, parse
class Manager: class Manager:
def __init__(self, db: str, args: dict): def __init__(self, url: str):
self._args = args self._db = DbClient(url)
print(args)
self._db = db
def action(self, op: Operation, params: list):
match (op):
case Operation.Init:
pass
case Operation.Parse:
# TODO this is a monstrosity, remove when possible
parse(self, self.args)
case Operation.Download:
# TODO this is a monstrosity, remove when possible
download(self, self.args)
case Operation.Categorize:
self.categorize()
case Operation.Register:
# self._db = DbClient(args["database"])
# self.register(args)
pass
case Operation.Unregister:
# self._db = DbClient(args["database"])
# self.unregister(args)
pass
case Operation.Token:
NordigenClient(self).token()
case Operation.Renew:
NordigenClient(self).requisition(
self.args["name"], self.args["country"]
)
case Operation.CategoryAdd:
with self.db.session() as session:
session.addcategories(params)
case Operation.CategoryUpdate:
with self.db.session() as session:
session.updategroup(*params)
case Operation.CategoryRemove:
with self.db.session() as session:
session.removecategories(params)
case Operation.CategorySchedule:
with self.db.session() as session:
session.updateschedules(params)
case Operation.CategoryRule:
with self.db.session() as session:
session.addrules(params)
case Operation.GroupAdd:
with self.db.session() as session:
for group in self.args["group"]:
session.addcategorygroup(CategoryGroup(name=group))
case Operation.GroupRemove:
with self.db.session() as session:
session.removecategorygroup(
[CategoryGroup(name=group) for group in self.args["group"]]
)
# def init(self): # def init(self):
# client = DatabaseClient(self.__db) # client = DatabaseClient(self.__db)
# client.init() # client.init()
# def register(self): # def register(self, args: dict):
# bank = Bank(self.args["bank"][0], "", self.args["requisition"][0], self.args["invert"]) # bank = Bank(args["bank"][0], "", args["requisition"][0], args["invert"])
# client = DatabaseClient(self.__db) # client = DatabaseClient(self.__db)
# client.register_bank(convert(bank)) # client.register_bank(convert(bank))
# def unregister(self): # def unregister(self, args: dict):
# client = DatabaseClient(self.__db) # client = DatabaseClient(self.__db)
# client.unregister_bank(self.args["bank"][0]) # client.unregister_bank(args["bank"][0])
def parser(self, parser: Input): def parser(self, parser: Input):
transactions = parser.parse() transactions = parser.parse()
print(transactions) print(transactions)
# self.add_transactions(transactions) # self.add_transactions(transactions)
# def parse(self, filename: str): # def parse(self, filename: str, args: dict):
# transactions = parse_data(filename, self.args) # transactions = parse_data(filename, args)
# self.add_transactions(transactions) # self.add_transactions(transactions)
# def transactions() -> list[Transaction]: # def transactions() -> list[Transaction]:
@ -105,11 +37,13 @@ class Manager:
def add_transactions(self, transactions): def add_transactions(self, transactions):
with self.db.session() as session: with self.db.session() as session:
session.add(transactions) session.add(transactions)
session.commit()
def categorize(self): def categorize(self, args: dict):
with self.db.session() as session: with self.db.session() as session:
uncategorized = session.uncategorized() uncategorized = session.uncategorized()
Categorizer().categorize(uncategorized) Categorizer().categorize(uncategorized)
session.commit()
# def get_bank_by(self, key: str, value: str) -> Bank: # def get_bank_by(self, key: str, value: str) -> Bank:
# client = DatabaseClient(self.__db) # client = DatabaseClient(self.__db)
@ -120,13 +54,5 @@ class Manager:
return self.db.get_nordigen_banks() return self.db.get_nordigen_banks()
@property @property
def db(self) -> DbClient: def db(self):
return DbClient(self._db, self.args["verbose"]) return self._db
@db.setter
def db(self, url: str):
self._db = url
@property
def args(self) -> dict:
return self._args

View File

@ -1,17 +1,7 @@
from copy import deepcopy from sqlalchemy import create_engine, select
from dataclasses import asdict
from sqlalchemy import create_engine, delete, select, update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session, joinedload, selectinload from sqlalchemy.orm import Session, joinedload, selectinload
from pfbudget.db.model import ( from pfbudget.db.model import Bank, Category, Transaction
Bank,
Category,
CategoryGroup,
CategoryRule,
CategorySchedule,
Transaction,
)
# import logging # import logging
@ -26,8 +16,8 @@ class DbClient:
__sessions: list[Session] __sessions: list[Session]
def __init__(self, url: str, echo=False) -> None: def __init__(self, url: str) -> None:
self._engine = create_engine(url, echo=echo) self._engine = create_engine(url)
def get_transactions(self): def get_transactions(self):
"""¿Non-optimized? get_transactions, will load the entire Transaction""" """¿Non-optimized? get_transactions, will load the entire Transaction"""
@ -75,7 +65,6 @@ class DbClient:
return self return self
def __exit__(self, exc_type, exc_value, exc_tb): def __exit__(self, exc_type, exc_value, exc_tb):
self.commit()
self.__session.close() self.__session.close()
def commit(self): def commit(self):
@ -84,50 +73,12 @@ class DbClient:
def add(self, transactions: list[Transaction]): def add(self, transactions: list[Transaction]):
self.__session.add_all(transactions) self.__session.add_all(transactions)
def addcategories(self, category: list[Category]): def addcategory(self, category: Category):
self.__session.add_all(category) self.__session.add(category)
def removecategories(self, categories: list[Category]):
stmt = delete(Category).where(
Category.name.in_([cat.name for cat in categories])
)
self.__session.execute(stmt)
def updategroup(self, categories: list[Category], group: CategoryGroup):
stmt = (
update(Category)
.where(Category.name.in_([cat.name for cat in categories]))
.values(group=group)
)
self.__session.execute(stmt)
def updateschedules(self, schedules: list[CategorySchedule]):
stmt = insert(CategorySchedule).values([asdict(s) for s in schedules])
stmt = stmt.on_conflict_do_update(
index_elements=[CategorySchedule.name],
set_=dict(
recurring=stmt.excluded.recurring,
period=stmt.excluded.period,
period_multiplier=stmt.excluded.period_multiplier,
),
)
self.__session.execute(stmt)
def addrules(self, rules: list[CategoryRule]):
self.__session.add_all(rules)
def addcategorygroup(self, group: CategoryGroup):
self.__session.add(group)
def removecategorygroup(self, groups: list[CategoryGroup]):
stmt = delete(CategoryGroup).where(
CategoryGroup.name.in_([grp.name for grp in groups])
)
self.__session.execute(stmt)
def uncategorized(self) -> list[Transaction]: def uncategorized(self) -> list[Transaction]:
stmt = select(Transaction).where(~Transaction.category.has()) stmt = select(Transaction).where(~Transaction.category.has())
return self.__session.scalars(stmt).all() return self.__session.scalars(stmt).all()
def session(self) -> ClientSession: def session(self):
return self.ClientSession(self.engine) return self.ClientSession(self.engine)

View File

@ -9,13 +9,7 @@ from sqlalchemy import (
String, String,
Text, Text,
) )
from sqlalchemy.orm import ( from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column, relationship
DeclarativeBase,
Mapped,
mapped_column,
MappedAsDataclass,
relationship,
)
from decimal import Decimal from decimal import Decimal
from typing import Annotated, Optional from typing import Annotated, Optional
@ -23,7 +17,7 @@ import datetime as dt
import enum import enum
class Base(MappedAsDataclass, DeclarativeBase): class Base(DeclarativeBase):
__table_args__ = {"schema": "transactions"} __table_args__ = {"schema": "transactions"}
metadata = MetaData( metadata = MetaData(
naming_convention={ naming_convention={
@ -75,21 +69,18 @@ money = Annotated[Decimal, mapped_column(Numeric(16, 2), nullable=False)]
class Transaction(Base): class Transaction(Base):
__tablename__ = "originals" __tablename__ = "originals"
id: Mapped[idpk] = mapped_column(autoincrement=True, init=False) id: Mapped[idpk] = mapped_column(autoincrement=True)
date: Mapped[dt.date] date: Mapped[dt.date]
description: Mapped[Optional[str]] description: Mapped[Optional[str]]
bank: Mapped[bankfk] bank: Mapped[bankfk]
amount: Mapped[money] amount: Mapped[money]
category: Mapped[Optional[TransactionCategory]] = relationship( category: Mapped[Optional[TransactionCategory]] = relationship(
back_populates="original", lazy="joined", default=None back_populates="original", lazy="joined"
) )
note: Mapped[Optional[Note]] = relationship(back_populates="original", default=None) note: Mapped[Optional[Note]] = relationship(back_populates="original")
tags: Mapped[Optional[set[Tag]]] = relationship( tags: Mapped[Optional[set[Tag]]] = relationship(
back_populates="original", back_populates="original", cascade="all, delete-orphan", passive_deletes=True
cascade="all, delete-orphan",
passive_deletes=True,
default=None,
) )
def __repr__(self) -> str: def __repr__(self) -> str:
@ -111,35 +102,20 @@ class Category(Base):
__tablename__ = "categories_available" __tablename__ = "categories_available"
name: Mapped[str] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(primary_key=True)
group: Mapped[Optional[str]] = mapped_column( group: Mapped[Optional[str]] = mapped_column(ForeignKey(CategoryGroup.name))
ForeignKey(CategoryGroup.name), default=None
)
rules: Mapped[Optional[set[CategoryRule]]] = relationship( rules: Mapped[Optional[set[CategoryRule]]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default=None cascade="all, delete-orphan", passive_deletes=True
) )
schedule: Mapped[CategorySchedule] = relationship(
back_populates="category", default=None
)
def __repr__(self) -> str:
return f"Category(name={self.name}, group={self.group}, #rules={len(self.rules)}, schedule={self.schedule})"
catfk = Annotated[
str,
mapped_column(ForeignKey(Category.name, ondelete="CASCADE")),
]
class TransactionCategory(Base): class TransactionCategory(Base):
__tablename__ = "categorized" __tablename__ = "categorized"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False) id: Mapped[idfk] = mapped_column(primary_key=True)
name: Mapped[str] = mapped_column(ForeignKey(Category.name)) name: Mapped[str] = mapped_column(ForeignKey(Category.name))
original: Mapped[Transaction] = relationship(back_populates="category") original: Mapped[Transaction] = relationship(back_populates="category")
selector: Mapped[CategorySelector] = relationship(back_populates="category")
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Category({self.name})" return f"Category({self.name})"
@ -180,66 +156,7 @@ class Tag(Base):
class CategoryRule(Base): class CategoryRule(Base):
__tablename__ = "categories_rules" __tablename__ = "categories_rules"
id: Mapped[idpk] = mapped_column(autoincrement=True, init=False) name: Mapped[str] = mapped_column(
name: Mapped[catfk] = mapped_column() ForeignKey(Category.name, ondelete="CASCADE"), primary_key=True
date: Mapped[Optional[str]] = mapped_column()
description: Mapped[Optional[str]] = mapped_column()
bank: Mapped[Optional[str]] = mapped_column()
min_amount: Mapped[Optional[float]] = mapped_column()
max_amount: Mapped[Optional[float]] = mapped_column()
class Selector(enum.Enum):
unknown = enum.auto()
nullifier = enum.auto()
vacations = enum.auto()
rules = enum.auto()
algorithm = enum.auto()
manual = enum.auto()
categoryselector = Annotated[
Selector,
mapped_column(Enum(Selector, inherit_schema=True), default=Selector.unknown),
]
class CategorySelector(Base):
__tablename__ = "categories_selector"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(TransactionCategory.id, ondelete="CASCADE"),
primary_key=True,
) )
selector: Mapped[categoryselector] rule: Mapped[str] = mapped_column(primary_key=True)
category: Mapped[TransactionCategory] = relationship(back_populates="selector")
class Period(enum.Enum):
daily = "daily"
weekly = "weekly"
monthly = "monthly"
yearly = "yearly"
scheduleperiod = Annotated[Selector, mapped_column(Enum(Period, inherit_schema=True))]
class CategorySchedule(Base):
__tablename__ = "categories_schedules"
name: Mapped[catfk] = mapped_column(primary_key=True)
recurring: Mapped[bool]
period: Mapped[Optional[scheduleperiod]]
period_multiplier: Mapped[Optional[int]]
category: Mapped[Category] = relationship(back_populates="schedule")
def __repr__(self) -> str:
return (
f"{self.name} schedule=Schedule(period={self.period}, multiplier={self.period_multiplier})"
if self.recurring
else f"{self.name} has no Schedule"
)

View File

@ -59,22 +59,21 @@ def find_credit_institution(fn, banks, creditcards):
return bank, cc return bank, cc
def parse_args_period(args: dict): def parse_args_period(args):
start, end = date.min, date.max start, end = date.min, date.max
print(args) if args.start:
if args["start"]: start = datetime.strptime(args.start[0], "%Y/%m/%d").date()
start = datetime.strptime(args["start"][0], "%Y/%m/%d").date()
if args["end"]: if args.end:
end = datetime.strptime(args["end"][0], "%Y/%m/%d").date() end = datetime.strptime(args.end[0], "%Y/%m/%d").date()
if args["interval"]: if args.interval:
start = datetime.strptime(args["interval"][0], "%Y/%m/%d").date() start = datetime.strptime(args.interval[0], "%Y/%m/%d").date()
end = datetime.strptime(args["interval"][1], "%Y/%m/%d").date() end = datetime.strptime(args.interval[1], "%Y/%m/%d").date()
if args["year"]: if args.year:
start = datetime.strptime(args["year"][0], "%Y").date() start = datetime.strptime(args.year[0], "%Y").date()
end = datetime.strptime(str(int(args["year"][0]) + 1), "%Y").date() - timedelta( end = datetime.strptime(str(int(args.year[0]) + 1), "%Y").date() - timedelta(
days=1 days=1
) )