Compare commits
3 Commits
1cce7d421e
...
fd6793b4f4
| Author | SHA1 | Date | |
|---|---|---|---|
| fd6793b4f4 | |||
| d4b5f1f11a | |||
| 6110858d48 |
152
alembic/versions/6b293f78cc97_rule_inheritance.py
Normal file
152
alembic/versions/6b293f78cc97_rule_inheritance.py
Normal file
@ -0,0 +1,152 @@
|
|||||||
|
"""Rule inheritance
|
||||||
|
|
||||||
|
Revision ID: 6b293f78cc97
|
||||||
|
Revises: 37d80de801a7
|
||||||
|
Create Date: 2023-01-22 20:05:32.887092+00:00
|
||||||
|
|
||||||
|
"""
|
||||||
|
from alembic import op
|
||||||
|
import sqlalchemy as sa
|
||||||
|
|
||||||
|
|
||||||
|
# revision identifiers, used by Alembic.
|
||||||
|
revision = "6b293f78cc97"
|
||||||
|
down_revision = "37d80de801a7"
|
||||||
|
branch_labels = None
|
||||||
|
depends_on = None
|
||||||
|
|
||||||
|
|
||||||
|
def upgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.create_table(
|
||||||
|
"rules",
|
||||||
|
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
|
||||||
|
sa.Column("date", sa.Date(), nullable=True),
|
||||||
|
sa.Column("description", sa.String(), nullable=True),
|
||||||
|
sa.Column("regex", sa.String(), nullable=True),
|
||||||
|
sa.Column("bank", sa.String(), nullable=True),
|
||||||
|
sa.Column("min", sa.Numeric(precision=16, scale=2), nullable=True),
|
||||||
|
sa.Column("max", sa.Numeric(precision=16, scale=2), nullable=True),
|
||||||
|
sa.Column("type", sa.String(), nullable=False),
|
||||||
|
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.create_foreign_key(
|
||||||
|
op.f("fk_categories_rules_id_rules"),
|
||||||
|
"categories_rules",
|
||||||
|
"rules",
|
||||||
|
["id"],
|
||||||
|
["id"],
|
||||||
|
source_schema="transactions",
|
||||||
|
referent_schema="transactions",
|
||||||
|
ondelete="CASCADE",
|
||||||
|
)
|
||||||
|
op.drop_column("categories_rules", "bank", schema="transactions")
|
||||||
|
op.drop_column("categories_rules", "min", schema="transactions")
|
||||||
|
op.drop_column("categories_rules", "date", schema="transactions")
|
||||||
|
op.drop_column("categories_rules", "regex", schema="transactions")
|
||||||
|
op.drop_column("categories_rules", "description", schema="transactions")
|
||||||
|
op.drop_column("categories_rules", "max", schema="transactions")
|
||||||
|
op.create_foreign_key(
|
||||||
|
op.f("fk_tag_rules_id_rules"),
|
||||||
|
"tag_rules",
|
||||||
|
"rules",
|
||||||
|
["id"],
|
||||||
|
["id"],
|
||||||
|
source_schema="transactions",
|
||||||
|
referent_schema="transactions",
|
||||||
|
ondelete="CASCADE",
|
||||||
|
)
|
||||||
|
op.drop_column("tag_rules", "bank", schema="transactions")
|
||||||
|
op.drop_column("tag_rules", "min", schema="transactions")
|
||||||
|
op.drop_column("tag_rules", "date", schema="transactions")
|
||||||
|
op.drop_column("tag_rules", "regex", schema="transactions")
|
||||||
|
op.drop_column("tag_rules", "description", schema="transactions")
|
||||||
|
op.drop_column("tag_rules", "max", schema="transactions")
|
||||||
|
# ### end Alembic commands ###
|
||||||
|
|
||||||
|
|
||||||
|
def downgrade() -> None:
|
||||||
|
# ### commands auto generated by Alembic - please adjust! ###
|
||||||
|
op.add_column(
|
||||||
|
"tag_rules",
|
||||||
|
sa.Column(
|
||||||
|
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
|
||||||
|
),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"tag_rules",
|
||||||
|
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"tag_rules",
|
||||||
|
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"tag_rules",
|
||||||
|
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"tag_rules",
|
||||||
|
sa.Column(
|
||||||
|
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
|
||||||
|
),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"tag_rules",
|
||||||
|
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.drop_constraint(
|
||||||
|
op.f("fk_tag_rules_id_rules"),
|
||||||
|
"tag_rules",
|
||||||
|
schema="transactions",
|
||||||
|
type_="foreignkey",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"categories_rules",
|
||||||
|
sa.Column(
|
||||||
|
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
|
||||||
|
),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"categories_rules",
|
||||||
|
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"categories_rules",
|
||||||
|
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"categories_rules",
|
||||||
|
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"categories_rules",
|
||||||
|
sa.Column(
|
||||||
|
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
|
||||||
|
),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.add_column(
|
||||||
|
"categories_rules",
|
||||||
|
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
|
||||||
|
schema="transactions",
|
||||||
|
)
|
||||||
|
op.drop_constraint(
|
||||||
|
op.f("fk_categories_rules_id_rules"),
|
||||||
|
"categories_rules",
|
||||||
|
schema="transactions",
|
||||||
|
type_="foreignkey",
|
||||||
|
)
|
||||||
|
op.drop_table("rules", schema="transactions")
|
||||||
|
# ### end Alembic commands ###
|
||||||
@ -9,4 +9,4 @@ from pfbudget.cli.runnable import argparser
|
|||||||
from pfbudget.input.parsers import parse_data
|
from pfbudget.input.parsers import parse_data
|
||||||
from pfbudget.utils.utils import parse_args_period
|
from pfbudget.utils.utils import parse_args_period
|
||||||
|
|
||||||
import pfbudget.db.model as types
|
import pfbudget.db.model as t
|
||||||
|
|||||||
@ -14,7 +14,7 @@ if __name__ == "__main__":
|
|||||||
assert "verbose" in args, "No verbose level specified"
|
assert "verbose" in args, "No verbose level specified"
|
||||||
verbosity = args.pop("verbose")
|
verbosity = args.pop("verbose")
|
||||||
|
|
||||||
params = None
|
params = []
|
||||||
match (op):
|
match (op):
|
||||||
case pfbudget.Operation.Parse:
|
case pfbudget.Operation.Parse:
|
||||||
keys = {"path", "bank", "creditcard"}
|
keys = {"path", "bank", "creditcard"}
|
||||||
@ -45,7 +45,7 @@ if __name__ == "__main__":
|
|||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [
|
params = [
|
||||||
pfbudget.types.Bank(
|
pfbudget.t.Bank(
|
||||||
args["bank"][0],
|
args["bank"][0],
|
||||||
args["bic"][0],
|
args["bic"][0],
|
||||||
args["type"][0],
|
args["type"][0],
|
||||||
@ -73,7 +73,7 @@ if __name__ == "__main__":
|
|||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [
|
params = [
|
||||||
pfbudget.types.Nordigen(
|
pfbudget.t.Nordigen(
|
||||||
args["bank"][0],
|
args["bank"][0],
|
||||||
args["bank_id"][0] if args["bank_id"] else None,
|
args["bank_id"][0] if args["bank_id"] else None,
|
||||||
args["requisition_id"][0] if args["requisition_id"] else None,
|
args["requisition_id"][0] if args["requisition_id"] else None,
|
||||||
@ -110,27 +110,27 @@ if __name__ == "__main__":
|
|||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [
|
params = [
|
||||||
pfbudget.types.Category(cat, args["group"]) for cat in args["category"]
|
pfbudget.t.Category(cat, args["group"]) for cat in args["category"]
|
||||||
]
|
]
|
||||||
|
|
||||||
case pfbudget.Operation.CategoryUpdate:
|
case pfbudget.Operation.CategoryUpdate:
|
||||||
keys = {"category", "group"}
|
keys = {"category", "group"}
|
||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [pfbudget.types.Category(cat) for cat in args["category"]]
|
params = [pfbudget.t.Category(cat) for cat in args["category"]]
|
||||||
params.append(args["group"])
|
params.append(args["group"])
|
||||||
|
|
||||||
case pfbudget.Operation.CategoryRemove:
|
case pfbudget.Operation.CategoryRemove:
|
||||||
assert "category" in args, "argparser ill defined"
|
assert "category" in args, "argparser ill defined"
|
||||||
params = [pfbudget.types.Category(cat) for cat in args["category"]]
|
params = [pfbudget.t.Category(cat) for cat in args["category"]]
|
||||||
|
|
||||||
case pfbudget.Operation.CategorySchedule:
|
case pfbudget.Operation.CategorySchedule:
|
||||||
keys = {"category", "period", "frequency"}
|
keys = {"category", "period", "frequency"}
|
||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [
|
params = [
|
||||||
pfbudget.types.CategorySchedule(
|
pfbudget.t.CategorySchedule(
|
||||||
cat, True, args["period"][0], args["frequency"][0]
|
cat, args["period"][0], args["frequency"][0], None
|
||||||
)
|
)
|
||||||
for cat in args["category"]
|
for cat in args["category"]
|
||||||
]
|
]
|
||||||
@ -140,7 +140,7 @@ if __name__ == "__main__":
|
|||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [
|
params = [
|
||||||
pfbudget.types.CategoryRule(
|
pfbudget.t.CategoryRule(
|
||||||
args["date"][0] if args["date"] else None,
|
args["date"][0] if args["date"] else None,
|
||||||
args["description"][0] if args["description"] else None,
|
args["description"][0] if args["description"] else None,
|
||||||
args["regex"][0] if args["regex"] else None,
|
args["regex"][0] if args["regex"] else None,
|
||||||
@ -184,14 +184,14 @@ if __name__ == "__main__":
|
|||||||
keys = {"tag"}
|
keys = {"tag"}
|
||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [pfbudget.types.Tag(tag) for tag in args["tag"]]
|
params = [pfbudget.t.Tag(tag) for tag in args["tag"]]
|
||||||
|
|
||||||
case pfbudget.Operation.TagRuleAdd:
|
case pfbudget.Operation.TagRuleAdd:
|
||||||
keys = {"tag", "date", "description", "bank", "min", "max"}
|
keys = {"tag", "date", "description", "bank", "min", "max"}
|
||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [
|
params = [
|
||||||
pfbudget.types.TagRule(
|
pfbudget.t.TagRule(
|
||||||
args["date"][0] if args["date"] else None,
|
args["date"][0] if args["date"] else None,
|
||||||
args["description"][0] if args["description"] else None,
|
args["description"][0] if args["description"] else None,
|
||||||
args["regex"][0] if args["regex"] else None,
|
args["regex"][0] if args["regex"] else None,
|
||||||
@ -218,31 +218,21 @@ if __name__ == "__main__":
|
|||||||
|
|
||||||
case pfbudget.Operation.GroupAdd:
|
case pfbudget.Operation.GroupAdd:
|
||||||
assert "group" in args, "argparser ill defined"
|
assert "group" in args, "argparser ill defined"
|
||||||
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]]
|
params = [pfbudget.t.CategoryGroup(group) for group in args["group"]]
|
||||||
|
|
||||||
case pfbudget.Operation.GroupRemove:
|
case pfbudget.Operation.GroupRemove:
|
||||||
assert "group" in args, "argparser ill defined"
|
assert "group" in args, "argparser ill defined"
|
||||||
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]]
|
params = [pfbudget.t.CategoryGroup(group) for group in args["group"]]
|
||||||
|
|
||||||
case pfbudget.Operation.Forge | pfbudget.Operation.Dismantle:
|
case pfbudget.Operation.Forge | pfbudget.Operation.Dismantle:
|
||||||
keys = {"original", "links"}
|
keys = {"original", "links"}
|
||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
params = [
|
params = [
|
||||||
pfbudget.types.Link(args["original"][0], link) for link in args["links"]
|
pfbudget.t.Link(args["original"][0], link) for link in args["links"]
|
||||||
]
|
]
|
||||||
|
|
||||||
case pfbudget.Operation.Export:
|
case pfbudget.Operation.Export | pfbudget.Operation.Import | pfbudget.Operation.ExportCategoryRules | pfbudget.Operation.ImportCategoryRules | pfbudget.Operation.ExportTagRules | pfbudget.Operation.ImportTagRules:
|
||||||
keys = {"interval", "start", "end", "year", "all", "banks", "file"}
|
|
||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
|
||||||
|
|
||||||
start, end = pfbudget.parse_args_period(args)
|
|
||||||
params = [start, end]
|
|
||||||
if not args["all"]:
|
|
||||||
params.append(args["banks"])
|
|
||||||
params.append(args["file"][0])
|
|
||||||
|
|
||||||
case pfbudget.Operation.Import:
|
|
||||||
keys = {"file"}
|
keys = {"file"}
|
||||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
|
|||||||
@ -1,12 +1,13 @@
|
|||||||
from dotenv import load_dotenv
|
|
||||||
import argparse
|
import argparse
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import decimal
|
import decimal
|
||||||
|
from dotenv import load_dotenv
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
|
||||||
from pfbudget.common.types import Operation
|
from pfbudget.common.types import Operation
|
||||||
from pfbudget.db.model import AccountType, Period
|
from pfbudget.db.model import AccountType, Period
|
||||||
|
|
||||||
from pfbudget.db.sqlite import DatabaseClient
|
from pfbudget.db.sqlite import DatabaseClient
|
||||||
import pfbudget.reporting.graph
|
import pfbudget.reporting.graph
|
||||||
import pfbudget.reporting.report
|
import pfbudget.reporting.report
|
||||||
@ -38,50 +39,43 @@ def argparser() -> argparse.ArgumentParser:
|
|||||||
help="select current database",
|
help="select current database",
|
||||||
default=DEFAULT_DB,
|
default=DEFAULT_DB,
|
||||||
)
|
)
|
||||||
|
|
||||||
universal.add_argument("-v", "--verbose", action="count", default=0)
|
universal.add_argument("-v", "--verbose", action="count", default=0)
|
||||||
|
|
||||||
period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group()
|
period = argparse.ArgumentParser(add_help=False)
|
||||||
period.add_argument(
|
period_group = period.add_mutually_exclusive_group()
|
||||||
|
period_group.add_argument(
|
||||||
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
|
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
|
||||||
)
|
)
|
||||||
period.add_argument("--start", type=str, nargs=1, help="graph start date")
|
period_group.add_argument("--start", type=str, nargs=1, help="graph start date")
|
||||||
period.add_argument("--end", type=str, nargs=1, help="graph end date")
|
period_group.add_argument("--end", type=str, nargs=1, help="graph end date")
|
||||||
period.add_argument("--year", type=str, nargs=1, help="graph year")
|
period_group.add_argument("--year", type=str, nargs=1, help="graph year")
|
||||||
|
|
||||||
parser = argparse.ArgumentParser(
|
parser = argparse.ArgumentParser(
|
||||||
description="does cool finance stuff",
|
description="does cool finance stuff",
|
||||||
parents=[universal],
|
parents=[universal],
|
||||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if version := re.search(
|
||||||
|
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', open("pfbudget/__init__.py").read()
|
||||||
|
):
|
||||||
parser.add_argument(
|
parser.add_argument(
|
||||||
"--version",
|
"--version",
|
||||||
action="version",
|
action="version",
|
||||||
version=re.search(
|
version=version.group(1),
|
||||||
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
|
|
||||||
open("pfbudget/__init__.py").read(),
|
|
||||||
).group(1),
|
|
||||||
)
|
)
|
||||||
|
|
||||||
subparsers = parser.add_subparsers(required=True)
|
subparsers = parser.add_subparsers(required=True)
|
||||||
|
|
||||||
"""
|
# TODO Init
|
||||||
Init
|
# init = subparsers.add_parser("init")
|
||||||
"""
|
# init.set_defaults(op=Operation.Init)
|
||||||
p_init = subparsers.add_parser(
|
|
||||||
"init",
|
|
||||||
description="Initializes the SQLite3 database",
|
|
||||||
parents=[universal],
|
|
||||||
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
|
|
||||||
)
|
|
||||||
p_init.set_defaults(command=Operation.Init)
|
|
||||||
|
|
||||||
# Exports transactions to .csv file
|
# Exports transactions to .csv file
|
||||||
export = subparsers.add_parser("export", parents=[period])
|
export = subparsers.add_parser("export")
|
||||||
export.set_defaults(op=Operation.Export)
|
export.set_defaults(op=Operation.Export)
|
||||||
export.add_argument("file", nargs=1, type=str)
|
export_args(export)
|
||||||
export_banks = export.add_mutually_exclusive_group()
|
|
||||||
export_banks.add_argument("--all", action="store_true")
|
|
||||||
export_banks.add_argument("--banks", nargs="+", type=str)
|
|
||||||
|
|
||||||
pimport = subparsers.add_parser("import")
|
pimport = subparsers.add_parser("import")
|
||||||
pimport.set_defaults(op=Operation.Import)
|
pimport.set_defaults(op=Operation.Import)
|
||||||
@ -209,11 +203,6 @@ def report(args):
|
|||||||
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
|
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
|
||||||
|
|
||||||
|
|
||||||
# def nordigen_banks(manager: Manager, args):
|
|
||||||
# input = NordigenInput(manager)
|
|
||||||
# input.list(vars(args)["country"][0])
|
|
||||||
|
|
||||||
|
|
||||||
def bank(parser: argparse.ArgumentParser):
|
def bank(parser: argparse.ArgumentParser):
|
||||||
commands = parser.add_subparsers(required=True)
|
commands = parser.add_subparsers(required=True)
|
||||||
|
|
||||||
@ -321,6 +310,14 @@ def category_rule(parser: argparse.ArgumentParser):
|
|||||||
rules(modify)
|
rules(modify)
|
||||||
modify.add_argument("--remove", nargs="*", default=[], type=str)
|
modify.add_argument("--remove", nargs="*", default=[], type=str)
|
||||||
|
|
||||||
|
export = commands.add_parser("export")
|
||||||
|
export.set_defaults(op=Operation.ExportCategoryRules)
|
||||||
|
export_args(export)
|
||||||
|
|
||||||
|
pimport = commands.add_parser("import")
|
||||||
|
pimport.set_defaults(op=Operation.ImportCategoryRules)
|
||||||
|
export_args(pimport)
|
||||||
|
|
||||||
|
|
||||||
def tags(parser: argparse.ArgumentParser):
|
def tags(parser: argparse.ArgumentParser):
|
||||||
commands = parser.add_subparsers(required=True)
|
commands = parser.add_subparsers(required=True)
|
||||||
@ -355,6 +352,14 @@ def tag_rule(parser: argparse.ArgumentParser):
|
|||||||
modify.add_argument("--tag", nargs=1, type=str)
|
modify.add_argument("--tag", nargs=1, type=str)
|
||||||
rules(modify)
|
rules(modify)
|
||||||
|
|
||||||
|
export = commands.add_parser("export")
|
||||||
|
export.set_defaults(op=Operation.ExportTagRules)
|
||||||
|
export_args(export)
|
||||||
|
|
||||||
|
pimport = commands.add_parser("import")
|
||||||
|
pimport.set_defaults(op=Operation.ImportTagRules)
|
||||||
|
export_args(pimport)
|
||||||
|
|
||||||
|
|
||||||
def rules(parser: argparse.ArgumentParser):
|
def rules(parser: argparse.ArgumentParser):
|
||||||
parser.add_argument("--date", nargs=1, type=dt.date.fromisoformat)
|
parser.add_argument("--date", nargs=1, type=dt.date.fromisoformat)
|
||||||
@ -377,3 +382,7 @@ def link(parser: argparse.ArgumentParser):
|
|||||||
dismantle.set_defaults(op=Operation.Dismantle)
|
dismantle.set_defaults(op=Operation.Dismantle)
|
||||||
dismantle.add_argument("original", nargs=1, type=int)
|
dismantle.add_argument("original", nargs=1, type=int)
|
||||||
dismantle.add_argument("links", nargs="+", type=int)
|
dismantle.add_argument("links", nargs="+", type=int)
|
||||||
|
|
||||||
|
|
||||||
|
def export_args(parser: argparse.ArgumentParser):
|
||||||
|
parser.add_argument("file", nargs=1, type=str)
|
||||||
|
|||||||
@ -37,6 +37,10 @@ class Operation(Enum):
|
|||||||
NordigenCountryBanks = auto()
|
NordigenCountryBanks = auto()
|
||||||
Export = auto()
|
Export = auto()
|
||||||
Import = auto()
|
Import = auto()
|
||||||
|
ExportCategoryRules = auto()
|
||||||
|
ImportCategoryRules = auto()
|
||||||
|
ExportTagRules = auto()
|
||||||
|
ImportTagRules = auto()
|
||||||
|
|
||||||
|
|
||||||
class TransactionError(Exception):
|
class TransactionError(Exception):
|
||||||
|
|||||||
@ -1,17 +1,8 @@
|
|||||||
from pfbudget.db.model import (
|
|
||||||
Category,
|
|
||||||
CategorySelector,
|
|
||||||
Selector,
|
|
||||||
Tag,
|
|
||||||
Transaction,
|
|
||||||
TransactionCategory,
|
|
||||||
TransactionTag,
|
|
||||||
)
|
|
||||||
|
|
||||||
from codetiming import Timer
|
from codetiming import Timer
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
from typing import Sequence
|
||||||
|
|
||||||
Transactions = list[Transaction]
|
import pfbudget.db.model as t
|
||||||
|
|
||||||
|
|
||||||
class Categorizer:
|
class Categorizer:
|
||||||
@ -22,9 +13,9 @@ class Categorizer:
|
|||||||
|
|
||||||
def rules(
|
def rules(
|
||||||
self,
|
self,
|
||||||
transactions: Transactions,
|
transactions: Sequence[t.BankTransaction],
|
||||||
categories: list[Category],
|
categories: Sequence[t.Category],
|
||||||
tags: list[Tag],
|
tags: Sequence[t.Tag],
|
||||||
):
|
):
|
||||||
"""Overarching categorization tool
|
"""Overarching categorization tool
|
||||||
|
|
||||||
@ -32,9 +23,9 @@ class Categorizer:
|
|||||||
to the rules defined for each category
|
to the rules defined for each category
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
transactions (list[Transaction]): uncategorized transactions
|
transactions (Sequence[BankTransaction]): uncategorized transactions
|
||||||
categories (list[Category]): available categories
|
categories (Sequence[Category]): available categories
|
||||||
tags (list[Tag]): currently available tags
|
tags (Sequence[Tag]): currently available tags
|
||||||
"""
|
"""
|
||||||
|
|
||||||
self._nullify(transactions)
|
self._nullify(transactions)
|
||||||
@ -44,21 +35,21 @@ class Categorizer:
|
|||||||
|
|
||||||
def manual(
|
def manual(
|
||||||
self,
|
self,
|
||||||
transactions: Transactions,
|
transactions: Sequence[t.Transaction],
|
||||||
categories: list[Category],
|
categories: Sequence[t.Category],
|
||||||
tags: list[Tag],
|
tags: Sequence[t.Tag],
|
||||||
):
|
):
|
||||||
"""Manual categorization input
|
"""Manual categorization input
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
transactions (list[Transaction]): uncategorized transactions
|
transactions (Sequence[Transaction]): uncategorized transactions
|
||||||
categories (list[Category]): available categories
|
categories (Sequence[Category]): available categories
|
||||||
tags (list[Tag]): currently available tags
|
tags (Sequence[Tag]): currently available tags
|
||||||
"""
|
"""
|
||||||
self._manual(transactions)
|
self._manual(transactions)
|
||||||
|
|
||||||
@Timer(name="nullify")
|
@Timer(name="nullify")
|
||||||
def _nullify(self, transactions: Transactions):
|
def _nullify(self, transactions: Sequence[t.BankTransaction]):
|
||||||
count = 0
|
count = 0
|
||||||
matching = []
|
matching = []
|
||||||
for transaction in transactions:
|
for transaction in transactions:
|
||||||
@ -76,11 +67,13 @@ class Categorizer:
|
|||||||
and cancel.amount == -transaction.amount
|
and cancel.amount == -transaction.amount
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
transaction.category = TransactionCategory(
|
transaction.category = t.TransactionCategory(
|
||||||
name="null", selector=CategorySelector(Selector.nullifier)
|
name="null",
|
||||||
|
selector=t.CategorySelector(t.Selector.nullifier),
|
||||||
)
|
)
|
||||||
cancel.category = TransactionCategory(
|
cancel.category = t.TransactionCategory(
|
||||||
name="null", selector=CategorySelector(Selector.nullifier)
|
name="null",
|
||||||
|
selector=t.CategorySelector(t.Selector.nullifier),
|
||||||
)
|
)
|
||||||
matching.extend([transaction, cancel])
|
matching.extend([transaction, cancel])
|
||||||
count += 2
|
count += 2
|
||||||
@ -90,7 +83,9 @@ class Categorizer:
|
|||||||
|
|
||||||
@Timer(name="categoryrules")
|
@Timer(name="categoryrules")
|
||||||
def _rule_based_categories(
|
def _rule_based_categories(
|
||||||
self, transactions: Transactions, categories: list[Category]
|
self,
|
||||||
|
transactions: Sequence[t.BankTransaction],
|
||||||
|
categories: Sequence[t.Category],
|
||||||
):
|
):
|
||||||
d = {}
|
d = {}
|
||||||
for category in [c for c in categories if c.rules]:
|
for category in [c for c in categories if c.rules]:
|
||||||
@ -114,10 +109,10 @@ class Categorizer:
|
|||||||
== "y"
|
== "y"
|
||||||
):
|
):
|
||||||
transaction.category.name = category.name
|
transaction.category.name = category.name
|
||||||
transaction.category.selector.selector = Selector.rules
|
transaction.category.selector.selector = t.Selector.rules
|
||||||
else:
|
else:
|
||||||
transaction.category = TransactionCategory(
|
transaction.category = t.TransactionCategory(
|
||||||
category.name, CategorySelector(Selector.rules)
|
category.name, t.CategorySelector(t.Selector.rules)
|
||||||
)
|
)
|
||||||
|
|
||||||
if rule in d:
|
if rule in d:
|
||||||
@ -129,9 +124,11 @@ class Categorizer:
|
|||||||
print(f"{v}: {k}")
|
print(f"{v}: {k}")
|
||||||
|
|
||||||
@Timer(name="tagrules")
|
@Timer(name="tagrules")
|
||||||
def _rule_based_tags(self, transactions: Transactions, tags: list[Tag]):
|
def _rule_based_tags(
|
||||||
|
self, transactions: Sequence[t.BankTransaction], tags: Sequence[t.Tag]
|
||||||
|
):
|
||||||
d = {}
|
d = {}
|
||||||
for tag in [t for t in tags if t.rules]:
|
for tag in [t for t in tags if len(t.rules) > 0]:
|
||||||
for rule in tag.rules:
|
for rule in tag.rules:
|
||||||
# for transaction in [t for t in transactions if not t.category]:
|
# for transaction in [t for t in transactions if not t.category]:
|
||||||
for transaction in [
|
for transaction in [
|
||||||
@ -143,9 +140,9 @@ class Categorizer:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
if not transaction.tags:
|
if not transaction.tags:
|
||||||
transaction.tags = {TransactionTag(tag.name)}
|
transaction.tags = {t.TransactionTag(tag.name)}
|
||||||
else:
|
else:
|
||||||
transaction.tags.add(TransactionTag(tag.name))
|
transaction.tags.add(t.TransactionTag(tag.name))
|
||||||
|
|
||||||
if rule in d:
|
if rule in d:
|
||||||
d[rule] += 1
|
d[rule] += 1
|
||||||
@ -155,7 +152,7 @@ class Categorizer:
|
|||||||
for k, v in d.items():
|
for k, v in d.items():
|
||||||
print(f"{v}: {k}")
|
print(f"{v}: {k}")
|
||||||
|
|
||||||
def _manual(self, transactions: Transactions):
|
def _manual(self, transactions: Sequence[t.Transaction]):
|
||||||
uncategorized = [t for t in transactions if not t.category]
|
uncategorized = [t for t in transactions if not t.category]
|
||||||
print(f"{len(uncategorized)} transactions left to categorize")
|
print(f"{len(uncategorized)} transactions left to categorize")
|
||||||
|
|
||||||
@ -167,8 +164,8 @@ class Categorizer:
|
|||||||
if not category:
|
if not category:
|
||||||
print("{category} doesn't exist")
|
print("{category} doesn't exist")
|
||||||
continue
|
continue
|
||||||
transaction.category = TransactionCategory(
|
transaction.category = t.TransactionCategory(
|
||||||
category, CategorySelector(Selector.manual)
|
category, t.CategorySelector(t.Selector.manual)
|
||||||
)
|
)
|
||||||
|
|
||||||
break
|
break
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
import pickle
|
||||||
import webbrowser
|
import webbrowser
|
||||||
|
|
||||||
from pfbudget.common.types import Operation
|
from pfbudget.common.types import Operation
|
||||||
@ -6,19 +7,22 @@ from pfbudget.core.categorizer import Categorizer
|
|||||||
from pfbudget.db.client import DbClient
|
from pfbudget.db.client import DbClient
|
||||||
from pfbudget.db.model import (
|
from pfbudget.db.model import (
|
||||||
Bank,
|
Bank,
|
||||||
|
BankTransaction,
|
||||||
Category,
|
Category,
|
||||||
CategoryGroup,
|
CategoryGroup,
|
||||||
CategoryRule,
|
CategoryRule,
|
||||||
|
CategorySelector,
|
||||||
|
Link,
|
||||||
|
MoneyTransaction,
|
||||||
Nordigen,
|
Nordigen,
|
||||||
Rule,
|
Rule,
|
||||||
Tag,
|
Tag,
|
||||||
TagRule,
|
TagRule,
|
||||||
Transaction,
|
Transaction,
|
||||||
|
TransactionCategory,
|
||||||
)
|
)
|
||||||
from pfbudget.input.nordigen import NordigenInput
|
from pfbudget.input.nordigen import NordigenInput
|
||||||
from pfbudget.input.parsers import parse_data
|
from pfbudget.input.parsers import parse_data
|
||||||
from pfbudget.output.csv import CSV
|
|
||||||
from pfbudget.output.output import Output
|
|
||||||
|
|
||||||
|
|
||||||
class Manager:
|
class Manager:
|
||||||
@ -38,18 +42,21 @@ class Manager:
|
|||||||
# Adapter for the parse_data method. Can be refactored.
|
# Adapter for the parse_data method. Can be refactored.
|
||||||
args = {"bank": params[1], "creditcard": params[2], "category": None}
|
args = {"bank": params[1], "creditcard": params[2], "category": None}
|
||||||
transactions = []
|
transactions = []
|
||||||
for path in params[0]:
|
for path in [Path(p) for p in params[0]]:
|
||||||
if (dir := Path(path)).is_dir():
|
if path.is_dir():
|
||||||
for file in dir.iterdir():
|
for file in path.iterdir():
|
||||||
transactions.extend(self.parse(file, args))
|
transactions.extend(self.parse(file, args))
|
||||||
elif Path(path).is_file():
|
elif path.is_file():
|
||||||
transactions.extend(self.parse(path, args))
|
transactions.extend(self.parse(path, args))
|
||||||
else:
|
else:
|
||||||
raise FileNotFoundError(path)
|
raise FileNotFoundError(path)
|
||||||
|
|
||||||
print(transactions)
|
if (
|
||||||
if len(transactions) > 0 and input("Commit? (y/n)") == "y":
|
len(transactions) > 0
|
||||||
self.add_transactions(sorted(transactions))
|
and input(f"{transactions[:5]}\nCommit? (y/n)") == "y"
|
||||||
|
):
|
||||||
|
with self.db.session() as session:
|
||||||
|
session.add(sorted(transactions))
|
||||||
|
|
||||||
case Operation.Download:
|
case Operation.Download:
|
||||||
client = NordigenInput()
|
client = NordigenInput()
|
||||||
@ -65,14 +72,15 @@ class Manager:
|
|||||||
|
|
||||||
# dry-run
|
# dry-run
|
||||||
if not params[2]:
|
if not params[2]:
|
||||||
self.add_transactions(transactions)
|
with self.db.session() as session:
|
||||||
|
session.add(sorted(transactions))
|
||||||
else:
|
else:
|
||||||
print(transactions)
|
print(transactions)
|
||||||
|
|
||||||
case Operation.Categorize:
|
case Operation.Categorize:
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
uncategorized = session.get(
|
uncategorized = session.get(
|
||||||
Transaction, ~Transaction.category.has()
|
BankTransaction, ~BankTransaction.category.has()
|
||||||
)
|
)
|
||||||
categories = session.get(Category)
|
categories = session.get(Category)
|
||||||
tags = session.get(Tag)
|
tags = session.get(Tag)
|
||||||
@ -152,7 +160,7 @@ class Manager:
|
|||||||
|
|
||||||
case Operation.GroupAdd:
|
case Operation.GroupAdd:
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
session.add(CategoryGroup(params))
|
session.add(params)
|
||||||
|
|
||||||
case Operation.GroupRemove:
|
case Operation.GroupRemove:
|
||||||
assert all(isinstance(param, CategoryGroup) for param in params)
|
assert all(isinstance(param, CategoryGroup) for param in params)
|
||||||
@ -164,6 +172,8 @@ class Manager:
|
|||||||
session.add(params)
|
session.add(params)
|
||||||
|
|
||||||
case Operation.Dismantle:
|
case Operation.Dismantle:
|
||||||
|
assert all(isinstance(param, Link) for param in params)
|
||||||
|
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
original = params[0].original
|
original = params[0].original
|
||||||
links = [link.link for link in params]
|
links = [link.link for link in params]
|
||||||
@ -171,20 +181,37 @@ class Manager:
|
|||||||
|
|
||||||
case Operation.Export:
|
case Operation.Export:
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
if len(params) < 4:
|
self.dump(params[0], sorted(session.get(Transaction)))
|
||||||
banks = [bank.name for bank in session.get(Bank)]
|
|
||||||
transactions = session.transactions(params[0], params[1], banks)
|
|
||||||
else:
|
|
||||||
transactions = session.transactions(
|
|
||||||
params[0], params[1], params[2]
|
|
||||||
)
|
|
||||||
|
|
||||||
csvwriter: Output = CSV(params[-1])
|
|
||||||
csvwriter.report(transactions)
|
|
||||||
|
|
||||||
case Operation.Import:
|
case Operation.Import:
|
||||||
csvwriter: Output = CSV(params[0]) # Output is strange here
|
transactions = []
|
||||||
transactions = csvwriter.load()
|
for row in self.load(params[0]):
|
||||||
|
match row["type"]:
|
||||||
|
case "bank":
|
||||||
|
transaction = BankTransaction(
|
||||||
|
row["date"],
|
||||||
|
row["description"],
|
||||||
|
row["amount"],
|
||||||
|
row["bank"],
|
||||||
|
False,
|
||||||
|
)
|
||||||
|
|
||||||
|
case "money":
|
||||||
|
transaction = MoneyTransaction(
|
||||||
|
row["date"], row["description"], row["amount"], False
|
||||||
|
)
|
||||||
|
|
||||||
|
# TODO case "split" how to match to original transaction?? also save ids?
|
||||||
|
case _:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if category := row.pop("category", None):
|
||||||
|
transaction.category = TransactionCategory(
|
||||||
|
category["name"],
|
||||||
|
CategorySelector(category["selector"]["selector"]),
|
||||||
|
)
|
||||||
|
|
||||||
|
transactions.append(transaction)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
len(transactions) > 0
|
len(transactions) > 0
|
||||||
@ -196,33 +223,46 @@ class Manager:
|
|||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
session.add(transactions)
|
session.add(transactions)
|
||||||
|
|
||||||
# def init(self):
|
case Operation.ExportCategoryRules:
|
||||||
# client = DatabaseClient(self.__db)
|
with self.db.session() as session:
|
||||||
# client.init()
|
self.dump(params[0], session.get(CategoryRule))
|
||||||
|
|
||||||
# def register(self):
|
case Operation.ImportCategoryRules:
|
||||||
# bank = Bank(self.args["bank"][0], "", self.args["requisition"][0], self.args["invert"])
|
rules = [CategoryRule(**row) for row in self.load(params[0])]
|
||||||
# client = DatabaseClient(self.__db)
|
|
||||||
# client.register_bank(convert(bank))
|
|
||||||
|
|
||||||
# def unregister(self):
|
if (
|
||||||
# client = DatabaseClient(self.__db)
|
len(rules) > 0
|
||||||
# client.unregister_bank(self.args["bank"][0])
|
and input(f"{rules[:5]}\nDoes the import seem correct? (y/n)")
|
||||||
|
== "y"
|
||||||
|
):
|
||||||
|
with self.db.session() as session:
|
||||||
|
session.add(rules)
|
||||||
|
|
||||||
def parse(self, filename: str, args: dict):
|
case Operation.ExportTagRules:
|
||||||
|
with self.db.session() as session:
|
||||||
|
self.dump(params[0], session.get(TagRule))
|
||||||
|
|
||||||
|
case Operation.ImportTagRules:
|
||||||
|
rules = [TagRule(**row) for row in self.load(params[0])]
|
||||||
|
|
||||||
|
if (
|
||||||
|
len(rules) > 0
|
||||||
|
and input(f"{rules[:5]}\nDoes the import seem correct? (y/n)")
|
||||||
|
== "y"
|
||||||
|
):
|
||||||
|
with self.db.session() as session:
|
||||||
|
session.add(rules)
|
||||||
|
|
||||||
|
def parse(self, filename: Path, args: dict):
|
||||||
return parse_data(filename, args)
|
return parse_data(filename, args)
|
||||||
|
|
||||||
# def transactions() -> list[Transaction]:
|
def dump(self, fn, sequence):
|
||||||
# pass
|
with open(fn, "wb") as f:
|
||||||
|
pickle.dump([e.format for e in sequence], f)
|
||||||
|
|
||||||
def add_transactions(self, transactions):
|
def load(self, fn):
|
||||||
with self.db.session() as session:
|
with open(fn, "rb") as f:
|
||||||
session.add(transactions)
|
return pickle.load(f)
|
||||||
|
|
||||||
# def get_bank_by(self, key: str, value: str) -> Bank:
|
|
||||||
# client = DatabaseClient(self.__db)
|
|
||||||
# bank = client.get_bank(key, value)
|
|
||||||
# return convert(bank)
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def db(self) -> DbClient:
|
def db(self) -> DbClient:
|
||||||
|
|||||||
@ -1,25 +1,16 @@
|
|||||||
from dataclasses import asdict
|
from dataclasses import asdict
|
||||||
from datetime import date
|
|
||||||
from sqlalchemy import create_engine, delete, select, update
|
from sqlalchemy import create_engine, delete, select, update
|
||||||
from sqlalchemy.dialects.postgresql import insert
|
from sqlalchemy.dialects.postgresql import insert
|
||||||
from sqlalchemy.orm import Session
|
from sqlalchemy.orm import Session
|
||||||
|
from typing import Sequence, Type, TypeVar
|
||||||
|
|
||||||
from pfbudget.db.model import (
|
from pfbudget.db.model import (
|
||||||
Category,
|
Category,
|
||||||
CategoryGroup,
|
CategoryGroup,
|
||||||
CategoryRule,
|
|
||||||
CategorySchedule,
|
CategorySchedule,
|
||||||
Link,
|
Link,
|
||||||
Tag,
|
|
||||||
TagRule,
|
|
||||||
Transaction,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# import logging
|
|
||||||
|
|
||||||
# logging.basicConfig()
|
|
||||||
# logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
|
|
||||||
|
|
||||||
|
|
||||||
class DbClient:
|
class DbClient:
|
||||||
"""
|
"""
|
||||||
@ -53,7 +44,9 @@ class DbClient:
|
|||||||
def expunge_all(self):
|
def expunge_all(self):
|
||||||
self.__session.expunge_all()
|
self.__session.expunge_all()
|
||||||
|
|
||||||
def get(self, type, column=None, values=None):
|
T = TypeVar("T")
|
||||||
|
|
||||||
|
def get(self, type: Type[T], column=None, values=None) -> Sequence[T]:
|
||||||
if column is not None:
|
if column is not None:
|
||||||
if values:
|
if values:
|
||||||
stmt = select(type).where(column.in_(values))
|
stmt = select(type).where(column.in_(values))
|
||||||
@ -67,7 +60,7 @@ class DbClient:
|
|||||||
def add(self, rows: list):
|
def add(self, rows: list):
|
||||||
self.__session.add_all(rows)
|
self.__session.add_all(rows)
|
||||||
|
|
||||||
def remove_by_name(self, type: Category | Tag | Transaction, rows: list):
|
def remove_by_name(self, type, rows: list):
|
||||||
stmt = delete(type).where(type.name.in_([row.name for row in rows]))
|
stmt = delete(type).where(type.name.in_([row.name for row in rows]))
|
||||||
self.__session.execute(stmt)
|
self.__session.execute(stmt)
|
||||||
|
|
||||||
@ -91,7 +84,7 @@ class DbClient:
|
|||||||
)
|
)
|
||||||
self.__session.execute(stmt)
|
self.__session.execute(stmt)
|
||||||
|
|
||||||
def remove_by_id(self, type: CategoryRule | TagRule, ids: list[int]):
|
def remove_by_id(self, type, ids: list[int]):
|
||||||
stmt = delete(type).where(type.id.in_(ids))
|
stmt = delete(type).where(type.id.in_(ids))
|
||||||
self.__session.execute(stmt)
|
self.__session.execute(stmt)
|
||||||
|
|
||||||
@ -99,19 +92,11 @@ class DbClient:
|
|||||||
print(type, values)
|
print(type, values)
|
||||||
self.__session.execute(update(type), values)
|
self.__session.execute(update(type), values)
|
||||||
|
|
||||||
def remove_links(self, original, links: list):
|
def remove_links(self, original: int, links: list[int]):
|
||||||
stmt = delete(Link).where(
|
stmt = delete(Link).where(
|
||||||
Link.original == original, Link.link.in_(link for link in links)
|
Link.original == original, Link.link.in_(link for link in links)
|
||||||
)
|
)
|
||||||
self.__session.execute(stmt)
|
self.__session.execute(stmt)
|
||||||
|
|
||||||
def transactions(self, min: date, max: date, banks: list[str]):
|
|
||||||
stmt = select(Transaction).where(
|
|
||||||
Transaction.date >= min,
|
|
||||||
Transaction.date <= max,
|
|
||||||
Transaction.bank.in_(banks),
|
|
||||||
)
|
|
||||||
return self.__session.scalars(stmt).all()
|
|
||||||
|
|
||||||
def session(self) -> ClientSession:
|
def session(self) -> ClientSession:
|
||||||
return self.ClientSession(self.engine)
|
return self.ClientSession(self.engine)
|
||||||
|
|||||||
@ -1,4 +1,9 @@
|
|||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
import datetime as dt
|
||||||
|
import decimal
|
||||||
|
import enum
|
||||||
|
import re
|
||||||
|
from typing import Annotated, Any, Optional
|
||||||
|
|
||||||
from sqlalchemy import (
|
from sqlalchemy import (
|
||||||
BigInteger,
|
BigInteger,
|
||||||
@ -17,12 +22,6 @@ from sqlalchemy.orm import (
|
|||||||
relationship,
|
relationship,
|
||||||
)
|
)
|
||||||
|
|
||||||
from decimal import Decimal
|
|
||||||
from typing import Annotated, Optional
|
|
||||||
import datetime as dt
|
|
||||||
import enum
|
|
||||||
import re
|
|
||||||
|
|
||||||
|
|
||||||
class Base(MappedAsDataclass, DeclarativeBase):
|
class Base(MappedAsDataclass, DeclarativeBase):
|
||||||
__table_args__ = {"schema": "transactions"}
|
__table_args__ = {"schema": "transactions"}
|
||||||
@ -52,6 +51,12 @@ accounttype = Annotated[
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
class Export:
|
||||||
|
@property
|
||||||
|
def format(self) -> dict[str, Any]:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
class Bank(Base):
|
class Bank(Base):
|
||||||
__tablename__ = "banks"
|
__tablename__ = "banks"
|
||||||
|
|
||||||
@ -59,16 +64,16 @@ class Bank(Base):
|
|||||||
BIC: Mapped[str] = mapped_column(String(8), primary_key=True)
|
BIC: Mapped[str] = mapped_column(String(8), primary_key=True)
|
||||||
type: Mapped[accounttype] = mapped_column(primary_key=True)
|
type: Mapped[accounttype] = mapped_column(primary_key=True)
|
||||||
|
|
||||||
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined")
|
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
|
||||||
|
|
||||||
|
|
||||||
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
|
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
|
||||||
|
|
||||||
idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=True)]
|
idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=True)]
|
||||||
money = Annotated[Decimal, mapped_column(Numeric(16, 2))]
|
money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
|
||||||
|
|
||||||
|
|
||||||
class Transaction(Base):
|
class Transaction(Base, Export):
|
||||||
__tablename__ = "originals"
|
__tablename__ = "originals"
|
||||||
|
|
||||||
id: Mapped[idpk] = mapped_column(init=False)
|
id: Mapped[idpk] = mapped_column(init=False)
|
||||||
@ -80,10 +85,22 @@ class Transaction(Base):
|
|||||||
|
|
||||||
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
|
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
|
||||||
note: Mapped[Optional[Note]] = relationship(init=False)
|
note: Mapped[Optional[Note]] = relationship(init=False)
|
||||||
tags: Mapped[Optional[set[TransactionTag]]] = relationship(init=False)
|
tags: Mapped[set[TransactionTag]] = relationship(init=False)
|
||||||
|
|
||||||
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
|
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self) -> dict[str, Any]:
|
||||||
|
return dict(
|
||||||
|
date=self.date,
|
||||||
|
description=self.description,
|
||||||
|
amount=self.amount,
|
||||||
|
type=self.type,
|
||||||
|
category=self.category.format if self.category else None,
|
||||||
|
# TODO note
|
||||||
|
tags=[tag.format for tag in self.tags] if self.tags else None,
|
||||||
|
)
|
||||||
|
|
||||||
def __lt__(self, other: Transaction):
|
def __lt__(self, other: Transaction):
|
||||||
return self.date < other.date
|
return self.date < other.date
|
||||||
|
|
||||||
@ -93,17 +110,20 @@ idfk = Annotated[
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class IsSplit:
|
class BankTransaction(Transaction):
|
||||||
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
|
|
||||||
|
|
||||||
|
|
||||||
class BankTransaction(IsSplit, Transaction):
|
|
||||||
bank: Mapped[bankfk] = mapped_column(nullable=True)
|
bank: Mapped[bankfk] = mapped_column(nullable=True)
|
||||||
|
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
|
||||||
|
|
||||||
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
|
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self) -> dict[str, Any]:
|
||||||
|
return super().format | dict(bank=self.bank)
|
||||||
|
|
||||||
|
|
||||||
|
class MoneyTransaction(Transaction):
|
||||||
|
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
|
||||||
|
|
||||||
class MoneyTransaction(IsSplit, Transaction):
|
|
||||||
__mapper_args__ = {"polymorphic_identity": "money"}
|
__mapper_args__ = {"polymorphic_identity": "money"}
|
||||||
|
|
||||||
|
|
||||||
@ -112,6 +132,10 @@ class SplitTransaction(Transaction):
|
|||||||
|
|
||||||
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
|
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self) -> dict[str, Any]:
|
||||||
|
return super().format | dict(original=self.original)
|
||||||
|
|
||||||
|
|
||||||
class CategoryGroup(Base):
|
class CategoryGroup(Base):
|
||||||
__tablename__ = "categories_groups"
|
__tablename__ = "categories_groups"
|
||||||
@ -127,11 +151,11 @@ class Category(Base):
|
|||||||
ForeignKey(CategoryGroup.name), default=None
|
ForeignKey(CategoryGroup.name), default=None
|
||||||
)
|
)
|
||||||
|
|
||||||
rules: Mapped[Optional[set[CategoryRule]]] = relationship(
|
rules: Mapped[set[CategoryRule]] = relationship(
|
||||||
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
|
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
|
||||||
)
|
)
|
||||||
schedule: Mapped[CategorySchedule] = relationship(
|
schedule: Mapped[Optional[CategorySchedule]] = relationship(
|
||||||
back_populates="category", default=None
|
cascade="all, delete-orphan", passive_deletes=True, default=None
|
||||||
)
|
)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
def __repr__(self) -> str:
|
||||||
@ -144,16 +168,19 @@ catfk = Annotated[
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class TransactionCategory(Base):
|
class TransactionCategory(Base, Export):
|
||||||
__tablename__ = "categorized"
|
__tablename__ = "categorized"
|
||||||
|
|
||||||
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
|
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
|
||||||
name: Mapped[catfk]
|
name: Mapped[catfk]
|
||||||
|
|
||||||
selector: Mapped[CategorySelector] = relationship(cascade="all, delete-orphan")
|
selector: Mapped[CategorySelector] = relationship(
|
||||||
|
cascade="all, delete-orphan", lazy="joined"
|
||||||
|
)
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
@property
|
||||||
return f"Category({self.name})"
|
def format(self):
|
||||||
|
return dict(name=self.name, selector=self.selector.format)
|
||||||
|
|
||||||
|
|
||||||
class Note(Base):
|
class Note(Base):
|
||||||
@ -177,17 +204,21 @@ class Tag(Base):
|
|||||||
|
|
||||||
name: Mapped[str] = mapped_column(primary_key=True)
|
name: Mapped[str] = mapped_column(primary_key=True)
|
||||||
|
|
||||||
rules: Mapped[Optional[set[TagRule]]] = relationship(
|
rules: Mapped[set[TagRule]] = relationship(
|
||||||
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
|
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
class TransactionTag(Base):
|
class TransactionTag(Base, Export):
|
||||||
__tablename__ = "tags"
|
__tablename__ = "tags"
|
||||||
|
|
||||||
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
|
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
|
||||||
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
|
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self):
|
||||||
|
return dict(tag=self.tag)
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
return hash(self.id)
|
return hash(self.id)
|
||||||
|
|
||||||
@ -207,7 +238,7 @@ categoryselector = Annotated[
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
class CategorySelector(Base):
|
class CategorySelector(Base, Export):
|
||||||
__tablename__ = "categories_selector"
|
__tablename__ = "categories_selector"
|
||||||
|
|
||||||
id: Mapped[int] = mapped_column(
|
id: Mapped[int] = mapped_column(
|
||||||
@ -218,6 +249,10 @@ class CategorySelector(Base):
|
|||||||
)
|
)
|
||||||
selector: Mapped[categoryselector]
|
selector: Mapped[categoryselector]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self):
|
||||||
|
return dict(selector=self.selector)
|
||||||
|
|
||||||
|
|
||||||
class Period(enum.Enum):
|
class Period(enum.Enum):
|
||||||
daily = "daily"
|
daily = "daily"
|
||||||
@ -237,8 +272,6 @@ class CategorySchedule(Base):
|
|||||||
period_multiplier: Mapped[Optional[int]]
|
period_multiplier: Mapped[Optional[int]]
|
||||||
amount: Mapped[Optional[int]]
|
amount: Mapped[Optional[int]]
|
||||||
|
|
||||||
category: Mapped[Category] = relationship(back_populates="schedule")
|
|
||||||
|
|
||||||
|
|
||||||
class Link(Base):
|
class Link(Base):
|
||||||
__tablename__ = "links"
|
__tablename__ = "links"
|
||||||
@ -247,7 +280,10 @@ class Link(Base):
|
|||||||
link: Mapped[idfk] = mapped_column(primary_key=True)
|
link: Mapped[idfk] = mapped_column(primary_key=True)
|
||||||
|
|
||||||
|
|
||||||
class Rule:
|
class Rule(Base, Export):
|
||||||
|
__tablename__ = "rules"
|
||||||
|
|
||||||
|
id: Mapped[idpk] = mapped_column(init=False)
|
||||||
date: Mapped[Optional[dt.date]]
|
date: Mapped[Optional[dt.date]]
|
||||||
description: Mapped[Optional[str]]
|
description: Mapped[Optional[str]]
|
||||||
regex: Mapped[Optional[str]]
|
regex: Mapped[Optional[str]]
|
||||||
@ -255,7 +291,14 @@ class Rule:
|
|||||||
min: Mapped[Optional[money]]
|
min: Mapped[Optional[money]]
|
||||||
max: Mapped[Optional[money]]
|
max: Mapped[Optional[money]]
|
||||||
|
|
||||||
def matches(self, transaction: Transaction) -> bool:
|
type: Mapped[str] = mapped_column(init=False)
|
||||||
|
|
||||||
|
__mapper_args__ = {
|
||||||
|
"polymorphic_identity": "rule",
|
||||||
|
"polymorphic_on": "type",
|
||||||
|
}
|
||||||
|
|
||||||
|
def matches(self, transaction: BankTransaction) -> bool:
|
||||||
if (
|
if (
|
||||||
(self.date and self.date < transaction.date)
|
(self.date and self.date < transaction.date)
|
||||||
or (
|
or (
|
||||||
@ -277,22 +320,60 @@ class Rule:
|
|||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self) -> dict[str, Any]:
|
||||||
|
return dict(
|
||||||
|
date=self.date,
|
||||||
|
description=self.description,
|
||||||
|
regex=self.regex,
|
||||||
|
bank=self.bank,
|
||||||
|
min=self.min,
|
||||||
|
max=self.max,
|
||||||
|
type=self.type,
|
||||||
|
)
|
||||||
|
|
||||||
class CategoryRule(Base, Rule):
|
|
||||||
|
class CategoryRule(Rule):
|
||||||
__tablename__ = "categories_rules"
|
__tablename__ = "categories_rules"
|
||||||
|
|
||||||
id: Mapped[idpk] = mapped_column(init=False)
|
id: Mapped[int] = mapped_column(
|
||||||
|
BigInteger,
|
||||||
|
ForeignKey(Rule.id, ondelete="CASCADE"),
|
||||||
|
primary_key=True,
|
||||||
|
init=False,
|
||||||
|
)
|
||||||
name: Mapped[catfk]
|
name: Mapped[catfk]
|
||||||
|
|
||||||
|
__mapper_args__ = {
|
||||||
|
"polymorphic_identity": "category_rule",
|
||||||
|
}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self) -> dict[str, Any]:
|
||||||
|
return super().format | dict(name=self.name)
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
return hash(self.id)
|
return hash(self.id)
|
||||||
|
|
||||||
|
|
||||||
class TagRule(Base, Rule):
|
class TagRule(Rule):
|
||||||
__tablename__ = "tag_rules"
|
__tablename__ = "tag_rules"
|
||||||
|
|
||||||
id: Mapped[idpk] = mapped_column(init=False)
|
id: Mapped[int] = mapped_column(
|
||||||
|
BigInteger,
|
||||||
|
ForeignKey(Rule.id, ondelete="CASCADE"),
|
||||||
|
primary_key=True,
|
||||||
|
init=False,
|
||||||
|
)
|
||||||
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
|
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
|
||||||
|
|
||||||
|
__mapper_args__ = {
|
||||||
|
"polymorphic_identity": "tag_rule",
|
||||||
|
}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def format(self) -> dict[str, Any]:
|
||||||
|
return super().format | dict(tag=self.tag)
|
||||||
|
|
||||||
def __hash__(self):
|
def __hash__(self):
|
||||||
return hash(self.id)
|
return hash(self.id)
|
||||||
|
|||||||
@ -1,18 +1,18 @@
|
|||||||
from datetime import date
|
import datetime as dt
|
||||||
from time import sleep
|
import dotenv
|
||||||
from requests import HTTPError, ReadTimeout
|
|
||||||
from dotenv import load_dotenv
|
|
||||||
from nordigen import NordigenClient
|
|
||||||
from uuid import uuid4
|
|
||||||
import json
|
import json
|
||||||
|
import nordigen
|
||||||
import os
|
import os
|
||||||
|
import requests
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
from pfbudget.db.model import BankTransaction
|
import pfbudget.db.model as t
|
||||||
from pfbudget.utils import convert
|
import pfbudget.utils as utils
|
||||||
|
|
||||||
from .input import Input
|
from .input import Input
|
||||||
|
|
||||||
load_dotenv()
|
dotenv.load_dotenv()
|
||||||
|
|
||||||
|
|
||||||
class NordigenInput(Input):
|
class NordigenInput(Input):
|
||||||
@ -20,16 +20,22 @@ class NordigenInput(Input):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super().__init__()
|
super().__init__()
|
||||||
self._client = NordigenClient(
|
|
||||||
secret_key=os.environ.get("SECRET_KEY"),
|
if not (key := os.environ.get("SECRET_KEY")) or not (
|
||||||
secret_id=os.environ.get("SECRET_ID"),
|
id := os.environ.get("SECRET_ID")
|
||||||
|
):
|
||||||
|
raise
|
||||||
|
|
||||||
|
self._client = nordigen.NordigenClient(
|
||||||
|
secret_key=key,
|
||||||
|
secret_id=id,
|
||||||
)
|
)
|
||||||
|
|
||||||
self._client.token = self.__token()
|
self._client.token = self.__token()
|
||||||
self._start = date.min
|
self._start = dt.date.min
|
||||||
self._end = date.max
|
self._end = dt.date.max
|
||||||
|
|
||||||
def parse(self) -> list[BankTransaction]:
|
def parse(self) -> list[t.BankTransaction]:
|
||||||
transactions = []
|
transactions = []
|
||||||
assert len(self._banks) > 0
|
assert len(self._banks) > 0
|
||||||
|
|
||||||
@ -49,14 +55,14 @@ class NordigenInput(Input):
|
|||||||
try:
|
try:
|
||||||
downloaded = account.get_transactions()
|
downloaded = account.get_transactions()
|
||||||
break
|
break
|
||||||
except ReadTimeout:
|
except requests.ReadTimeout:
|
||||||
retries += 1
|
retries += 1
|
||||||
print(f"Request #{retries} timed-out, retrying in 1s")
|
print(f"Request #{retries} timed-out, retrying in 1s")
|
||||||
sleep(1)
|
time.sleep(1)
|
||||||
except HTTPError as e:
|
except requests.HTTPError as e:
|
||||||
retries += 1
|
retries += 1
|
||||||
print(f"Request #{retries} failed with {e}, retrying in 1s")
|
print(f"Request #{retries} failed with {e}, retrying in 1s")
|
||||||
sleep(1)
|
time.sleep(1)
|
||||||
|
|
||||||
if not downloaded:
|
if not downloaded:
|
||||||
print(f"Couldn't download transactions for {account}")
|
print(f"Couldn't download transactions for {account}")
|
||||||
@ -66,7 +72,7 @@ class NordigenInput(Input):
|
|||||||
json.dump(downloaded, f)
|
json.dump(downloaded, f)
|
||||||
|
|
||||||
converted = [
|
converted = [
|
||||||
convert(t, bank) for t in downloaded["transactions"]["booked"]
|
utils.convert(t, bank) for t in downloaded["transactions"]["booked"]
|
||||||
]
|
]
|
||||||
|
|
||||||
transactions.extend(
|
transactions.extend(
|
||||||
@ -82,11 +88,12 @@ class NordigenInput(Input):
|
|||||||
|
|
||||||
def requisition(self, institution: str, country: str = "PT"):
|
def requisition(self, institution: str, country: str = "PT"):
|
||||||
id = self._client.institution.get_institution_id_by_name(country, institution)
|
id = self._client.institution.get_institution_id_by_name(country, institution)
|
||||||
return self._client.initialize_session(
|
requisition = self._client.initialize_session(
|
||||||
redirect_uri=self.redirect_url,
|
redirect_uri=self.redirect_url,
|
||||||
institution_id=id,
|
institution_id=id,
|
||||||
reference_id=str(uuid4()),
|
reference_id=str(uuid.uuid4()),
|
||||||
)
|
)
|
||||||
|
return requisition.link, requisition.requisition_id
|
||||||
|
|
||||||
def country_banks(self, country: str):
|
def country_banks(self, country: str):
|
||||||
return self._client.institution.get_institutions(country)
|
return self._client.institution.get_institutions(country)
|
||||||
@ -125,4 +132,4 @@ class NordigenInput(Input):
|
|||||||
else:
|
else:
|
||||||
token = self._client.generate_token()
|
token = self._client.generate_token()
|
||||||
print(f"New access token: {token}")
|
print(f"New access token: {token}")
|
||||||
return token
|
return token["access"]
|
||||||
|
|||||||
@ -1,6 +1,7 @@
|
|||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
from decimal import Decimal
|
from decimal import Decimal
|
||||||
from importlib import import_module
|
from importlib import import_module
|
||||||
|
from pathlib import Path
|
||||||
import datetime as dt
|
import datetime as dt
|
||||||
import yaml
|
import yaml
|
||||||
|
|
||||||
@ -44,7 +45,7 @@ Options = namedtuple(
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def parse_data(filename: str, args: dict) -> list[Transaction]:
|
def parse_data(filename: Path, args: dict) -> list[Transaction]:
|
||||||
cfg: dict = yaml.safe_load(open("parsers.yaml"))
|
cfg: dict = yaml.safe_load(open("parsers.yaml"))
|
||||||
assert (
|
assert (
|
||||||
"Banks" in cfg
|
"Banks" in cfg
|
||||||
@ -85,7 +86,7 @@ def parse_data(filename: str, args: dict) -> list[Transaction]:
|
|||||||
|
|
||||||
|
|
||||||
class Parser:
|
class Parser:
|
||||||
def __init__(self, filename: str, bank: str, options: dict):
|
def __init__(self, filename: Path, bank: str, options: dict):
|
||||||
self.filename = filename
|
self.filename = filename
|
||||||
self.bank = bank
|
self.bank = bank
|
||||||
|
|
||||||
|
|||||||
@ -1 +0,0 @@
|
|||||||
__all__ = ["csv", "output"]
|
|
||||||
@ -1,35 +0,0 @@
|
|||||||
from csv import DictReader, writer
|
|
||||||
|
|
||||||
from pfbudget.db.model import (
|
|
||||||
BankTransaction,
|
|
||||||
MoneyTransaction,
|
|
||||||
Transaction,
|
|
||||||
)
|
|
||||||
|
|
||||||
from .output import Output
|
|
||||||
|
|
||||||
|
|
||||||
class CSV(Output):
|
|
||||||
def __init__(self, filename: str):
|
|
||||||
self.fn = filename
|
|
||||||
|
|
||||||
def load(self) -> list[Transaction]:
|
|
||||||
with open(self.fn, "r", newline="") as f:
|
|
||||||
r = DictReader(f)
|
|
||||||
return [
|
|
||||||
BankTransaction(
|
|
||||||
row["date"], row["description"], row["amount"], False, row["bank"]
|
|
||||||
)
|
|
||||||
if row["bank"]
|
|
||||||
else MoneyTransaction(
|
|
||||||
row["date"], row["description"], False, row["amount"]
|
|
||||||
)
|
|
||||||
for row in r
|
|
||||||
]
|
|
||||||
|
|
||||||
def report(self, transactions: list[Transaction]):
|
|
||||||
with open(self.fn, "w", newline="") as f:
|
|
||||||
w = writer(f, delimiter="\t")
|
|
||||||
w.writerows(
|
|
||||||
[(t.date, t.description, t.amount, t.bank) for t in transactions]
|
|
||||||
)
|
|
||||||
@ -1,9 +0,0 @@
|
|||||||
from abc import ABC, abstractmethod
|
|
||||||
|
|
||||||
from pfbudget.db.model import Transaction
|
|
||||||
|
|
||||||
|
|
||||||
class Output(ABC):
|
|
||||||
@abstractmethod
|
|
||||||
def report(self, transactions: list[Transaction]):
|
|
||||||
raise NotImplementedError
|
|
||||||
@ -1,23 +1,25 @@
|
|||||||
from datetime import date
|
import datetime as dt
|
||||||
from functools import singledispatch
|
import functools
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
from pfbudget.common.types import TransactionError
|
from pfbudget.common.types import TransactionError
|
||||||
from pfbudget.db.model import Bank, BankTransaction
|
import pfbudget.db.model as t
|
||||||
|
|
||||||
from .utils import parse_decimal
|
from .utils import parse_decimal
|
||||||
|
|
||||||
|
|
||||||
@singledispatch
|
@functools.singledispatch
|
||||||
def convert(t):
|
def convert(t) -> Any:
|
||||||
print("No converter as been found")
|
print("No converter has been found")
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@convert.register
|
@convert.register
|
||||||
def _(json: dict, bank: Bank) -> BankTransaction:
|
def _(json: dict, bank: t.Bank) -> t.BankTransaction | None:
|
||||||
i = -1 if bank.nordigen.invert else 1
|
i = -1 if bank.nordigen and bank.nordigen.invert else 1
|
||||||
try:
|
try:
|
||||||
transaction = BankTransaction(
|
transaction = t.BankTransaction(
|
||||||
date=date.fromisoformat(json["bookingDate"]),
|
date=dt.date.fromisoformat(json["bookingDate"]),
|
||||||
description=json["remittanceInformationUnstructured"],
|
description=json["remittanceInformationUnstructured"],
|
||||||
bank=bank.name,
|
bank=bank.name,
|
||||||
amount=i * parse_decimal(json["transactionAmount"]["amount"]),
|
amount=i * parse_decimal(json["transactionAmount"]["amount"]),
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user