Compare commits

...

3 Commits

Author SHA1 Message Date
fd6793b4f4
Turned on type checking
and as a result, had to fix a LOT of minor potential future issue.
It also reorders and clears unused imports.

When exporting transactions, it will sort by date.
2023-01-22 20:44:05 +00:00
d4b5f1f11a
Rule inheritance
Both rules, categorries and tags, now derive from the rule base type.
This clears up some type definitions.
2023-01-22 20:22:46 +00:00
6110858d48
Extend export/import to rules
Removes additional bank/all options from the transactions export command
line.
Deletes the brief lived CSV class.
This patch start using pickle for simple export/import, other options
can be added later. An issue with the .csv is the lack of a Null field.
Moves logic to Manager, it is simple enough.
2023-01-15 23:06:20 +00:00
15 changed files with 503 additions and 280 deletions

View File

@ -0,0 +1,152 @@
"""Rule inheritance
Revision ID: 6b293f78cc97
Revises: 37d80de801a7
Create Date: 2023-01-22 20:05:32.887092+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6b293f78cc97"
down_revision = "37d80de801a7"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("regex", sa.String(), nullable=True),
sa.Column("bank", sa.String(), nullable=True),
sa.Column("min", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("max", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("type", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_categories_rules_id_rules"),
"categories_rules",
"rules",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_column("categories_rules", "bank", schema="transactions")
op.drop_column("categories_rules", "min", schema="transactions")
op.drop_column("categories_rules", "date", schema="transactions")
op.drop_column("categories_rules", "regex", schema="transactions")
op.drop_column("categories_rules", "description", schema="transactions")
op.drop_column("categories_rules", "max", schema="transactions")
op.create_foreign_key(
op.f("fk_tag_rules_id_rules"),
"tag_rules",
"rules",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_column("tag_rules", "bank", schema="transactions")
op.drop_column("tag_rules", "min", schema="transactions")
op.drop_column("tag_rules", "date", schema="transactions")
op.drop_column("tag_rules", "regex", schema="transactions")
op.drop_column("tag_rules", "description", schema="transactions")
op.drop_column("tag_rules", "max", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"tag_rules",
sa.Column(
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column(
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.drop_constraint(
op.f("fk_tag_rules_id_rules"),
"tag_rules",
schema="transactions",
type_="foreignkey",
)
op.add_column(
"categories_rules",
sa.Column(
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column(
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.drop_constraint(
op.f("fk_categories_rules_id_rules"),
"categories_rules",
schema="transactions",
type_="foreignkey",
)
op.drop_table("rules", schema="transactions")
# ### end Alembic commands ###

View File

@ -9,4 +9,4 @@ from pfbudget.cli.runnable import argparser
from pfbudget.input.parsers import parse_data from pfbudget.input.parsers import parse_data
from pfbudget.utils.utils import parse_args_period from pfbudget.utils.utils import parse_args_period
import pfbudget.db.model as types import pfbudget.db.model as t

View File

@ -14,7 +14,7 @@ if __name__ == "__main__":
assert "verbose" in args, "No verbose level specified" assert "verbose" in args, "No verbose level specified"
verbosity = args.pop("verbose") verbosity = args.pop("verbose")
params = None params = []
match (op): match (op):
case pfbudget.Operation.Parse: case pfbudget.Operation.Parse:
keys = {"path", "bank", "creditcard"} keys = {"path", "bank", "creditcard"}
@ -45,7 +45,7 @@ if __name__ == "__main__":
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.Bank( pfbudget.t.Bank(
args["bank"][0], args["bank"][0],
args["bic"][0], args["bic"][0],
args["type"][0], args["type"][0],
@ -73,7 +73,7 @@ if __name__ == "__main__":
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.Nordigen( pfbudget.t.Nordigen(
args["bank"][0], args["bank"][0],
args["bank_id"][0] if args["bank_id"] else None, args["bank_id"][0] if args["bank_id"] else None,
args["requisition_id"][0] if args["requisition_id"] else None, args["requisition_id"][0] if args["requisition_id"] else None,
@ -110,27 +110,27 @@ if __name__ == "__main__":
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.Category(cat, args["group"]) for cat in args["category"] pfbudget.t.Category(cat, args["group"]) for cat in args["category"]
] ]
case pfbudget.Operation.CategoryUpdate: case pfbudget.Operation.CategoryUpdate:
keys = {"category", "group"} keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [pfbudget.types.Category(cat) for cat in args["category"]] params = [pfbudget.t.Category(cat) for cat in args["category"]]
params.append(args["group"]) params.append(args["group"])
case pfbudget.Operation.CategoryRemove: case pfbudget.Operation.CategoryRemove:
assert "category" in args, "argparser ill defined" assert "category" in args, "argparser ill defined"
params = [pfbudget.types.Category(cat) for cat in args["category"]] params = [pfbudget.t.Category(cat) for cat in args["category"]]
case pfbudget.Operation.CategorySchedule: case pfbudget.Operation.CategorySchedule:
keys = {"category", "period", "frequency"} keys = {"category", "period", "frequency"}
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.CategorySchedule( pfbudget.t.CategorySchedule(
cat, True, args["period"][0], args["frequency"][0] cat, args["period"][0], args["frequency"][0], None
) )
for cat in args["category"] for cat in args["category"]
] ]
@ -140,7 +140,7 @@ if __name__ == "__main__":
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.CategoryRule( pfbudget.t.CategoryRule(
args["date"][0] if args["date"] else None, args["date"][0] if args["date"] else None,
args["description"][0] if args["description"] else None, args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None, args["regex"][0] if args["regex"] else None,
@ -184,14 +184,14 @@ if __name__ == "__main__":
keys = {"tag"} keys = {"tag"}
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [pfbudget.types.Tag(tag) for tag in args["tag"]] params = [pfbudget.t.Tag(tag) for tag in args["tag"]]
case pfbudget.Operation.TagRuleAdd: case pfbudget.Operation.TagRuleAdd:
keys = {"tag", "date", "description", "bank", "min", "max"} keys = {"tag", "date", "description", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.TagRule( pfbudget.t.TagRule(
args["date"][0] if args["date"] else None, args["date"][0] if args["date"] else None,
args["description"][0] if args["description"] else None, args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None, args["regex"][0] if args["regex"] else None,
@ -218,31 +218,21 @@ if __name__ == "__main__":
case pfbudget.Operation.GroupAdd: case pfbudget.Operation.GroupAdd:
assert "group" in args, "argparser ill defined" assert "group" in args, "argparser ill defined"
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]] params = [pfbudget.t.CategoryGroup(group) for group in args["group"]]
case pfbudget.Operation.GroupRemove: case pfbudget.Operation.GroupRemove:
assert "group" in args, "argparser ill defined" assert "group" in args, "argparser ill defined"
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]] params = [pfbudget.t.CategoryGroup(group) for group in args["group"]]
case pfbudget.Operation.Forge | pfbudget.Operation.Dismantle: case pfbudget.Operation.Forge | pfbudget.Operation.Dismantle:
keys = {"original", "links"} keys = {"original", "links"}
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.Link(args["original"][0], link) for link in args["links"] pfbudget.t.Link(args["original"][0], link) for link in args["links"]
] ]
case pfbudget.Operation.Export: case pfbudget.Operation.Export | pfbudget.Operation.Import | pfbudget.Operation.ExportCategoryRules | pfbudget.Operation.ImportCategoryRules | pfbudget.Operation.ExportTagRules | pfbudget.Operation.ImportTagRules:
keys = {"interval", "start", "end", "year", "all", "banks", "file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = pfbudget.parse_args_period(args)
params = [start, end]
if not args["all"]:
params.append(args["banks"])
params.append(args["file"][0])
case pfbudget.Operation.Import:
keys = {"file"} keys = {"file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"

View File

@ -1,12 +1,13 @@
from dotenv import load_dotenv
import argparse import argparse
import datetime as dt import datetime as dt
import decimal import decimal
from dotenv import load_dotenv
import os import os
import re import re
from pfbudget.common.types import Operation from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, Period from pfbudget.db.model import AccountType, Period
from pfbudget.db.sqlite import DatabaseClient from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph import pfbudget.reporting.graph
import pfbudget.reporting.report import pfbudget.reporting.report
@ -38,50 +39,43 @@ def argparser() -> argparse.ArgumentParser:
help="select current database", help="select current database",
default=DEFAULT_DB, default=DEFAULT_DB,
) )
universal.add_argument("-v", "--verbose", action="count", default=0) universal.add_argument("-v", "--verbose", action="count", default=0)
period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group() period = argparse.ArgumentParser(add_help=False)
period.add_argument( period_group = period.add_mutually_exclusive_group()
period_group.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END") "--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
) )
period.add_argument("--start", type=str, nargs=1, help="graph start date") period_group.add_argument("--start", type=str, nargs=1, help="graph start date")
period.add_argument("--end", type=str, nargs=1, help="graph end date") period_group.add_argument("--end", type=str, nargs=1, help="graph end date")
period.add_argument("--year", type=str, nargs=1, help="graph year") period_group.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser( parser = argparse.ArgumentParser(
description="does cool finance stuff", description="does cool finance stuff",
parents=[universal], parents=[universal],
formatter_class=argparse.ArgumentDefaultsHelpFormatter, formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) )
parser.add_argument(
"--version", if version := re.search(
action="version", r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', open("pfbudget/__init__.py").read()
version=re.search( ):
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', parser.add_argument(
open("pfbudget/__init__.py").read(), "--version",
).group(1), action="version",
) version=version.group(1),
)
subparsers = parser.add_subparsers(required=True) subparsers = parser.add_subparsers(required=True)
""" # TODO Init
Init # init = subparsers.add_parser("init")
""" # init.set_defaults(op=Operation.Init)
p_init = subparsers.add_parser(
"init",
description="Initializes the SQLite3 database",
parents=[universal],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_init.set_defaults(command=Operation.Init)
# Exports transactions to .csv file # Exports transactions to .csv file
export = subparsers.add_parser("export", parents=[period]) export = subparsers.add_parser("export")
export.set_defaults(op=Operation.Export) export.set_defaults(op=Operation.Export)
export.add_argument("file", nargs=1, type=str) export_args(export)
export_banks = export.add_mutually_exclusive_group()
export_banks.add_argument("--all", action="store_true")
export_banks.add_argument("--banks", nargs="+", type=str)
pimport = subparsers.add_parser("import") pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import) pimport.set_defaults(op=Operation.Import)
@ -209,11 +203,6 @@ def report(args):
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end) pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
# def nordigen_banks(manager: Manager, args):
# input = NordigenInput(manager)
# input.list(vars(args)["country"][0])
def bank(parser: argparse.ArgumentParser): def bank(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True) commands = parser.add_subparsers(required=True)
@ -321,6 +310,14 @@ def category_rule(parser: argparse.ArgumentParser):
rules(modify) rules(modify)
modify.add_argument("--remove", nargs="*", default=[], type=str) modify.add_argument("--remove", nargs="*", default=[], type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryRules)
export_args(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryRules)
export_args(pimport)
def tags(parser: argparse.ArgumentParser): def tags(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True) commands = parser.add_subparsers(required=True)
@ -355,6 +352,14 @@ def tag_rule(parser: argparse.ArgumentParser):
modify.add_argument("--tag", nargs=1, type=str) modify.add_argument("--tag", nargs=1, type=str)
rules(modify) rules(modify)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportTagRules)
export_args(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportTagRules)
export_args(pimport)
def rules(parser: argparse.ArgumentParser): def rules(parser: argparse.ArgumentParser):
parser.add_argument("--date", nargs=1, type=dt.date.fromisoformat) parser.add_argument("--date", nargs=1, type=dt.date.fromisoformat)
@ -377,3 +382,7 @@ def link(parser: argparse.ArgumentParser):
dismantle.set_defaults(op=Operation.Dismantle) dismantle.set_defaults(op=Operation.Dismantle)
dismantle.add_argument("original", nargs=1, type=int) dismantle.add_argument("original", nargs=1, type=int)
dismantle.add_argument("links", nargs="+", type=int) dismantle.add_argument("links", nargs="+", type=int)
def export_args(parser: argparse.ArgumentParser):
parser.add_argument("file", nargs=1, type=str)

View File

@ -37,6 +37,10 @@ class Operation(Enum):
NordigenCountryBanks = auto() NordigenCountryBanks = auto()
Export = auto() Export = auto()
Import = auto() Import = auto()
ExportCategoryRules = auto()
ImportCategoryRules = auto()
ExportTagRules = auto()
ImportTagRules = auto()
class TransactionError(Exception): class TransactionError(Exception):

View File

@ -1,17 +1,8 @@
from pfbudget.db.model import (
Category,
CategorySelector,
Selector,
Tag,
Transaction,
TransactionCategory,
TransactionTag,
)
from codetiming import Timer from codetiming import Timer
from datetime import timedelta from datetime import timedelta
from typing import Sequence
Transactions = list[Transaction] import pfbudget.db.model as t
class Categorizer: class Categorizer:
@ -22,9 +13,9 @@ class Categorizer:
def rules( def rules(
self, self,
transactions: Transactions, transactions: Sequence[t.BankTransaction],
categories: list[Category], categories: Sequence[t.Category],
tags: list[Tag], tags: Sequence[t.Tag],
): ):
"""Overarching categorization tool """Overarching categorization tool
@ -32,9 +23,9 @@ class Categorizer:
to the rules defined for each category to the rules defined for each category
Args: Args:
transactions (list[Transaction]): uncategorized transactions transactions (Sequence[BankTransaction]): uncategorized transactions
categories (list[Category]): available categories categories (Sequence[Category]): available categories
tags (list[Tag]): currently available tags tags (Sequence[Tag]): currently available tags
""" """
self._nullify(transactions) self._nullify(transactions)
@ -44,21 +35,21 @@ class Categorizer:
def manual( def manual(
self, self,
transactions: Transactions, transactions: Sequence[t.Transaction],
categories: list[Category], categories: Sequence[t.Category],
tags: list[Tag], tags: Sequence[t.Tag],
): ):
"""Manual categorization input """Manual categorization input
Args: Args:
transactions (list[Transaction]): uncategorized transactions transactions (Sequence[Transaction]): uncategorized transactions
categories (list[Category]): available categories categories (Sequence[Category]): available categories
tags (list[Tag]): currently available tags tags (Sequence[Tag]): currently available tags
""" """
self._manual(transactions) self._manual(transactions)
@Timer(name="nullify") @Timer(name="nullify")
def _nullify(self, transactions: Transactions): def _nullify(self, transactions: Sequence[t.BankTransaction]):
count = 0 count = 0
matching = [] matching = []
for transaction in transactions: for transaction in transactions:
@ -76,11 +67,13 @@ class Categorizer:
and cancel.amount == -transaction.amount and cancel.amount == -transaction.amount
) )
): ):
transaction.category = TransactionCategory( transaction.category = t.TransactionCategory(
name="null", selector=CategorySelector(Selector.nullifier) name="null",
selector=t.CategorySelector(t.Selector.nullifier),
) )
cancel.category = TransactionCategory( cancel.category = t.TransactionCategory(
name="null", selector=CategorySelector(Selector.nullifier) name="null",
selector=t.CategorySelector(t.Selector.nullifier),
) )
matching.extend([transaction, cancel]) matching.extend([transaction, cancel])
count += 2 count += 2
@ -90,7 +83,9 @@ class Categorizer:
@Timer(name="categoryrules") @Timer(name="categoryrules")
def _rule_based_categories( def _rule_based_categories(
self, transactions: Transactions, categories: list[Category] self,
transactions: Sequence[t.BankTransaction],
categories: Sequence[t.Category],
): ):
d = {} d = {}
for category in [c for c in categories if c.rules]: for category in [c for c in categories if c.rules]:
@ -114,10 +109,10 @@ class Categorizer:
== "y" == "y"
): ):
transaction.category.name = category.name transaction.category.name = category.name
transaction.category.selector.selector = Selector.rules transaction.category.selector.selector = t.Selector.rules
else: else:
transaction.category = TransactionCategory( transaction.category = t.TransactionCategory(
category.name, CategorySelector(Selector.rules) category.name, t.CategorySelector(t.Selector.rules)
) )
if rule in d: if rule in d:
@ -129,9 +124,11 @@ class Categorizer:
print(f"{v}: {k}") print(f"{v}: {k}")
@Timer(name="tagrules") @Timer(name="tagrules")
def _rule_based_tags(self, transactions: Transactions, tags: list[Tag]): def _rule_based_tags(
self, transactions: Sequence[t.BankTransaction], tags: Sequence[t.Tag]
):
d = {} d = {}
for tag in [t for t in tags if t.rules]: for tag in [t for t in tags if len(t.rules) > 0]:
for rule in tag.rules: for rule in tag.rules:
# for transaction in [t for t in transactions if not t.category]: # for transaction in [t for t in transactions if not t.category]:
for transaction in [ for transaction in [
@ -143,9 +140,9 @@ class Categorizer:
continue continue
if not transaction.tags: if not transaction.tags:
transaction.tags = {TransactionTag(tag.name)} transaction.tags = {t.TransactionTag(tag.name)}
else: else:
transaction.tags.add(TransactionTag(tag.name)) transaction.tags.add(t.TransactionTag(tag.name))
if rule in d: if rule in d:
d[rule] += 1 d[rule] += 1
@ -155,7 +152,7 @@ class Categorizer:
for k, v in d.items(): for k, v in d.items():
print(f"{v}: {k}") print(f"{v}: {k}")
def _manual(self, transactions: Transactions): def _manual(self, transactions: Sequence[t.Transaction]):
uncategorized = [t for t in transactions if not t.category] uncategorized = [t for t in transactions if not t.category]
print(f"{len(uncategorized)} transactions left to categorize") print(f"{len(uncategorized)} transactions left to categorize")
@ -167,8 +164,8 @@ class Categorizer:
if not category: if not category:
print("{category} doesn't exist") print("{category} doesn't exist")
continue continue
transaction.category = TransactionCategory( transaction.category = t.TransactionCategory(
category, CategorySelector(Selector.manual) category, t.CategorySelector(t.Selector.manual)
) )
break break

View File

@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
import pickle
import webbrowser import webbrowser
from pfbudget.common.types import Operation from pfbudget.common.types import Operation
@ -6,19 +7,22 @@ from pfbudget.core.categorizer import Categorizer
from pfbudget.db.client import DbClient from pfbudget.db.client import DbClient
from pfbudget.db.model import ( from pfbudget.db.model import (
Bank, Bank,
BankTransaction,
Category, Category,
CategoryGroup, CategoryGroup,
CategoryRule, CategoryRule,
CategorySelector,
Link,
MoneyTransaction,
Nordigen, Nordigen,
Rule, Rule,
Tag, Tag,
TagRule, TagRule,
Transaction, Transaction,
TransactionCategory,
) )
from pfbudget.input.nordigen import NordigenInput from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.parsers import parse_data from pfbudget.input.parsers import parse_data
from pfbudget.output.csv import CSV
from pfbudget.output.output import Output
class Manager: class Manager:
@ -38,18 +42,21 @@ class Manager:
# Adapter for the parse_data method. Can be refactored. # Adapter for the parse_data method. Can be refactored.
args = {"bank": params[1], "creditcard": params[2], "category": None} args = {"bank": params[1], "creditcard": params[2], "category": None}
transactions = [] transactions = []
for path in params[0]: for path in [Path(p) for p in params[0]]:
if (dir := Path(path)).is_dir(): if path.is_dir():
for file in dir.iterdir(): for file in path.iterdir():
transactions.extend(self.parse(file, args)) transactions.extend(self.parse(file, args))
elif Path(path).is_file(): elif path.is_file():
transactions.extend(self.parse(path, args)) transactions.extend(self.parse(path, args))
else: else:
raise FileNotFoundError(path) raise FileNotFoundError(path)
print(transactions) if (
if len(transactions) > 0 and input("Commit? (y/n)") == "y": len(transactions) > 0
self.add_transactions(sorted(transactions)) and input(f"{transactions[:5]}\nCommit? (y/n)") == "y"
):
with self.db.session() as session:
session.add(sorted(transactions))
case Operation.Download: case Operation.Download:
client = NordigenInput() client = NordigenInput()
@ -65,14 +72,15 @@ class Manager:
# dry-run # dry-run
if not params[2]: if not params[2]:
self.add_transactions(transactions) with self.db.session() as session:
session.add(sorted(transactions))
else: else:
print(transactions) print(transactions)
case Operation.Categorize: case Operation.Categorize:
with self.db.session() as session: with self.db.session() as session:
uncategorized = session.get( uncategorized = session.get(
Transaction, ~Transaction.category.has() BankTransaction, ~BankTransaction.category.has()
) )
categories = session.get(Category) categories = session.get(Category)
tags = session.get(Tag) tags = session.get(Tag)
@ -152,7 +160,7 @@ class Manager:
case Operation.GroupAdd: case Operation.GroupAdd:
with self.db.session() as session: with self.db.session() as session:
session.add(CategoryGroup(params)) session.add(params)
case Operation.GroupRemove: case Operation.GroupRemove:
assert all(isinstance(param, CategoryGroup) for param in params) assert all(isinstance(param, CategoryGroup) for param in params)
@ -164,6 +172,8 @@ class Manager:
session.add(params) session.add(params)
case Operation.Dismantle: case Operation.Dismantle:
assert all(isinstance(param, Link) for param in params)
with self.db.session() as session: with self.db.session() as session:
original = params[0].original original = params[0].original
links = [link.link for link in params] links = [link.link for link in params]
@ -171,20 +181,37 @@ class Manager:
case Operation.Export: case Operation.Export:
with self.db.session() as session: with self.db.session() as session:
if len(params) < 4: self.dump(params[0], sorted(session.get(Transaction)))
banks = [bank.name for bank in session.get(Bank)]
transactions = session.transactions(params[0], params[1], banks)
else:
transactions = session.transactions(
params[0], params[1], params[2]
)
csvwriter: Output = CSV(params[-1])
csvwriter.report(transactions)
case Operation.Import: case Operation.Import:
csvwriter: Output = CSV(params[0]) # Output is strange here transactions = []
transactions = csvwriter.load() for row in self.load(params[0]):
match row["type"]:
case "bank":
transaction = BankTransaction(
row["date"],
row["description"],
row["amount"],
row["bank"],
False,
)
case "money":
transaction = MoneyTransaction(
row["date"], row["description"], row["amount"], False
)
# TODO case "split" how to match to original transaction?? also save ids?
case _:
continue
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"],
CategorySelector(category["selector"]["selector"]),
)
transactions.append(transaction)
if ( if (
len(transactions) > 0 len(transactions) > 0
@ -196,33 +223,46 @@ class Manager:
with self.db.session() as session: with self.db.session() as session:
session.add(transactions) session.add(transactions)
# def init(self): case Operation.ExportCategoryRules:
# client = DatabaseClient(self.__db) with self.db.session() as session:
# client.init() self.dump(params[0], session.get(CategoryRule))
# def register(self): case Operation.ImportCategoryRules:
# bank = Bank(self.args["bank"][0], "", self.args["requisition"][0], self.args["invert"]) rules = [CategoryRule(**row) for row in self.load(params[0])]
# client = DatabaseClient(self.__db)
# client.register_bank(convert(bank))
# def unregister(self): if (
# client = DatabaseClient(self.__db) len(rules) > 0
# client.unregister_bank(self.args["bank"][0]) and input(f"{rules[:5]}\nDoes the import seem correct? (y/n)")
== "y"
):
with self.db.session() as session:
session.add(rules)
def parse(self, filename: str, args: dict): case Operation.ExportTagRules:
with self.db.session() as session:
self.dump(params[0], session.get(TagRule))
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0])]
if (
len(rules) > 0
and input(f"{rules[:5]}\nDoes the import seem correct? (y/n)")
== "y"
):
with self.db.session() as session:
session.add(rules)
def parse(self, filename: Path, args: dict):
return parse_data(filename, args) return parse_data(filename, args)
# def transactions() -> list[Transaction]: def dump(self, fn, sequence):
# pass with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
def add_transactions(self, transactions): def load(self, fn):
with self.db.session() as session: with open(fn, "rb") as f:
session.add(transactions) return pickle.load(f)
# def get_bank_by(self, key: str, value: str) -> Bank:
# client = DatabaseClient(self.__db)
# bank = client.get_bank(key, value)
# return convert(bank)
@property @property
def db(self) -> DbClient: def db(self) -> DbClient:

View File

@ -1,25 +1,16 @@
from dataclasses import asdict from dataclasses import asdict
from datetime import date
from sqlalchemy import create_engine, delete, select, update from sqlalchemy import create_engine, delete, select, update
from sqlalchemy.dialects.postgresql import insert from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from typing import Sequence, Type, TypeVar
from pfbudget.db.model import ( from pfbudget.db.model import (
Category, Category,
CategoryGroup, CategoryGroup,
CategoryRule,
CategorySchedule, CategorySchedule,
Link, Link,
Tag,
TagRule,
Transaction,
) )
# import logging
# logging.basicConfig()
# logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO)
class DbClient: class DbClient:
""" """
@ -53,7 +44,9 @@ class DbClient:
def expunge_all(self): def expunge_all(self):
self.__session.expunge_all() self.__session.expunge_all()
def get(self, type, column=None, values=None): T = TypeVar("T")
def get(self, type: Type[T], column=None, values=None) -> Sequence[T]:
if column is not None: if column is not None:
if values: if values:
stmt = select(type).where(column.in_(values)) stmt = select(type).where(column.in_(values))
@ -67,7 +60,7 @@ class DbClient:
def add(self, rows: list): def add(self, rows: list):
self.__session.add_all(rows) self.__session.add_all(rows)
def remove_by_name(self, type: Category | Tag | Transaction, rows: list): def remove_by_name(self, type, rows: list):
stmt = delete(type).where(type.name.in_([row.name for row in rows])) stmt = delete(type).where(type.name.in_([row.name for row in rows]))
self.__session.execute(stmt) self.__session.execute(stmt)
@ -91,7 +84,7 @@ class DbClient:
) )
self.__session.execute(stmt) self.__session.execute(stmt)
def remove_by_id(self, type: CategoryRule | TagRule, ids: list[int]): def remove_by_id(self, type, ids: list[int]):
stmt = delete(type).where(type.id.in_(ids)) stmt = delete(type).where(type.id.in_(ids))
self.__session.execute(stmt) self.__session.execute(stmt)
@ -99,19 +92,11 @@ class DbClient:
print(type, values) print(type, values)
self.__session.execute(update(type), values) self.__session.execute(update(type), values)
def remove_links(self, original, links: list): def remove_links(self, original: int, links: list[int]):
stmt = delete(Link).where( stmt = delete(Link).where(
Link.original == original, Link.link.in_(link for link in links) Link.original == original, Link.link.in_(link for link in links)
) )
self.__session.execute(stmt) self.__session.execute(stmt)
def transactions(self, min: date, max: date, banks: list[str]):
stmt = select(Transaction).where(
Transaction.date >= min,
Transaction.date <= max,
Transaction.bank.in_(banks),
)
return self.__session.scalars(stmt).all()
def session(self) -> ClientSession: def session(self) -> ClientSession:
return self.ClientSession(self.engine) return self.ClientSession(self.engine)

View File

@ -1,4 +1,9 @@
from __future__ import annotations from __future__ import annotations
import datetime as dt
import decimal
import enum
import re
from typing import Annotated, Any, Optional
from sqlalchemy import ( from sqlalchemy import (
BigInteger, BigInteger,
@ -17,12 +22,6 @@ from sqlalchemy.orm import (
relationship, relationship,
) )
from decimal import Decimal
from typing import Annotated, Optional
import datetime as dt
import enum
import re
class Base(MappedAsDataclass, DeclarativeBase): class Base(MappedAsDataclass, DeclarativeBase):
__table_args__ = {"schema": "transactions"} __table_args__ = {"schema": "transactions"}
@ -52,6 +51,12 @@ accounttype = Annotated[
] ]
class Export:
@property
def format(self) -> dict[str, Any]:
raise NotImplementedError
class Bank(Base): class Bank(Base):
__tablename__ = "banks" __tablename__ = "banks"
@ -59,16 +64,16 @@ class Bank(Base):
BIC: Mapped[str] = mapped_column(String(8), primary_key=True) BIC: Mapped[str] = mapped_column(String(8), primary_key=True)
type: Mapped[accounttype] = mapped_column(primary_key=True) type: Mapped[accounttype] = mapped_column(primary_key=True)
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined") nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))] bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=True)] idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=True)]
money = Annotated[Decimal, mapped_column(Numeric(16, 2))] money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
class Transaction(Base): class Transaction(Base, Export):
__tablename__ = "originals" __tablename__ = "originals"
id: Mapped[idpk] = mapped_column(init=False) id: Mapped[idpk] = mapped_column(init=False)
@ -80,10 +85,22 @@ class Transaction(Base):
category: Mapped[Optional[TransactionCategory]] = relationship(init=False) category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(init=False) note: Mapped[Optional[Note]] = relationship(init=False)
tags: Mapped[Optional[set[TransactionTag]]] = relationship(init=False) tags: Mapped[set[TransactionTag]] = relationship(init=False)
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"} __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
@property
def format(self) -> dict[str, Any]:
return dict(
date=self.date,
description=self.description,
amount=self.amount,
type=self.type,
category=self.category.format if self.category else None,
# TODO note
tags=[tag.format for tag in self.tags] if self.tags else None,
)
def __lt__(self, other: Transaction): def __lt__(self, other: Transaction):
return self.date < other.date return self.date < other.date
@ -93,17 +110,20 @@ idfk = Annotated[
] ]
class IsSplit: class BankTransaction(Transaction):
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
class BankTransaction(IsSplit, Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True) bank: Mapped[bankfk] = mapped_column(nullable=True)
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"} __mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(bank=self.bank)
class MoneyTransaction(Transaction):
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
class MoneyTransaction(IsSplit, Transaction):
__mapper_args__ = {"polymorphic_identity": "money"} __mapper_args__ = {"polymorphic_identity": "money"}
@ -112,6 +132,10 @@ class SplitTransaction(Transaction):
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"} __mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(original=self.original)
class CategoryGroup(Base): class CategoryGroup(Base):
__tablename__ = "categories_groups" __tablename__ = "categories_groups"
@ -127,11 +151,11 @@ class Category(Base):
ForeignKey(CategoryGroup.name), default=None ForeignKey(CategoryGroup.name), default=None
) )
rules: Mapped[Optional[set[CategoryRule]]] = relationship( rules: Mapped[set[CategoryRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set cascade="all, delete-orphan", passive_deletes=True, default_factory=set
) )
schedule: Mapped[CategorySchedule] = relationship( schedule: Mapped[Optional[CategorySchedule]] = relationship(
back_populates="category", default=None cascade="all, delete-orphan", passive_deletes=True, default=None
) )
def __repr__(self) -> str: def __repr__(self) -> str:
@ -144,16 +168,19 @@ catfk = Annotated[
] ]
class TransactionCategory(Base): class TransactionCategory(Base, Export):
__tablename__ = "categorized" __tablename__ = "categorized"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False) id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
name: Mapped[catfk] name: Mapped[catfk]
selector: Mapped[CategorySelector] = relationship(cascade="all, delete-orphan") selector: Mapped[CategorySelector] = relationship(
cascade="all, delete-orphan", lazy="joined"
)
def __repr__(self) -> str: @property
return f"Category({self.name})" def format(self):
return dict(name=self.name, selector=self.selector.format)
class Note(Base): class Note(Base):
@ -177,17 +204,21 @@ class Tag(Base):
name: Mapped[str] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(primary_key=True)
rules: Mapped[Optional[set[TagRule]]] = relationship( rules: Mapped[set[TagRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set cascade="all, delete-orphan", passive_deletes=True, default_factory=set
) )
class TransactionTag(Base): class TransactionTag(Base, Export):
__tablename__ = "tags" __tablename__ = "tags"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False) id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True) tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
@property
def format(self):
return dict(tag=self.tag)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
@ -207,7 +238,7 @@ categoryselector = Annotated[
] ]
class CategorySelector(Base): class CategorySelector(Base, Export):
__tablename__ = "categories_selector" __tablename__ = "categories_selector"
id: Mapped[int] = mapped_column( id: Mapped[int] = mapped_column(
@ -218,6 +249,10 @@ class CategorySelector(Base):
) )
selector: Mapped[categoryselector] selector: Mapped[categoryselector]
@property
def format(self):
return dict(selector=self.selector)
class Period(enum.Enum): class Period(enum.Enum):
daily = "daily" daily = "daily"
@ -237,8 +272,6 @@ class CategorySchedule(Base):
period_multiplier: Mapped[Optional[int]] period_multiplier: Mapped[Optional[int]]
amount: Mapped[Optional[int]] amount: Mapped[Optional[int]]
category: Mapped[Category] = relationship(back_populates="schedule")
class Link(Base): class Link(Base):
__tablename__ = "links" __tablename__ = "links"
@ -247,7 +280,10 @@ class Link(Base):
link: Mapped[idfk] = mapped_column(primary_key=True) link: Mapped[idfk] = mapped_column(primary_key=True)
class Rule: class Rule(Base, Export):
__tablename__ = "rules"
id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[Optional[dt.date]] date: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]] description: Mapped[Optional[str]]
regex: Mapped[Optional[str]] regex: Mapped[Optional[str]]
@ -255,7 +291,14 @@ class Rule:
min: Mapped[Optional[money]] min: Mapped[Optional[money]]
max: Mapped[Optional[money]] max: Mapped[Optional[money]]
def matches(self, transaction: Transaction) -> bool: type: Mapped[str] = mapped_column(init=False)
__mapper_args__ = {
"polymorphic_identity": "rule",
"polymorphic_on": "type",
}
def matches(self, transaction: BankTransaction) -> bool:
if ( if (
(self.date and self.date < transaction.date) (self.date and self.date < transaction.date)
or ( or (
@ -277,22 +320,60 @@ class Rule:
return False return False
return True return True
@property
def format(self) -> dict[str, Any]:
return dict(
date=self.date,
description=self.description,
regex=self.regex,
bank=self.bank,
min=self.min,
max=self.max,
type=self.type,
)
class CategoryRule(Base, Rule):
class CategoryRule(Rule):
__tablename__ = "categories_rules" __tablename__ = "categories_rules"
id: Mapped[idpk] = mapped_column(init=False) id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(Rule.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
name: Mapped[catfk] name: Mapped[catfk]
__mapper_args__ = {
"polymorphic_identity": "category_rule",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(name=self.name)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
class TagRule(Base, Rule): class TagRule(Rule):
__tablename__ = "tag_rules" __tablename__ = "tag_rules"
id: Mapped[idpk] = mapped_column(init=False) id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(Rule.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE")) tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
__mapper_args__ = {
"polymorphic_identity": "tag_rule",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(tag=self.tag)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)

View File

@ -1,18 +1,18 @@
from datetime import date import datetime as dt
from time import sleep import dotenv
from requests import HTTPError, ReadTimeout
from dotenv import load_dotenv
from nordigen import NordigenClient
from uuid import uuid4
import json import json
import nordigen
import os import os
import requests
import time
import uuid
from pfbudget.db.model import BankTransaction import pfbudget.db.model as t
from pfbudget.utils import convert import pfbudget.utils as utils
from .input import Input from .input import Input
load_dotenv() dotenv.load_dotenv()
class NordigenInput(Input): class NordigenInput(Input):
@ -20,16 +20,22 @@ class NordigenInput(Input):
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self._client = NordigenClient(
secret_key=os.environ.get("SECRET_KEY"), if not (key := os.environ.get("SECRET_KEY")) or not (
secret_id=os.environ.get("SECRET_ID"), id := os.environ.get("SECRET_ID")
):
raise
self._client = nordigen.NordigenClient(
secret_key=key,
secret_id=id,
) )
self._client.token = self.__token() self._client.token = self.__token()
self._start = date.min self._start = dt.date.min
self._end = date.max self._end = dt.date.max
def parse(self) -> list[BankTransaction]: def parse(self) -> list[t.BankTransaction]:
transactions = [] transactions = []
assert len(self._banks) > 0 assert len(self._banks) > 0
@ -49,14 +55,14 @@ class NordigenInput(Input):
try: try:
downloaded = account.get_transactions() downloaded = account.get_transactions()
break break
except ReadTimeout: except requests.ReadTimeout:
retries += 1 retries += 1
print(f"Request #{retries} timed-out, retrying in 1s") print(f"Request #{retries} timed-out, retrying in 1s")
sleep(1) time.sleep(1)
except HTTPError as e: except requests.HTTPError as e:
retries += 1 retries += 1
print(f"Request #{retries} failed with {e}, retrying in 1s") print(f"Request #{retries} failed with {e}, retrying in 1s")
sleep(1) time.sleep(1)
if not downloaded: if not downloaded:
print(f"Couldn't download transactions for {account}") print(f"Couldn't download transactions for {account}")
@ -66,7 +72,7 @@ class NordigenInput(Input):
json.dump(downloaded, f) json.dump(downloaded, f)
converted = [ converted = [
convert(t, bank) for t in downloaded["transactions"]["booked"] utils.convert(t, bank) for t in downloaded["transactions"]["booked"]
] ]
transactions.extend( transactions.extend(
@ -82,11 +88,12 @@ class NordigenInput(Input):
def requisition(self, institution: str, country: str = "PT"): def requisition(self, institution: str, country: str = "PT"):
id = self._client.institution.get_institution_id_by_name(country, institution) id = self._client.institution.get_institution_id_by_name(country, institution)
return self._client.initialize_session( requisition = self._client.initialize_session(
redirect_uri=self.redirect_url, redirect_uri=self.redirect_url,
institution_id=id, institution_id=id,
reference_id=str(uuid4()), reference_id=str(uuid.uuid4()),
) )
return requisition.link, requisition.requisition_id
def country_banks(self, country: str): def country_banks(self, country: str):
return self._client.institution.get_institutions(country) return self._client.institution.get_institutions(country)
@ -125,4 +132,4 @@ class NordigenInput(Input):
else: else:
token = self._client.generate_token() token = self._client.generate_token()
print(f"New access token: {token}") print(f"New access token: {token}")
return token return token["access"]

View File

@ -1,6 +1,7 @@
from collections import namedtuple from collections import namedtuple
from decimal import Decimal from decimal import Decimal
from importlib import import_module from importlib import import_module
from pathlib import Path
import datetime as dt import datetime as dt
import yaml import yaml
@ -44,7 +45,7 @@ Options = namedtuple(
) )
def parse_data(filename: str, args: dict) -> list[Transaction]: def parse_data(filename: Path, args: dict) -> list[Transaction]:
cfg: dict = yaml.safe_load(open("parsers.yaml")) cfg: dict = yaml.safe_load(open("parsers.yaml"))
assert ( assert (
"Banks" in cfg "Banks" in cfg
@ -85,7 +86,7 @@ def parse_data(filename: str, args: dict) -> list[Transaction]:
class Parser: class Parser:
def __init__(self, filename: str, bank: str, options: dict): def __init__(self, filename: Path, bank: str, options: dict):
self.filename = filename self.filename = filename
self.bank = bank self.bank = bank

View File

@ -1 +0,0 @@
__all__ = ["csv", "output"]

View File

@ -1,35 +0,0 @@
from csv import DictReader, writer
from pfbudget.db.model import (
BankTransaction,
MoneyTransaction,
Transaction,
)
from .output import Output
class CSV(Output):
def __init__(self, filename: str):
self.fn = filename
def load(self) -> list[Transaction]:
with open(self.fn, "r", newline="") as f:
r = DictReader(f)
return [
BankTransaction(
row["date"], row["description"], row["amount"], False, row["bank"]
)
if row["bank"]
else MoneyTransaction(
row["date"], row["description"], False, row["amount"]
)
for row in r
]
def report(self, transactions: list[Transaction]):
with open(self.fn, "w", newline="") as f:
w = writer(f, delimiter="\t")
w.writerows(
[(t.date, t.description, t.amount, t.bank) for t in transactions]
)

View File

@ -1,9 +0,0 @@
from abc import ABC, abstractmethod
from pfbudget.db.model import Transaction
class Output(ABC):
@abstractmethod
def report(self, transactions: list[Transaction]):
raise NotImplementedError

View File

@ -1,23 +1,25 @@
from datetime import date import datetime as dt
from functools import singledispatch import functools
from typing import Any
from pfbudget.common.types import TransactionError from pfbudget.common.types import TransactionError
from pfbudget.db.model import Bank, BankTransaction import pfbudget.db.model as t
from .utils import parse_decimal from .utils import parse_decimal
@singledispatch @functools.singledispatch
def convert(t): def convert(t) -> Any:
print("No converter as been found") print("No converter has been found")
pass pass
@convert.register @convert.register
def _(json: dict, bank: Bank) -> BankTransaction: def _(json: dict, bank: t.Bank) -> t.BankTransaction | None:
i = -1 if bank.nordigen.invert else 1 i = -1 if bank.nordigen and bank.nordigen.invert else 1
try: try:
transaction = BankTransaction( transaction = t.BankTransaction(
date=date.fromisoformat(json["bookingDate"]), date=dt.date.fromisoformat(json["bookingDate"]),
description=json["remittanceInformationUnstructured"], description=json["remittanceInformationUnstructured"],
bank=bank.name, bank=bank.name,
amount=i * parse_decimal(json["transactionAmount"]["amount"]), amount=i * parse_decimal(json["transactionAmount"]["amount"]),