Compare commits

...

6 Commits

Author SHA1 Message Date
c42a399d3d
Adds the import operation and a timer
to the categorization. We can now import transactions from a csv file,
and later automatically categorize them all.
2023-01-10 23:45:09 +00:00
478bd25190
Subclass the Transaction with multiple children
Each children is essentually a type of transaction. We currently have:
- bank transactions
- money transactions
- split transactions

The table inheritance is implemented as a single table, with a
polymorphic type and Null columns.

Adds a IsSplit interface, which will later be used for the category
views, so as to not repeat transactions.
2023-01-10 23:42:37 +00:00
0d287624c4
Load the default DB from the .env file 2023-01-10 21:35:43 +00:00
c37e7eb37c
Readds manual categorization
Also fixes a categorization bug in the Manager, in the DB client method.
2023-01-10 21:32:08 +00:00
86afa99217
Finish the remaining Nordigen operations
from the Manager POV and the update on the argparses.
Also clears unnecessary methods from the DB client interface.
Better assert information on the __main__.py
2023-01-08 19:41:07 +00:00
9b45ee4817
Update the export operation
to work with the Manager.
Also removes the run method from the runnable.py, since everything is
done in the __main__.py file of the pfbudget module.
2023-01-08 19:41:07 +00:00
16 changed files with 422 additions and 334 deletions

View File

@ -0,0 +1,74 @@
"""Inheritance
Revision ID: 37d80de801a7
Revises: 8cc9870b0d74
Create Date: 2023-01-10 22:41:03.540108+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "37d80de801a7"
down_revision = "8cc9870b0d74"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"originals",
sa.Column("type", sa.String(), nullable=False),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("split", sa.Boolean(), nullable=True),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("original", sa.BigInteger(), nullable=True),
schema="transactions",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=True,
schema="transactions",
)
op.create_foreign_key(
op.f("fk_originals_original_originals"),
"originals",
"originals",
["original"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_originals_original_originals"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=False,
schema="transactions",
)
op.drop_column("originals", "original", schema="transactions")
op.drop_column("originals", "split", schema="transactions")
op.drop_column("originals", "type", schema="transactions")
# ### end Alembic commands ###

View File

@ -17,32 +17,32 @@ if __name__ == "__main__":
params = None
match (op):
case pfbudget.Operation.Parse:
assert args.keys() >= {"path", "bank", "creditcard"}
keys = {"path", "bank", "creditcard"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["path"], args["bank"], args["creditcard"]]
case pfbudget.Operation.RequisitionId:
assert args.keys() >= {"name", "country"}, "argparser ill defined"
keys = {"name", "country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["name"][0], args["country"][0]]
case pfbudget.Operation.Download:
assert args.keys() >= {
"id",
"name",
"all",
"interval",
"start",
"end",
"year",
}, "argparser ill defined"
keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = pfbudget.parse_args_period(args)
params = [start, end]
params = [start, end, args["dry_run"]]
if not args["all"]:
params.append(args["banks"])
else:
params.append([])
case pfbudget.Operation.BankAdd:
assert args.keys() >= {
"bank",
"bic",
"type",
}, "argparser ill defined"
keys = {"bank", "bic", "type"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
pfbudget.types.Bank(
@ -53,12 +53,8 @@ if __name__ == "__main__":
]
case pfbudget.Operation.BankMod:
assert args.keys() >= {
"bank",
"bic",
"type",
"remove",
}, "argparser ill defined"
keys = {"bank", "bic", "type", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["bic", "type"]
@ -73,12 +69,8 @@ if __name__ == "__main__":
params = args["bank"]
case pfbudget.Operation.NordigenAdd:
assert args.keys() >= {
"bank",
"bank_id",
"requisition_id",
"invert",
}, "argparser ill defined"
keys = {"bank", "bank_id", "requisition_id", "invert"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
pfbudget.types.Nordigen(
@ -90,13 +82,8 @@ if __name__ == "__main__":
]
case pfbudget.Operation.NordigenMod:
assert args.keys() >= {
"bank",
"bank_id",
"requisition_id",
"invert",
"remove",
}, "argparser ill defined"
keys = {"bank", "bank_id", "requisition_id", "invert", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["bank_id", "requisition_id"]
nargs_0 = ["invert"]
@ -112,14 +99,24 @@ if __name__ == "__main__":
assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"]
case pfbudget.Operation.NordigenCountryBanks:
keys = {"country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["country"][0]]
case pfbudget.Operation.CategoryAdd:
assert args.keys() >= {"category", "group"}, "argparser ill defined"
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
pfbudget.types.Category(cat, args["group"]) for cat in args["category"]
]
case pfbudget.Operation.CategoryUpdate:
assert args.keys() >= {"category", "group"}, "argparser ill defined"
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [pfbudget.types.Category(cat) for cat in args["category"]]
params.append(args["group"])
@ -128,11 +125,8 @@ if __name__ == "__main__":
params = [pfbudget.types.Category(cat) for cat in args["category"]]
case pfbudget.Operation.CategorySchedule:
assert args.keys() >= {
"category",
"period",
"frequency",
}, "argparser ill defined"
keys = {"category", "period", "frequency"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
pfbudget.types.CategorySchedule(
@ -142,14 +136,8 @@ if __name__ == "__main__":
]
case pfbudget.Operation.RuleAdd:
assert args.keys() >= {
"category",
"date",
"description",
"bank",
"min",
"max",
}, "argparser ill defined"
keys = {"category", "date", "description", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
pfbudget.types.CategoryRule(
@ -165,11 +153,13 @@ if __name__ == "__main__":
]
case pfbudget.Operation.RuleRemove | pfbudget.Operation.TagRuleRemove:
assert args.keys() >= {"id"}, "argparser ill defined"
keys = {"id"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["id"]
case pfbudget.Operation.RuleModify:
assert args.keys() >= {
keys = {
"id",
"category",
"date",
@ -178,7 +168,8 @@ if __name__ == "__main__":
"min",
"max",
"remove",
}, "argparser ill defined"
}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["category", "date", "description", "regex", "bank", "min", "max"]
params = []
@ -190,18 +181,14 @@ if __name__ == "__main__":
params.append(param)
case pfbudget.Operation.TagAdd:
assert args.keys() >= {"tag"}, "argparser ill defined"
keys = {"tag"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [pfbudget.types.Tag(tag) for tag in args["tag"]]
case pfbudget.Operation.TagRuleAdd:
assert args.keys() >= {
"tag",
"date",
"description",
"bank",
"min",
"max",
}, "argparser ill defined"
keys = {"tag", "date", "description", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
pfbudget.types.TagRule(
@ -217,16 +204,8 @@ if __name__ == "__main__":
]
case pfbudget.Operation.TagRuleModify:
assert args.keys() >= {
"id",
"tag",
"date",
"description",
"bank",
"min",
"max",
"remove",
}, "argparser ill defined"
keys = {"id", "tag", "date", "description", "bank", "min", "max", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["tag", "date", "description", "regex", "bank", "min", "max"]
params = []
@ -246,9 +225,27 @@ if __name__ == "__main__":
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]]
case pfbudget.Operation.Forge | pfbudget.Operation.Dismantle:
assert args.keys() >= {"original", "links"}, "argparser ill defined"
keys = {"original", "links"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
pfbudget.types.Link(args["original"][0], link) for link in args["links"]
]
pfbudget.Manager(db, verbosity, args).action(op, params)
case pfbudget.Operation.Export:
keys = {"interval", "start", "end", "year", "all", "banks", "file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = pfbudget.parse_args_period(args)
params = [start, end]
if not args["all"]:
params.append(args["banks"])
params.append(args["file"][0])
case pfbudget.Operation.Import:
keys = {"file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["file"]
pfbudget.Manager(db, verbosity).action(op, params)

View File

@ -1,19 +1,20 @@
from pathlib import Path
from dotenv import load_dotenv
import argparse
import datetime as dt
import decimal
import os
import re
from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, Period
from pfbudget.input.nordigen import NordigenInput
from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils
load_dotenv()
DEFAULT_DB = "data.db"
DEFAULT_DB = os.environ.get("DEFAULT_DB")
class PfBudgetInitialized(Exception):
@ -29,7 +30,6 @@ class DataFileMissing(Exception):
def argparser() -> argparse.ArgumentParser:
universal = argparse.ArgumentParser(add_help=False)
universal.add_argument(
"-db",
@ -75,16 +75,17 @@ def argparser() -> argparse.ArgumentParser:
)
p_init.set_defaults(command=Operation.Init)
"""
Exporting
"""
p_export = subparsers.add_parser(
"export",
description="Exports the selected database to a .csv file",
parents=[universal],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export())
# Exports transactions to .csv file
export = subparsers.add_parser("export", parents=[period])
export.set_defaults(op=Operation.Export)
export.add_argument("file", nargs=1, type=str)
export_banks = export.add_mutually_exclusive_group()
export_banks.add_argument("--all", action="store_true")
export_banks.add_argument("--banks", nargs="+", type=str)
pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import)
pimport.add_argument("file", nargs=1, type=str)
# Parse from .csv
parse = subparsers.add_parser("parse")
@ -93,15 +94,10 @@ def argparser() -> argparse.ArgumentParser:
parse.add_argument("--bank", nargs=1, type=str)
parse.add_argument("--creditcard", nargs=1, type=str)
"""
Categorizing
"""
categorize = subparsers.add_parser(
"categorize",
description="Categorizes the transactions in the selected database",
parents=[universal],
)
categorize.set_defaults(op=Operation.Categorize)
# Automatic/manual categorization
categorize = subparsers.add_parser("categorize").add_subparsers(required=True)
categorize.add_parser("auto").set_defaults(op=Operation.Categorize)
categorize.add_parser("manual").set_defaults(op=Operation.ManualCategorization)
"""
Graph
@ -157,37 +153,15 @@ def argparser() -> argparse.ArgumentParser:
# Download through the Nordigen API
download = subparsers.add_parser("download", parents=[period])
download.set_defaults(op=Operation.Download)
download.add_argument("--id", nargs="+", type=str)
download.add_argument("--name", nargs="+", type=str)
download.add_argument("--all", action="store_true")
download_banks = download.add_mutually_exclusive_group()
download_banks.add_argument("--all", action="store_true")
download_banks.add_argument("--banks", nargs="+", type=str)
download.add_argument("--dry-run", action="store_true")
# """
# List available banks on Nordigen API
# """
# p_nordigen_list = subparsers.add_parser(
# "list",
# description="Lists banks in {country}",
# parents=[help],
# formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# )
# p_nordigen_list.add_argument("country", nargs=1, type=str)
# p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args))
# """
# Nordigen JSONs
# """
# p_nordigen_json = subparsers.add_parser(
# "json",
# description="",
# parents=[help],
# formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# )
# p_nordigen_json.add_argument("json", nargs=1, type=str)
# p_nordigen_json.add_argument("bank", nargs=1, type=str)
# p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction)
# p_nordigen_json.set_defaults(
# func=lambda args: manager.parser(JsonParser(vars(args)))
# )
# List available banks in country C
banks = subparsers.add_parser("banks")
banks.set_defaults(op=Operation.NordigenCountryBanks)
banks.add_argument("country", nargs=1, type=str)
# Categories
category(subparsers.add_parser("category"))
@ -403,9 +377,3 @@ def link(parser: argparse.ArgumentParser):
dismantle.set_defaults(op=Operation.Dismantle)
dismantle.add_argument("original", nargs=1, type=int)
dismantle.add_argument("links", nargs="+", type=int)
def run():
args = vars(argparser().parse_args())
assert "op" in args, "No operation selected"
return args["op"], args

View File

@ -9,6 +9,7 @@ class Operation(Enum):
Parse = auto()
Download = auto()
Categorize = auto()
ManualCategorization = auto()
Token = auto()
RequisitionId = auto()
CategoryAdd = auto()
@ -33,6 +34,9 @@ class Operation(Enum):
NordigenAdd = auto()
NordigenMod = auto()
NordigenDel = auto()
NordigenCountryBanks = auto()
Export = auto()
Import = auto()
class TransactionError(Exception):

View File

@ -8,8 +8,11 @@ from pfbudget.db.model import (
TransactionTag,
)
from codetiming import Timer
from datetime import timedelta
Transactions = list[Transaction]
class Categorizer:
options = {}
@ -17,25 +20,45 @@ class Categorizer:
def __init__(self):
self.options["null_days"] = 4
def categorize(
def rules(
self,
transactions: list[Transaction],
transactions: Transactions,
categories: list[Category],
tags: list[Tag],
):
"""Overarching categorization tool
Receives a list of transactions (by ref) and updates their category
Receives a list of transactions (by ref) and updates their category according
to the rules defined for each category
Args:
transactions (list[Transaction]): uncategorized transactions
categories (list[Category]): available categories
tags (list[Tag]): currently available tags
"""
self._nullify(transactions)
self._rule_based_categories(transactions, categories)
self._rule_based_tags(transactions, tags)
def _nullify(self, transactions: list[Transaction]):
def manual(
self,
transactions: Transactions,
categories: list[Category],
tags: list[Tag],
):
"""Manual categorization input
Args:
transactions (list[Transaction]): uncategorized transactions
categories (list[Category]): available categories
tags (list[Tag]): currently available tags
"""
self._manual(transactions)
@Timer(name="nullify")
def _nullify(self, transactions: Transactions):
count = 0
matching = []
for transaction in transactions:
@ -65,8 +88,9 @@ class Categorizer:
print(f"Nullified {count} transactions")
@Timer(name="categoryrules")
def _rule_based_categories(
self, transactions: list[Transaction], categories: list[Category]
self, transactions: Transactions, categories: list[Category]
):
d = {}
for category in [c for c in categories if c.rules]:
@ -81,9 +105,20 @@ class Categorizer:
continue
# passed all conditions, assign category
transaction.category = TransactionCategory(
category.name, CategorySelector(Selector.rules)
)
if (
transaction.category
and transaction.category.name == category.name
):
if (
input(f"Overwrite {transaction} with {category}? (y/n)")
== "y"
):
transaction.category.name = category.name
transaction.category.selector.selector = Selector.rules
else:
transaction.category = TransactionCategory(
category.name, CategorySelector(Selector.rules)
)
if rule in d:
d[rule] += 1
@ -93,7 +128,8 @@ class Categorizer:
for k, v in d.items():
print(f"{v}: {k}")
def _rule_based_tags(self, transactions: list[Transaction], tags: list[Tag]):
@Timer(name="tagrules")
def _rule_based_tags(self, transactions: Transactions, tags: list[Tag]):
d = {}
for tag in [t for t in tags if t.rules]:
for rule in tag.rules:
@ -119,3 +155,20 @@ class Categorizer:
for k, v in d.items():
print(f"{v}: {k}")
def _manual(self, transactions: Transactions):
uncategorized = [t for t in transactions if not t.category]
print(f"{len(uncategorized)} transactions left to categorize")
for transaction in uncategorized:
while True:
category = input(f"{transaction} category: ")
if category == "quit":
return
if not category:
print("{category} doesn't exist")
continue
transaction.category = TransactionCategory(
category, CategorySelector(Selector.manual)
)
break

View File

@ -1,29 +1,28 @@
from pathlib import Path
import webbrowser
from pfbudget.input.input import Input
from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.parsers import parse_data
from pfbudget.common.types import Operation
from pfbudget.core.categorizer import Categorizer
from pfbudget.db.client import DbClient
from pfbudget.db.model import (
Bank,
Category,
CategoryGroup,
CategoryRule,
CategorySchedule,
Nordigen,
Rule,
Tag,
TagRule,
Transaction,
)
from pfbudget.common.types import Operation
from pfbudget.core.categorizer import Categorizer
from pfbudget.utils import convert
from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.parsers import parse_data
from pfbudget.output.csv import CSV
from pfbudget.output.output import Output
class Manager:
def __init__(self, db: str, verbosity: int = 0, args: dict = {}):
self._args = args
def __init__(self, db: str, verbosity: int = 0):
self._db = db
self._verbosity = verbosity
@ -54,18 +53,39 @@ class Manager:
case Operation.Download:
client = NordigenInput()
client.banks = self.get_banks()
with self.db.session() as session:
if len(params[3]) == 0:
client.banks = session.get(Bank, Bank.nordigen)
else:
client.banks = session.get(Bank, Bank.name, params[3])
session.expunge_all()
client.start = params[0]
client.end = params[1]
transactions = client.parse()
self.add_transactions(transactions)
# dry-run
if not params[2]:
self.add_transactions(transactions)
else:
print(transactions)
case Operation.Categorize:
with self.db.session() as session:
uncategorized = session.uncategorized()
categories = session.categories()
tags = session.tags()
Categorizer().categorize(uncategorized, categories, tags)
uncategorized = session.get(
Transaction, ~Transaction.category.has()
)
categories = session.get(Category)
tags = session.get(Tag)
Categorizer().rules(uncategorized, categories, tags)
case Operation.ManualCategorization:
with self.db.session() as session:
uncategorized = session.get(
Transaction, ~Transaction.category.has()
)
categories = session.get(Category)
tags = session.get(Tag)
Categorizer().manual(uncategorized, categories, tags)
case Operation.BankMod:
with self.db.session() as session:
@ -87,7 +107,13 @@ class Manager:
NordigenInput().token()
case Operation.RequisitionId:
NordigenInput().requisition(params[0], params[1])
link, _ = NordigenInput().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
webbrowser.open(link)
case Operation.NordigenCountryBanks:
banks = NordigenInput().country_banks(params[0])
print(banks)
case Operation.BankAdd | Operation.CategoryAdd | Operation.NordigenAdd | Operation.RuleAdd | Operation.TagAdd | Operation.TagRuleAdd:
with self.db.session() as session:
@ -143,6 +169,33 @@ class Manager:
links = [link.link for link in params]
session.remove_links(original, links)
case Operation.Export:
with self.db.session() as session:
if len(params) < 4:
banks = [bank.name for bank in session.get(Bank)]
transactions = session.transactions(params[0], params[1], banks)
else:
transactions = session.transactions(
params[0], params[1], params[2]
)
csvwriter: Output = CSV(params[-1])
csvwriter.report(transactions)
case Operation.Import:
csvwriter: Output = CSV(params[0]) # Output is strange here
transactions = csvwriter.load()
if (
len(transactions) > 0
and input(
f"{transactions[:5]}\nDoes the import seem correct? (y/n)"
)
== "y"
):
with self.db.session() as session:
session.add(transactions)
# def init(self):
# client = DatabaseClient(self.__db)
# client.init()
@ -171,17 +224,10 @@ class Manager:
# bank = client.get_bank(key, value)
# return convert(bank)
def get_banks(self):
return self.db.get_nordigen_banks()
@property
def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 0)
return DbClient(self._db, self._verbosity > 2)
@db.setter
def db(self, url: str):
self._db = url
@property
def args(self) -> dict:
return self._args

View File

@ -1,10 +1,10 @@
from dataclasses import asdict
from datetime import date
from sqlalchemy import create_engine, delete, select, update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session, joinedload, selectinload
from sqlalchemy.orm import Session
from pfbudget.db.model import (
Bank,
Category,
CategoryGroup,
CategoryRule,
@ -31,39 +31,6 @@ class DbClient:
def __init__(self, url: str, echo=False) -> None:
self._engine = create_engine(url, echo=echo)
def get_transactions(self):
"""¿Non-optimized? get_transactions, will load the entire Transaction"""
with Session(self.engine) as session:
stmt = select(Transaction).options(
joinedload("*"), selectinload(Transaction.tags)
)
return session.scalars(stmt).all()
def get_uncategorized(self):
with Session(self.engine) as session:
stmt = select(Transaction).where(~Transaction.category.has())
return session.scalars(stmt).all()
def get_categorized(self):
with Session(self.engine) as session:
stmt = select(Transaction).where(Transaction.category.has())
return session.scalars(stmt).all()
def insert_transactions(self, input: list[Transaction]):
with Session(self.engine) as session:
session.add_all(input)
session.commit()
def get_banks(self):
with Session(self.engine) as session:
stmt = select(Bank)
return session.scalars(stmt).all()
def get_nordigen_banks(self):
with Session(self.engine) as session:
stmt = select(Bank).where(Bank.nordigen.has())
return session.scalars(stmt).all()
@property
def engine(self):
return self._engine
@ -83,6 +50,20 @@ class DbClient:
def commit(self):
self.__session.commit()
def expunge_all(self):
self.__session.expunge_all()
def get(self, type, column=None, values=None):
if column is not None:
if values:
stmt = select(type).where(column.in_(values))
else:
stmt = select(type).where(column)
else:
stmt = select(type)
return self.__session.scalars(stmt).all()
def add(self, rows: list):
self.__session.add_all(rows)
@ -124,16 +105,12 @@ class DbClient:
)
self.__session.execute(stmt)
def uncategorized(self) -> list[Transaction]:
stmt = select(Transaction).where(~Transaction.category.has())
return self.__session.scalars(stmt).all()
def categories(self) -> list[Category]:
stmt = select(Category)
return self.__session.scalars(stmt).all()
def tags(self) -> list[Tag]:
stmt = select(Tag)
def transactions(self, min: date, max: date, banks: list[str]):
stmt = select(Transaction).where(
Transaction.date >= min,
Transaction.date <= max,
Transaction.bank.in_(banks),
)
return self.__session.scalars(stmt).all()
def session(self) -> ClientSession:

View File

@ -74,14 +74,17 @@ class Transaction(Base):
id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[dt.date]
description: Mapped[Optional[str]]
bank: Mapped[bankfk]
amount: Mapped[money]
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(init=False)
tags: Mapped[Optional[set[TransactionTag]]] = relationship(init=False)
def __lt__(self, other):
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
def __lt__(self, other: Transaction):
return self.date < other.date
@ -90,6 +93,26 @@ idfk = Annotated[
]
class IsSplit:
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
class BankTransaction(IsSplit, Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
class MoneyTransaction(IsSplit, Transaction):
__mapper_args__ = {"polymorphic_identity": "money"}
class SplitTransaction(Transaction):
original: Mapped[idfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
class CategoryGroup(Base):
__tablename__ = "categories_groups"

View File

@ -6,4 +6,4 @@ from pfbudget.db.model import Transaction
class Input(ABC):
@abstractmethod
def parse(self) -> list[Transaction]:
return NotImplemented
return NotImplementedError

View File

@ -1,30 +0,0 @@
import json
from .input import Input
from pfbudget.common.types import Transactions
from pfbudget.utils import convert, parse_decimal
class JsonParser(Input):
def __init__(self, manager, options):
super().__init__(manager)
self.options = options
def parse(self) -> Transactions:
try:
with open(self.options["json"][0], "r") as f:
return [
convert(
[
t["bookingDate"],
t["remittanceInformationUnstructured"],
self.options["bank"][0],
parse_decimal(t["transactionAmount"]["amount"])
if not self.options["invert"]
else -parse_decimal(t["transactionAmount"]["amount"]),
],
)
for t in json.load(f)["transactions"]["booked"]
]
except KeyError:
print("No json file defined")

View File

@ -6,17 +6,18 @@ from nordigen import NordigenClient
from uuid import uuid4
import json
import os
import webbrowser
from pfbudget.db.model import BankTransaction
from pfbudget.utils import convert
from .input import Input
from pfbudget.common.types import NoBankSelected
from pfbudget.db.model import Transaction
from pfbudget.utils import convert
load_dotenv()
class NordigenInput(Input):
redirect_url = "https://murta.dev"
def __init__(self):
super().__init__()
self._client = NordigenClient(
@ -24,27 +25,11 @@ class NordigenInput(Input):
secret_id=os.environ.get("SECRET_ID"),
)
self.client.token = self.__token()
# print(options)
# if "all" in options and options["all"]:
# self.__banks = self.manager.get_banks()
# elif "id" in options and options["id"]:
# self.__banks = [
# self.manager.get_bank_by("nordigen_id", b) for b in options["id"]
# ]
# elif "name" in options and options["name"]:
# self.__banks = [
# self.manager.get_bank_by("name", b) for b in options["name"]
# ]
# else:
# self.__banks = None
self._client.token = self.__token()
self._start = date.min
self._end = date.max
def parse(self) -> list[Transaction]:
def parse(self) -> list[BankTransaction]:
transactions = []
assert len(self._banks) > 0
@ -96,11 +81,15 @@ class NordigenInput(Input):
return token
def requisition(self, institution: str, country: str = "PT"):
link, _ = self.__requisition_id(institution, country)
webbrowser.open(link)
id = self._client.institution.get_institution_id_by_name(country, institution)
return self._client.initialize_session(
redirect_uri=self.redirect_url,
institution_id=id,
reference_id=str(uuid4()),
)
def list(self, country: str):
print(self._client.institution.get_institutions(country))
def country_banks(self, country: str):
return self._client.institution.get_institutions(country)
@property
def client(self):
@ -137,16 +126,3 @@ class NordigenInput(Input):
token = self._client.generate_token()
print(f"New access token: {token}")
return token
def __requisition_id(self, i: str, c: str):
id = self._client.institution.get_institution_id_by_name(
country=c, institution=i
)
init = self._client.initialize_session(
redirect_uri="https://murta.dev",
institution_id=id,
reference_id=str(uuid4()),
)
print(f"{i}({c}) link: {init.link} and requisition ID: {init.requisition_id}")
return (init.link, init.requisition_id)

View File

@ -0,0 +1 @@
__all__ = ["csv", "output"]

35
pfbudget/output/csv.py Normal file
View File

@ -0,0 +1,35 @@
from csv import DictReader, writer
from pfbudget.db.model import (
BankTransaction,
MoneyTransaction,
Transaction,
)
from .output import Output
class CSV(Output):
def __init__(self, filename: str):
self.fn = filename
def load(self) -> list[Transaction]:
with open(self.fn, "r", newline="") as f:
r = DictReader(f)
return [
BankTransaction(
row["date"], row["description"], row["amount"], False, row["bank"]
)
if row["bank"]
else MoneyTransaction(
row["date"], row["description"], False, row["amount"]
)
for row in r
]
def report(self, transactions: list[Transaction]):
with open(self.fn, "w", newline="") as f:
w = writer(f, delimiter="\t")
w.writerows(
[(t.date, t.description, t.amount, t.bank) for t in transactions]
)

View File

@ -0,0 +1,9 @@
from abc import ABC, abstractmethod
from pfbudget.db.model import Transaction
class Output(ABC):
@abstractmethod
def report(self, transactions: list[Transaction]):
raise NotImplementedError

View File

@ -1,9 +1,8 @@
from datetime import date, timedelta
from datetime import date
from functools import singledispatch
from pfbudget.common.types import TransactionError
from pfbudget.db.model import Bank, Transaction
from pfbudget.db.schema import DbBank, DbTransaction
from pfbudget.db.model import Bank, BankTransaction
from .utils import parse_decimal
@ -13,54 +12,11 @@ def convert(t):
pass
# @convert.register
# def _(t: Transaction) -> DbTransaction:
# return DbTransaction(
# t.date,
# t.description,
# t.bank,
# t.value,
# t.category,
# t.original,
# t.additional_comment,
# )
# @convert.register
# def _(db: DbTransaction) -> Transaction:
# try:
# return Transaction(db)
# except TransactionError:
# print(f"{db} is in the wrong format")
# @convert.register
# def _(db: DbBank, key: str = "") -> Bank:
# bank = Bank(db.name, db.bic, db.requisition_id, db.invert, db.offset, key=key)
# if not bank.invert:
# bank.invert = False
# if not bank.offset:
# bank.offset = 0
# return bank
# @convert.register
# def _(bank: Bank) -> DbBank:
# bank = DbBank(
# bank.name, bank.bic, "", "", bank.requisition_id, bank.invert, bank.offset
# )
# if not bank.invert:
# bank.invert = False
# if not bank.offset:
# bank.offset = 0
# return bank
@convert.register
def _(json: dict, bank: Bank) -> Transaction:
def _(json: dict, bank: Bank) -> BankTransaction:
i = -1 if bank.nordigen.invert else 1
try:
transaction = Transaction(
transaction = BankTransaction(
date=date.fromisoformat(json["bookingDate"]),
description=json["remittanceInformationUnstructured"],
bank=bank.name,

View File

@ -61,7 +61,6 @@ def find_credit_institution(fn, banks, creditcards):
def parse_args_period(args: dict):
start, end = date.min, date.max
print(args)
if args["start"]:
start = datetime.strptime(args["start"][0], "%Y/%m/%d").date()