Compare commits

..

6 Commits

Author SHA1 Message Date
c42a399d3d
Adds the import operation and a timer
to the categorization. We can now import transactions from a csv file,
and later automatically categorize them all.
2023-01-10 23:45:09 +00:00
478bd25190
Subclass the Transaction with multiple children
Each children is essentually a type of transaction. We currently have:
- bank transactions
- money transactions
- split transactions

The table inheritance is implemented as a single table, with a
polymorphic type and Null columns.

Adds a IsSplit interface, which will later be used for the category
views, so as to not repeat transactions.
2023-01-10 23:42:37 +00:00
0d287624c4
Load the default DB from the .env file 2023-01-10 21:35:43 +00:00
c37e7eb37c
Readds manual categorization
Also fixes a categorization bug in the Manager, in the DB client method.
2023-01-10 21:32:08 +00:00
86afa99217
Finish the remaining Nordigen operations
from the Manager POV and the update on the argparses.
Also clears unnecessary methods from the DB client interface.
Better assert information on the __main__.py
2023-01-08 19:41:07 +00:00
9b45ee4817
Update the export operation
to work with the Manager.
Also removes the run method from the runnable.py, since everything is
done in the __main__.py file of the pfbudget module.
2023-01-08 19:41:07 +00:00
16 changed files with 422 additions and 334 deletions

View File

@ -0,0 +1,74 @@
"""Inheritance
Revision ID: 37d80de801a7
Revises: 8cc9870b0d74
Create Date: 2023-01-10 22:41:03.540108+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "37d80de801a7"
down_revision = "8cc9870b0d74"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"originals",
sa.Column("type", sa.String(), nullable=False),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("split", sa.Boolean(), nullable=True),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("original", sa.BigInteger(), nullable=True),
schema="transactions",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=True,
schema="transactions",
)
op.create_foreign_key(
op.f("fk_originals_original_originals"),
"originals",
"originals",
["original"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_originals_original_originals"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=False,
schema="transactions",
)
op.drop_column("originals", "original", schema="transactions")
op.drop_column("originals", "split", schema="transactions")
op.drop_column("originals", "type", schema="transactions")
# ### end Alembic commands ###

View File

@ -17,32 +17,32 @@ if __name__ == "__main__":
params = None params = None
match (op): match (op):
case pfbudget.Operation.Parse: case pfbudget.Operation.Parse:
assert args.keys() >= {"path", "bank", "creditcard"} keys = {"path", "bank", "creditcard"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["path"], args["bank"], args["creditcard"]] params = [args["path"], args["bank"], args["creditcard"]]
case pfbudget.Operation.RequisitionId: case pfbudget.Operation.RequisitionId:
assert args.keys() >= {"name", "country"}, "argparser ill defined" keys = {"name", "country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["name"][0], args["country"][0]] params = [args["name"][0], args["country"][0]]
case pfbudget.Operation.Download: case pfbudget.Operation.Download:
assert args.keys() >= { keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
"id", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"name",
"all",
"interval",
"start",
"end",
"year",
}, "argparser ill defined"
start, end = pfbudget.parse_args_period(args) start, end = pfbudget.parse_args_period(args)
params = [start, end] params = [start, end, args["dry_run"]]
if not args["all"]:
params.append(args["banks"])
else:
params.append([])
case pfbudget.Operation.BankAdd: case pfbudget.Operation.BankAdd:
assert args.keys() >= { keys = {"bank", "bic", "type"}
"bank", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"bic",
"type",
}, "argparser ill defined"
params = [ params = [
pfbudget.types.Bank( pfbudget.types.Bank(
@ -53,12 +53,8 @@ if __name__ == "__main__":
] ]
case pfbudget.Operation.BankMod: case pfbudget.Operation.BankMod:
assert args.keys() >= { keys = {"bank", "bic", "type", "remove"}
"bank", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"bic",
"type",
"remove",
}, "argparser ill defined"
nargs_1 = ["bic", "type"] nargs_1 = ["bic", "type"]
@ -73,12 +69,8 @@ if __name__ == "__main__":
params = args["bank"] params = args["bank"]
case pfbudget.Operation.NordigenAdd: case pfbudget.Operation.NordigenAdd:
assert args.keys() >= { keys = {"bank", "bank_id", "requisition_id", "invert"}
"bank", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"bank_id",
"requisition_id",
"invert",
}, "argparser ill defined"
params = [ params = [
pfbudget.types.Nordigen( pfbudget.types.Nordigen(
@ -90,13 +82,8 @@ if __name__ == "__main__":
] ]
case pfbudget.Operation.NordigenMod: case pfbudget.Operation.NordigenMod:
assert args.keys() >= { keys = {"bank", "bank_id", "requisition_id", "invert", "remove"}
"bank", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"bank_id",
"requisition_id",
"invert",
"remove",
}, "argparser ill defined"
nargs_1 = ["bank_id", "requisition_id"] nargs_1 = ["bank_id", "requisition_id"]
nargs_0 = ["invert"] nargs_0 = ["invert"]
@ -112,14 +99,24 @@ if __name__ == "__main__":
assert len(args["bank"]) > 0, "argparser ill defined" assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"] params = args["bank"]
case pfbudget.Operation.NordigenCountryBanks:
keys = {"country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["country"][0]]
case pfbudget.Operation.CategoryAdd: case pfbudget.Operation.CategoryAdd:
assert args.keys() >= {"category", "group"}, "argparser ill defined" keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.Category(cat, args["group"]) for cat in args["category"] pfbudget.types.Category(cat, args["group"]) for cat in args["category"]
] ]
case pfbudget.Operation.CategoryUpdate: case pfbudget.Operation.CategoryUpdate:
assert args.keys() >= {"category", "group"}, "argparser ill defined" keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [pfbudget.types.Category(cat) for cat in args["category"]] params = [pfbudget.types.Category(cat) for cat in args["category"]]
params.append(args["group"]) params.append(args["group"])
@ -128,11 +125,8 @@ if __name__ == "__main__":
params = [pfbudget.types.Category(cat) for cat in args["category"]] params = [pfbudget.types.Category(cat) for cat in args["category"]]
case pfbudget.Operation.CategorySchedule: case pfbudget.Operation.CategorySchedule:
assert args.keys() >= { keys = {"category", "period", "frequency"}
"category", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"period",
"frequency",
}, "argparser ill defined"
params = [ params = [
pfbudget.types.CategorySchedule( pfbudget.types.CategorySchedule(
@ -142,14 +136,8 @@ if __name__ == "__main__":
] ]
case pfbudget.Operation.RuleAdd: case pfbudget.Operation.RuleAdd:
assert args.keys() >= { keys = {"category", "date", "description", "bank", "min", "max"}
"category", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"date",
"description",
"bank",
"min",
"max",
}, "argparser ill defined"
params = [ params = [
pfbudget.types.CategoryRule( pfbudget.types.CategoryRule(
@ -165,11 +153,13 @@ if __name__ == "__main__":
] ]
case pfbudget.Operation.RuleRemove | pfbudget.Operation.TagRuleRemove: case pfbudget.Operation.RuleRemove | pfbudget.Operation.TagRuleRemove:
assert args.keys() >= {"id"}, "argparser ill defined" keys = {"id"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["id"] params = args["id"]
case pfbudget.Operation.RuleModify: case pfbudget.Operation.RuleModify:
assert args.keys() >= { keys = {
"id", "id",
"category", "category",
"date", "date",
@ -178,7 +168,8 @@ if __name__ == "__main__":
"min", "min",
"max", "max",
"remove", "remove",
}, "argparser ill defined" }
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["category", "date", "description", "regex", "bank", "min", "max"] nargs_1 = ["category", "date", "description", "regex", "bank", "min", "max"]
params = [] params = []
@ -190,18 +181,14 @@ if __name__ == "__main__":
params.append(param) params.append(param)
case pfbudget.Operation.TagAdd: case pfbudget.Operation.TagAdd:
assert args.keys() >= {"tag"}, "argparser ill defined" keys = {"tag"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [pfbudget.types.Tag(tag) for tag in args["tag"]] params = [pfbudget.types.Tag(tag) for tag in args["tag"]]
case pfbudget.Operation.TagRuleAdd: case pfbudget.Operation.TagRuleAdd:
assert args.keys() >= { keys = {"tag", "date", "description", "bank", "min", "max"}
"tag", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"date",
"description",
"bank",
"min",
"max",
}, "argparser ill defined"
params = [ params = [
pfbudget.types.TagRule( pfbudget.types.TagRule(
@ -217,16 +204,8 @@ if __name__ == "__main__":
] ]
case pfbudget.Operation.TagRuleModify: case pfbudget.Operation.TagRuleModify:
assert args.keys() >= { keys = {"id", "tag", "date", "description", "bank", "min", "max", "remove"}
"id", assert args.keys() >= keys, f"missing {args.keys() - keys}"
"tag",
"date",
"description",
"bank",
"min",
"max",
"remove",
}, "argparser ill defined"
nargs_1 = ["tag", "date", "description", "regex", "bank", "min", "max"] nargs_1 = ["tag", "date", "description", "regex", "bank", "min", "max"]
params = [] params = []
@ -246,9 +225,27 @@ if __name__ == "__main__":
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]] params = [pfbudget.types.CategoryGroup(group) for group in args["group"]]
case pfbudget.Operation.Forge | pfbudget.Operation.Dismantle: case pfbudget.Operation.Forge | pfbudget.Operation.Dismantle:
assert args.keys() >= {"original", "links"}, "argparser ill defined" keys = {"original", "links"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [ params = [
pfbudget.types.Link(args["original"][0], link) for link in args["links"] pfbudget.types.Link(args["original"][0], link) for link in args["links"]
] ]
pfbudget.Manager(db, verbosity, args).action(op, params) case pfbudget.Operation.Export:
keys = {"interval", "start", "end", "year", "all", "banks", "file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = pfbudget.parse_args_period(args)
params = [start, end]
if not args["all"]:
params.append(args["banks"])
params.append(args["file"][0])
case pfbudget.Operation.Import:
keys = {"file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["file"]
pfbudget.Manager(db, verbosity).action(op, params)

View File

@ -1,19 +1,20 @@
from pathlib import Path from dotenv import load_dotenv
import argparse import argparse
import datetime as dt import datetime as dt
import decimal import decimal
import os
import re import re
from pfbudget.common.types import Operation from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, Period from pfbudget.db.model import AccountType, Period
from pfbudget.input.nordigen import NordigenInput
from pfbudget.db.sqlite import DatabaseClient from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph import pfbudget.reporting.graph
import pfbudget.reporting.report import pfbudget.reporting.report
import pfbudget.utils import pfbudget.utils
load_dotenv()
DEFAULT_DB = "data.db" DEFAULT_DB = os.environ.get("DEFAULT_DB")
class PfBudgetInitialized(Exception): class PfBudgetInitialized(Exception):
@ -29,7 +30,6 @@ class DataFileMissing(Exception):
def argparser() -> argparse.ArgumentParser: def argparser() -> argparse.ArgumentParser:
universal = argparse.ArgumentParser(add_help=False) universal = argparse.ArgumentParser(add_help=False)
universal.add_argument( universal.add_argument(
"-db", "-db",
@ -75,16 +75,17 @@ def argparser() -> argparse.ArgumentParser:
) )
p_init.set_defaults(command=Operation.Init) p_init.set_defaults(command=Operation.Init)
""" # Exports transactions to .csv file
Exporting export = subparsers.add_parser("export", parents=[period])
""" export.set_defaults(op=Operation.Export)
p_export = subparsers.add_parser( export.add_argument("file", nargs=1, type=str)
"export", export_banks = export.add_mutually_exclusive_group()
description="Exports the selected database to a .csv file", export_banks.add_argument("--all", action="store_true")
parents=[universal], export_banks.add_argument("--banks", nargs="+", type=str)
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
) pimport = subparsers.add_parser("import")
p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export()) pimport.set_defaults(op=Operation.Import)
pimport.add_argument("file", nargs=1, type=str)
# Parse from .csv # Parse from .csv
parse = subparsers.add_parser("parse") parse = subparsers.add_parser("parse")
@ -93,15 +94,10 @@ def argparser() -> argparse.ArgumentParser:
parse.add_argument("--bank", nargs=1, type=str) parse.add_argument("--bank", nargs=1, type=str)
parse.add_argument("--creditcard", nargs=1, type=str) parse.add_argument("--creditcard", nargs=1, type=str)
""" # Automatic/manual categorization
Categorizing categorize = subparsers.add_parser("categorize").add_subparsers(required=True)
""" categorize.add_parser("auto").set_defaults(op=Operation.Categorize)
categorize = subparsers.add_parser( categorize.add_parser("manual").set_defaults(op=Operation.ManualCategorization)
"categorize",
description="Categorizes the transactions in the selected database",
parents=[universal],
)
categorize.set_defaults(op=Operation.Categorize)
""" """
Graph Graph
@ -157,37 +153,15 @@ def argparser() -> argparse.ArgumentParser:
# Download through the Nordigen API # Download through the Nordigen API
download = subparsers.add_parser("download", parents=[period]) download = subparsers.add_parser("download", parents=[period])
download.set_defaults(op=Operation.Download) download.set_defaults(op=Operation.Download)
download.add_argument("--id", nargs="+", type=str) download_banks = download.add_mutually_exclusive_group()
download.add_argument("--name", nargs="+", type=str) download_banks.add_argument("--all", action="store_true")
download.add_argument("--all", action="store_true") download_banks.add_argument("--banks", nargs="+", type=str)
download.add_argument("--dry-run", action="store_true")
# """ # List available banks in country C
# List available banks on Nordigen API banks = subparsers.add_parser("banks")
# """ banks.set_defaults(op=Operation.NordigenCountryBanks)
# p_nordigen_list = subparsers.add_parser( banks.add_argument("country", nargs=1, type=str)
# "list",
# description="Lists banks in {country}",
# parents=[help],
# formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# )
# p_nordigen_list.add_argument("country", nargs=1, type=str)
# p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args))
# """
# Nordigen JSONs
# """
# p_nordigen_json = subparsers.add_parser(
# "json",
# description="",
# parents=[help],
# formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# )
# p_nordigen_json.add_argument("json", nargs=1, type=str)
# p_nordigen_json.add_argument("bank", nargs=1, type=str)
# p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction)
# p_nordigen_json.set_defaults(
# func=lambda args: manager.parser(JsonParser(vars(args)))
# )
# Categories # Categories
category(subparsers.add_parser("category")) category(subparsers.add_parser("category"))
@ -403,9 +377,3 @@ def link(parser: argparse.ArgumentParser):
dismantle.set_defaults(op=Operation.Dismantle) dismantle.set_defaults(op=Operation.Dismantle)
dismantle.add_argument("original", nargs=1, type=int) dismantle.add_argument("original", nargs=1, type=int)
dismantle.add_argument("links", nargs="+", type=int) dismantle.add_argument("links", nargs="+", type=int)
def run():
args = vars(argparser().parse_args())
assert "op" in args, "No operation selected"
return args["op"], args

View File

@ -9,6 +9,7 @@ class Operation(Enum):
Parse = auto() Parse = auto()
Download = auto() Download = auto()
Categorize = auto() Categorize = auto()
ManualCategorization = auto()
Token = auto() Token = auto()
RequisitionId = auto() RequisitionId = auto()
CategoryAdd = auto() CategoryAdd = auto()
@ -33,6 +34,9 @@ class Operation(Enum):
NordigenAdd = auto() NordigenAdd = auto()
NordigenMod = auto() NordigenMod = auto()
NordigenDel = auto() NordigenDel = auto()
NordigenCountryBanks = auto()
Export = auto()
Import = auto()
class TransactionError(Exception): class TransactionError(Exception):

View File

@ -8,8 +8,11 @@ from pfbudget.db.model import (
TransactionTag, TransactionTag,
) )
from codetiming import Timer
from datetime import timedelta from datetime import timedelta
Transactions = list[Transaction]
class Categorizer: class Categorizer:
options = {} options = {}
@ -17,25 +20,45 @@ class Categorizer:
def __init__(self): def __init__(self):
self.options["null_days"] = 4 self.options["null_days"] = 4
def categorize( def rules(
self, self,
transactions: list[Transaction], transactions: Transactions,
categories: list[Category], categories: list[Category],
tags: list[Tag], tags: list[Tag],
): ):
"""Overarching categorization tool """Overarching categorization tool
Receives a list of transactions (by ref) and updates their category Receives a list of transactions (by ref) and updates their category according
to the rules defined for each category
Args: Args:
transactions (list[Transaction]): uncategorized transactions transactions (list[Transaction]): uncategorized transactions
categories (list[Category]): available categories
tags (list[Tag]): currently available tags
""" """
self._nullify(transactions) self._nullify(transactions)
self._rule_based_categories(transactions, categories) self._rule_based_categories(transactions, categories)
self._rule_based_tags(transactions, tags) self._rule_based_tags(transactions, tags)
def _nullify(self, transactions: list[Transaction]): def manual(
self,
transactions: Transactions,
categories: list[Category],
tags: list[Tag],
):
"""Manual categorization input
Args:
transactions (list[Transaction]): uncategorized transactions
categories (list[Category]): available categories
tags (list[Tag]): currently available tags
"""
self._manual(transactions)
@Timer(name="nullify")
def _nullify(self, transactions: Transactions):
count = 0 count = 0
matching = [] matching = []
for transaction in transactions: for transaction in transactions:
@ -65,8 +88,9 @@ class Categorizer:
print(f"Nullified {count} transactions") print(f"Nullified {count} transactions")
@Timer(name="categoryrules")
def _rule_based_categories( def _rule_based_categories(
self, transactions: list[Transaction], categories: list[Category] self, transactions: Transactions, categories: list[Category]
): ):
d = {} d = {}
for category in [c for c in categories if c.rules]: for category in [c for c in categories if c.rules]:
@ -81,9 +105,20 @@ class Categorizer:
continue continue
# passed all conditions, assign category # passed all conditions, assign category
transaction.category = TransactionCategory( if (
category.name, CategorySelector(Selector.rules) transaction.category
) and transaction.category.name == category.name
):
if (
input(f"Overwrite {transaction} with {category}? (y/n)")
== "y"
):
transaction.category.name = category.name
transaction.category.selector.selector = Selector.rules
else:
transaction.category = TransactionCategory(
category.name, CategorySelector(Selector.rules)
)
if rule in d: if rule in d:
d[rule] += 1 d[rule] += 1
@ -93,7 +128,8 @@ class Categorizer:
for k, v in d.items(): for k, v in d.items():
print(f"{v}: {k}") print(f"{v}: {k}")
def _rule_based_tags(self, transactions: list[Transaction], tags: list[Tag]): @Timer(name="tagrules")
def _rule_based_tags(self, transactions: Transactions, tags: list[Tag]):
d = {} d = {}
for tag in [t for t in tags if t.rules]: for tag in [t for t in tags if t.rules]:
for rule in tag.rules: for rule in tag.rules:
@ -119,3 +155,20 @@ class Categorizer:
for k, v in d.items(): for k, v in d.items():
print(f"{v}: {k}") print(f"{v}: {k}")
def _manual(self, transactions: Transactions):
uncategorized = [t for t in transactions if not t.category]
print(f"{len(uncategorized)} transactions left to categorize")
for transaction in uncategorized:
while True:
category = input(f"{transaction} category: ")
if category == "quit":
return
if not category:
print("{category} doesn't exist")
continue
transaction.category = TransactionCategory(
category, CategorySelector(Selector.manual)
)
break

View File

@ -1,29 +1,28 @@
from pathlib import Path from pathlib import Path
import webbrowser
from pfbudget.input.input import Input from pfbudget.common.types import Operation
from pfbudget.input.nordigen import NordigenInput from pfbudget.core.categorizer import Categorizer
from pfbudget.input.parsers import parse_data
from pfbudget.db.client import DbClient from pfbudget.db.client import DbClient
from pfbudget.db.model import ( from pfbudget.db.model import (
Bank, Bank,
Category, Category,
CategoryGroup, CategoryGroup,
CategoryRule, CategoryRule,
CategorySchedule,
Nordigen, Nordigen,
Rule, Rule,
Tag, Tag,
TagRule, TagRule,
Transaction,
) )
from pfbudget.common.types import Operation from pfbudget.input.nordigen import NordigenInput
from pfbudget.core.categorizer import Categorizer from pfbudget.input.parsers import parse_data
from pfbudget.utils import convert from pfbudget.output.csv import CSV
from pfbudget.output.output import Output
class Manager: class Manager:
def __init__(self, db: str, verbosity: int = 0, args: dict = {}): def __init__(self, db: str, verbosity: int = 0):
self._args = args
self._db = db self._db = db
self._verbosity = verbosity self._verbosity = verbosity
@ -54,18 +53,39 @@ class Manager:
case Operation.Download: case Operation.Download:
client = NordigenInput() client = NordigenInput()
client.banks = self.get_banks() with self.db.session() as session:
if len(params[3]) == 0:
client.banks = session.get(Bank, Bank.nordigen)
else:
client.banks = session.get(Bank, Bank.name, params[3])
session.expunge_all()
client.start = params[0] client.start = params[0]
client.end = params[1] client.end = params[1]
transactions = client.parse() transactions = client.parse()
self.add_transactions(transactions)
# dry-run
if not params[2]:
self.add_transactions(transactions)
else:
print(transactions)
case Operation.Categorize: case Operation.Categorize:
with self.db.session() as session: with self.db.session() as session:
uncategorized = session.uncategorized() uncategorized = session.get(
categories = session.categories() Transaction, ~Transaction.category.has()
tags = session.tags() )
Categorizer().categorize(uncategorized, categories, tags) categories = session.get(Category)
tags = session.get(Tag)
Categorizer().rules(uncategorized, categories, tags)
case Operation.ManualCategorization:
with self.db.session() as session:
uncategorized = session.get(
Transaction, ~Transaction.category.has()
)
categories = session.get(Category)
tags = session.get(Tag)
Categorizer().manual(uncategorized, categories, tags)
case Operation.BankMod: case Operation.BankMod:
with self.db.session() as session: with self.db.session() as session:
@ -87,7 +107,13 @@ class Manager:
NordigenInput().token() NordigenInput().token()
case Operation.RequisitionId: case Operation.RequisitionId:
NordigenInput().requisition(params[0], params[1]) link, _ = NordigenInput().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
webbrowser.open(link)
case Operation.NordigenCountryBanks:
banks = NordigenInput().country_banks(params[0])
print(banks)
case Operation.BankAdd | Operation.CategoryAdd | Operation.NordigenAdd | Operation.RuleAdd | Operation.TagAdd | Operation.TagRuleAdd: case Operation.BankAdd | Operation.CategoryAdd | Operation.NordigenAdd | Operation.RuleAdd | Operation.TagAdd | Operation.TagRuleAdd:
with self.db.session() as session: with self.db.session() as session:
@ -143,6 +169,33 @@ class Manager:
links = [link.link for link in params] links = [link.link for link in params]
session.remove_links(original, links) session.remove_links(original, links)
case Operation.Export:
with self.db.session() as session:
if len(params) < 4:
banks = [bank.name for bank in session.get(Bank)]
transactions = session.transactions(params[0], params[1], banks)
else:
transactions = session.transactions(
params[0], params[1], params[2]
)
csvwriter: Output = CSV(params[-1])
csvwriter.report(transactions)
case Operation.Import:
csvwriter: Output = CSV(params[0]) # Output is strange here
transactions = csvwriter.load()
if (
len(transactions) > 0
and input(
f"{transactions[:5]}\nDoes the import seem correct? (y/n)"
)
== "y"
):
with self.db.session() as session:
session.add(transactions)
# def init(self): # def init(self):
# client = DatabaseClient(self.__db) # client = DatabaseClient(self.__db)
# client.init() # client.init()
@ -171,17 +224,10 @@ class Manager:
# bank = client.get_bank(key, value) # bank = client.get_bank(key, value)
# return convert(bank) # return convert(bank)
def get_banks(self):
return self.db.get_nordigen_banks()
@property @property
def db(self) -> DbClient: def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 0) return DbClient(self._db, self._verbosity > 2)
@db.setter @db.setter
def db(self, url: str): def db(self, url: str):
self._db = url self._db = url
@property
def args(self) -> dict:
return self._args

View File

@ -1,10 +1,10 @@
from dataclasses import asdict from dataclasses import asdict
from datetime import date
from sqlalchemy import create_engine, delete, select, update from sqlalchemy import create_engine, delete, select, update
from sqlalchemy.dialects.postgresql import insert from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session, joinedload, selectinload from sqlalchemy.orm import Session
from pfbudget.db.model import ( from pfbudget.db.model import (
Bank,
Category, Category,
CategoryGroup, CategoryGroup,
CategoryRule, CategoryRule,
@ -31,39 +31,6 @@ class DbClient:
def __init__(self, url: str, echo=False) -> None: def __init__(self, url: str, echo=False) -> None:
self._engine = create_engine(url, echo=echo) self._engine = create_engine(url, echo=echo)
def get_transactions(self):
"""¿Non-optimized? get_transactions, will load the entire Transaction"""
with Session(self.engine) as session:
stmt = select(Transaction).options(
joinedload("*"), selectinload(Transaction.tags)
)
return session.scalars(stmt).all()
def get_uncategorized(self):
with Session(self.engine) as session:
stmt = select(Transaction).where(~Transaction.category.has())
return session.scalars(stmt).all()
def get_categorized(self):
with Session(self.engine) as session:
stmt = select(Transaction).where(Transaction.category.has())
return session.scalars(stmt).all()
def insert_transactions(self, input: list[Transaction]):
with Session(self.engine) as session:
session.add_all(input)
session.commit()
def get_banks(self):
with Session(self.engine) as session:
stmt = select(Bank)
return session.scalars(stmt).all()
def get_nordigen_banks(self):
with Session(self.engine) as session:
stmt = select(Bank).where(Bank.nordigen.has())
return session.scalars(stmt).all()
@property @property
def engine(self): def engine(self):
return self._engine return self._engine
@ -83,6 +50,20 @@ class DbClient:
def commit(self): def commit(self):
self.__session.commit() self.__session.commit()
def expunge_all(self):
self.__session.expunge_all()
def get(self, type, column=None, values=None):
if column is not None:
if values:
stmt = select(type).where(column.in_(values))
else:
stmt = select(type).where(column)
else:
stmt = select(type)
return self.__session.scalars(stmt).all()
def add(self, rows: list): def add(self, rows: list):
self.__session.add_all(rows) self.__session.add_all(rows)
@ -124,16 +105,12 @@ class DbClient:
) )
self.__session.execute(stmt) self.__session.execute(stmt)
def uncategorized(self) -> list[Transaction]: def transactions(self, min: date, max: date, banks: list[str]):
stmt = select(Transaction).where(~Transaction.category.has()) stmt = select(Transaction).where(
return self.__session.scalars(stmt).all() Transaction.date >= min,
Transaction.date <= max,
def categories(self) -> list[Category]: Transaction.bank.in_(banks),
stmt = select(Category) )
return self.__session.scalars(stmt).all()
def tags(self) -> list[Tag]:
stmt = select(Tag)
return self.__session.scalars(stmt).all() return self.__session.scalars(stmt).all()
def session(self) -> ClientSession: def session(self) -> ClientSession:

View File

@ -74,14 +74,17 @@ class Transaction(Base):
id: Mapped[idpk] = mapped_column(init=False) id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[dt.date] date: Mapped[dt.date]
description: Mapped[Optional[str]] description: Mapped[Optional[str]]
bank: Mapped[bankfk]
amount: Mapped[money] amount: Mapped[money]
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False) category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(init=False) note: Mapped[Optional[Note]] = relationship(init=False)
tags: Mapped[Optional[set[TransactionTag]]] = relationship(init=False) tags: Mapped[Optional[set[TransactionTag]]] = relationship(init=False)
def __lt__(self, other): __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
def __lt__(self, other: Transaction):
return self.date < other.date return self.date < other.date
@ -90,6 +93,26 @@ idfk = Annotated[
] ]
class IsSplit:
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
class BankTransaction(IsSplit, Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
class MoneyTransaction(IsSplit, Transaction):
__mapper_args__ = {"polymorphic_identity": "money"}
class SplitTransaction(Transaction):
original: Mapped[idfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
class CategoryGroup(Base): class CategoryGroup(Base):
__tablename__ = "categories_groups" __tablename__ = "categories_groups"

View File

@ -6,4 +6,4 @@ from pfbudget.db.model import Transaction
class Input(ABC): class Input(ABC):
@abstractmethod @abstractmethod
def parse(self) -> list[Transaction]: def parse(self) -> list[Transaction]:
return NotImplemented return NotImplementedError

View File

@ -1,30 +0,0 @@
import json
from .input import Input
from pfbudget.common.types import Transactions
from pfbudget.utils import convert, parse_decimal
class JsonParser(Input):
def __init__(self, manager, options):
super().__init__(manager)
self.options = options
def parse(self) -> Transactions:
try:
with open(self.options["json"][0], "r") as f:
return [
convert(
[
t["bookingDate"],
t["remittanceInformationUnstructured"],
self.options["bank"][0],
parse_decimal(t["transactionAmount"]["amount"])
if not self.options["invert"]
else -parse_decimal(t["transactionAmount"]["amount"]),
],
)
for t in json.load(f)["transactions"]["booked"]
]
except KeyError:
print("No json file defined")

View File

@ -6,17 +6,18 @@ from nordigen import NordigenClient
from uuid import uuid4 from uuid import uuid4
import json import json
import os import os
import webbrowser
from pfbudget.db.model import BankTransaction
from pfbudget.utils import convert
from .input import Input from .input import Input
from pfbudget.common.types import NoBankSelected
from pfbudget.db.model import Transaction
from pfbudget.utils import convert
load_dotenv() load_dotenv()
class NordigenInput(Input): class NordigenInput(Input):
redirect_url = "https://murta.dev"
def __init__(self): def __init__(self):
super().__init__() super().__init__()
self._client = NordigenClient( self._client = NordigenClient(
@ -24,27 +25,11 @@ class NordigenInput(Input):
secret_id=os.environ.get("SECRET_ID"), secret_id=os.environ.get("SECRET_ID"),
) )
self.client.token = self.__token() self._client.token = self.__token()
# print(options)
# if "all" in options and options["all"]:
# self.__banks = self.manager.get_banks()
# elif "id" in options and options["id"]:
# self.__banks = [
# self.manager.get_bank_by("nordigen_id", b) for b in options["id"]
# ]
# elif "name" in options and options["name"]:
# self.__banks = [
# self.manager.get_bank_by("name", b) for b in options["name"]
# ]
# else:
# self.__banks = None
self._start = date.min self._start = date.min
self._end = date.max self._end = date.max
def parse(self) -> list[Transaction]: def parse(self) -> list[BankTransaction]:
transactions = [] transactions = []
assert len(self._banks) > 0 assert len(self._banks) > 0
@ -96,11 +81,15 @@ class NordigenInput(Input):
return token return token
def requisition(self, institution: str, country: str = "PT"): def requisition(self, institution: str, country: str = "PT"):
link, _ = self.__requisition_id(institution, country) id = self._client.institution.get_institution_id_by_name(country, institution)
webbrowser.open(link) return self._client.initialize_session(
redirect_uri=self.redirect_url,
institution_id=id,
reference_id=str(uuid4()),
)
def list(self, country: str): def country_banks(self, country: str):
print(self._client.institution.get_institutions(country)) return self._client.institution.get_institutions(country)
@property @property
def client(self): def client(self):
@ -137,16 +126,3 @@ class NordigenInput(Input):
token = self._client.generate_token() token = self._client.generate_token()
print(f"New access token: {token}") print(f"New access token: {token}")
return token return token
def __requisition_id(self, i: str, c: str):
id = self._client.institution.get_institution_id_by_name(
country=c, institution=i
)
init = self._client.initialize_session(
redirect_uri="https://murta.dev",
institution_id=id,
reference_id=str(uuid4()),
)
print(f"{i}({c}) link: {init.link} and requisition ID: {init.requisition_id}")
return (init.link, init.requisition_id)

View File

@ -0,0 +1 @@
__all__ = ["csv", "output"]

35
pfbudget/output/csv.py Normal file
View File

@ -0,0 +1,35 @@
from csv import DictReader, writer
from pfbudget.db.model import (
BankTransaction,
MoneyTransaction,
Transaction,
)
from .output import Output
class CSV(Output):
def __init__(self, filename: str):
self.fn = filename
def load(self) -> list[Transaction]:
with open(self.fn, "r", newline="") as f:
r = DictReader(f)
return [
BankTransaction(
row["date"], row["description"], row["amount"], False, row["bank"]
)
if row["bank"]
else MoneyTransaction(
row["date"], row["description"], False, row["amount"]
)
for row in r
]
def report(self, transactions: list[Transaction]):
with open(self.fn, "w", newline="") as f:
w = writer(f, delimiter="\t")
w.writerows(
[(t.date, t.description, t.amount, t.bank) for t in transactions]
)

View File

@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from pfbudget.db.model import Transaction
class Output(ABC):
@abstractmethod
def report(self, transactions: list[Transaction]):
raise NotImplementedError

View File

@ -1,9 +1,8 @@
from datetime import date, timedelta from datetime import date
from functools import singledispatch from functools import singledispatch
from pfbudget.common.types import TransactionError from pfbudget.common.types import TransactionError
from pfbudget.db.model import Bank, Transaction from pfbudget.db.model import Bank, BankTransaction
from pfbudget.db.schema import DbBank, DbTransaction
from .utils import parse_decimal from .utils import parse_decimal
@ -13,54 +12,11 @@ def convert(t):
pass pass
# @convert.register
# def _(t: Transaction) -> DbTransaction:
# return DbTransaction(
# t.date,
# t.description,
# t.bank,
# t.value,
# t.category,
# t.original,
# t.additional_comment,
# )
# @convert.register
# def _(db: DbTransaction) -> Transaction:
# try:
# return Transaction(db)
# except TransactionError:
# print(f"{db} is in the wrong format")
# @convert.register
# def _(db: DbBank, key: str = "") -> Bank:
# bank = Bank(db.name, db.bic, db.requisition_id, db.invert, db.offset, key=key)
# if not bank.invert:
# bank.invert = False
# if not bank.offset:
# bank.offset = 0
# return bank
# @convert.register
# def _(bank: Bank) -> DbBank:
# bank = DbBank(
# bank.name, bank.bic, "", "", bank.requisition_id, bank.invert, bank.offset
# )
# if not bank.invert:
# bank.invert = False
# if not bank.offset:
# bank.offset = 0
# return bank
@convert.register @convert.register
def _(json: dict, bank: Bank) -> Transaction: def _(json: dict, bank: Bank) -> BankTransaction:
i = -1 if bank.nordigen.invert else 1 i = -1 if bank.nordigen.invert else 1
try: try:
transaction = Transaction( transaction = BankTransaction(
date=date.fromisoformat(json["bookingDate"]), date=date.fromisoformat(json["bookingDate"]),
description=json["remittanceInformationUnstructured"], description=json["remittanceInformationUnstructured"],
bank=bank.name, bank=bank.name,

View File

@ -61,7 +61,6 @@ def find_credit_institution(fn, banks, creditcards):
def parse_args_period(args: dict): def parse_args_period(args: dict):
start, end = date.min, date.max start, end = date.min, date.max
print(args)
if args["start"]: if args["start"]:
start = datetime.strptime(args["start"][0], "%Y/%m/%d").date() start = datetime.strptime(args["start"][0], "%Y/%m/%d").date()