Compare commits

..

No commits in common. "c42a399d3d08e49620234aaf7025909568615ddb" and "9500e808de000f354b9a3a98d6a44be357dacf73" have entirely different histories.

16 changed files with 334 additions and 422 deletions

View File

@ -1,74 +0,0 @@
"""Inheritance
Revision ID: 37d80de801a7
Revises: 8cc9870b0d74
Create Date: 2023-01-10 22:41:03.540108+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "37d80de801a7"
down_revision = "8cc9870b0d74"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"originals",
sa.Column("type", sa.String(), nullable=False),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("split", sa.Boolean(), nullable=True),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("original", sa.BigInteger(), nullable=True),
schema="transactions",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=True,
schema="transactions",
)
op.create_foreign_key(
op.f("fk_originals_original_originals"),
"originals",
"originals",
["original"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_originals_original_originals"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=False,
schema="transactions",
)
op.drop_column("originals", "original", schema="transactions")
op.drop_column("originals", "split", schema="transactions")
op.drop_column("originals", "type", schema="transactions")
# ### end Alembic commands ###

View File

@ -17,32 +17,32 @@ if __name__ == "__main__":
params = None
match (op):
case pfbudget.Operation.Parse:
keys = {"path", "bank", "creditcard"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {"path", "bank", "creditcard"}
params = [args["path"], args["bank"], args["creditcard"]]
case pfbudget.Operation.RequisitionId:
keys = {"name", "country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {"name", "country"}, "argparser ill defined"
params = [args["name"][0], args["country"][0]]
case pfbudget.Operation.Download:
keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"id",
"name",
"all",
"interval",
"start",
"end",
"year",
}, "argparser ill defined"
start, end = pfbudget.parse_args_period(args)
params = [start, end, args["dry_run"]]
if not args["all"]:
params.append(args["banks"])
else:
params.append([])
params = [start, end]
case pfbudget.Operation.BankAdd:
keys = {"bank", "bic", "type"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"bank",
"bic",
"type",
}, "argparser ill defined"
params = [
pfbudget.types.Bank(
@ -53,8 +53,12 @@ if __name__ == "__main__":
]
case pfbudget.Operation.BankMod:
keys = {"bank", "bic", "type", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"bank",
"bic",
"type",
"remove",
}, "argparser ill defined"
nargs_1 = ["bic", "type"]
@ -69,8 +73,12 @@ if __name__ == "__main__":
params = args["bank"]
case pfbudget.Operation.NordigenAdd:
keys = {"bank", "bank_id", "requisition_id", "invert"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"bank",
"bank_id",
"requisition_id",
"invert",
}, "argparser ill defined"
params = [
pfbudget.types.Nordigen(
@ -82,8 +90,13 @@ if __name__ == "__main__":
]
case pfbudget.Operation.NordigenMod:
keys = {"bank", "bank_id", "requisition_id", "invert", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"bank",
"bank_id",
"requisition_id",
"invert",
"remove",
}, "argparser ill defined"
nargs_1 = ["bank_id", "requisition_id"]
nargs_0 = ["invert"]
@ -99,24 +112,14 @@ if __name__ == "__main__":
assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"]
case pfbudget.Operation.NordigenCountryBanks:
keys = {"country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["country"][0]]
case pfbudget.Operation.CategoryAdd:
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {"category", "group"}, "argparser ill defined"
params = [
pfbudget.types.Category(cat, args["group"]) for cat in args["category"]
]
case pfbudget.Operation.CategoryUpdate:
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {"category", "group"}, "argparser ill defined"
params = [pfbudget.types.Category(cat) for cat in args["category"]]
params.append(args["group"])
@ -125,8 +128,11 @@ if __name__ == "__main__":
params = [pfbudget.types.Category(cat) for cat in args["category"]]
case pfbudget.Operation.CategorySchedule:
keys = {"category", "period", "frequency"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"category",
"period",
"frequency",
}, "argparser ill defined"
params = [
pfbudget.types.CategorySchedule(
@ -136,8 +142,14 @@ if __name__ == "__main__":
]
case pfbudget.Operation.RuleAdd:
keys = {"category", "date", "description", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"category",
"date",
"description",
"bank",
"min",
"max",
}, "argparser ill defined"
params = [
pfbudget.types.CategoryRule(
@ -153,13 +165,11 @@ if __name__ == "__main__":
]
case pfbudget.Operation.RuleRemove | pfbudget.Operation.TagRuleRemove:
keys = {"id"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {"id"}, "argparser ill defined"
params = args["id"]
case pfbudget.Operation.RuleModify:
keys = {
assert args.keys() >= {
"id",
"category",
"date",
@ -168,8 +178,7 @@ if __name__ == "__main__":
"min",
"max",
"remove",
}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
}, "argparser ill defined"
nargs_1 = ["category", "date", "description", "regex", "bank", "min", "max"]
params = []
@ -181,14 +190,18 @@ if __name__ == "__main__":
params.append(param)
case pfbudget.Operation.TagAdd:
keys = {"tag"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {"tag"}, "argparser ill defined"
params = [pfbudget.types.Tag(tag) for tag in args["tag"]]
case pfbudget.Operation.TagRuleAdd:
keys = {"tag", "date", "description", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"tag",
"date",
"description",
"bank",
"min",
"max",
}, "argparser ill defined"
params = [
pfbudget.types.TagRule(
@ -204,8 +217,16 @@ if __name__ == "__main__":
]
case pfbudget.Operation.TagRuleModify:
keys = {"id", "tag", "date", "description", "bank", "min", "max", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {
"id",
"tag",
"date",
"description",
"bank",
"min",
"max",
"remove",
}, "argparser ill defined"
nargs_1 = ["tag", "date", "description", "regex", "bank", "min", "max"]
params = []
@ -225,27 +246,9 @@ if __name__ == "__main__":
params = [pfbudget.types.CategoryGroup(group) for group in args["group"]]
case pfbudget.Operation.Forge | pfbudget.Operation.Dismantle:
keys = {"original", "links"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
assert args.keys() >= {"original", "links"}, "argparser ill defined"
params = [
pfbudget.types.Link(args["original"][0], link) for link in args["links"]
]
case pfbudget.Operation.Export:
keys = {"interval", "start", "end", "year", "all", "banks", "file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = pfbudget.parse_args_period(args)
params = [start, end]
if not args["all"]:
params.append(args["banks"])
params.append(args["file"][0])
case pfbudget.Operation.Import:
keys = {"file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["file"]
pfbudget.Manager(db, verbosity).action(op, params)
pfbudget.Manager(db, verbosity, args).action(op, params)

View File

@ -1,20 +1,19 @@
from dotenv import load_dotenv
from pathlib import Path
import argparse
import datetime as dt
import decimal
import os
import re
from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, Period
from pfbudget.input.nordigen import NordigenInput
from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils
load_dotenv()
DEFAULT_DB = os.environ.get("DEFAULT_DB")
DEFAULT_DB = "data.db"
class PfBudgetInitialized(Exception):
@ -30,6 +29,7 @@ class DataFileMissing(Exception):
def argparser() -> argparse.ArgumentParser:
universal = argparse.ArgumentParser(add_help=False)
universal.add_argument(
"-db",
@ -75,17 +75,16 @@ def argparser() -> argparse.ArgumentParser:
)
p_init.set_defaults(command=Operation.Init)
# Exports transactions to .csv file
export = subparsers.add_parser("export", parents=[period])
export.set_defaults(op=Operation.Export)
export.add_argument("file", nargs=1, type=str)
export_banks = export.add_mutually_exclusive_group()
export_banks.add_argument("--all", action="store_true")
export_banks.add_argument("--banks", nargs="+", type=str)
pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import)
pimport.add_argument("file", nargs=1, type=str)
"""
Exporting
"""
p_export = subparsers.add_parser(
"export",
description="Exports the selected database to a .csv file",
parents=[universal],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export())
# Parse from .csv
parse = subparsers.add_parser("parse")
@ -94,10 +93,15 @@ def argparser() -> argparse.ArgumentParser:
parse.add_argument("--bank", nargs=1, type=str)
parse.add_argument("--creditcard", nargs=1, type=str)
# Automatic/manual categorization
categorize = subparsers.add_parser("categorize").add_subparsers(required=True)
categorize.add_parser("auto").set_defaults(op=Operation.Categorize)
categorize.add_parser("manual").set_defaults(op=Operation.ManualCategorization)
"""
Categorizing
"""
categorize = subparsers.add_parser(
"categorize",
description="Categorizes the transactions in the selected database",
parents=[universal],
)
categorize.set_defaults(op=Operation.Categorize)
"""
Graph
@ -153,15 +157,37 @@ def argparser() -> argparse.ArgumentParser:
# Download through the Nordigen API
download = subparsers.add_parser("download", parents=[period])
download.set_defaults(op=Operation.Download)
download_banks = download.add_mutually_exclusive_group()
download_banks.add_argument("--all", action="store_true")
download_banks.add_argument("--banks", nargs="+", type=str)
download.add_argument("--dry-run", action="store_true")
download.add_argument("--id", nargs="+", type=str)
download.add_argument("--name", nargs="+", type=str)
download.add_argument("--all", action="store_true")
# List available banks in country C
banks = subparsers.add_parser("banks")
banks.set_defaults(op=Operation.NordigenCountryBanks)
banks.add_argument("country", nargs=1, type=str)
# """
# List available banks on Nordigen API
# """
# p_nordigen_list = subparsers.add_parser(
# "list",
# description="Lists banks in {country}",
# parents=[help],
# formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# )
# p_nordigen_list.add_argument("country", nargs=1, type=str)
# p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args))
# """
# Nordigen JSONs
# """
# p_nordigen_json = subparsers.add_parser(
# "json",
# description="",
# parents=[help],
# formatter_class=argparse.ArgumentDefaultsHelpFormatter,
# )
# p_nordigen_json.add_argument("json", nargs=1, type=str)
# p_nordigen_json.add_argument("bank", nargs=1, type=str)
# p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction)
# p_nordigen_json.set_defaults(
# func=lambda args: manager.parser(JsonParser(vars(args)))
# )
# Categories
category(subparsers.add_parser("category"))
@ -377,3 +403,9 @@ def link(parser: argparse.ArgumentParser):
dismantle.set_defaults(op=Operation.Dismantle)
dismantle.add_argument("original", nargs=1, type=int)
dismantle.add_argument("links", nargs="+", type=int)
def run():
args = vars(argparser().parse_args())
assert "op" in args, "No operation selected"
return args["op"], args

View File

@ -9,7 +9,6 @@ class Operation(Enum):
Parse = auto()
Download = auto()
Categorize = auto()
ManualCategorization = auto()
Token = auto()
RequisitionId = auto()
CategoryAdd = auto()
@ -34,9 +33,6 @@ class Operation(Enum):
NordigenAdd = auto()
NordigenMod = auto()
NordigenDel = auto()
NordigenCountryBanks = auto()
Export = auto()
Import = auto()
class TransactionError(Exception):

View File

@ -8,11 +8,8 @@ from pfbudget.db.model import (
TransactionTag,
)
from codetiming import Timer
from datetime import timedelta
Transactions = list[Transaction]
class Categorizer:
options = {}
@ -20,45 +17,25 @@ class Categorizer:
def __init__(self):
self.options["null_days"] = 4
def rules(
def categorize(
self,
transactions: Transactions,
transactions: list[Transaction],
categories: list[Category],
tags: list[Tag],
):
"""Overarching categorization tool
Receives a list of transactions (by ref) and updates their category according
to the rules defined for each category
Receives a list of transactions (by ref) and updates their category
Args:
transactions (list[Transaction]): uncategorized transactions
categories (list[Category]): available categories
tags (list[Tag]): currently available tags
"""
self._nullify(transactions)
self._rule_based_categories(transactions, categories)
self._rule_based_tags(transactions, tags)
def manual(
self,
transactions: Transactions,
categories: list[Category],
tags: list[Tag],
):
"""Manual categorization input
Args:
transactions (list[Transaction]): uncategorized transactions
categories (list[Category]): available categories
tags (list[Tag]): currently available tags
"""
self._manual(transactions)
@Timer(name="nullify")
def _nullify(self, transactions: Transactions):
def _nullify(self, transactions: list[Transaction]):
count = 0
matching = []
for transaction in transactions:
@ -88,9 +65,8 @@ class Categorizer:
print(f"Nullified {count} transactions")
@Timer(name="categoryrules")
def _rule_based_categories(
self, transactions: Transactions, categories: list[Category]
self, transactions: list[Transaction], categories: list[Category]
):
d = {}
for category in [c for c in categories if c.rules]:
@ -105,17 +81,6 @@ class Categorizer:
continue
# passed all conditions, assign category
if (
transaction.category
and transaction.category.name == category.name
):
if (
input(f"Overwrite {transaction} with {category}? (y/n)")
== "y"
):
transaction.category.name = category.name
transaction.category.selector.selector = Selector.rules
else:
transaction.category = TransactionCategory(
category.name, CategorySelector(Selector.rules)
)
@ -128,8 +93,7 @@ class Categorizer:
for k, v in d.items():
print(f"{v}: {k}")
@Timer(name="tagrules")
def _rule_based_tags(self, transactions: Transactions, tags: list[Tag]):
def _rule_based_tags(self, transactions: list[Transaction], tags: list[Tag]):
d = {}
for tag in [t for t in tags if t.rules]:
for rule in tag.rules:
@ -155,20 +119,3 @@ class Categorizer:
for k, v in d.items():
print(f"{v}: {k}")
def _manual(self, transactions: Transactions):
uncategorized = [t for t in transactions if not t.category]
print(f"{len(uncategorized)} transactions left to categorize")
for transaction in uncategorized:
while True:
category = input(f"{transaction} category: ")
if category == "quit":
return
if not category:
print("{category} doesn't exist")
continue
transaction.category = TransactionCategory(
category, CategorySelector(Selector.manual)
)
break

View File

@ -1,28 +1,29 @@
from pathlib import Path
import webbrowser
from pfbudget.common.types import Operation
from pfbudget.core.categorizer import Categorizer
from pfbudget.input.input import Input
from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.parsers import parse_data
from pfbudget.db.client import DbClient
from pfbudget.db.model import (
Bank,
Category,
CategoryGroup,
CategoryRule,
CategorySchedule,
Nordigen,
Rule,
Tag,
TagRule,
Transaction,
)
from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.parsers import parse_data
from pfbudget.output.csv import CSV
from pfbudget.output.output import Output
from pfbudget.common.types import Operation
from pfbudget.core.categorizer import Categorizer
from pfbudget.utils import convert
class Manager:
def __init__(self, db: str, verbosity: int = 0):
def __init__(self, db: str, verbosity: int = 0, args: dict = {}):
self._args = args
self._db = db
self._verbosity = verbosity
@ -53,39 +54,18 @@ class Manager:
case Operation.Download:
client = NordigenInput()
with self.db.session() as session:
if len(params[3]) == 0:
client.banks = session.get(Bank, Bank.nordigen)
else:
client.banks = session.get(Bank, Bank.name, params[3])
session.expunge_all()
client.banks = self.get_banks()
client.start = params[0]
client.end = params[1]
transactions = client.parse()
# dry-run
if not params[2]:
self.add_transactions(transactions)
else:
print(transactions)
case Operation.Categorize:
with self.db.session() as session:
uncategorized = session.get(
Transaction, ~Transaction.category.has()
)
categories = session.get(Category)
tags = session.get(Tag)
Categorizer().rules(uncategorized, categories, tags)
case Operation.ManualCategorization:
with self.db.session() as session:
uncategorized = session.get(
Transaction, ~Transaction.category.has()
)
categories = session.get(Category)
tags = session.get(Tag)
Categorizer().manual(uncategorized, categories, tags)
uncategorized = session.uncategorized()
categories = session.categories()
tags = session.tags()
Categorizer().categorize(uncategorized, categories, tags)
case Operation.BankMod:
with self.db.session() as session:
@ -107,13 +87,7 @@ class Manager:
NordigenInput().token()
case Operation.RequisitionId:
link, _ = NordigenInput().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
webbrowser.open(link)
case Operation.NordigenCountryBanks:
banks = NordigenInput().country_banks(params[0])
print(banks)
NordigenInput().requisition(params[0], params[1])
case Operation.BankAdd | Operation.CategoryAdd | Operation.NordigenAdd | Operation.RuleAdd | Operation.TagAdd | Operation.TagRuleAdd:
with self.db.session() as session:
@ -169,33 +143,6 @@ class Manager:
links = [link.link for link in params]
session.remove_links(original, links)
case Operation.Export:
with self.db.session() as session:
if len(params) < 4:
banks = [bank.name for bank in session.get(Bank)]
transactions = session.transactions(params[0], params[1], banks)
else:
transactions = session.transactions(
params[0], params[1], params[2]
)
csvwriter: Output = CSV(params[-1])
csvwriter.report(transactions)
case Operation.Import:
csvwriter: Output = CSV(params[0]) # Output is strange here
transactions = csvwriter.load()
if (
len(transactions) > 0
and input(
f"{transactions[:5]}\nDoes the import seem correct? (y/n)"
)
== "y"
):
with self.db.session() as session:
session.add(transactions)
# def init(self):
# client = DatabaseClient(self.__db)
# client.init()
@ -224,10 +171,17 @@ class Manager:
# bank = client.get_bank(key, value)
# return convert(bank)
def get_banks(self):
return self.db.get_nordigen_banks()
@property
def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 2)
return DbClient(self._db, self._verbosity > 0)
@db.setter
def db(self, url: str):
self._db = url
@property
def args(self) -> dict:
return self._args

View File

@ -1,10 +1,10 @@
from dataclasses import asdict
from datetime import date
from sqlalchemy import create_engine, delete, select, update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload, selectinload
from pfbudget.db.model import (
Bank,
Category,
CategoryGroup,
CategoryRule,
@ -31,6 +31,39 @@ class DbClient:
def __init__(self, url: str, echo=False) -> None:
self._engine = create_engine(url, echo=echo)
def get_transactions(self):
"""¿Non-optimized? get_transactions, will load the entire Transaction"""
with Session(self.engine) as session:
stmt = select(Transaction).options(
joinedload("*"), selectinload(Transaction.tags)
)
return session.scalars(stmt).all()
def get_uncategorized(self):
with Session(self.engine) as session:
stmt = select(Transaction).where(~Transaction.category.has())
return session.scalars(stmt).all()
def get_categorized(self):
with Session(self.engine) as session:
stmt = select(Transaction).where(Transaction.category.has())
return session.scalars(stmt).all()
def insert_transactions(self, input: list[Transaction]):
with Session(self.engine) as session:
session.add_all(input)
session.commit()
def get_banks(self):
with Session(self.engine) as session:
stmt = select(Bank)
return session.scalars(stmt).all()
def get_nordigen_banks(self):
with Session(self.engine) as session:
stmt = select(Bank).where(Bank.nordigen.has())
return session.scalars(stmt).all()
@property
def engine(self):
return self._engine
@ -50,20 +83,6 @@ class DbClient:
def commit(self):
self.__session.commit()
def expunge_all(self):
self.__session.expunge_all()
def get(self, type, column=None, values=None):
if column is not None:
if values:
stmt = select(type).where(column.in_(values))
else:
stmt = select(type).where(column)
else:
stmt = select(type)
return self.__session.scalars(stmt).all()
def add(self, rows: list):
self.__session.add_all(rows)
@ -105,12 +124,16 @@ class DbClient:
)
self.__session.execute(stmt)
def transactions(self, min: date, max: date, banks: list[str]):
stmt = select(Transaction).where(
Transaction.date >= min,
Transaction.date <= max,
Transaction.bank.in_(banks),
)
def uncategorized(self) -> list[Transaction]:
stmt = select(Transaction).where(~Transaction.category.has())
return self.__session.scalars(stmt).all()
def categories(self) -> list[Category]:
stmt = select(Category)
return self.__session.scalars(stmt).all()
def tags(self) -> list[Tag]:
stmt = select(Tag)
return self.__session.scalars(stmt).all()
def session(self) -> ClientSession:

View File

@ -74,17 +74,14 @@ class Transaction(Base):
id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[dt.date]
description: Mapped[Optional[str]]
bank: Mapped[bankfk]
amount: Mapped[money]
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(init=False)
tags: Mapped[Optional[set[TransactionTag]]] = relationship(init=False)
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
def __lt__(self, other: Transaction):
def __lt__(self, other):
return self.date < other.date
@ -93,26 +90,6 @@ idfk = Annotated[
]
class IsSplit:
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
class BankTransaction(IsSplit, Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
class MoneyTransaction(IsSplit, Transaction):
__mapper_args__ = {"polymorphic_identity": "money"}
class SplitTransaction(Transaction):
original: Mapped[idfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
class CategoryGroup(Base):
__tablename__ = "categories_groups"

View File

@ -6,4 +6,4 @@ from pfbudget.db.model import Transaction
class Input(ABC):
@abstractmethod
def parse(self) -> list[Transaction]:
return NotImplementedError
return NotImplemented

30
pfbudget/input/json.py Normal file
View File

@ -0,0 +1,30 @@
import json
from .input import Input
from pfbudget.common.types import Transactions
from pfbudget.utils import convert, parse_decimal
class JsonParser(Input):
def __init__(self, manager, options):
super().__init__(manager)
self.options = options
def parse(self) -> Transactions:
try:
with open(self.options["json"][0], "r") as f:
return [
convert(
[
t["bookingDate"],
t["remittanceInformationUnstructured"],
self.options["bank"][0],
parse_decimal(t["transactionAmount"]["amount"])
if not self.options["invert"]
else -parse_decimal(t["transactionAmount"]["amount"]),
],
)
for t in json.load(f)["transactions"]["booked"]
]
except KeyError:
print("No json file defined")

View File

@ -6,18 +6,17 @@ from nordigen import NordigenClient
from uuid import uuid4
import json
import os
from pfbudget.db.model import BankTransaction
from pfbudget.utils import convert
import webbrowser
from .input import Input
from pfbudget.common.types import NoBankSelected
from pfbudget.db.model import Transaction
from pfbudget.utils import convert
load_dotenv()
class NordigenInput(Input):
redirect_url = "https://murta.dev"
def __init__(self):
super().__init__()
self._client = NordigenClient(
@ -25,11 +24,27 @@ class NordigenInput(Input):
secret_id=os.environ.get("SECRET_ID"),
)
self._client.token = self.__token()
self.client.token = self.__token()
# print(options)
# if "all" in options and options["all"]:
# self.__banks = self.manager.get_banks()
# elif "id" in options and options["id"]:
# self.__banks = [
# self.manager.get_bank_by("nordigen_id", b) for b in options["id"]
# ]
# elif "name" in options and options["name"]:
# self.__banks = [
# self.manager.get_bank_by("name", b) for b in options["name"]
# ]
# else:
# self.__banks = None
self._start = date.min
self._end = date.max
def parse(self) -> list[BankTransaction]:
def parse(self) -> list[Transaction]:
transactions = []
assert len(self._banks) > 0
@ -81,15 +96,11 @@ class NordigenInput(Input):
return token
def requisition(self, institution: str, country: str = "PT"):
id = self._client.institution.get_institution_id_by_name(country, institution)
return self._client.initialize_session(
redirect_uri=self.redirect_url,
institution_id=id,
reference_id=str(uuid4()),
)
link, _ = self.__requisition_id(institution, country)
webbrowser.open(link)
def country_banks(self, country: str):
return self._client.institution.get_institutions(country)
def list(self, country: str):
print(self._client.institution.get_institutions(country))
@property
def client(self):
@ -126,3 +137,16 @@ class NordigenInput(Input):
token = self._client.generate_token()
print(f"New access token: {token}")
return token
def __requisition_id(self, i: str, c: str):
id = self._client.institution.get_institution_id_by_name(
country=c, institution=i
)
init = self._client.initialize_session(
redirect_uri="https://murta.dev",
institution_id=id,
reference_id=str(uuid4()),
)
print(f"{i}({c}) link: {init.link} and requisition ID: {init.requisition_id}")
return (init.link, init.requisition_id)

View File

@ -1 +0,0 @@
__all__ = ["csv", "output"]

View File

@ -1,35 +0,0 @@
from csv import DictReader, writer
from pfbudget.db.model import (
BankTransaction,
MoneyTransaction,
Transaction,
)
from .output import Output
class CSV(Output):
def __init__(self, filename: str):
self.fn = filename
def load(self) -> list[Transaction]:
with open(self.fn, "r", newline="") as f:
r = DictReader(f)
return [
BankTransaction(
row["date"], row["description"], row["amount"], False, row["bank"]
)
if row["bank"]
else MoneyTransaction(
row["date"], row["description"], False, row["amount"]
)
for row in r
]
def report(self, transactions: list[Transaction]):
with open(self.fn, "w", newline="") as f:
w = writer(f, delimiter="\t")
w.writerows(
[(t.date, t.description, t.amount, t.bank) for t in transactions]
)

View File

@ -1,9 +0,0 @@
from abc import ABC, abstractmethod
from pfbudget.db.model import Transaction
class Output(ABC):
@abstractmethod
def report(self, transactions: list[Transaction]):
raise NotImplementedError

View File

@ -1,8 +1,9 @@
from datetime import date
from datetime import date, timedelta
from functools import singledispatch
from pfbudget.common.types import TransactionError
from pfbudget.db.model import Bank, BankTransaction
from pfbudget.db.model import Bank, Transaction
from pfbudget.db.schema import DbBank, DbTransaction
from .utils import parse_decimal
@ -12,11 +13,54 @@ def convert(t):
pass
# @convert.register
# def _(t: Transaction) -> DbTransaction:
# return DbTransaction(
# t.date,
# t.description,
# t.bank,
# t.value,
# t.category,
# t.original,
# t.additional_comment,
# )
# @convert.register
# def _(db: DbTransaction) -> Transaction:
# try:
# return Transaction(db)
# except TransactionError:
# print(f"{db} is in the wrong format")
# @convert.register
# def _(db: DbBank, key: str = "") -> Bank:
# bank = Bank(db.name, db.bic, db.requisition_id, db.invert, db.offset, key=key)
# if not bank.invert:
# bank.invert = False
# if not bank.offset:
# bank.offset = 0
# return bank
# @convert.register
# def _(bank: Bank) -> DbBank:
# bank = DbBank(
# bank.name, bank.bic, "", "", bank.requisition_id, bank.invert, bank.offset
# )
# if not bank.invert:
# bank.invert = False
# if not bank.offset:
# bank.offset = 0
# return bank
@convert.register
def _(json: dict, bank: Bank) -> BankTransaction:
def _(json: dict, bank: Bank) -> Transaction:
i = -1 if bank.nordigen.invert else 1
try:
transaction = BankTransaction(
transaction = Transaction(
date=date.fromisoformat(json["bookingDate"]),
description=json["remittanceInformationUnstructured"],
bank=bank.name,

View File

@ -61,6 +61,7 @@ def find_credit_institution(fn, banks, creditcards):
def parse_args_period(args: dict):
start, end = date.min, date.max
print(args)
if args["start"]:
start = datetime.strptime(args["start"][0], "%Y/%m/%d").date()