Compare commits
No commits in common. "29c5206638d13ef42e70d9bbbfff0435624101a8" and "bdd7cac4be597cba3232ffc60c9e539bb13fcbd5" have entirely different histories.
29c5206638
...
bdd7cac4be
3
.gitignore
vendored
3
.gitignore
vendored
@ -174,6 +174,3 @@ poetry.toml
|
||||
pyrightconfig.json
|
||||
|
||||
# End of https://www.toptal.com/developers/gitignore/api/python
|
||||
|
||||
# Project specific ignores
|
||||
database.db
|
||||
|
||||
@ -1,35 +0,0 @@
|
||||
"""nordigen tokens
|
||||
|
||||
Revision ID: 325b901ac712
|
||||
Revises: 60469d5dd2b0
|
||||
Create Date: 2023-05-25 19:10:10.374008+00:00
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "325b901ac712"
|
||||
down_revision = "60469d5dd2b0"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.create_table(
|
||||
"nordigen",
|
||||
sa.Column("type", sa.String(), nullable=False),
|
||||
sa.Column("token", sa.String(), nullable=False),
|
||||
sa.Column("expires", sa.DateTime(), nullable=False),
|
||||
sa.PrimaryKeyConstraint("type", name=op.f("pk_nordigen")),
|
||||
schema="pfbudget",
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table("nordigen", schema="pfbudget")
|
||||
# ### end Alembic commands ###
|
||||
@ -1,48 +0,0 @@
|
||||
"""Drop SQLAlchemy enum
|
||||
|
||||
Revision ID: 60469d5dd2b0
|
||||
Revises: b599dafcf468
|
||||
Create Date: 2023-05-15 19:24:07.911352+00:00
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "60469d5dd2b0"
|
||||
down_revision = "b599dafcf468"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.execute(
|
||||
"""
|
||||
CREATE TYPE pfbudget.scheduleperiod
|
||||
AS ENUM ('daily', 'weekly', 'monthly', 'yearly')
|
||||
"""
|
||||
)
|
||||
op.execute(
|
||||
"""ALTER TABLE pfbudget.category_schedules
|
||||
ALTER COLUMN period TYPE pfbudget.scheduleperiod
|
||||
USING period::text::pfbudget.scheduleperiod
|
||||
"""
|
||||
)
|
||||
op.execute("DROP TYPE pfbudget.period")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.execute(
|
||||
"""
|
||||
CREATE TYPE pfbudget.period
|
||||
AS ENUM ('daily', 'weekly', 'monthly', 'yearly')
|
||||
"""
|
||||
)
|
||||
op.execute(
|
||||
"""ALTER TABLE pfbudget.category_schedules
|
||||
ALTER COLUMN period TYPE pfbudget.period
|
||||
USING period::text::pfbudget.period
|
||||
"""
|
||||
)
|
||||
op.execute("DROP TYPE pfbudget.scheduleperiod")
|
||||
@ -1,74 +0,0 @@
|
||||
"""Compact category selector
|
||||
|
||||
Revision ID: 8623e709e111
|
||||
Revises: ce68ee15e5d2
|
||||
Create Date: 2023-05-08 19:00:51.063240+00:00
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects import postgresql
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "8623e709e111"
|
||||
down_revision = "ce68ee15e5d2"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_table("category_selectors", schema="pfbudget")
|
||||
op.add_column(
|
||||
"transactions_categorized",
|
||||
sa.Column(
|
||||
"selector",
|
||||
sa.Enum(
|
||||
"unknown",
|
||||
"nullifier",
|
||||
"vacations",
|
||||
"rules",
|
||||
"algorithm",
|
||||
"manual",
|
||||
name="selector_t",
|
||||
schema="pfbudget",
|
||||
inherit_schema=True,
|
||||
),
|
||||
nullable=False,
|
||||
),
|
||||
schema="pfbudget",
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column("transactions_categorized", "selector", schema="pfbudget")
|
||||
op.create_table(
|
||||
"category_selectors",
|
||||
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
|
||||
sa.Column(
|
||||
"selector",
|
||||
postgresql.ENUM(
|
||||
"unknown",
|
||||
"nullifier",
|
||||
"vacations",
|
||||
"rules",
|
||||
"algorithm",
|
||||
"manual",
|
||||
name="selector_t",
|
||||
schema="pfbudget",
|
||||
),
|
||||
autoincrement=False,
|
||||
nullable=False,
|
||||
),
|
||||
sa.ForeignKeyConstraint(
|
||||
["id"],
|
||||
["pfbudget.transactions_categorized.id"],
|
||||
name="fk_category_selectors_id_transactions_categorized",
|
||||
ondelete="CASCADE",
|
||||
),
|
||||
sa.PrimaryKeyConstraint("id", name="pk_category_selectors"),
|
||||
schema="pfbudget",
|
||||
)
|
||||
# ### end Alembic commands ###
|
||||
@ -1,46 +0,0 @@
|
||||
"""Selector type name change
|
||||
|
||||
Revision ID: b599dafcf468
|
||||
Revises: 8623e709e111
|
||||
Create Date: 2023-05-08 19:46:20.661214+00:00
|
||||
|
||||
"""
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = "b599dafcf468"
|
||||
down_revision = "8623e709e111"
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
op.execute(
|
||||
"""
|
||||
CREATE TYPE pfbudget.categoryselector
|
||||
AS ENUM ('unknown', 'nullifier', 'vacations', 'rules', 'algorithm', 'manual')
|
||||
"""
|
||||
)
|
||||
op.execute(
|
||||
"""ALTER TABLE pfbudget.transactions_categorized
|
||||
ALTER COLUMN selector TYPE pfbudget.categoryselector
|
||||
USING selector::text::pfbudget.categoryselector
|
||||
"""
|
||||
)
|
||||
op.execute("DROP TYPE pfbudget.selector_t")
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
op.execute(
|
||||
"""
|
||||
CREATE TYPE pfbudget.selector_t
|
||||
AS ENUM ('unknown', 'nullifier', 'vacations', 'rules', 'algorithm', 'manual')
|
||||
"""
|
||||
)
|
||||
op.execute(
|
||||
"""ALTER TABLE pfbudget.transactions_categorized
|
||||
ALTER COLUMN selector TYPE pfbudget.selector_t
|
||||
USING selector::text::pfbudget.selector_t
|
||||
"""
|
||||
)
|
||||
op.execute("DROP TYPE pfbudget.categoryselector")
|
||||
@ -38,10 +38,10 @@ if __name__ == "__main__":
|
||||
params = [args["path"], args["bank"], args["creditcard"]]
|
||||
|
||||
case Operation.RequisitionId:
|
||||
keys = {"bank"}
|
||||
keys = {"name", "country"}
|
||||
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||
|
||||
params = [args["bank"][0]]
|
||||
params = [args["name"][0], args["country"][0]]
|
||||
|
||||
case Operation.Download:
|
||||
keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
|
||||
@ -163,14 +163,14 @@ if __name__ == "__main__":
|
||||
|
||||
params = [
|
||||
type.CategoryRule(
|
||||
args["start"][0] if args["start"] else None,
|
||||
args["end"][0] if args["end"] else None,
|
||||
args["description"][0] if args["description"] else None,
|
||||
args["regex"][0] if args["regex"] else None,
|
||||
args["bank"][0] if args["bank"] else None,
|
||||
args["min"][0] if args["min"] else None,
|
||||
args["max"][0] if args["max"] else None,
|
||||
cat,
|
||||
start=args["start"][0] if args["start"] else None,
|
||||
end=args["end"][0] if args["end"] else None,
|
||||
description=args["description"][0] if args["description"] else None,
|
||||
regex=args["regex"][0] if args["regex"] else None,
|
||||
bank=args["bank"][0] if args["bank"] else None,
|
||||
min=args["min"][0] if args["min"] else None,
|
||||
max=args["max"][0] if args["max"] else None,
|
||||
)
|
||||
for cat in args["category"]
|
||||
]
|
||||
@ -215,14 +215,14 @@ if __name__ == "__main__":
|
||||
|
||||
params = [
|
||||
type.TagRule(
|
||||
args["start"][0] if args["start"] else None,
|
||||
args["end"][0] if args["end"] else None,
|
||||
args["description"][0] if args["description"] else None,
|
||||
args["regex"][0] if args["regex"] else None,
|
||||
args["bank"][0] if args["bank"] else None,
|
||||
args["min"][0] if args["min"] else None,
|
||||
args["max"][0] if args["max"] else None,
|
||||
tag,
|
||||
start=args["start"][0] if args["start"] else None,
|
||||
end=args["end"][0] if args["end"] else None,
|
||||
description=args["description"][0] if args["description"] else None,
|
||||
regex=args["regex"][0] if args["regex"] else None,
|
||||
bank=args["bank"][0] if args["bank"] else None,
|
||||
min=args["min"][0] if args["min"] else None,
|
||||
max=args["max"][0] if args["max"] else None,
|
||||
)
|
||||
for tag in args["tag"]
|
||||
]
|
||||
|
||||
@ -6,7 +6,7 @@ import os
|
||||
import re
|
||||
|
||||
from pfbudget.common.types import Operation
|
||||
from pfbudget.db.model import AccountType, SchedulePeriod
|
||||
from pfbudget.db.model import AccountType, Period
|
||||
|
||||
from pfbudget.db.sqlite import DatabaseClient
|
||||
import pfbudget.reporting.graph
|
||||
@ -60,12 +60,11 @@ def argparser() -> argparse.ArgumentParser:
|
||||
# init = subparsers.add_parser("init")
|
||||
# init.set_defaults(op=Operation.Init)
|
||||
|
||||
# Exports transactions to specified format and file
|
||||
# Exports transactions to .csv file
|
||||
export = subparsers.add_parser("export")
|
||||
export.set_defaults(op=Operation.Export)
|
||||
file_options(export)
|
||||
|
||||
# Imports transactions from specified format and file
|
||||
pimport = subparsers.add_parser("import")
|
||||
pimport.set_defaults(op=Operation.Import)
|
||||
file_options(pimport)
|
||||
@ -133,7 +132,8 @@ def argparser() -> argparse.ArgumentParser:
|
||||
# PSD2 requisition id
|
||||
requisition = subparsers.add_parser("eua")
|
||||
requisition.set_defaults(op=Operation.RequisitionId)
|
||||
requisition.add_argument("bank", nargs=1, type=str)
|
||||
requisition.add_argument("id", nargs=1, type=str)
|
||||
requisition.add_argument("country", nargs=1, type=str)
|
||||
|
||||
# Download through the PSD2 API
|
||||
download = subparsers.add_parser("download", parents=[period])
|
||||
@ -268,7 +268,7 @@ def category(parser: argparse.ArgumentParser):
|
||||
schedule = commands.add_parser("schedule")
|
||||
schedule.set_defaults(op=Operation.CategorySchedule)
|
||||
schedule.add_argument("category", nargs="+", type=str)
|
||||
schedule.add_argument("period", nargs=1, choices=[e.value for e in SchedulePeriod])
|
||||
schedule.add_argument("period", nargs=1, choices=[e.value for e in Period])
|
||||
schedule.add_argument("--frequency", nargs=1, default=[1], type=int)
|
||||
|
||||
rule = commands.add_parser("rule")
|
||||
|
||||
@ -3,8 +3,9 @@ import decimal
|
||||
from ..core.manager import Manager
|
||||
from ..db.model import (
|
||||
Category,
|
||||
Note,
|
||||
CategorySelector,
|
||||
Note,
|
||||
Selector_T,
|
||||
SplitTransaction,
|
||||
Tag,
|
||||
Transaction,
|
||||
@ -15,13 +16,15 @@ from ..db.model import (
|
||||
|
||||
class Interactive:
|
||||
help = "category(:tag)/split/note:/skip/quit"
|
||||
selector = CategorySelector.manual
|
||||
selector = Selector_T.manual
|
||||
|
||||
def __init__(self, manager: Manager) -> None:
|
||||
self.manager = manager
|
||||
|
||||
self.categories = self.manager.database.select(Category)
|
||||
self.tags = self.manager.database.select(Tag)
|
||||
with self.manager.db.session() as session:
|
||||
self.categories = session.get(Category)
|
||||
self.tags = session.get(Tag)
|
||||
session.expunge_all()
|
||||
|
||||
def intro(self) -> None:
|
||||
print(
|
||||
@ -32,34 +35,28 @@ class Interactive:
|
||||
def start(self) -> None:
|
||||
self.intro()
|
||||
|
||||
with self.manager.database.session as session:
|
||||
uncategorized = session.select(
|
||||
Transaction, lambda: ~Transaction.category.has()
|
||||
)
|
||||
uncategorized.sort()
|
||||
with self.manager.db.session() as session:
|
||||
uncategorized = session.uncategorized()
|
||||
n = len(uncategorized)
|
||||
print(f"{n} left to categorize")
|
||||
|
||||
i = 0
|
||||
new = []
|
||||
|
||||
while (command := input("$ ")) != "quit" and i < len(uncategorized):
|
||||
current = uncategorized[i] if len(new) == 0 else new.pop()
|
||||
print(current)
|
||||
|
||||
next = uncategorized[i]
|
||||
print(next)
|
||||
while (command := input("$ ")) != "quit":
|
||||
match command:
|
||||
case "help":
|
||||
print(self.help)
|
||||
|
||||
case "skip":
|
||||
if len(uncategorized) == 0:
|
||||
i += 1
|
||||
|
||||
case "quit":
|
||||
break
|
||||
|
||||
case "split":
|
||||
new = self.split(current)
|
||||
new = self.split(next)
|
||||
session.insert(new)
|
||||
|
||||
case other:
|
||||
@ -70,32 +67,35 @@ class Interactive:
|
||||
if other.startswith("note:"):
|
||||
# TODO adding notes to a splitted transaction won't allow
|
||||
# categorization
|
||||
current.note = Note(other[len("note:") :].strip())
|
||||
next.note = Note(other[len("note:") :].strip())
|
||||
else:
|
||||
ct = other.split(":")
|
||||
if (category := ct[0]) not in [
|
||||
c.name for c in self.categories
|
||||
]:
|
||||
print(self.help, self.categories)
|
||||
continue
|
||||
|
||||
tags = []
|
||||
if len(ct) > 1:
|
||||
tags = ct[1:]
|
||||
|
||||
current.category = TransactionCategory(
|
||||
category, self.selector
|
||||
next.category = TransactionCategory(
|
||||
category, CategorySelector(self.selector)
|
||||
)
|
||||
for tag in tags:
|
||||
if tag not in [t.name for t in self.tags]:
|
||||
session.insert([Tag(tag)])
|
||||
self.tags = session.get(Tag)
|
||||
|
||||
current.tags.add(TransactionTag(tag))
|
||||
next.tags.add(TransactionTag(tag))
|
||||
|
||||
if len(new) == 0:
|
||||
i += 1
|
||||
|
||||
session.commit()
|
||||
|
||||
next = uncategorized[i] if len(new) == 0 else new.pop()
|
||||
print(next)
|
||||
|
||||
def split(self, original: Transaction) -> list[SplitTransaction]:
|
||||
total = original.amount
|
||||
new = []
|
||||
|
||||
@ -51,11 +51,6 @@ class Operation(Enum):
|
||||
ImportCategoryGroups = auto()
|
||||
|
||||
|
||||
class ExportFormat(Enum):
|
||||
JSON = auto()
|
||||
pickle = auto()
|
||||
|
||||
|
||||
class TransactionError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
@ -1,128 +0,0 @@
|
||||
from abc import ABC, abstractmethod
|
||||
import json
|
||||
from pathlib import Path
|
||||
import pickle
|
||||
from typing import Type
|
||||
|
||||
from pfbudget.common.types import ExportFormat
|
||||
from pfbudget.db.client import Client
|
||||
from pfbudget.db.model import (
|
||||
Bank,
|
||||
Category,
|
||||
CategoryGroup,
|
||||
Serializable,
|
||||
Tag,
|
||||
Transaction,
|
||||
)
|
||||
|
||||
# required for the backup import
|
||||
import pfbudget.db.model
|
||||
|
||||
|
||||
class Command(ABC):
|
||||
@abstractmethod
|
||||
def execute(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
def undo(self) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class ExportCommand(Command):
|
||||
def __init__(
|
||||
self, client: Client, what: Type[Serializable], fn: Path, format: ExportFormat
|
||||
):
|
||||
self.__client = client
|
||||
self.what = what
|
||||
self.fn = fn
|
||||
self.format = format
|
||||
|
||||
def execute(self) -> None:
|
||||
values = self.__client.select(self.what)
|
||||
match self.format:
|
||||
case ExportFormat.JSON:
|
||||
with open(self.fn, "w", newline="") as f:
|
||||
json.dump([e.serialize() for e in values], f, indent=4)
|
||||
case ExportFormat.pickle:
|
||||
raise AttributeError("pickle export not working at the moment!")
|
||||
with open(self.fn, "wb") as f:
|
||||
pickle.dump(values, f)
|
||||
|
||||
|
||||
class ImportCommand(Command):
|
||||
def __init__(
|
||||
self, client: Client, what: Type[Serializable], fn: Path, format: ExportFormat
|
||||
):
|
||||
self.__client = client
|
||||
self.what = what
|
||||
self.fn = fn
|
||||
self.format = format
|
||||
|
||||
def execute(self) -> None:
|
||||
match self.format:
|
||||
case ExportFormat.JSON:
|
||||
with open(self.fn, "r") as f:
|
||||
try:
|
||||
values = json.load(f)
|
||||
values = [self.what.deserialize(v) for v in values]
|
||||
except json.JSONDecodeError as e:
|
||||
raise ImportFailedError(e)
|
||||
|
||||
case ExportFormat.pickle:
|
||||
raise AttributeError("pickle import not working at the moment!")
|
||||
with open(self.fn, "rb") as f:
|
||||
values = pickle.load(f)
|
||||
|
||||
self.__client.insert(values)
|
||||
|
||||
|
||||
class ImportFailedError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class BackupCommand(Command):
|
||||
def __init__(self, client: Client, fn: Path, format: ExportFormat) -> None:
|
||||
self.__client = client
|
||||
self.fn = fn
|
||||
self.format = format
|
||||
|
||||
def execute(self) -> None:
|
||||
banks = self.__client.select(Bank)
|
||||
groups = self.__client.select(CategoryGroup)
|
||||
categories = self.__client.select(Category)
|
||||
tags = self.__client.select(Tag)
|
||||
transactions = self.__client.select(Transaction)
|
||||
|
||||
values = [*banks, *groups, *categories, *tags, *transactions]
|
||||
|
||||
match self.format:
|
||||
case ExportFormat.JSON:
|
||||
with open(self.fn, "w", newline="") as f:
|
||||
json.dump([e.serialize() for e in values], f, indent=4)
|
||||
case ExportFormat.pickle:
|
||||
raise AttributeError("pickle export not working at the moment!")
|
||||
|
||||
|
||||
class ImportBackupCommand(Command):
|
||||
def __init__(self, client: Client, fn: Path, format: ExportFormat) -> None:
|
||||
self.__client = client
|
||||
self.fn = fn
|
||||
self.format = format
|
||||
|
||||
def execute(self) -> None:
|
||||
match self.format:
|
||||
case ExportFormat.JSON:
|
||||
with open(self.fn, "r") as f:
|
||||
try:
|
||||
values = json.load(f)
|
||||
values = [
|
||||
getattr(pfbudget.db.model, v["class_"]).deserialize(v)
|
||||
for v in values
|
||||
]
|
||||
except json.JSONDecodeError as e:
|
||||
raise ImportFailedError(e)
|
||||
|
||||
case ExportFormat.pickle:
|
||||
raise AttributeError("pickle import not working at the moment!")
|
||||
|
||||
self.__client.insert(values)
|
||||
@ -1,3 +1,4 @@
|
||||
import csv
|
||||
import json
|
||||
from pathlib import Path
|
||||
import pickle
|
||||
@ -13,11 +14,12 @@ from pfbudget.db.model import (
|
||||
CategoryGroup,
|
||||
CategoryRule,
|
||||
CategorySchedule,
|
||||
CategorySelector,
|
||||
Link,
|
||||
MoneyTransaction,
|
||||
NordigenBank,
|
||||
Nordigen,
|
||||
Rule,
|
||||
CategorySelector,
|
||||
Selector_T,
|
||||
SplitTransaction,
|
||||
Tag,
|
||||
TagRule,
|
||||
@ -79,7 +81,7 @@ class Manager:
|
||||
else:
|
||||
banks = self.database.select(Bank, Bank.nordigen)
|
||||
|
||||
extractor = PSD2Extractor(self.nordigen_client())
|
||||
extractor = PSD2Extractor(Manager.nordigen_client())
|
||||
|
||||
transactions = []
|
||||
for bank in banks:
|
||||
@ -101,20 +103,10 @@ class Manager:
|
||||
categories = session.select(Category)
|
||||
tags = session.select(Tag)
|
||||
|
||||
rules = [
|
||||
rule
|
||||
for cat in categories
|
||||
if cat.name == "null"
|
||||
for rule in cat.rules
|
||||
]
|
||||
rules = [cat.rules for cat in categories if cat.name == "null"]
|
||||
Nullifier(rules).transform_inplace(uncategorized)
|
||||
|
||||
rules = [
|
||||
rule
|
||||
for cat in categories
|
||||
if cat.name != "null"
|
||||
for rule in cat.rules
|
||||
]
|
||||
rules = [rule for cat in categories for rule in cat.rules]
|
||||
Categorizer(rules).transform_inplace(uncategorized)
|
||||
|
||||
rules = [rule for tag in tags for rule in tag.rules]
|
||||
@ -124,34 +116,24 @@ class Manager:
|
||||
self.database.update(Bank, params)
|
||||
|
||||
case Operation.PSD2Mod:
|
||||
self.database.update(NordigenBank, params)
|
||||
self.database.update(Nordigen, params)
|
||||
|
||||
case Operation.BankDel:
|
||||
self.database.delete(Bank, Bank.name, params)
|
||||
|
||||
case Operation.PSD2Del:
|
||||
self.database.delete(NordigenBank, NordigenBank.name, params)
|
||||
self.database.delete(Nordigen, Nordigen.name, params)
|
||||
|
||||
case Operation.Token:
|
||||
Manager.nordigen_client().generate_token()
|
||||
|
||||
case Operation.RequisitionId:
|
||||
bank_name = params[0]
|
||||
bank = self.database.select(Bank, (lambda: Bank.name == bank_name))[0]
|
||||
|
||||
if not bank.nordigen or not bank.nordigen.bank_id:
|
||||
raise ValueError(f"{bank} doesn't have a Nordigen ID")
|
||||
|
||||
link, req_id = self.nordigen_client().new_requisition(
|
||||
bank.nordigen.bank_id
|
||||
)
|
||||
|
||||
self.database.update(
|
||||
NordigenBank,
|
||||
[{"name": bank.nordigen.name, "requisition_id": req_id}],
|
||||
)
|
||||
|
||||
link, _ = Manager.nordigen_client().requisition(params[0], params[1])
|
||||
print(f"Opening {link} to request access to {params[0]}")
|
||||
webbrowser.open(link)
|
||||
|
||||
case Operation.PSD2CountryBanks:
|
||||
banks = self.nordigen_client().country_banks(params[0])
|
||||
banks = Manager.nordigen_client().country_banks(params[0])
|
||||
print(banks)
|
||||
|
||||
case (
|
||||
@ -263,7 +245,10 @@ class Manager:
|
||||
session.insert(transactions)
|
||||
|
||||
case Operation.Export:
|
||||
self.dump(params[0], params[1], self.database.select(Transaction))
|
||||
with self.database.session as session:
|
||||
self.dump(
|
||||
params[0], params[1], self.database.select(Transaction, session)
|
||||
)
|
||||
|
||||
case Operation.Import:
|
||||
transactions = []
|
||||
@ -289,7 +274,8 @@ class Manager:
|
||||
|
||||
if category := row.pop("category", None):
|
||||
transaction.category = TransactionCategory(
|
||||
category["name"], category["selector"]["selector"]
|
||||
category["name"],
|
||||
CategorySelector(category["selector"]["selector"]),
|
||||
)
|
||||
|
||||
transactions.append(transaction)
|
||||
@ -298,21 +284,27 @@ class Manager:
|
||||
self.database.insert(transactions)
|
||||
|
||||
case Operation.ExportBanks:
|
||||
self.dump(params[0], params[1], self.database.select(Bank))
|
||||
with self.database.session as session:
|
||||
self.dump(params[0], params[1], self.database.select(Bank, session))
|
||||
|
||||
case Operation.ImportBanks:
|
||||
banks = []
|
||||
for row in self.load(params[0], params[1]):
|
||||
bank = Bank(row["name"], row["BIC"], row["type"])
|
||||
if row["nordigen"]:
|
||||
bank.nordigen = NordigenBank(**row["nordigen"])
|
||||
bank.nordigen = Nordigen(**row["nordigen"])
|
||||
banks.append(bank)
|
||||
|
||||
if self.certify(banks):
|
||||
self.database.insert(banks)
|
||||
|
||||
case Operation.ExportCategoryRules:
|
||||
self.dump(params[0], params[1], self.database.select(CategoryRule))
|
||||
with self.database.session as session:
|
||||
self.dump(
|
||||
params[0],
|
||||
params[1],
|
||||
self.database.select(CategoryRule, session),
|
||||
)
|
||||
|
||||
case Operation.ImportCategoryRules:
|
||||
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
|
||||
@ -321,7 +313,10 @@ class Manager:
|
||||
self.database.insert(rules)
|
||||
|
||||
case Operation.ExportTagRules:
|
||||
self.dump(params[0], params[1], self.database.select(TagRule))
|
||||
with self.database.session as session:
|
||||
self.dump(
|
||||
params[0], params[1], self.database.select(TagRule, session)
|
||||
)
|
||||
|
||||
case Operation.ImportTagRules:
|
||||
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
|
||||
@ -330,7 +325,10 @@ class Manager:
|
||||
self.database.insert(rules)
|
||||
|
||||
case Operation.ExportCategories:
|
||||
self.dump(params[0], params[1], self.database.select(Category))
|
||||
with self.database.session as session:
|
||||
self.dump(
|
||||
params[0], params[1], self.database.select(Category, session)
|
||||
)
|
||||
|
||||
case Operation.ImportCategories:
|
||||
# rules = [Category(**row) for row in self.load(params[0])]
|
||||
@ -343,7 +341,7 @@ class Manager:
|
||||
for rule in rules:
|
||||
del rule["type"]
|
||||
|
||||
category.rules = [CategoryRule(**rule) for rule in rules]
|
||||
category.rules = set(CategoryRule(**rule) for rule in rules)
|
||||
if row["schedule"]:
|
||||
category.schedule = CategorySchedule(**row["schedule"])
|
||||
categories.append(category)
|
||||
@ -352,7 +350,12 @@ class Manager:
|
||||
self.database.insert(categories)
|
||||
|
||||
case Operation.ExportCategoryGroups:
|
||||
self.dump(params[0], params[1], self.database.select(CategoryGroup))
|
||||
with self.database.session as session:
|
||||
self.dump(
|
||||
params[0],
|
||||
params[1],
|
||||
self.database.select(CategoryGroup, session),
|
||||
)
|
||||
|
||||
case Operation.ImportCategoryGroups:
|
||||
groups = [
|
||||
@ -366,7 +369,7 @@ class Manager:
|
||||
return parse_data(filename, args)
|
||||
|
||||
def askcategory(self, transaction: Transaction):
|
||||
selector = CategorySelector.manual
|
||||
selector = CategorySelector(Selector_T.manual)
|
||||
|
||||
categories = self.database.select(Category)
|
||||
|
||||
@ -380,6 +383,9 @@ class Manager:
|
||||
if format == "pickle":
|
||||
with open(fn, "wb") as f:
|
||||
pickle.dump([e.format for e in sequence], f)
|
||||
elif format == "csv":
|
||||
with open(fn, "w", newline="") as f:
|
||||
csv.writer(f).writerows([e.format.values() for e in sequence])
|
||||
elif format == "json":
|
||||
with open(fn, "w", newline="") as f:
|
||||
json.dump([e.format for e in sequence], f, indent=4, default=str)
|
||||
@ -391,6 +397,8 @@ class Manager:
|
||||
if format == "pickle":
|
||||
with open(fn, "rb") as f:
|
||||
return pickle.load(f)
|
||||
elif format == "csv":
|
||||
raise Exception("CSV import not supported")
|
||||
else:
|
||||
print("format not well specified")
|
||||
return []
|
||||
@ -407,5 +415,6 @@ class Manager:
|
||||
self._database = Client(self._db, echo=self._verbosity > 2)
|
||||
return self._database
|
||||
|
||||
def nordigen_client(self) -> NordigenClient:
|
||||
return NordigenClient(NordigenCredentialsManager.default, self.database)
|
||||
@staticmethod
|
||||
def nordigen_client() -> NordigenClient:
|
||||
return NordigenClient(NordigenCredentialsManager.default)
|
||||
|
||||
@ -1,11 +1,10 @@
|
||||
from collections.abc import Sequence
|
||||
from copy import deepcopy
|
||||
from sqlalchemy import Engine, create_engine, delete, select, update
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.orm import Session, sessionmaker
|
||||
from typing import Any, Mapping, Optional, Type, TypeVar
|
||||
|
||||
from pfbudget.db.exceptions import InsertError
|
||||
# from pfbudget.db.exceptions import InsertError, SelectError
|
||||
|
||||
|
||||
class DatabaseSession:
|
||||
@ -17,17 +16,10 @@ class DatabaseSession:
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
|
||||
try:
|
||||
if exc_type:
|
||||
self.__session.rollback()
|
||||
else:
|
||||
self.__session.commit()
|
||||
except IntegrityError as e:
|
||||
raise InsertError() from e
|
||||
finally:
|
||||
self.__session.close()
|
||||
|
||||
def close(self):
|
||||
self.__session.close()
|
||||
|
||||
def insert(self, sequence: Sequence[Any]) -> None:
|
||||
@ -41,10 +33,7 @@ class DatabaseSession:
|
||||
else:
|
||||
stmt = select(what)
|
||||
|
||||
return self.__session.scalars(stmt).unique().all()
|
||||
|
||||
def delete(self, obj: Any) -> None:
|
||||
self.__session.delete(obj)
|
||||
return self.__session.scalars(stmt).all()
|
||||
|
||||
|
||||
class Client:
|
||||
@ -61,16 +50,13 @@ class Client:
|
||||
T = TypeVar("T")
|
||||
|
||||
def select(self, what: Type[T], exists: Optional[Any] = None) -> Sequence[T]:
|
||||
session = self.session
|
||||
result = session.select(what, exists)
|
||||
session.close()
|
||||
return result
|
||||
return self.session.select(what, exists)
|
||||
|
||||
def update(self, what: Type[Any], values: Sequence[Mapping[str, Any]]) -> None:
|
||||
with self._sessionmaker() as session, session.begin():
|
||||
session.execute(update(what), values)
|
||||
|
||||
def delete(self, what: Type[Any], column: Any, values: Sequence[Any]) -> None:
|
||||
def delete(self, what: Type[Any], column: Any, values: Sequence[str]) -> None:
|
||||
with self._sessionmaker() as session, session.begin():
|
||||
session.execute(delete(what).where(column.in_(values)))
|
||||
|
||||
|
||||
@ -1,11 +1,9 @@
|
||||
from __future__ import annotations
|
||||
from collections.abc import Mapping, MutableMapping, Sequence
|
||||
from dataclasses import dataclass
|
||||
import datetime as dt
|
||||
import decimal
|
||||
import enum
|
||||
import re
|
||||
from typing import Annotated, Any, Callable, Optional, Self, cast
|
||||
from typing import Annotated, Any, Optional
|
||||
|
||||
from sqlalchemy import (
|
||||
BigInteger,
|
||||
@ -38,20 +36,6 @@ class Base(MappedAsDataclass, DeclarativeBase):
|
||||
},
|
||||
)
|
||||
|
||||
type_annotation_map = {
|
||||
enum.Enum: Enum(enum.Enum, create_constraint=True, inherit_schema=True),
|
||||
}
|
||||
|
||||
|
||||
@dataclass
|
||||
class Serializable:
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
return dict(class_=type(self).__name__)
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class AccountType(enum.Enum):
|
||||
checking = enum.auto()
|
||||
@ -62,38 +46,36 @@ class AccountType(enum.Enum):
|
||||
MASTERCARD = enum.auto()
|
||||
|
||||
|
||||
class Bank(Base, Serializable):
|
||||
accounttype = Annotated[
|
||||
AccountType,
|
||||
mapped_column(Enum(AccountType, inherit_schema=True)),
|
||||
]
|
||||
|
||||
|
||||
class Export:
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
class Bank(Base, Export):
|
||||
__tablename__ = "banks"
|
||||
|
||||
name: Mapped[str] = mapped_column(primary_key=True)
|
||||
BIC: Mapped[str] = mapped_column(String(8))
|
||||
type: Mapped[AccountType]
|
||||
type: Mapped[accounttype]
|
||||
|
||||
nordigen: Mapped[Optional[NordigenBank]] = relationship(default=None, lazy="joined")
|
||||
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
|
||||
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
nordigen = None
|
||||
if self.nordigen:
|
||||
nordigen = {
|
||||
"bank_id": self.nordigen.bank_id,
|
||||
"requisition_id": self.nordigen.requisition_id,
|
||||
"invert": self.nordigen.invert,
|
||||
}
|
||||
|
||||
return super().serialize() | dict(
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return dict(
|
||||
name=self.name,
|
||||
BIC=self.BIC,
|
||||
type=self.type.name,
|
||||
nordigen=nordigen,
|
||||
type=self.type,
|
||||
nordigen=self.nordigen.format if self.nordigen else None,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
bank = cls(map["name"], map["BIC"], map["type"])
|
||||
if map["nordigen"]:
|
||||
bank.nordigen = NordigenBank(**map["nordigen"])
|
||||
return bank
|
||||
|
||||
|
||||
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
|
||||
|
||||
@ -108,7 +90,7 @@ idpk = Annotated[
|
||||
money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
|
||||
|
||||
|
||||
class Transaction(Base, Serializable):
|
||||
class Transaction(Base, Export):
|
||||
__tablename__ = "transactions"
|
||||
|
||||
id: Mapped[idpk] = mapped_column(init=False)
|
||||
@ -116,83 +98,32 @@ class Transaction(Base, Serializable):
|
||||
description: Mapped[Optional[str]]
|
||||
amount: Mapped[money]
|
||||
|
||||
split: Mapped[bool] = mapped_column(default=False)
|
||||
|
||||
category: Mapped[Optional[TransactionCategory]] = relationship(
|
||||
back_populates="transaction", default=None, lazy="joined"
|
||||
)
|
||||
tags: Mapped[set[TransactionTag]] = relationship(default_factory=set, lazy="joined")
|
||||
note: Mapped[Optional[Note]] = relationship(
|
||||
cascade="all, delete-orphan", passive_deletes=True, default=None, lazy="joined"
|
||||
)
|
||||
split: Mapped[bool] = mapped_column(init=False, default=False)
|
||||
|
||||
type: Mapped[str] = mapped_column(init=False)
|
||||
|
||||
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
|
||||
note: Mapped[Optional[Note]] = relationship(
|
||||
cascade="all, delete-orphan", init=False, passive_deletes=True
|
||||
)
|
||||
tags: Mapped[set[TransactionTag]] = relationship(init=False)
|
||||
|
||||
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
|
||||
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
category = None
|
||||
if self.category:
|
||||
category = {
|
||||
"name": self.category.name,
|
||||
"selector": self.category.selector.name,
|
||||
}
|
||||
|
||||
return super().serialize() | dict(
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return dict(
|
||||
id=self.id,
|
||||
date=self.date.isoformat(),
|
||||
date=self.date,
|
||||
description=self.description,
|
||||
amount=str(self.amount),
|
||||
amount=self.amount,
|
||||
split=self.split,
|
||||
category=category if category else None,
|
||||
tags=[{"tag": tag.tag} for tag in self.tags],
|
||||
note={"note": self.note.note} if self.note else None,
|
||||
type=self.type,
|
||||
category=self.category.format if self.category else None,
|
||||
# TODO note
|
||||
tags=[tag.format for tag in self.tags] if self.tags else None,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def deserialize(
|
||||
cls, map: Mapping[str, Any]
|
||||
) -> Transaction | BankTransaction | MoneyTransaction | SplitTransaction:
|
||||
match map["type"]:
|
||||
case "bank":
|
||||
return BankTransaction.deserialize(map)
|
||||
case "money":
|
||||
return MoneyTransaction.deserialize(map)
|
||||
case "split":
|
||||
return SplitTransaction.deserialize(map)
|
||||
case _:
|
||||
return cls._deserialize(map)
|
||||
|
||||
@classmethod
|
||||
def _deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
category = None
|
||||
if map["category"]:
|
||||
category = TransactionCategory(map["category"]["name"])
|
||||
if map["category"]["selector"]:
|
||||
category.selector = map["category"]["selector"]
|
||||
|
||||
tags: set[TransactionTag] = set()
|
||||
if map["tags"]:
|
||||
tags = set(TransactionTag(t["tag"]) for t in map["tags"])
|
||||
|
||||
note = None
|
||||
if map["note"]:
|
||||
note = Note(map["note"]["note"])
|
||||
|
||||
result = cls(
|
||||
dt.date.fromisoformat(map["date"]),
|
||||
map["description"],
|
||||
map["amount"],
|
||||
map["split"],
|
||||
category,
|
||||
tags,
|
||||
note,
|
||||
)
|
||||
|
||||
if map["id"]:
|
||||
result.id = map["id"]
|
||||
return result
|
||||
|
||||
def __lt__(self, other: Transaction):
|
||||
return self.date < other.date
|
||||
|
||||
@ -203,64 +134,40 @@ idfk = Annotated[
|
||||
|
||||
|
||||
class BankTransaction(Transaction):
|
||||
bank: Mapped[Optional[bankfk]] = mapped_column(default=None)
|
||||
bank: Mapped[bankfk] = mapped_column(nullable=True)
|
||||
|
||||
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
|
||||
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
map = cast(MutableMapping[str, Any], super().serialize())
|
||||
map["bank"] = self.bank
|
||||
return map
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
transaction = cls._deserialize(map)
|
||||
transaction.bank = map["bank"]
|
||||
return transaction
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return super().format | dict(bank=self.bank)
|
||||
|
||||
|
||||
class MoneyTransaction(Transaction):
|
||||
__mapper_args__ = {"polymorphic_identity": "money"}
|
||||
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
return super().serialize()
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
return cls._deserialize(map)
|
||||
|
||||
|
||||
class SplitTransaction(Transaction):
|
||||
original: Mapped[Optional[idfk]] = mapped_column(default=None)
|
||||
original: Mapped[idfk] = mapped_column(nullable=True)
|
||||
|
||||
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
|
||||
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
map = cast(MutableMapping[str, Any], super().serialize())
|
||||
map["original"] = self.original
|
||||
return map
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
transaction = cls._deserialize(map)
|
||||
transaction.original = map["original"]
|
||||
return transaction
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return super().format | dict(original=self.original)
|
||||
|
||||
|
||||
class CategoryGroup(Base, Serializable):
|
||||
class CategoryGroup(Base, Export):
|
||||
__tablename__ = "category_groups"
|
||||
|
||||
name: Mapped[str] = mapped_column(primary_key=True)
|
||||
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
return super().serialize() | dict(name=self.name)
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
return cls(map["name"])
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return dict(name=self.name)
|
||||
|
||||
|
||||
class Category(Base, Serializable, repr=False):
|
||||
class Category(Base, Export):
|
||||
__tablename__ = "categories"
|
||||
|
||||
name: Mapped[str] = mapped_column(primary_key=True)
|
||||
@ -268,67 +175,11 @@ class Category(Base, Serializable, repr=False):
|
||||
ForeignKey(CategoryGroup.name), default=None
|
||||
)
|
||||
|
||||
rules: Mapped[list[CategoryRule]] = relationship(
|
||||
cascade="all, delete-orphan",
|
||||
passive_deletes=True,
|
||||
default_factory=list,
|
||||
lazy="joined",
|
||||
rules: Mapped[set[CategoryRule]] = relationship(
|
||||
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
|
||||
)
|
||||
schedule: Mapped[Optional[CategorySchedule]] = relationship(
|
||||
cascade="all, delete-orphan", passive_deletes=True, default=None, lazy="joined"
|
||||
)
|
||||
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
rules: Sequence[Mapping[str, Any]] = []
|
||||
for rule in self.rules:
|
||||
rules.append(
|
||||
{
|
||||
"start": rule.start.isoformat() if rule.start else None,
|
||||
"end": rule.end.isoformat() if rule.end else None,
|
||||
"description": rule.description,
|
||||
"regex": rule.regex,
|
||||
"bank": rule.bank,
|
||||
"min": str(rule.min) if rule.min is not None else None,
|
||||
"max": str(rule.max) if rule.max is not None else None,
|
||||
}
|
||||
)
|
||||
|
||||
schedule = None
|
||||
if self.schedule:
|
||||
schedule = {
|
||||
"period": self.schedule.period.name if self.schedule.period else None,
|
||||
"period_multiplier": self.schedule.period_multiplier,
|
||||
"amount": self.schedule.amount,
|
||||
}
|
||||
|
||||
return super().serialize() | dict(
|
||||
name=self.name,
|
||||
group=self.group,
|
||||
rules=rules,
|
||||
schedule=schedule,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
rules: list[CategoryRule] = []
|
||||
for rule in map["rules"]:
|
||||
rules.append(
|
||||
CategoryRule(
|
||||
dt.date.fromisoformat(rule["start"]) if rule["start"] else None,
|
||||
dt.date.fromisoformat(rule["end"]) if rule["end"] else None,
|
||||
rule["description"],
|
||||
rule["regex"],
|
||||
rule["bank"],
|
||||
rule["min"],
|
||||
rule["max"],
|
||||
)
|
||||
)
|
||||
|
||||
return cls(
|
||||
map["name"],
|
||||
map["group"],
|
||||
rules,
|
||||
CategorySchedule(**map["schedule"]) if map["schedule"] else None,
|
||||
cascade="all, delete-orphan", passive_deletes=True, default=None
|
||||
)
|
||||
|
||||
def __repr__(self) -> str:
|
||||
@ -337,6 +188,15 @@ class Category(Base, Serializable, repr=False):
|
||||
f" schedule={self.schedule})"
|
||||
)
|
||||
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return dict(
|
||||
name=self.name,
|
||||
group=self.group if self.group else None,
|
||||
rules=[rule.format for rule in self.rules],
|
||||
schedule=self.schedule.format if self.schedule else None,
|
||||
)
|
||||
|
||||
|
||||
catfk = Annotated[
|
||||
str,
|
||||
@ -344,25 +204,20 @@ catfk = Annotated[
|
||||
]
|
||||
|
||||
|
||||
class CategorySelector(enum.Enum):
|
||||
unknown = enum.auto()
|
||||
nullifier = enum.auto()
|
||||
vacations = enum.auto()
|
||||
rules = enum.auto()
|
||||
algorithm = enum.auto()
|
||||
manual = enum.auto()
|
||||
|
||||
|
||||
class TransactionCategory(Base):
|
||||
class TransactionCategory(Base, Export):
|
||||
__tablename__ = "transactions_categorized"
|
||||
|
||||
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
|
||||
name: Mapped[catfk]
|
||||
|
||||
selector: Mapped[CategorySelector] = mapped_column(default=CategorySelector.unknown)
|
||||
selector: Mapped[CategorySelector] = relationship(
|
||||
cascade="all, delete-orphan", lazy="joined"
|
||||
)
|
||||
|
||||
transaction: Mapped[Transaction] = relationship(
|
||||
back_populates="category", init=False, compare=False
|
||||
@property
|
||||
def format(self):
|
||||
return dict(
|
||||
name=self.name, selector=self.selector.format if self.selector else None
|
||||
)
|
||||
|
||||
|
||||
@ -373,85 +228,106 @@ class Note(Base):
|
||||
note: Mapped[str]
|
||||
|
||||
|
||||
class NordigenBank(Base):
|
||||
class Nordigen(Base, Export):
|
||||
__tablename__ = "banks_nordigen"
|
||||
|
||||
name: Mapped[bankfk] = mapped_column(primary_key=True, init=False)
|
||||
name: Mapped[bankfk] = mapped_column(primary_key=True)
|
||||
bank_id: Mapped[Optional[str]]
|
||||
requisition_id: Mapped[Optional[str]]
|
||||
invert: Mapped[Optional[bool]] = mapped_column(default=None)
|
||||
invert: Mapped[Optional[bool]]
|
||||
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return dict(
|
||||
name=self.name,
|
||||
bank_id=self.bank_id,
|
||||
requisition_id=self.requisition_id,
|
||||
invert=self.invert,
|
||||
)
|
||||
|
||||
|
||||
class Tag(Base, Serializable):
|
||||
class Tag(Base):
|
||||
__tablename__ = "tags"
|
||||
|
||||
name: Mapped[str] = mapped_column(primary_key=True)
|
||||
|
||||
rules: Mapped[list[TagRule]] = relationship(
|
||||
cascade="all, delete-orphan",
|
||||
passive_deletes=True,
|
||||
default_factory=list,
|
||||
lazy="joined",
|
||||
rules: Mapped[set[TagRule]] = relationship(
|
||||
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
|
||||
)
|
||||
|
||||
def serialize(self) -> Mapping[str, Any]:
|
||||
rules: Sequence[Mapping[str, Any]] = []
|
||||
for rule in self.rules:
|
||||
rules.append(
|
||||
{
|
||||
"start": rule.start,
|
||||
"end": rule.end,
|
||||
"description": rule.description,
|
||||
"regex": rule.regex,
|
||||
"bank": rule.bank,
|
||||
"min": str(rule.min) if rule.min is not None else None,
|
||||
"max": str(rule.max) if rule.max is not None else None,
|
||||
}
|
||||
)
|
||||
|
||||
return super().serialize() | dict(name=self.name, rules=rules)
|
||||
|
||||
@classmethod
|
||||
def deserialize(cls, map: Mapping[str, Any]) -> Self:
|
||||
rules: list[TagRule] = []
|
||||
for rule in map["rules"]:
|
||||
rules.append(
|
||||
TagRule(
|
||||
dt.date.fromisoformat(rule["start"]) if rule["start"] else None,
|
||||
dt.date.fromisoformat(rule["end"]) if rule["end"] else None,
|
||||
rule["description"],
|
||||
rule["regex"],
|
||||
rule["bank"],
|
||||
rule["min"],
|
||||
rule["max"],
|
||||
)
|
||||
)
|
||||
|
||||
return cls(map["name"], rules)
|
||||
|
||||
|
||||
class TransactionTag(Base, unsafe_hash=True):
|
||||
class TransactionTag(Base, Export):
|
||||
__tablename__ = "transactions_tagged"
|
||||
|
||||
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
|
||||
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
return dict(tag=self.tag)
|
||||
|
||||
class SchedulePeriod(enum.Enum):
|
||||
daily = enum.auto()
|
||||
weekly = enum.auto()
|
||||
monthly = enum.auto()
|
||||
yearly = enum.auto()
|
||||
def __hash__(self):
|
||||
return hash(self.id)
|
||||
|
||||
|
||||
class CategorySchedule(Base):
|
||||
class Selector_T(enum.Enum):
|
||||
unknown = enum.auto()
|
||||
nullifier = enum.auto()
|
||||
vacations = enum.auto()
|
||||
rules = enum.auto()
|
||||
algorithm = enum.auto()
|
||||
manual = enum.auto()
|
||||
|
||||
|
||||
categoryselector = Annotated[
|
||||
Selector_T,
|
||||
mapped_column(Enum(Selector_T, inherit_schema=True), default=Selector_T.unknown),
|
||||
]
|
||||
|
||||
|
||||
class CategorySelector(Base, Export):
|
||||
__tablename__ = "category_selectors"
|
||||
|
||||
id: Mapped[int] = mapped_column(
|
||||
BigInteger,
|
||||
ForeignKey(TransactionCategory.id, ondelete="CASCADE"),
|
||||
primary_key=True,
|
||||
init=False,
|
||||
)
|
||||
selector: Mapped[categoryselector]
|
||||
|
||||
@property
|
||||
def format(self):
|
||||
return dict(selector=self.selector)
|
||||
|
||||
|
||||
class Period(enum.Enum):
|
||||
daily = "daily"
|
||||
weekly = "weekly"
|
||||
monthly = "monthly"
|
||||
yearly = "yearly"
|
||||
|
||||
|
||||
scheduleperiod = Annotated[Selector_T, mapped_column(Enum(Period, inherit_schema=True))]
|
||||
|
||||
|
||||
class CategorySchedule(Base, Export):
|
||||
__tablename__ = "category_schedules"
|
||||
|
||||
name: Mapped[catfk] = mapped_column(primary_key=True, init=False)
|
||||
period: Mapped[Optional[SchedulePeriod]]
|
||||
name: Mapped[catfk] = mapped_column(primary_key=True)
|
||||
period: Mapped[Optional[scheduleperiod]]
|
||||
period_multiplier: Mapped[Optional[int]]
|
||||
amount: Mapped[Optional[int]]
|
||||
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return dict(
|
||||
name=self.name,
|
||||
period=self.period,
|
||||
period_multiplier=self.period_multiplier,
|
||||
amount=self.amount,
|
||||
)
|
||||
|
||||
|
||||
class Link(Base):
|
||||
__tablename__ = "links"
|
||||
@ -460,17 +336,17 @@ class Link(Base):
|
||||
link: Mapped[idfk] = mapped_column(primary_key=True)
|
||||
|
||||
|
||||
class Rule(Base):
|
||||
class Rule(Base, Export):
|
||||
__tablename__ = "rules"
|
||||
|
||||
id: Mapped[idpk] = mapped_column(init=False)
|
||||
start: Mapped[Optional[dt.date]] = mapped_column(default=None)
|
||||
end: Mapped[Optional[dt.date]] = mapped_column(default=None)
|
||||
description: Mapped[Optional[str]] = mapped_column(default=None)
|
||||
regex: Mapped[Optional[str]] = mapped_column(default=None)
|
||||
bank: Mapped[Optional[str]] = mapped_column(default=None)
|
||||
min: Mapped[Optional[money]] = mapped_column(default=None)
|
||||
max: Mapped[Optional[money]] = mapped_column(default=None)
|
||||
start: Mapped[Optional[dt.date]]
|
||||
end: Mapped[Optional[dt.date]]
|
||||
description: Mapped[Optional[str]]
|
||||
regex: Mapped[Optional[str]]
|
||||
bank: Mapped[Optional[str]]
|
||||
min: Mapped[Optional[money]]
|
||||
max: Mapped[Optional[money]]
|
||||
|
||||
type: Mapped[str] = mapped_column(init=False)
|
||||
|
||||
@ -485,16 +361,16 @@ class Rule(Base):
|
||||
valid = re.compile(self.regex, re.IGNORECASE)
|
||||
|
||||
ops = (
|
||||
Rule.exists(self.start, lambda r: t.date >= r),
|
||||
Rule.exists(self.end, lambda r: t.date <= r),
|
||||
Rule.exists(self.start, lambda r: r < t.date),
|
||||
Rule.exists(self.end, lambda r: r > t.date),
|
||||
Rule.exists(self.description, lambda r: r == t.description),
|
||||
Rule.exists(
|
||||
valid,
|
||||
lambda r: r.search(t.description) if t.description else False,
|
||||
),
|
||||
Rule.exists(self.bank, lambda r: r == t.bank),
|
||||
Rule.exists(self.min, lambda r: t.amount >= r),
|
||||
Rule.exists(self.max, lambda r: t.amount <= r),
|
||||
Rule.exists(self.min, lambda r: r < t.amount),
|
||||
Rule.exists(self.max, lambda r: r > t.amount),
|
||||
)
|
||||
|
||||
if all(ops):
|
||||
@ -502,8 +378,21 @@ class Rule(Base):
|
||||
|
||||
return False
|
||||
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return dict(
|
||||
start=self.start,
|
||||
end=self.end,
|
||||
description=self.description,
|
||||
regex=self.regex,
|
||||
bank=self.bank,
|
||||
min=self.min,
|
||||
max=self.max,
|
||||
type=self.type,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def exists(r: Optional[Any], op: Callable[[Any], bool]) -> bool:
|
||||
def exists(r, op) -> bool:
|
||||
return op(r) if r is not None else True
|
||||
|
||||
|
||||
@ -516,13 +405,19 @@ class CategoryRule(Rule):
|
||||
primary_key=True,
|
||||
init=False,
|
||||
)
|
||||
name: Mapped[catfk] = mapped_column(init=False)
|
||||
name: Mapped[catfk]
|
||||
|
||||
__mapper_args__ = {
|
||||
"polymorphic_identity": "category_rule",
|
||||
"polymorphic_load": "selectin",
|
||||
}
|
||||
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return super().format | dict(name=self.name)
|
||||
|
||||
def __hash__(self):
|
||||
return hash(self.id)
|
||||
|
||||
|
||||
class TagRule(Rule):
|
||||
__tablename__ = "tag_rules"
|
||||
@ -533,19 +428,15 @@ class TagRule(Rule):
|
||||
primary_key=True,
|
||||
init=False,
|
||||
)
|
||||
tag: Mapped[str] = mapped_column(
|
||||
ForeignKey(Tag.name, ondelete="CASCADE"), init=False
|
||||
)
|
||||
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
|
||||
|
||||
__mapper_args__ = {
|
||||
"polymorphic_identity": "tag_rule",
|
||||
"polymorphic_load": "selectin",
|
||||
}
|
||||
|
||||
@property
|
||||
def format(self) -> dict[str, Any]:
|
||||
return super().format | dict(tag=self.tag)
|
||||
|
||||
class Nordigen(Base):
|
||||
__tablename__ = "nordigen"
|
||||
|
||||
type: Mapped[str] = mapped_column(primary_key=True)
|
||||
token: Mapped[str]
|
||||
expires: Mapped[dt.datetime]
|
||||
def __hash__(self):
|
||||
return hash(self.id)
|
||||
|
||||
@ -1,16 +1,12 @@
|
||||
from dataclasses import dataclass
|
||||
import datetime as dt
|
||||
import dotenv
|
||||
import json
|
||||
import nordigen
|
||||
import os
|
||||
import requests
|
||||
import time
|
||||
from typing import Any, Optional, Sequence, Tuple
|
||||
import uuid
|
||||
|
||||
from pfbudget.db.client import Client
|
||||
from pfbudget.db.model import Nordigen
|
||||
|
||||
from .exceptions import CredentialsError, DownloadError
|
||||
|
||||
dotenv.load_dotenv()
|
||||
@ -20,38 +16,40 @@ dotenv.load_dotenv()
|
||||
class NordigenCredentials:
|
||||
id: str
|
||||
key: str
|
||||
token: str = ""
|
||||
|
||||
def valid(self) -> bool:
|
||||
return len(self.id) != 0 and len(self.key) != 0
|
||||
return self.id and self.key
|
||||
|
||||
|
||||
class NordigenClient:
|
||||
redirect_url = "https://murta.dev"
|
||||
|
||||
def __init__(self, credentials: NordigenCredentials, client: Client):
|
||||
def __init__(self, credentials: NordigenCredentials):
|
||||
super().__init__()
|
||||
|
||||
if not credentials.valid():
|
||||
raise CredentialsError
|
||||
|
||||
self.__client = nordigen.NordigenClient(
|
||||
self._client = nordigen.NordigenClient(
|
||||
secret_key=credentials.key, secret_id=credentials.id, timeout=5
|
||||
)
|
||||
self.__client.token = self.__token(client)
|
||||
|
||||
def download(self, requisition_id) -> Sequence[dict[str, Any]]:
|
||||
if credentials.token:
|
||||
self._client.token = credentials.token
|
||||
|
||||
def download(self, requisition_id):
|
||||
try:
|
||||
requisition = self.__client.requisition.get_requisition_by_id(
|
||||
requisition_id
|
||||
)
|
||||
requisition = self._client.requisition.get_requisition_by_id(requisition_id)
|
||||
print(requisition)
|
||||
except requests.HTTPError as e:
|
||||
raise DownloadError(e)
|
||||
|
||||
transactions = []
|
||||
transactions = {}
|
||||
for acc in requisition["accounts"]:
|
||||
account = self.__client.account_api(acc)
|
||||
account = self._client.account_api(acc)
|
||||
|
||||
retries = 0
|
||||
downloaded = None
|
||||
while retries < 3:
|
||||
try:
|
||||
downloaded = account.get_transactions()
|
||||
@ -62,93 +60,55 @@ class NordigenClient:
|
||||
time.sleep(1)
|
||||
|
||||
if not downloaded:
|
||||
print(f"Couldn't download transactions for {account.get_metadata()}")
|
||||
print(f"Couldn't download transactions for {account}")
|
||||
continue
|
||||
|
||||
if (
|
||||
"transactions" not in downloaded
|
||||
or "booked" not in downloaded["transactions"]
|
||||
):
|
||||
print(f"{account} doesn't have transactions")
|
||||
continue
|
||||
|
||||
transactions.extend(downloaded["transactions"]["booked"])
|
||||
transactions.update(downloaded)
|
||||
|
||||
return transactions
|
||||
|
||||
def dump(self, bank, downloaded):
|
||||
# @TODO log received JSON
|
||||
pass
|
||||
with open("json/" + bank.name + ".json", "w") as f:
|
||||
json.dump(downloaded, f)
|
||||
|
||||
def new_requisition(
|
||||
self,
|
||||
institution_id: str,
|
||||
max_historical_days: Optional[int] = None,
|
||||
access_valid_for_days: Optional[int] = None,
|
||||
) -> Tuple[str, str]:
|
||||
kwargs = {
|
||||
"max_historical_days": max_historical_days,
|
||||
"access_valid_for_days": access_valid_for_days,
|
||||
}
|
||||
kwargs = {k: v for k, v in kwargs.items() if v is not None}
|
||||
def generate_token(self):
|
||||
self.token = self._client.generate_token()
|
||||
print(f"New access token: {self.token}")
|
||||
return self.token
|
||||
|
||||
req = self.__client.initialize_session(
|
||||
self.redirect_url, institution_id, str(uuid.uuid4()), **kwargs
|
||||
def requisition(self, id: str, country: str = "PT"):
|
||||
requisition = self._client.initialize_session(
|
||||
redirect_uri=self.redirect_url,
|
||||
institution_id=id,
|
||||
reference_id=str(uuid.uuid4()),
|
||||
)
|
||||
return req.link, req.requisition_id
|
||||
return requisition.link, requisition.requisition_id
|
||||
|
||||
def country_banks(self, country: str):
|
||||
return self.__client.institution.get_institutions(country)
|
||||
return self._client.institution.get_institutions(country)
|
||||
|
||||
def __token(self, client: Client) -> str:
|
||||
with client.session as session:
|
||||
token = session.select(Nordigen)
|
||||
# def __token(self):
|
||||
# if token := os.environ.get("TOKEN"):
|
||||
# return token
|
||||
# else:
|
||||
# token = self._client.generate_token()
|
||||
# print(f"New access token: {token}")
|
||||
# return token["access"]
|
||||
|
||||
def datetime(seconds: int) -> dt.datetime:
|
||||
return dt.datetime.now() + dt.timedelta(seconds=seconds)
|
||||
@property
|
||||
def token(self):
|
||||
return self._token
|
||||
|
||||
if not len(token):
|
||||
print("First time nordigen token setup")
|
||||
new = self.__client.generate_token()
|
||||
session.insert(
|
||||
[
|
||||
Nordigen(
|
||||
"access",
|
||||
new["access"],
|
||||
datetime(new["access_expires"]),
|
||||
),
|
||||
Nordigen(
|
||||
"refresh",
|
||||
new["refresh"],
|
||||
datetime(new["refresh_expires"]),
|
||||
),
|
||||
]
|
||||
)
|
||||
|
||||
return new["access"]
|
||||
|
||||
else:
|
||||
access = next(t for t in token if t.type == "access")
|
||||
refresh = next(t for t in token if t.type == "refresh")
|
||||
|
||||
if access.expires > dt.datetime.now():
|
||||
pass
|
||||
elif refresh.expires > dt.datetime.now():
|
||||
new = self.__client.exchange_token(refresh.token)
|
||||
access.token = new["access"]
|
||||
access.expires = datetime(new["access_expires"])
|
||||
else:
|
||||
new = self.__client.generate_token()
|
||||
access.token = new["access"]
|
||||
access.expires = datetime(new["access_expires"])
|
||||
refresh.token = new["refresh"]
|
||||
refresh.expires = datetime(new["refresh_expires"])
|
||||
|
||||
return access.token
|
||||
@token.setter
|
||||
def token(self, value):
|
||||
if self._token:
|
||||
print("Replacing existing token with {value}")
|
||||
self._token = value
|
||||
|
||||
|
||||
class NordigenCredentialsManager:
|
||||
default = NordigenCredentials(
|
||||
os.environ.get("SECRET_ID", ""),
|
||||
os.environ.get("SECRET_KEY", ""),
|
||||
os.environ.get("SECRET_ID"),
|
||||
os.environ.get("SECRET_KEY"),
|
||||
os.environ.get("TOKEN"),
|
||||
)
|
||||
|
||||
@ -1,45 +1,58 @@
|
||||
from __future__ import annotations
|
||||
from collections import namedtuple
|
||||
from decimal import Decimal
|
||||
from importlib import import_module
|
||||
from pathlib import Path
|
||||
import datetime as dt
|
||||
from typing import Any, Callable, NamedTuple, Optional
|
||||
import yaml
|
||||
|
||||
from pfbudget.common.types import NoBankSelected
|
||||
from pfbudget.db.model import BankTransaction
|
||||
from pfbudget.db.model import Transaction
|
||||
from pfbudget.utils import utils
|
||||
|
||||
|
||||
class Index(NamedTuple):
|
||||
date: int = -1
|
||||
text: int = -1
|
||||
value: int = -1
|
||||
negate: bool = False
|
||||
Index = namedtuple(
|
||||
"Index", ["date", "text", "value", "negate"], defaults=[-1, -1, -1, False]
|
||||
)
|
||||
Options = namedtuple(
|
||||
"Options",
|
||||
[
|
||||
"encoding",
|
||||
"separator",
|
||||
"date_fmt",
|
||||
"start",
|
||||
"end",
|
||||
"debit",
|
||||
"credit",
|
||||
"additional_parser",
|
||||
"category",
|
||||
"VISA",
|
||||
"MasterCard",
|
||||
"AmericanExpress",
|
||||
],
|
||||
defaults=[
|
||||
"",
|
||||
"",
|
||||
"",
|
||||
1,
|
||||
None,
|
||||
Index(),
|
||||
Index(),
|
||||
False,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
None,
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class Options(NamedTuple):
|
||||
encoding: str
|
||||
separator: str
|
||||
date_fmt: str
|
||||
start: int = 1
|
||||
end: Optional[int] = None
|
||||
debit: Index = Index()
|
||||
credit: Index = Index()
|
||||
additional_parser: bool = False
|
||||
VISA: Optional[Options] = None
|
||||
MasterCard: Optional[Options] = None
|
||||
AmericanExpress: Optional[Options] = None
|
||||
|
||||
|
||||
def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
|
||||
cfg: dict[str, Any] = yaml.safe_load(open("parsers.yaml"))
|
||||
def parse_data(filename: Path, args: dict) -> list[Transaction]:
|
||||
cfg: dict = yaml.safe_load(open("parsers.yaml"))
|
||||
assert (
|
||||
"Banks" in cfg
|
||||
), "parsers.yaml is missing the Banks section with the list of available banks"
|
||||
|
||||
if not args["bank"]:
|
||||
bank, creditcard = utils.find_credit_institution( # type: ignore
|
||||
bank, creditcard = utils.find_credit_institution(
|
||||
filename, cfg.get("Banks"), cfg.get("CreditCards")
|
||||
)
|
||||
else:
|
||||
@ -47,7 +60,7 @@ def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
|
||||
creditcard = None if not args["creditcard"] else args["creditcard"][0]
|
||||
|
||||
try:
|
||||
options: dict[str, Any] = cfg[bank]
|
||||
options: dict = cfg[bank]
|
||||
except KeyError as e:
|
||||
banks = cfg["Banks"]
|
||||
raise NoBankSelected(f"{e} not a valid bank, try one of {banks}")
|
||||
@ -60,6 +73,9 @@ def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
|
||||
raise NoBankSelected(f"{e} not a valid bank, try one of {creditcards}")
|
||||
bank += creditcard
|
||||
|
||||
if args["category"]:
|
||||
options["category"] = args["category"][0]
|
||||
|
||||
if options.get("additional_parser"):
|
||||
parser = getattr(import_module("pfbudget.extract.parsers"), bank)
|
||||
transactions = parser(filename, bank, options).parse()
|
||||
@ -70,7 +86,7 @@ def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
|
||||
|
||||
|
||||
class Parser:
|
||||
def __init__(self, filename: Path, bank: str, options: dict[str, Any]):
|
||||
def __init__(self, filename: Path, bank: str, options: dict):
|
||||
self.filename = filename
|
||||
self.bank = bank
|
||||
|
||||
@ -81,10 +97,10 @@ class Parser:
|
||||
|
||||
self.options = Options(**options)
|
||||
|
||||
def func(self, transaction: BankTransaction):
|
||||
def func(self, transaction: Transaction):
|
||||
pass
|
||||
|
||||
def parse(self) -> list[BankTransaction]:
|
||||
def parse(self) -> list[Transaction]:
|
||||
transactions = [
|
||||
Parser.transaction(line, self.bank, self.options, self.func)
|
||||
for line in list(open(self.filename, encoding=self.options.encoding))[
|
||||
@ -95,8 +111,7 @@ class Parser:
|
||||
return transactions
|
||||
|
||||
@staticmethod
|
||||
def index(line: list[str], options: Options) -> Index:
|
||||
index = None
|
||||
def index(line: list, options: Options) -> Index:
|
||||
if options.debit.date != -1 and options.credit.date != -1:
|
||||
if options.debit.value != options.credit.value:
|
||||
if line[options.debit.value]:
|
||||
@ -123,57 +138,49 @@ class Parser:
|
||||
else:
|
||||
raise IndexError("No debit not credit indexes available")
|
||||
|
||||
return index if index else Index()
|
||||
return index
|
||||
|
||||
@staticmethod
|
||||
def transaction(
|
||||
line_: str, bank: str, options: Options, func: Callable[[BankTransaction], None]
|
||||
) -> BankTransaction:
|
||||
line = line_.rstrip().split(options.separator)
|
||||
def transaction(line: str, bank: str, options: Options, func) -> Transaction:
|
||||
line = line.rstrip().split(options.separator)
|
||||
index = Parser.index(line, options)
|
||||
|
||||
try:
|
||||
date_str = line[index.date].strip()
|
||||
date = dt.datetime.strptime(date_str, options.date_fmt).date()
|
||||
|
||||
date = (
|
||||
dt.datetime.strptime(line[index.date].strip(), options.date_fmt)
|
||||
.date()
|
||||
.isoformat()
|
||||
)
|
||||
text = line[index.text]
|
||||
|
||||
value = utils.parse_decimal(line[index.value])
|
||||
if index.negate:
|
||||
value = -value
|
||||
|
||||
transaction = BankTransaction(date, text, value, bank=bank)
|
||||
if options.category:
|
||||
category = line[options.category]
|
||||
transaction = Transaction(date, text, bank, value, category)
|
||||
else:
|
||||
transaction = Transaction(date, text, bank, value)
|
||||
|
||||
if options.additional_parser:
|
||||
func(transaction)
|
||||
|
||||
return transaction
|
||||
|
||||
except IndexError:
|
||||
raise IndexError(line_)
|
||||
|
||||
|
||||
class Bank1(Parser):
|
||||
def __init__(self, filename: Path, bank: str, options: dict[str, Any]):
|
||||
def __init__(self, filename: str, bank: str, options: dict):
|
||||
super().__init__(filename, bank, options)
|
||||
self.transfers: list[dt.date] = []
|
||||
self.transfers = []
|
||||
self.transaction_cost = -Decimal("1")
|
||||
|
||||
def func(self, transaction: BankTransaction):
|
||||
if (
|
||||
transaction.description
|
||||
and "transf" in transaction.description.lower()
|
||||
and transaction.amount < 0
|
||||
):
|
||||
transaction.amount -= self.transaction_cost
|
||||
def func(self, transaction: Transaction):
|
||||
if "transf" in transaction.description.lower() and transaction.value < 0:
|
||||
transaction.value -= self.transaction_cost
|
||||
self.transfers.append(transaction.date)
|
||||
|
||||
def parse(self) -> list[BankTransaction]:
|
||||
def parse(self) -> list:
|
||||
transactions = super().parse()
|
||||
for date in self.transfers:
|
||||
transactions.append(
|
||||
BankTransaction(
|
||||
date, "Transaction cost", self.transaction_cost, bank=self.bank
|
||||
)
|
||||
Transaction(date, "Transaction cost", self.bank, self.transaction_cost)
|
||||
)
|
||||
return transactions
|
||||
|
||||
@ -35,4 +35,4 @@ class PSD2Extractor(Extractor):
|
||||
]
|
||||
|
||||
def convert(self, bank, downloaded, start, end):
|
||||
return [convert(t, bank) for t in downloaded]
|
||||
return [convert(t, bank) for t in downloaded["transactions"]["booked"]]
|
||||
|
||||
@ -4,10 +4,11 @@ from typing import Iterable, Sequence
|
||||
from pfbudget.db.model import (
|
||||
CategoryRule,
|
||||
CategorySelector,
|
||||
Selector_T,
|
||||
Transaction,
|
||||
TransactionCategory,
|
||||
TransactionTag,
|
||||
)
|
||||
from .exceptions import TransactionCategorizedError
|
||||
from .transform import Transformer
|
||||
|
||||
|
||||
@ -24,15 +25,12 @@ class Categorizer(Transformer):
|
||||
def transform_inplace(self, transactions: Sequence[Transaction]) -> None:
|
||||
for rule in self.rules:
|
||||
for transaction in transactions:
|
||||
if transaction.category:
|
||||
raise TransactionCategorizedError(transaction)
|
||||
|
||||
if not rule.matches(transaction):
|
||||
continue
|
||||
|
||||
if not transaction.category:
|
||||
transaction.category = TransactionCategory(
|
||||
rule.name, CategorySelector.rules
|
||||
rule.name, CategorySelector(Selector_T.rules)
|
||||
)
|
||||
else:
|
||||
if not transaction.tags:
|
||||
transaction.tags = {TransactionTag(rule.name)}
|
||||
else:
|
||||
transaction.tags.add(TransactionTag(rule.name))
|
||||
|
||||
@ -1,2 +1,6 @@
|
||||
class MoreThanOneMatchError(Exception):
|
||||
pass
|
||||
|
||||
|
||||
class TransactionCategorizedError(Exception):
|
||||
pass
|
||||
|
||||
@ -6,6 +6,7 @@ from .exceptions import MoreThanOneMatchError
|
||||
from .transform import Transformer
|
||||
from pfbudget.db.model import (
|
||||
CategorySelector,
|
||||
Selector_T,
|
||||
Transaction,
|
||||
TransactionCategory,
|
||||
)
|
||||
@ -15,7 +16,7 @@ class Nullifier(Transformer):
|
||||
NULL_DAYS = 4
|
||||
|
||||
def __init__(self, rules=None):
|
||||
self.rules = rules if rules else []
|
||||
self.rules = rules
|
||||
|
||||
def transform(self, transactions: Sequence[Transaction]) -> Sequence[Transaction]:
|
||||
"""transform
|
||||
@ -88,6 +89,6 @@ class Nullifier(Transformer):
|
||||
|
||||
def _nullify(self, transaction: Transaction) -> Transaction:
|
||||
transaction.category = TransactionCategory(
|
||||
"null", selector=CategorySelector.nullifier
|
||||
"null", selector=CategorySelector(Selector_T.nullifier)
|
||||
)
|
||||
return transaction
|
||||
|
||||
1776
poetry.lock
generated
1776
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@ -8,16 +8,17 @@ readme = "README.md"
|
||||
packages = [{include = "pfbudget"}]
|
||||
|
||||
[tool.poetry.dependencies]
|
||||
python = "^3.11"
|
||||
python = "^3.10"
|
||||
codetiming = "^1.4.0"
|
||||
matplotlib = "^3.7.1"
|
||||
nordigen = "^1.3.1"
|
||||
psycopg2 = "^2.9.6"
|
||||
psycopg2 = {extras = ["binary"], version = "^2.9.6"}
|
||||
python-dateutil = "^2.8.2"
|
||||
python-dotenv = "^1.0.0"
|
||||
pyyaml = "^6.0"
|
||||
sqlalchemy = "^2.0.9"
|
||||
|
||||
|
||||
[tool.poetry.group.dev.dependencies]
|
||||
alembic = "^1.10.3"
|
||||
black = "^23.3.0"
|
||||
@ -27,15 +28,11 @@ pytest = "^7.3.0"
|
||||
pytest-cov = "^4.0.0"
|
||||
pytest-mock = "^3.10.0"
|
||||
sqlalchemy = {extras = ["mypy"], version = "^2.0.9"}
|
||||
ruff = "^0.0.267"
|
||||
|
||||
[build-system]
|
||||
requires = ["poetry-core"]
|
||||
build-backend = "poetry.core.masonry.api"
|
||||
|
||||
[tool.pytest.ini_options]
|
||||
pythonpath = ". tests"
|
||||
|
||||
[pytest]
|
||||
mock_use_standalone_module = true
|
||||
|
||||
|
||||
@ -1,8 +0,0 @@
|
||||
from pfbudget.db.model import AccountType, Bank, NordigenBank
|
||||
|
||||
|
||||
checking = Bank(
|
||||
"bank", "BANK", AccountType.checking, NordigenBank("bank_id", "requisition_id")
|
||||
)
|
||||
|
||||
cc = Bank("cc", "CC", AccountType.MASTERCARD)
|
||||
@ -1,21 +1,15 @@
|
||||
from decimal import Decimal
|
||||
|
||||
from pfbudget.db.model import Category, CategoryGroup, CategoryRule, Tag, TagRule
|
||||
from pfbudget.db.model import Category, CategoryRule, Tag, TagRule
|
||||
|
||||
category_null = Category("null")
|
||||
|
||||
categorygroup1 = CategoryGroup("group#1")
|
||||
category_null = Category("null", None, set())
|
||||
|
||||
category1 = Category(
|
||||
"cat#1",
|
||||
"group#1",
|
||||
rules=[CategoryRule(description="desc#1", max=Decimal(0))],
|
||||
None,
|
||||
{CategoryRule(None, None, "desc#1", None, None, None, Decimal(0), "cat#1")},
|
||||
)
|
||||
|
||||
category2 = Category(
|
||||
"cat#2",
|
||||
"group#1",
|
||||
rules=[CategoryRule(description="desc#1", max=Decimal(0))],
|
||||
tag_1 = Tag(
|
||||
"tag#1", {TagRule(None, None, "desc#1", None, None, None, Decimal(0), "tag#1")}
|
||||
)
|
||||
|
||||
tag_1 = Tag("tag#1", rules=[TagRule(description="desc#1", max=Decimal(0))])
|
||||
|
||||
@ -1,22 +0,0 @@
|
||||
import datetime as dt
|
||||
|
||||
from pfbudget.db.client import Client
|
||||
from pfbudget.db.model import Base, Nordigen
|
||||
|
||||
|
||||
class MockClient(Client):
|
||||
now = dt.datetime.now()
|
||||
|
||||
def __init__(self):
|
||||
url = "sqlite://"
|
||||
super().__init__(
|
||||
url, execution_options={"schema_translate_map": {"pfbudget": None}}
|
||||
)
|
||||
Base.metadata.create_all(self.engine)
|
||||
|
||||
self.insert(
|
||||
[
|
||||
Nordigen("access", "token#1", self.now + dt.timedelta(days=1)),
|
||||
Nordigen("refresh", "token#2", self.now + dt.timedelta(days=30)),
|
||||
]
|
||||
)
|
||||
@ -1,11 +1,3 @@
|
||||
from typing import Any, Dict, List, Optional
|
||||
import nordigen
|
||||
from nordigen.types.http_enums import HTTPMethod
|
||||
from nordigen.types.types import RequisitionDto, TokenType
|
||||
|
||||
from pfbudget.extract.nordigen import NordigenCredentials
|
||||
|
||||
|
||||
id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
|
||||
|
||||
accounts_id = {
|
||||
@ -18,7 +10,6 @@ accounts_id = {
|
||||
"owner_name": "string",
|
||||
}
|
||||
|
||||
# The downloaded transactions match the simple and simple_transformed mocks
|
||||
accounts_id_transactions = {
|
||||
"transactions": {
|
||||
"booked": [
|
||||
@ -89,58 +80,3 @@ requisitions_id = {
|
||||
"account_selection": False,
|
||||
"redirect_immediate": False,
|
||||
}
|
||||
|
||||
credentials = NordigenCredentials("ID", "KEY")
|
||||
|
||||
|
||||
class MockNordigenClient(nordigen.NordigenClient):
|
||||
def __init__(
|
||||
self,
|
||||
secret_key: str = "ID",
|
||||
secret_id: str = "KEY",
|
||||
timeout: int = 10,
|
||||
base_url: str = "https://ob.nordigen.com/api/v2",
|
||||
) -> None:
|
||||
super().__init__(secret_key, secret_id, timeout, base_url)
|
||||
|
||||
def generate_token(self) -> TokenType:
|
||||
return {
|
||||
"access": "access_token",
|
||||
"refresh": "refresh_token",
|
||||
"access_expires": 86400,
|
||||
"refresh_expires": 2592000,
|
||||
}
|
||||
|
||||
def exchange_token(self, refresh_token: str) -> TokenType:
|
||||
assert len(refresh_token) > 0, "invalid refresh token"
|
||||
return {
|
||||
"access": "access_token",
|
||||
"refresh": "refresh_token",
|
||||
"access_expires": 86400,
|
||||
"refresh_expires": 2592000,
|
||||
}
|
||||
|
||||
def request(
|
||||
self,
|
||||
method: HTTPMethod,
|
||||
endpoint: str,
|
||||
data: Optional[Dict[str, Any]] = None,
|
||||
headers: Optional[Dict[str, Any]] = None,
|
||||
) -> Any:
|
||||
if endpoint == "requisitions/" + "requisition_id" + "/":
|
||||
return requisitions_id
|
||||
elif endpoint == "accounts/" + id + "/transactions/":
|
||||
return accounts_id_transactions
|
||||
else:
|
||||
raise NotImplementedError(endpoint)
|
||||
|
||||
def initialize_session(
|
||||
self,
|
||||
redirect_uri: str,
|
||||
institution_id: str,
|
||||
reference_id: str,
|
||||
max_historical_days: int = 90,
|
||||
access_valid_for_days: int = 90,
|
||||
access_scope: List[str] | None = None,
|
||||
) -> RequisitionDto:
|
||||
return RequisitionDto("http://random", "requisition_id")
|
||||
|
||||
@ -1,73 +0,0 @@
|
||||
from datetime import date
|
||||
from decimal import Decimal
|
||||
|
||||
from pfbudget.db.model import (
|
||||
BankTransaction,
|
||||
CategorySelector,
|
||||
MoneyTransaction,
|
||||
Note,
|
||||
SplitTransaction,
|
||||
Transaction,
|
||||
TransactionCategory,
|
||||
TransactionTag,
|
||||
)
|
||||
|
||||
# The simple and simple_transformed match the nordigen mocks
|
||||
simple = [
|
||||
BankTransaction(date(2023, 1, 14), "string", Decimal("328.18"), bank="bank"),
|
||||
BankTransaction(date(2023, 2, 14), "string", Decimal("947.26"), bank="bank"),
|
||||
]
|
||||
|
||||
simple_transformed = [
|
||||
BankTransaction(
|
||||
date(2023, 1, 14),
|
||||
"",
|
||||
Decimal("328.18"),
|
||||
bank="bank",
|
||||
category=TransactionCategory("category#1", CategorySelector.algorithm),
|
||||
),
|
||||
BankTransaction(
|
||||
date(2023, 2, 14),
|
||||
"",
|
||||
Decimal("947.26"),
|
||||
bank="bank",
|
||||
category=TransactionCategory("category#2", CategorySelector.algorithm),
|
||||
),
|
||||
]
|
||||
|
||||
bank = [
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-10"), bank="bank#1"),
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-10"), bank="bank#2"),
|
||||
]
|
||||
|
||||
money = [
|
||||
MoneyTransaction(date(2023, 1, 1), "", Decimal("-10")),
|
||||
MoneyTransaction(date(2023, 1, 1), "", Decimal("-10")),
|
||||
]
|
||||
|
||||
__original = Transaction(date(2023, 1, 1), "", Decimal("-10"), split=True)
|
||||
__original.id = 9000
|
||||
|
||||
split = [
|
||||
__original,
|
||||
SplitTransaction(date(2023, 1, 1), "", Decimal("-5"), original=__original.id),
|
||||
SplitTransaction(date(2023, 1, 1), "", Decimal("-5"), original=__original.id),
|
||||
]
|
||||
|
||||
tagged = [
|
||||
Transaction(
|
||||
date(2023, 1, 1),
|
||||
"",
|
||||
Decimal("-10"),
|
||||
tags={TransactionTag("tag#1"), TransactionTag("tag#1")},
|
||||
)
|
||||
]
|
||||
|
||||
noted = [
|
||||
Transaction(
|
||||
date(2023, 1, 1),
|
||||
"",
|
||||
Decimal("-10"),
|
||||
note=Note("note#1"),
|
||||
)
|
||||
]
|
||||
@ -1,144 +0,0 @@
|
||||
from pathlib import Path
|
||||
from typing import Any, Sequence, Type
|
||||
import pytest
|
||||
|
||||
from mocks import banks, categories, transactions
|
||||
from mocks.client import MockClient
|
||||
|
||||
from pfbudget.common.types import ExportFormat
|
||||
from pfbudget.core.command import (
|
||||
BackupCommand,
|
||||
ExportCommand,
|
||||
ImportBackupCommand,
|
||||
ImportCommand,
|
||||
ImportFailedError,
|
||||
)
|
||||
from pfbudget.db.client import Client
|
||||
from pfbudget.db.model import (
|
||||
Bank,
|
||||
BankTransaction,
|
||||
Base,
|
||||
Category,
|
||||
CategoryGroup,
|
||||
MoneyTransaction,
|
||||
Note,
|
||||
SplitTransaction,
|
||||
Tag,
|
||||
Transaction,
|
||||
TransactionCategory,
|
||||
TransactionTag,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client() -> Client:
|
||||
return MockClient()
|
||||
|
||||
|
||||
params = [
|
||||
(transactions.simple, Transaction),
|
||||
(transactions.simple_transformed, Transaction),
|
||||
(transactions.bank, Transaction),
|
||||
(transactions.bank, BankTransaction),
|
||||
(transactions.money, Transaction),
|
||||
(transactions.money, MoneyTransaction),
|
||||
(transactions.split, SplitTransaction),
|
||||
([banks.checking, banks.cc], Bank),
|
||||
([categories.category_null, categories.category1, categories.category2], Category),
|
||||
(
|
||||
[
|
||||
categories.categorygroup1,
|
||||
categories.category_null,
|
||||
categories.category1,
|
||||
categories.category2,
|
||||
],
|
||||
CategoryGroup,
|
||||
),
|
||||
([categories.tag_1], Tag),
|
||||
]
|
||||
|
||||
not_serializable = [
|
||||
(transactions.simple_transformed, TransactionCategory),
|
||||
(transactions.tagged, TransactionTag),
|
||||
(transactions.noted, Note),
|
||||
]
|
||||
|
||||
|
||||
class TestBackup:
|
||||
@pytest.mark.parametrize("input, what", params)
|
||||
def test_import(self, tmp_path: Path, input: Sequence[Any], what: Type[Any]):
|
||||
file = tmp_path / "test.json"
|
||||
|
||||
client = MockClient()
|
||||
client.insert(input)
|
||||
originals = client.select(what)
|
||||
|
||||
assert originals
|
||||
|
||||
command = ExportCommand(client, what, file, ExportFormat.JSON)
|
||||
command.execute()
|
||||
|
||||
other = MockClient()
|
||||
command = ImportCommand(other, what, file, ExportFormat.JSON)
|
||||
command.execute()
|
||||
|
||||
imported = other.select(what)
|
||||
|
||||
assert originals == imported
|
||||
|
||||
command = ExportCommand(client, what, file, ExportFormat.pickle)
|
||||
with pytest.raises(AttributeError):
|
||||
command.execute()
|
||||
|
||||
command = ImportCommand(other, what, file, ExportFormat.pickle)
|
||||
with pytest.raises(AttributeError):
|
||||
command.execute()
|
||||
|
||||
@pytest.mark.parametrize("input, what", not_serializable)
|
||||
def test_try_backup_not_serializable(
|
||||
self, tmp_path: Path, input: Sequence[Any], what: Type[Any]
|
||||
):
|
||||
file = tmp_path / "test.json"
|
||||
|
||||
client = MockClient()
|
||||
client.insert(input)
|
||||
originals = client.select(what)
|
||||
assert originals
|
||||
|
||||
command = ExportCommand(client, what, file, ExportFormat.JSON)
|
||||
|
||||
with pytest.raises(AttributeError):
|
||||
command.execute()
|
||||
|
||||
other = MockClient()
|
||||
command = ImportCommand(other, what, file, ExportFormat.JSON)
|
||||
|
||||
with pytest.raises(ImportFailedError):
|
||||
command.execute()
|
||||
|
||||
imported = other.select(what)
|
||||
assert not imported
|
||||
|
||||
def test_full_backup(self, tmp_path: Path):
|
||||
file = tmp_path / "test.json"
|
||||
|
||||
client = MockClient()
|
||||
client.insert([e for t in params for e in t[0]])
|
||||
|
||||
command = BackupCommand(client, file, ExportFormat.JSON)
|
||||
command.execute()
|
||||
|
||||
other = MockClient()
|
||||
command = ImportBackupCommand(other, file, ExportFormat.JSON)
|
||||
command.execute()
|
||||
|
||||
def subclasses(cls: Type[Any]) -> set[Type[Any]]:
|
||||
return set(cls.__subclasses__()) | {
|
||||
s for c in cls.__subclasses__() for s in subclasses(c)
|
||||
}
|
||||
|
||||
for t in [cls for cls in subclasses(Base)]:
|
||||
originals = client.select(t)
|
||||
imported = other.select(t)
|
||||
|
||||
assert originals == imported, f"{t}"
|
||||
@ -1,54 +0,0 @@
|
||||
import json
|
||||
from pathlib import Path
|
||||
import pytest
|
||||
|
||||
from mocks.client import MockClient
|
||||
import mocks.transactions
|
||||
|
||||
from pfbudget.common.types import ExportFormat
|
||||
from pfbudget.core.command import ExportCommand, ImportCommand
|
||||
from pfbudget.db.client import Client
|
||||
from pfbudget.db.exceptions import InsertError
|
||||
from pfbudget.db.model import Transaction
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def client() -> Client:
|
||||
return MockClient()
|
||||
|
||||
|
||||
class TestCommand:
|
||||
def test_export_json(self, tmp_path: Path, client: Client):
|
||||
file = tmp_path / "test.json"
|
||||
client.insert(mocks.transactions.simple)
|
||||
|
||||
command = ExportCommand(client, Transaction, file, ExportFormat.JSON)
|
||||
command.execute()
|
||||
|
||||
with open(file, newline="") as f:
|
||||
result = json.load(f)
|
||||
assert result == [t.serialize() for t in client.select(Transaction)]
|
||||
|
||||
def test_export_pickle(self, tmp_path: Path, client: Client):
|
||||
file = tmp_path / "test.pickle"
|
||||
command = ExportCommand(client, Transaction, file, ExportFormat.pickle)
|
||||
with pytest.raises(AttributeError):
|
||||
command.execute()
|
||||
|
||||
def test_import_json(self, tmp_path: Path, client: Client):
|
||||
file = tmp_path / "test"
|
||||
client.insert(mocks.transactions.simple)
|
||||
|
||||
command = ExportCommand(client, Transaction, file, ExportFormat.JSON)
|
||||
command.execute()
|
||||
|
||||
# Since the transactions are already in the DB, we expect an insert error
|
||||
with pytest.raises(InsertError):
|
||||
command = ImportCommand(client, Transaction, file, ExportFormat.JSON)
|
||||
command.execute()
|
||||
|
||||
def test_import_pickle(self, tmp_path: Path, client: Client):
|
||||
file = tmp_path / "test"
|
||||
command = ExportCommand(client, Transaction, file, ExportFormat.pickle)
|
||||
with pytest.raises(AttributeError):
|
||||
command.execute()
|
||||
@ -2,14 +2,14 @@ from datetime import date
|
||||
from decimal import Decimal
|
||||
import pytest
|
||||
|
||||
from mocks.client import MockClient
|
||||
|
||||
from pfbudget.db.client import Client
|
||||
from pfbudget.db.model import (
|
||||
AccountType,
|
||||
Bank,
|
||||
NordigenBank,
|
||||
Base,
|
||||
CategorySelector,
|
||||
Nordigen,
|
||||
Selector_T,
|
||||
Transaction,
|
||||
TransactionCategory,
|
||||
)
|
||||
@ -17,21 +17,20 @@ from pfbudget.db.model import (
|
||||
|
||||
@pytest.fixture
|
||||
def client() -> Client:
|
||||
return MockClient()
|
||||
url = "sqlite://"
|
||||
client = Client(url, execution_options={"schema_translate_map": {"pfbudget": None}})
|
||||
Base.metadata.create_all(client.engine)
|
||||
return client
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def banks(client: Client) -> list[Bank]:
|
||||
banks = [
|
||||
Bank("bank", "BANK", AccountType.checking, NordigenBank(None, "req", None)),
|
||||
Bank("bank", "BANK", AccountType.checking),
|
||||
Bank("broker", "BROKER", AccountType.investment),
|
||||
Bank("creditcard", "CC", AccountType.MASTERCARD),
|
||||
]
|
||||
|
||||
# fix nordigen bank names which would be generated post DB insert
|
||||
for bank in banks:
|
||||
if bank.nordigen:
|
||||
bank.nordigen.name = bank.name
|
||||
banks[0].nordigen = Nordigen("bank", None, "req", None)
|
||||
|
||||
client.insert(banks)
|
||||
return banks
|
||||
@ -40,22 +39,19 @@ def banks(client: Client) -> list[Bank]:
|
||||
@pytest.fixture
|
||||
def transactions(client: Client) -> list[Transaction]:
|
||||
transactions = [
|
||||
Transaction(
|
||||
date(2023, 1, 1),
|
||||
"",
|
||||
Decimal("-10"),
|
||||
category=TransactionCategory("category", CategorySelector.algorithm),
|
||||
),
|
||||
Transaction(date(2023, 1, 1), "", Decimal("-10")),
|
||||
Transaction(date(2023, 1, 2), "", Decimal("-50")),
|
||||
]
|
||||
transactions[0].category = TransactionCategory(
|
||||
"name", CategorySelector(Selector_T.algorithm)
|
||||
)
|
||||
|
||||
client.insert(transactions)
|
||||
|
||||
# fix ids which would be generated post DB insert
|
||||
for i, transaction in enumerate(transactions):
|
||||
transaction.id = i + 1
|
||||
if transaction.category:
|
||||
transaction.category.id = 1
|
||||
transaction.split = False # default
|
||||
transactions[0].category.id = 1
|
||||
transactions[0].category.selector.id = 1
|
||||
|
||||
return transactions
|
||||
|
||||
@ -125,13 +121,13 @@ class TestDatabase:
|
||||
def test_update_nordigen(self, client: Client, banks: list[Bank]):
|
||||
name = banks[0].name
|
||||
|
||||
result = client.select(NordigenBank, lambda: NordigenBank.name == name)
|
||||
result = client.select(Nordigen, lambda: Nordigen.name == name)
|
||||
assert result[0].requisition_id == "req"
|
||||
|
||||
update = {"name": name, "requisition_id": "anotherreq"}
|
||||
client.update(NordigenBank, [update])
|
||||
client.update(Nordigen, [update])
|
||||
|
||||
result = client.select(NordigenBank, lambda: NordigenBank.name == name)
|
||||
result = client.select(Nordigen, lambda: Nordigen.name == name)
|
||||
assert result[0].requisition_id == "anotherreq"
|
||||
|
||||
result = client.select(Bank, lambda: Bank.name == name)
|
||||
|
||||
@ -31,8 +31,8 @@ class TestDatabaseLoad:
|
||||
|
||||
def test_insert(self, loader: Loader):
|
||||
transactions = [
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
|
||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
|
||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
|
||||
]
|
||||
|
||||
loader.load(transactions)
|
||||
|
||||
@ -4,10 +4,9 @@ from typing import Any, Optional
|
||||
import pytest
|
||||
import requests
|
||||
|
||||
from mocks.client import MockClient
|
||||
import mocks.nordigen as mock
|
||||
|
||||
from pfbudget.db.model import AccountType, Bank, BankTransaction, NordigenBank
|
||||
from pfbudget.db.model import AccountType, Bank, BankTransaction, Nordigen
|
||||
from pfbudget.extract.exceptions import BankError, CredentialsError
|
||||
from pfbudget.extract.extract import Extractor
|
||||
from pfbudget.extract.nordigen import NordigenClient, NordigenCredentials
|
||||
@ -59,13 +58,14 @@ def mock_requests(monkeypatch: pytest.MonkeyPatch):
|
||||
|
||||
@pytest.fixture
|
||||
def extractor() -> Extractor:
|
||||
credentials = NordigenCredentials("ID", "KEY")
|
||||
return PSD2Extractor(NordigenClient(credentials, MockClient()))
|
||||
credentials = NordigenCredentials("ID", "KEY", "TOKEN")
|
||||
return PSD2Extractor(NordigenClient(credentials))
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def bank() -> Bank:
|
||||
bank = Bank("Bank#1", "", AccountType.checking, NordigenBank("", mock.id, False))
|
||||
bank = Bank("Bank#1", "", AccountType.checking)
|
||||
bank.nordigen = Nordigen("", "", mock.id, False)
|
||||
return bank
|
||||
|
||||
|
||||
@ -73,7 +73,7 @@ class TestExtractPSD2:
|
||||
def test_empty_credentials(self):
|
||||
cred = NordigenCredentials("", "")
|
||||
with pytest.raises(CredentialsError):
|
||||
NordigenClient(cred, MockClient())
|
||||
NordigenClient(cred)
|
||||
|
||||
def test_no_psd2_bank(self, extractor: Extractor):
|
||||
with pytest.raises(BankError):
|
||||
@ -88,17 +88,12 @@ class TestExtractPSD2:
|
||||
with pytest.raises(requests.Timeout):
|
||||
extractor.extract(bank)
|
||||
|
||||
def test_extract(
|
||||
self, monkeypatch: pytest.MonkeyPatch, extractor: Extractor, bank: Bank
|
||||
):
|
||||
monkeypatch.setattr(
|
||||
"pfbudget.extract.nordigen.NordigenClient.dump", lambda *args: None
|
||||
)
|
||||
def test_extract(self, extractor: Extractor, bank: Bank):
|
||||
assert extractor.extract(bank) == [
|
||||
BankTransaction(
|
||||
dt.date(2023, 1, 14), "string", Decimal("328.18"), bank="Bank#1"
|
||||
dt.date(2023, 1, 14), "string", Decimal("328.18"), "Bank#1"
|
||||
),
|
||||
BankTransaction(
|
||||
dt.date(2023, 2, 14), "string", Decimal("947.26"), bank="Bank#1"
|
||||
dt.date(2023, 2, 14), "string", Decimal("947.26"), "Bank#1"
|
||||
),
|
||||
]
|
||||
|
||||
@ -5,9 +5,9 @@ import mocks.categories as mock
|
||||
|
||||
from pfbudget.db.model import (
|
||||
BankTransaction,
|
||||
Category,
|
||||
CategoryRule,
|
||||
CategorySelector,
|
||||
Selector_T,
|
||||
TransactionCategory,
|
||||
TransactionTag,
|
||||
)
|
||||
@ -20,8 +20,8 @@ from pfbudget.transform.transform import Transformer
|
||||
class TestTransform:
|
||||
def test_nullifier(self):
|
||||
transactions = [
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
|
||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
|
||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
|
||||
]
|
||||
|
||||
for t in transactions:
|
||||
@ -31,12 +31,14 @@ class TestTransform:
|
||||
transactions = categorizer.transform(transactions)
|
||||
|
||||
for t in transactions:
|
||||
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
|
||||
assert t.category == TransactionCategory(
|
||||
"null", CategorySelector(Selector_T.nullifier)
|
||||
)
|
||||
|
||||
def test_nullifier_inplace(self):
|
||||
transactions = [
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
|
||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
|
||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
|
||||
]
|
||||
|
||||
for t in transactions:
|
||||
@ -46,20 +48,20 @@ class TestTransform:
|
||||
categorizer.transform_inplace(transactions)
|
||||
|
||||
for t in transactions:
|
||||
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
|
||||
assert t.category == TransactionCategory(
|
||||
"null", CategorySelector(Selector_T.nullifier)
|
||||
)
|
||||
|
||||
def test_nullifier_with_rules(self):
|
||||
transactions = [
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
|
||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
|
||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
|
||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
|
||||
]
|
||||
|
||||
for t in transactions:
|
||||
assert not t.category
|
||||
|
||||
rule = CategoryRule(bank="Bank#1")
|
||||
rule.name = "null"
|
||||
rules = [rule]
|
||||
rules = [CategoryRule(None, None, None, None, "Bank#1", None, None, "null")]
|
||||
|
||||
categorizer: Transformer = Nullifier(rules)
|
||||
transactions = categorizer.transform(transactions)
|
||||
@ -67,28 +69,24 @@ class TestTransform:
|
||||
for t in transactions:
|
||||
assert not t.category
|
||||
|
||||
rule = CategoryRule(bank="Bank#2")
|
||||
rule.name = "null"
|
||||
rules.append(rule)
|
||||
rules.append(CategoryRule(None, None, None, None, "Bank#2", None, None, "null"))
|
||||
categorizer = Nullifier(rules)
|
||||
transactions = categorizer.transform(transactions)
|
||||
|
||||
for t in transactions:
|
||||
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
|
||||
assert t.category == TransactionCategory(
|
||||
"null", CategorySelector(Selector_T.nullifier)
|
||||
)
|
||||
|
||||
def test_tagger(self):
|
||||
transactions = [
|
||||
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), bank="Bank#1")
|
||||
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), "Bank#1")
|
||||
]
|
||||
|
||||
for t in transactions:
|
||||
assert not t.category
|
||||
|
||||
rules = mock.tag_1.rules
|
||||
for rule in rules:
|
||||
rule.tag = mock.tag_1.name
|
||||
|
||||
categorizer: Transformer = Tagger(rules)
|
||||
categorizer: Transformer = Tagger(mock.tag_1.rules)
|
||||
transactions = categorizer.transform(transactions)
|
||||
|
||||
for t in transactions:
|
||||
@ -96,32 +94,16 @@ class TestTransform:
|
||||
|
||||
def test_categorize(self):
|
||||
transactions = [
|
||||
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), bank="Bank#1")
|
||||
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), "Bank#1")
|
||||
]
|
||||
|
||||
for t in transactions:
|
||||
assert not t.category
|
||||
|
||||
rules = mock.category1.rules
|
||||
for rule in rules:
|
||||
rule.name = mock.category1.name
|
||||
|
||||
categorizer: Transformer = Categorizer(rules)
|
||||
categorizer: Transformer = Categorizer(mock.category1.rules)
|
||||
transactions = categorizer.transform(transactions)
|
||||
|
||||
for t in transactions:
|
||||
assert t.category == TransactionCategory("cat#1", CategorySelector.rules)
|
||||
|
||||
def test_rule_limits(self):
|
||||
transactions = [
|
||||
BankTransaction(date.today(), "", Decimal("-60"), bank="Bank#1"),
|
||||
BankTransaction(date.today(), "", Decimal("-120"), bank="Bank#1"),
|
||||
]
|
||||
|
||||
cat = Category("cat")
|
||||
cat.rules = [CategoryRule(min=-120, max=-60)]
|
||||
for r in cat.rules:
|
||||
r.name = cat.name
|
||||
|
||||
transactions = Categorizer(cat.rules).transform(transactions)
|
||||
assert all(t.category.name == cat.name for t in transactions)
|
||||
assert t.category == TransactionCategory(
|
||||
"cat#1", CategorySelector(Selector_T.rules)
|
||||
)
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user