Compare commits

..

32 Commits

Author SHA1 Message Date
29c5206638
Remove JSON dump 2024-01-22 22:02:50 +00:00
f44c04d678
Ignore SQLite DB file 2024-01-22 22:02:43 +00:00
2267bb29b3
Update dependencies
Now using the PostgreSQL client library from the distribution package.
2024-01-22 22:02:38 +00:00
abf82176e5
Improve error logging on incorrect parsing 2024-01-22 22:02:31 +00:00
c6a13cc83a
Allow multi-category transactions
through the use of tags. Instead of failing when categorizing a
transaction which already has a category, add the new category as a tag
for that transaction.

Issue #2
2024-01-22 22:02:25 +00:00
a7b74237aa
Fix rule's values limits
They are supposed to catch also the explicit values, so the limits are
inclusive.
2024-01-22 22:02:19 +00:00
95eff24418
Remove null rules from Categorizer
and allow for multiple null rules on the Nullifier.

Also create an empty list on the Nullifier constructor to simplify the
logic afterwards, in the case no "null" rule exist.
2024-01-22 22:02:09 +00:00
f966868736
Update Interactive to new DB interface
Take the opportunity to improve the loop structure, so fix current
out-of-bound error.
2024-01-22 22:02:00 +00:00
2a68ddd152
[Refactor] Streamline the output of nordigen
Clean the output of the nordigen to return a list of transactions,
instead of its internal structure.
2024-01-22 22:01:51 +00:00
1cfc8dbe38
Create EUA and request user access permission
Move the token creation/renewal code into the NordigenClient and remove
external access. The NordigenClient now also takes the DB client in the
constructor.

While creating the unit test, noticed that the nordigen mocks for
downloaded transactions could match the simple transactions mock, thus
helping in the online download command.
Also created the nordigen.NordigenClient mock, with the used methods
mocked and raising a NotImplement when a new endpoint is requested.
2024-01-22 22:01:38 +00:00
420a6cdfaa
Nordigen token generation/refresh logic
Adds a new table, nordigen, with the access and refresh token, along
with their validity.
The Export/Import test would raise an integrety with the use of a real
DB and the export of the transaction IDs, so add a try-except block to
the database session to catch the error and re-raise an ImportError.
2024-01-22 21:59:24 +00:00
ea546fc2df
Fix typing warning on parsers.py 2024-01-22 21:57:16 +00:00
d11f753aa0
[Fix] Eager loading for subclasses
and CategoryGroup import/export.
2024-01-22 21:56:34 +00:00
e6622d1e19
Full backup creation and import commands
Using the same logic as the single Export/Import commands, implement the
entire backup command by exporting all the serializable classes into a
single json file.
To select the correct class upon import, save a new property on the
backup json, the class_, which contains the name of the class to be
imported.

Fix the note serialization.
2024-01-22 21:56:17 +00:00
2cf0ba4374
[Export] Remove pickle format
Pickling directly from a DB object will also save DB related attributes,
namely the state of the object. When exporting values from a DB select,
they will be in detached state, which means on the import, the session
will not insert the object back into the DB, it already assumes they are
there.
2024-01-22 21:53:49 +00:00
6574165f8f
Complete export->import cycle
Add the transaction ID to the export of transactions and enable the
import of SplitTransactions. The ID should function as a stable
transaction identifier.
Fix tag import on transactions.
Fix rules import of date for DBs that need datetime.date format.
2024-01-22 21:52:00 +00:00
42d84b02f4
Removes unnecessary FK from Rules __init__
The FK are correctly initialized when the rules are created as part of
the base category/tag.
Also removes the name from the CategorySchedule, same logic applies.
2024-01-22 21:51:49 +00:00
729e15d4e8
Adds ImportFailedError for non-serializable types 2024-01-22 21:51:02 +00:00
638b833c74
ImportCommand and Serializable types
The new command ImportCommand takes a Serializable type, from which it
can call the deserialize method to generate a DB ORM type. The
Serializable interface also declares the serialize method.

(De)serialization moved to the ORM types, due to the inability to
properly use overloading.
Possible improvement for the future is to merge serialization
information on JSONDecoder/Encoder classes.

Adds a MockClient with the in-memory SQLite DB which can be used by
tests.
Most types export/import functionally tested using two DBs and comparing
entries.
2024-01-22 21:49:56 +00:00
21099909c9
Allow Nordigen parameter in Bank constructor
and remove Nordigen FK to Bank from Nordigen constructor. It is auto
generated upon DB insertion.
2024-01-22 21:47:47 +00:00
3698654715
[DB missing] Remove SQLAlchemy Enum 2024-01-22 21:47:47 +00:00
271130b107
[DB] Rules collection using list instead of set
There pains of using a set as the aggregation of rules overweight the
small advantage. While it would make detecting similar rules faster, the
use of a collection type not inherently supporting by JSON brings some
issues in the serialization.
2024-01-22 21:47:47 +00:00
ec22b5e5bd
[DB][Refactor] Compact the category selector
The `CategorySelector` was possibly added to be incremented with other
attributes. However, since none other that the selector enum is used at
the moment, it is only adding unnecessary cluter.
The category selector value is moved to the parent
`TransactionCategory`.
2024-01-22 21:47:47 +00:00
48fae5483d
[Fix] Select statement dangling session
The `Client` was never closing the `Session` in `DatabaseSession`, which
meant that tests that compared transactions and banks would work since
attributes loaded only on select would still work.
However, this was not the intended behavior. Calling select from the
client with using sessions should detach objects completely from a DB
session. Therefore, attributes on Transaction, TransactionCategory and
Bank are now loaded on a join (lazy parameter).
2024-01-22 21:47:47 +00:00
1628f6c1ea
Update to python 3.11
Add pythonpath option to pytest so that VSCode testing tool successfully
imports the mocks dir.
2024-01-22 21:47:47 +00:00
60c7028f0b
Expand serializer to more types and test 2024-01-22 21:47:47 +00:00
6057ec59b4
[Deprecate] CSV export/import removed
CSV is not a good format, since it does not have a good support for
complex types, e.g. lists.
It served when the exported format was only a linear transaction, but it
is not worth the extra work around it anymore.
2024-01-22 21:47:47 +00:00
a527fa5e1d
[Fix] Export error on DB access
Export operations were using an old Client select methods signature to
pass in the opened session.
Session is no longer needed to a simple select.
2024-01-22 21:47:47 +00:00
a355ec3642
Initial work on a full backup option
Creates the Backup command and a general serializer.
2024-01-22 21:47:47 +00:00
0a0bc1baa0
Fix possible null access
Detected through the typing warning.
2024-01-22 21:47:47 +00:00
ffc3d477e2
Fix typing for model.py and hash functions
Set unsafe_hash parameter of dataclasses to true on types that need to
be hashed.
The __hash__ method won't be automatically generated since the types
can't be declared as frozen.
2024-01-22 21:45:49 +00:00
01df97ed46
back_populates option on category relationship
Due to the use of the dataclasses mixin on the SQLAlchemy types, a
back_populates creates a RecursiveError when comparing two types. This
occurs because the dataclass will overwrite the __eq__ operator, and it
doesn't know when to stop comparing relationships.

Removing the dataclasses isn't the best approach, since then __init__,
__eq__ and __repr__ methods would have to be added to all types. Thus
the solution was to remove the relationship on the child (on a
one-to-one relationship) from the __eq__ operation, with the use of the
compare parameter.

Took the opportunity to define more logical __init__ methods on the
`Rule` and child classes.
Also revised the parameter options on some DB types.
2024-01-22 21:45:49 +00:00
32 changed files with 2281 additions and 1347 deletions

3
.gitignore vendored
View File

@ -174,3 +174,6 @@ poetry.toml
pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python
# Project specific ignores
database.db

View File

@ -0,0 +1,35 @@
"""nordigen tokens
Revision ID: 325b901ac712
Revises: 60469d5dd2b0
Create Date: 2023-05-25 19:10:10.374008+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "325b901ac712"
down_revision = "60469d5dd2b0"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"nordigen",
sa.Column("type", sa.String(), nullable=False),
sa.Column("token", sa.String(), nullable=False),
sa.Column("expires", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("type", name=op.f("pk_nordigen")),
schema="pfbudget",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("nordigen", schema="pfbudget")
# ### end Alembic commands ###

View File

@ -0,0 +1,48 @@
"""Drop SQLAlchemy enum
Revision ID: 60469d5dd2b0
Revises: b599dafcf468
Create Date: 2023-05-15 19:24:07.911352+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "60469d5dd2b0"
down_revision = "b599dafcf468"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute(
"""
CREATE TYPE pfbudget.scheduleperiod
AS ENUM ('daily', 'weekly', 'monthly', 'yearly')
"""
)
op.execute(
"""ALTER TABLE pfbudget.category_schedules
ALTER COLUMN period TYPE pfbudget.scheduleperiod
USING period::text::pfbudget.scheduleperiod
"""
)
op.execute("DROP TYPE pfbudget.period")
def downgrade() -> None:
op.execute(
"""
CREATE TYPE pfbudget.period
AS ENUM ('daily', 'weekly', 'monthly', 'yearly')
"""
)
op.execute(
"""ALTER TABLE pfbudget.category_schedules
ALTER COLUMN period TYPE pfbudget.period
USING period::text::pfbudget.period
"""
)
op.execute("DROP TYPE pfbudget.scheduleperiod")

View File

@ -0,0 +1,74 @@
"""Compact category selector
Revision ID: 8623e709e111
Revises: ce68ee15e5d2
Create Date: 2023-05-08 19:00:51.063240+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "8623e709e111"
down_revision = "ce68ee15e5d2"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("category_selectors", schema="pfbudget")
op.add_column(
"transactions_categorized",
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="pfbudget",
inherit_schema=True,
),
nullable=False,
),
schema="pfbudget",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("transactions_categorized", "selector", schema="pfbudget")
op.create_table(
"category_selectors",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="pfbudget",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["pfbudget.transactions_categorized.id"],
name="fk_category_selectors_id_transactions_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_category_selectors"),
schema="pfbudget",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,46 @@
"""Selector type name change
Revision ID: b599dafcf468
Revises: 8623e709e111
Create Date: 2023-05-08 19:46:20.661214+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "b599dafcf468"
down_revision = "8623e709e111"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute(
"""
CREATE TYPE pfbudget.categoryselector
AS ENUM ('unknown', 'nullifier', 'vacations', 'rules', 'algorithm', 'manual')
"""
)
op.execute(
"""ALTER TABLE pfbudget.transactions_categorized
ALTER COLUMN selector TYPE pfbudget.categoryselector
USING selector::text::pfbudget.categoryselector
"""
)
op.execute("DROP TYPE pfbudget.selector_t")
def downgrade() -> None:
op.execute(
"""
CREATE TYPE pfbudget.selector_t
AS ENUM ('unknown', 'nullifier', 'vacations', 'rules', 'algorithm', 'manual')
"""
)
op.execute(
"""ALTER TABLE pfbudget.transactions_categorized
ALTER COLUMN selector TYPE pfbudget.selector_t
USING selector::text::pfbudget.selector_t
"""
)
op.execute("DROP TYPE pfbudget.categoryselector")

View File

@ -38,10 +38,10 @@ if __name__ == "__main__":
params = [args["path"], args["bank"], args["creditcard"]]
case Operation.RequisitionId:
keys = {"name", "country"}
keys = {"bank"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["name"][0], args["country"][0]]
params = [args["bank"][0]]
case Operation.Download:
keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
@ -163,14 +163,14 @@ if __name__ == "__main__":
params = [
type.CategoryRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
cat,
start=args["start"][0] if args["start"] else None,
end=args["end"][0] if args["end"] else None,
description=args["description"][0] if args["description"] else None,
regex=args["regex"][0] if args["regex"] else None,
bank=args["bank"][0] if args["bank"] else None,
min=args["min"][0] if args["min"] else None,
max=args["max"][0] if args["max"] else None,
)
for cat in args["category"]
]
@ -215,14 +215,14 @@ if __name__ == "__main__":
params = [
type.TagRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
tag,
start=args["start"][0] if args["start"] else None,
end=args["end"][0] if args["end"] else None,
description=args["description"][0] if args["description"] else None,
regex=args["regex"][0] if args["regex"] else None,
bank=args["bank"][0] if args["bank"] else None,
min=args["min"][0] if args["min"] else None,
max=args["max"][0] if args["max"] else None,
)
for tag in args["tag"]
]

View File

@ -6,7 +6,7 @@ import os
import re
from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, Period
from pfbudget.db.model import AccountType, SchedulePeriod
from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph
@ -60,11 +60,12 @@ def argparser() -> argparse.ArgumentParser:
# init = subparsers.add_parser("init")
# init.set_defaults(op=Operation.Init)
# Exports transactions to .csv file
# Exports transactions to specified format and file
export = subparsers.add_parser("export")
export.set_defaults(op=Operation.Export)
file_options(export)
# Imports transactions from specified format and file
pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import)
file_options(pimport)
@ -132,8 +133,7 @@ def argparser() -> argparse.ArgumentParser:
# PSD2 requisition id
requisition = subparsers.add_parser("eua")
requisition.set_defaults(op=Operation.RequisitionId)
requisition.add_argument("id", nargs=1, type=str)
requisition.add_argument("country", nargs=1, type=str)
requisition.add_argument("bank", nargs=1, type=str)
# Download through the PSD2 API
download = subparsers.add_parser("download", parents=[period])
@ -268,7 +268,7 @@ def category(parser: argparse.ArgumentParser):
schedule = commands.add_parser("schedule")
schedule.set_defaults(op=Operation.CategorySchedule)
schedule.add_argument("category", nargs="+", type=str)
schedule.add_argument("period", nargs=1, choices=[e.value for e in Period])
schedule.add_argument("period", nargs=1, choices=[e.value for e in SchedulePeriod])
schedule.add_argument("--frequency", nargs=1, default=[1], type=int)
rule = commands.add_parser("rule")

View File

@ -3,9 +3,8 @@ import decimal
from ..core.manager import Manager
from ..db.model import (
Category,
CategorySelector,
Note,
Selector_T,
CategorySelector,
SplitTransaction,
Tag,
Transaction,
@ -16,15 +15,13 @@ from ..db.model import (
class Interactive:
help = "category(:tag)/split/note:/skip/quit"
selector = Selector_T.manual
selector = CategorySelector.manual
def __init__(self, manager: Manager) -> None:
self.manager = manager
with self.manager.db.session() as session:
self.categories = session.get(Category)
self.tags = session.get(Tag)
session.expunge_all()
self.categories = self.manager.database.select(Category)
self.tags = self.manager.database.select(Tag)
def intro(self) -> None:
print(
@ -35,28 +32,34 @@ class Interactive:
def start(self) -> None:
self.intro()
with self.manager.db.session() as session:
uncategorized = session.uncategorized()
with self.manager.database.session as session:
uncategorized = session.select(
Transaction, lambda: ~Transaction.category.has()
)
uncategorized.sort()
n = len(uncategorized)
print(f"{n} left to categorize")
i = 0
new = []
next = uncategorized[i]
print(next)
while (command := input("$ ")) != "quit":
while (command := input("$ ")) != "quit" and i < len(uncategorized):
current = uncategorized[i] if len(new) == 0 else new.pop()
print(current)
match command:
case "help":
print(self.help)
case "skip":
if len(uncategorized) == 0:
i += 1
case "quit":
break
case "split":
new = self.split(next)
new = self.split(current)
session.insert(new)
case other:
@ -67,35 +70,32 @@ class Interactive:
if other.startswith("note:"):
# TODO adding notes to a splitted transaction won't allow
# categorization
next.note = Note(other[len("note:") :].strip())
current.note = Note(other[len("note:") :].strip())
else:
ct = other.split(":")
if (category := ct[0]) not in [
c.name for c in self.categories
]:
print(self.help, self.categories)
continue
tags = []
if len(ct) > 1:
tags = ct[1:]
next.category = TransactionCategory(
category, CategorySelector(self.selector)
current.category = TransactionCategory(
category, self.selector
)
for tag in tags:
if tag not in [t.name for t in self.tags]:
session.insert([Tag(tag)])
self.tags = session.get(Tag)
next.tags.add(TransactionTag(tag))
current.tags.add(TransactionTag(tag))
if len(new) == 0:
i += 1
session.commit()
next = uncategorized[i] if len(new) == 0 else new.pop()
print(next)
def split(self, original: Transaction) -> list[SplitTransaction]:
total = original.amount
new = []

View File

@ -51,6 +51,11 @@ class Operation(Enum):
ImportCategoryGroups = auto()
class ExportFormat(Enum):
JSON = auto()
pickle = auto()
class TransactionError(Exception):
pass

128
pfbudget/core/command.py Normal file
View File

@ -0,0 +1,128 @@
from abc import ABC, abstractmethod
import json
from pathlib import Path
import pickle
from typing import Type
from pfbudget.common.types import ExportFormat
from pfbudget.db.client import Client
from pfbudget.db.model import (
Bank,
Category,
CategoryGroup,
Serializable,
Tag,
Transaction,
)
# required for the backup import
import pfbudget.db.model
class Command(ABC):
@abstractmethod
def execute(self) -> None:
raise NotImplementedError
def undo(self) -> None:
raise NotImplementedError
class ExportCommand(Command):
def __init__(
self, client: Client, what: Type[Serializable], fn: Path, format: ExportFormat
):
self.__client = client
self.what = what
self.fn = fn
self.format = format
def execute(self) -> None:
values = self.__client.select(self.what)
match self.format:
case ExportFormat.JSON:
with open(self.fn, "w", newline="") as f:
json.dump([e.serialize() for e in values], f, indent=4)
case ExportFormat.pickle:
raise AttributeError("pickle export not working at the moment!")
with open(self.fn, "wb") as f:
pickle.dump(values, f)
class ImportCommand(Command):
def __init__(
self, client: Client, what: Type[Serializable], fn: Path, format: ExportFormat
):
self.__client = client
self.what = what
self.fn = fn
self.format = format
def execute(self) -> None:
match self.format:
case ExportFormat.JSON:
with open(self.fn, "r") as f:
try:
values = json.load(f)
values = [self.what.deserialize(v) for v in values]
except json.JSONDecodeError as e:
raise ImportFailedError(e)
case ExportFormat.pickle:
raise AttributeError("pickle import not working at the moment!")
with open(self.fn, "rb") as f:
values = pickle.load(f)
self.__client.insert(values)
class ImportFailedError(Exception):
pass
class BackupCommand(Command):
def __init__(self, client: Client, fn: Path, format: ExportFormat) -> None:
self.__client = client
self.fn = fn
self.format = format
def execute(self) -> None:
banks = self.__client.select(Bank)
groups = self.__client.select(CategoryGroup)
categories = self.__client.select(Category)
tags = self.__client.select(Tag)
transactions = self.__client.select(Transaction)
values = [*banks, *groups, *categories, *tags, *transactions]
match self.format:
case ExportFormat.JSON:
with open(self.fn, "w", newline="") as f:
json.dump([e.serialize() for e in values], f, indent=4)
case ExportFormat.pickle:
raise AttributeError("pickle export not working at the moment!")
class ImportBackupCommand(Command):
def __init__(self, client: Client, fn: Path, format: ExportFormat) -> None:
self.__client = client
self.fn = fn
self.format = format
def execute(self) -> None:
match self.format:
case ExportFormat.JSON:
with open(self.fn, "r") as f:
try:
values = json.load(f)
values = [
getattr(pfbudget.db.model, v["class_"]).deserialize(v)
for v in values
]
except json.JSONDecodeError as e:
raise ImportFailedError(e)
case ExportFormat.pickle:
raise AttributeError("pickle import not working at the moment!")
self.__client.insert(values)

View File

@ -1,4 +1,3 @@
import csv
import json
from pathlib import Path
import pickle
@ -14,12 +13,11 @@ from pfbudget.db.model import (
CategoryGroup,
CategoryRule,
CategorySchedule,
CategorySelector,
Link,
MoneyTransaction,
Nordigen,
NordigenBank,
Rule,
Selector_T,
CategorySelector,
SplitTransaction,
Tag,
TagRule,
@ -81,7 +79,7 @@ class Manager:
else:
banks = self.database.select(Bank, Bank.nordigen)
extractor = PSD2Extractor(Manager.nordigen_client())
extractor = PSD2Extractor(self.nordigen_client())
transactions = []
for bank in banks:
@ -103,10 +101,20 @@ class Manager:
categories = session.select(Category)
tags = session.select(Tag)
rules = [cat.rules for cat in categories if cat.name == "null"]
rules = [
rule
for cat in categories
if cat.name == "null"
for rule in cat.rules
]
Nullifier(rules).transform_inplace(uncategorized)
rules = [rule for cat in categories for rule in cat.rules]
rules = [
rule
for cat in categories
if cat.name != "null"
for rule in cat.rules
]
Categorizer(rules).transform_inplace(uncategorized)
rules = [rule for tag in tags for rule in tag.rules]
@ -116,24 +124,34 @@ class Manager:
self.database.update(Bank, params)
case Operation.PSD2Mod:
self.database.update(Nordigen, params)
self.database.update(NordigenBank, params)
case Operation.BankDel:
self.database.delete(Bank, Bank.name, params)
case Operation.PSD2Del:
self.database.delete(Nordigen, Nordigen.name, params)
case Operation.Token:
Manager.nordigen_client().generate_token()
self.database.delete(NordigenBank, NordigenBank.name, params)
case Operation.RequisitionId:
link, _ = Manager.nordigen_client().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
bank_name = params[0]
bank = self.database.select(Bank, (lambda: Bank.name == bank_name))[0]
if not bank.nordigen or not bank.nordigen.bank_id:
raise ValueError(f"{bank} doesn't have a Nordigen ID")
link, req_id = self.nordigen_client().new_requisition(
bank.nordigen.bank_id
)
self.database.update(
NordigenBank,
[{"name": bank.nordigen.name, "requisition_id": req_id}],
)
webbrowser.open(link)
case Operation.PSD2CountryBanks:
banks = Manager.nordigen_client().country_banks(params[0])
banks = self.nordigen_client().country_banks(params[0])
print(banks)
case (
@ -245,10 +263,7 @@ class Manager:
session.insert(transactions)
case Operation.Export:
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(Transaction, session)
)
self.dump(params[0], params[1], self.database.select(Transaction))
case Operation.Import:
transactions = []
@ -274,8 +289,7 @@ class Manager:
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"],
CategorySelector(category["selector"]["selector"]),
category["name"], category["selector"]["selector"]
)
transactions.append(transaction)
@ -284,27 +298,21 @@ class Manager:
self.database.insert(transactions)
case Operation.ExportBanks:
with self.database.session as session:
self.dump(params[0], params[1], self.database.select(Bank, session))
self.dump(params[0], params[1], self.database.select(Bank))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0], params[1]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = Nordigen(**row["nordigen"])
bank.nordigen = NordigenBank(**row["nordigen"])
banks.append(bank)
if self.certify(banks):
self.database.insert(banks)
case Operation.ExportCategoryRules:
with self.database.session as session:
self.dump(
params[0],
params[1],
self.database.select(CategoryRule, session),
)
self.dump(params[0], params[1], self.database.select(CategoryRule))
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
@ -313,10 +321,7 @@ class Manager:
self.database.insert(rules)
case Operation.ExportTagRules:
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(TagRule, session)
)
self.dump(params[0], params[1], self.database.select(TagRule))
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
@ -325,10 +330,7 @@ class Manager:
self.database.insert(rules)
case Operation.ExportCategories:
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(Category, session)
)
self.dump(params[0], params[1], self.database.select(Category))
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
@ -341,7 +343,7 @@ class Manager:
for rule in rules:
del rule["type"]
category.rules = set(CategoryRule(**rule) for rule in rules)
category.rules = [CategoryRule(**rule) for rule in rules]
if row["schedule"]:
category.schedule = CategorySchedule(**row["schedule"])
categories.append(category)
@ -350,12 +352,7 @@ class Manager:
self.database.insert(categories)
case Operation.ExportCategoryGroups:
with self.database.session as session:
self.dump(
params[0],
params[1],
self.database.select(CategoryGroup, session),
)
self.dump(params[0], params[1], self.database.select(CategoryGroup))
case Operation.ImportCategoryGroups:
groups = [
@ -369,7 +366,7 @@ class Manager:
return parse_data(filename, args)
def askcategory(self, transaction: Transaction):
selector = CategorySelector(Selector_T.manual)
selector = CategorySelector.manual
categories = self.database.select(Category)
@ -383,9 +380,6 @@ class Manager:
if format == "pickle":
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
elif format == "csv":
with open(fn, "w", newline="") as f:
csv.writer(f).writerows([e.format.values() for e in sequence])
elif format == "json":
with open(fn, "w", newline="") as f:
json.dump([e.format for e in sequence], f, indent=4, default=str)
@ -397,8 +391,6 @@ class Manager:
if format == "pickle":
with open(fn, "rb") as f:
return pickle.load(f)
elif format == "csv":
raise Exception("CSV import not supported")
else:
print("format not well specified")
return []
@ -415,6 +407,5 @@ class Manager:
self._database = Client(self._db, echo=self._verbosity > 2)
return self._database
@staticmethod
def nordigen_client() -> NordigenClient:
return NordigenClient(NordigenCredentialsManager.default)
def nordigen_client(self) -> NordigenClient:
return NordigenClient(NordigenCredentialsManager.default, self.database)

View File

@ -1,10 +1,11 @@
from collections.abc import Sequence
from copy import deepcopy
from sqlalchemy import Engine, create_engine, delete, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, sessionmaker
from typing import Any, Mapping, Optional, Type, TypeVar
# from pfbudget.db.exceptions import InsertError, SelectError
from pfbudget.db.exceptions import InsertError
class DatabaseSession:
@ -16,10 +17,17 @@ class DatabaseSession:
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
try:
if exc_type:
self.__session.rollback()
else:
self.__session.commit()
except IntegrityError as e:
raise InsertError() from e
finally:
self.__session.close()
def close(self):
self.__session.close()
def insert(self, sequence: Sequence[Any]) -> None:
@ -33,7 +41,10 @@ class DatabaseSession:
else:
stmt = select(what)
return self.__session.scalars(stmt).all()
return self.__session.scalars(stmt).unique().all()
def delete(self, obj: Any) -> None:
self.__session.delete(obj)
class Client:
@ -50,13 +61,16 @@ class Client:
T = TypeVar("T")
def select(self, what: Type[T], exists: Optional[Any] = None) -> Sequence[T]:
return self.session.select(what, exists)
session = self.session
result = session.select(what, exists)
session.close()
return result
def update(self, what: Type[Any], values: Sequence[Mapping[str, Any]]) -> None:
with self._sessionmaker() as session, session.begin():
session.execute(update(what), values)
def delete(self, what: Type[Any], column: Any, values: Sequence[str]) -> None:
def delete(self, what: Type[Any], column: Any, values: Sequence[Any]) -> None:
with self._sessionmaker() as session, session.begin():
session.execute(delete(what).where(column.in_(values)))

View File

@ -1,9 +1,11 @@
from __future__ import annotations
from collections.abc import Mapping, MutableMapping, Sequence
from dataclasses import dataclass
import datetime as dt
import decimal
import enum
import re
from typing import Annotated, Any, Optional
from typing import Annotated, Any, Callable, Optional, Self, cast
from sqlalchemy import (
BigInteger,
@ -36,6 +38,20 @@ class Base(MappedAsDataclass, DeclarativeBase):
},
)
type_annotation_map = {
enum.Enum: Enum(enum.Enum, create_constraint=True, inherit_schema=True),
}
@dataclass
class Serializable:
def serialize(self) -> Mapping[str, Any]:
return dict(class_=type(self).__name__)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
raise NotImplementedError
class AccountType(enum.Enum):
checking = enum.auto()
@ -46,36 +62,38 @@ class AccountType(enum.Enum):
MASTERCARD = enum.auto()
accounttype = Annotated[
AccountType,
mapped_column(Enum(AccountType, inherit_schema=True)),
]
class Export:
@property
def format(self) -> dict[str, Any]:
raise NotImplementedError
class Bank(Base, Export):
class Bank(Base, Serializable):
__tablename__ = "banks"
name: Mapped[str] = mapped_column(primary_key=True)
BIC: Mapped[str] = mapped_column(String(8))
type: Mapped[accounttype]
type: Mapped[AccountType]
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
nordigen: Mapped[Optional[NordigenBank]] = relationship(default=None, lazy="joined")
@property
def format(self) -> dict[str, Any]:
return dict(
def serialize(self) -> Mapping[str, Any]:
nordigen = None
if self.nordigen:
nordigen = {
"bank_id": self.nordigen.bank_id,
"requisition_id": self.nordigen.requisition_id,
"invert": self.nordigen.invert,
}
return super().serialize() | dict(
name=self.name,
BIC=self.BIC,
type=self.type,
nordigen=self.nordigen.format if self.nordigen else None,
type=self.type.name,
nordigen=nordigen,
)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
bank = cls(map["name"], map["BIC"], map["type"])
if map["nordigen"]:
bank.nordigen = NordigenBank(**map["nordigen"])
return bank
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
@ -90,7 +108,7 @@ idpk = Annotated[
money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
class Transaction(Base, Export):
class Transaction(Base, Serializable):
__tablename__ = "transactions"
id: Mapped[idpk] = mapped_column(init=False)
@ -98,32 +116,83 @@ class Transaction(Base, Export):
description: Mapped[Optional[str]]
amount: Mapped[money]
split: Mapped[bool] = mapped_column(init=False, default=False)
split: Mapped[bool] = mapped_column(default=False)
category: Mapped[Optional[TransactionCategory]] = relationship(
back_populates="transaction", default=None, lazy="joined"
)
tags: Mapped[set[TransactionTag]] = relationship(default_factory=set, lazy="joined")
note: Mapped[Optional[Note]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default=None, lazy="joined"
)
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(
cascade="all, delete-orphan", init=False, passive_deletes=True
)
tags: Mapped[set[TransactionTag]] = relationship(init=False)
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
@property
def format(self) -> dict[str, Any]:
return dict(
def serialize(self) -> Mapping[str, Any]:
category = None
if self.category:
category = {
"name": self.category.name,
"selector": self.category.selector.name,
}
return super().serialize() | dict(
id=self.id,
date=self.date,
date=self.date.isoformat(),
description=self.description,
amount=self.amount,
amount=str(self.amount),
split=self.split,
category=category if category else None,
tags=[{"tag": tag.tag} for tag in self.tags],
note={"note": self.note.note} if self.note else None,
type=self.type,
category=self.category.format if self.category else None,
# TODO note
tags=[tag.format for tag in self.tags] if self.tags else None,
)
@classmethod
def deserialize(
cls, map: Mapping[str, Any]
) -> Transaction | BankTransaction | MoneyTransaction | SplitTransaction:
match map["type"]:
case "bank":
return BankTransaction.deserialize(map)
case "money":
return MoneyTransaction.deserialize(map)
case "split":
return SplitTransaction.deserialize(map)
case _:
return cls._deserialize(map)
@classmethod
def _deserialize(cls, map: Mapping[str, Any]) -> Self:
category = None
if map["category"]:
category = TransactionCategory(map["category"]["name"])
if map["category"]["selector"]:
category.selector = map["category"]["selector"]
tags: set[TransactionTag] = set()
if map["tags"]:
tags = set(TransactionTag(t["tag"]) for t in map["tags"])
note = None
if map["note"]:
note = Note(map["note"]["note"])
result = cls(
dt.date.fromisoformat(map["date"]),
map["description"],
map["amount"],
map["split"],
category,
tags,
note,
)
if map["id"]:
result.id = map["id"]
return result
def __lt__(self, other: Transaction):
return self.date < other.date
@ -134,40 +203,64 @@ idfk = Annotated[
class BankTransaction(Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True)
bank: Mapped[Optional[bankfk]] = mapped_column(default=None)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(bank=self.bank)
def serialize(self) -> Mapping[str, Any]:
map = cast(MutableMapping[str, Any], super().serialize())
map["bank"] = self.bank
return map
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
transaction = cls._deserialize(map)
transaction.bank = map["bank"]
return transaction
class MoneyTransaction(Transaction):
__mapper_args__ = {"polymorphic_identity": "money"}
def serialize(self) -> Mapping[str, Any]:
return super().serialize()
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
return cls._deserialize(map)
class SplitTransaction(Transaction):
original: Mapped[idfk] = mapped_column(nullable=True)
original: Mapped[Optional[idfk]] = mapped_column(default=None)
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(original=self.original)
def serialize(self) -> Mapping[str, Any]:
map = cast(MutableMapping[str, Any], super().serialize())
map["original"] = self.original
return map
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
transaction = cls._deserialize(map)
transaction.original = map["original"]
return transaction
class CategoryGroup(Base, Export):
class CategoryGroup(Base, Serializable):
__tablename__ = "category_groups"
name: Mapped[str] = mapped_column(primary_key=True)
@property
def format(self) -> dict[str, Any]:
return dict(name=self.name)
def serialize(self) -> Mapping[str, Any]:
return super().serialize() | dict(name=self.name)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
return cls(map["name"])
class Category(Base, Export):
class Category(Base, Serializable, repr=False):
__tablename__ = "categories"
name: Mapped[str] = mapped_column(primary_key=True)
@ -175,11 +268,67 @@ class Category(Base, Export):
ForeignKey(CategoryGroup.name), default=None
)
rules: Mapped[set[CategoryRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
rules: Mapped[list[CategoryRule]] = relationship(
cascade="all, delete-orphan",
passive_deletes=True,
default_factory=list,
lazy="joined",
)
schedule: Mapped[Optional[CategorySchedule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default=None
cascade="all, delete-orphan", passive_deletes=True, default=None, lazy="joined"
)
def serialize(self) -> Mapping[str, Any]:
rules: Sequence[Mapping[str, Any]] = []
for rule in self.rules:
rules.append(
{
"start": rule.start.isoformat() if rule.start else None,
"end": rule.end.isoformat() if rule.end else None,
"description": rule.description,
"regex": rule.regex,
"bank": rule.bank,
"min": str(rule.min) if rule.min is not None else None,
"max": str(rule.max) if rule.max is not None else None,
}
)
schedule = None
if self.schedule:
schedule = {
"period": self.schedule.period.name if self.schedule.period else None,
"period_multiplier": self.schedule.period_multiplier,
"amount": self.schedule.amount,
}
return super().serialize() | dict(
name=self.name,
group=self.group,
rules=rules,
schedule=schedule,
)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
rules: list[CategoryRule] = []
for rule in map["rules"]:
rules.append(
CategoryRule(
dt.date.fromisoformat(rule["start"]) if rule["start"] else None,
dt.date.fromisoformat(rule["end"]) if rule["end"] else None,
rule["description"],
rule["regex"],
rule["bank"],
rule["min"],
rule["max"],
)
)
return cls(
map["name"],
map["group"],
rules,
CategorySchedule(**map["schedule"]) if map["schedule"] else None,
)
def __repr__(self) -> str:
@ -188,15 +337,6 @@ class Category(Base, Export):
f" schedule={self.schedule})"
)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
group=self.group if self.group else None,
rules=[rule.format for rule in self.rules],
schedule=self.schedule.format if self.schedule else None,
)
catfk = Annotated[
str,
@ -204,20 +344,25 @@ catfk = Annotated[
]
class TransactionCategory(Base, Export):
class CategorySelector(enum.Enum):
unknown = enum.auto()
nullifier = enum.auto()
vacations = enum.auto()
rules = enum.auto()
algorithm = enum.auto()
manual = enum.auto()
class TransactionCategory(Base):
__tablename__ = "transactions_categorized"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
name: Mapped[catfk]
selector: Mapped[CategorySelector] = relationship(
cascade="all, delete-orphan", lazy="joined"
)
selector: Mapped[CategorySelector] = mapped_column(default=CategorySelector.unknown)
@property
def format(self):
return dict(
name=self.name, selector=self.selector.format if self.selector else None
transaction: Mapped[Transaction] = relationship(
back_populates="category", init=False, compare=False
)
@ -228,106 +373,85 @@ class Note(Base):
note: Mapped[str]
class Nordigen(Base, Export):
class NordigenBank(Base):
__tablename__ = "banks_nordigen"
name: Mapped[bankfk] = mapped_column(primary_key=True)
name: Mapped[bankfk] = mapped_column(primary_key=True, init=False)
bank_id: Mapped[Optional[str]]
requisition_id: Mapped[Optional[str]]
invert: Mapped[Optional[bool]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
bank_id=self.bank_id,
requisition_id=self.requisition_id,
invert=self.invert,
)
invert: Mapped[Optional[bool]] = mapped_column(default=None)
class Tag(Base):
class Tag(Base, Serializable):
__tablename__ = "tags"
name: Mapped[str] = mapped_column(primary_key=True)
rules: Mapped[set[TagRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
rules: Mapped[list[TagRule]] = relationship(
cascade="all, delete-orphan",
passive_deletes=True,
default_factory=list,
lazy="joined",
)
def serialize(self) -> Mapping[str, Any]:
rules: Sequence[Mapping[str, Any]] = []
for rule in self.rules:
rules.append(
{
"start": rule.start,
"end": rule.end,
"description": rule.description,
"regex": rule.regex,
"bank": rule.bank,
"min": str(rule.min) if rule.min is not None else None,
"max": str(rule.max) if rule.max is not None else None,
}
)
class TransactionTag(Base, Export):
return super().serialize() | dict(name=self.name, rules=rules)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
rules: list[TagRule] = []
for rule in map["rules"]:
rules.append(
TagRule(
dt.date.fromisoformat(rule["start"]) if rule["start"] else None,
dt.date.fromisoformat(rule["end"]) if rule["end"] else None,
rule["description"],
rule["regex"],
rule["bank"],
rule["min"],
rule["max"],
)
)
return cls(map["name"], rules)
class TransactionTag(Base, unsafe_hash=True):
__tablename__ = "transactions_tagged"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
@property
def format(self):
return dict(tag=self.tag)
def __hash__(self):
return hash(self.id)
class SchedulePeriod(enum.Enum):
daily = enum.auto()
weekly = enum.auto()
monthly = enum.auto()
yearly = enum.auto()
class Selector_T(enum.Enum):
unknown = enum.auto()
nullifier = enum.auto()
vacations = enum.auto()
rules = enum.auto()
algorithm = enum.auto()
manual = enum.auto()
categoryselector = Annotated[
Selector_T,
mapped_column(Enum(Selector_T, inherit_schema=True), default=Selector_T.unknown),
]
class CategorySelector(Base, Export):
__tablename__ = "category_selectors"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(TransactionCategory.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
selector: Mapped[categoryselector]
@property
def format(self):
return dict(selector=self.selector)
class Period(enum.Enum):
daily = "daily"
weekly = "weekly"
monthly = "monthly"
yearly = "yearly"
scheduleperiod = Annotated[Selector_T, mapped_column(Enum(Period, inherit_schema=True))]
class CategorySchedule(Base, Export):
class CategorySchedule(Base):
__tablename__ = "category_schedules"
name: Mapped[catfk] = mapped_column(primary_key=True)
period: Mapped[Optional[scheduleperiod]]
name: Mapped[catfk] = mapped_column(primary_key=True, init=False)
period: Mapped[Optional[SchedulePeriod]]
period_multiplier: Mapped[Optional[int]]
amount: Mapped[Optional[int]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
period=self.period,
period_multiplier=self.period_multiplier,
amount=self.amount,
)
class Link(Base):
__tablename__ = "links"
@ -336,17 +460,17 @@ class Link(Base):
link: Mapped[idfk] = mapped_column(primary_key=True)
class Rule(Base, Export):
class Rule(Base):
__tablename__ = "rules"
id: Mapped[idpk] = mapped_column(init=False)
start: Mapped[Optional[dt.date]]
end: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]]
regex: Mapped[Optional[str]]
bank: Mapped[Optional[str]]
min: Mapped[Optional[money]]
max: Mapped[Optional[money]]
start: Mapped[Optional[dt.date]] = mapped_column(default=None)
end: Mapped[Optional[dt.date]] = mapped_column(default=None)
description: Mapped[Optional[str]] = mapped_column(default=None)
regex: Mapped[Optional[str]] = mapped_column(default=None)
bank: Mapped[Optional[str]] = mapped_column(default=None)
min: Mapped[Optional[money]] = mapped_column(default=None)
max: Mapped[Optional[money]] = mapped_column(default=None)
type: Mapped[str] = mapped_column(init=False)
@ -361,16 +485,16 @@ class Rule(Base, Export):
valid = re.compile(self.regex, re.IGNORECASE)
ops = (
Rule.exists(self.start, lambda r: r < t.date),
Rule.exists(self.end, lambda r: r > t.date),
Rule.exists(self.start, lambda r: t.date >= r),
Rule.exists(self.end, lambda r: t.date <= r),
Rule.exists(self.description, lambda r: r == t.description),
Rule.exists(
valid,
lambda r: r.search(t.description) if t.description else False,
),
Rule.exists(self.bank, lambda r: r == t.bank),
Rule.exists(self.min, lambda r: r < t.amount),
Rule.exists(self.max, lambda r: r > t.amount),
Rule.exists(self.min, lambda r: t.amount >= r),
Rule.exists(self.max, lambda r: t.amount <= r),
)
if all(ops):
@ -378,21 +502,8 @@ class Rule(Base, Export):
return False
@property
def format(self) -> dict[str, Any]:
return dict(
start=self.start,
end=self.end,
description=self.description,
regex=self.regex,
bank=self.bank,
min=self.min,
max=self.max,
type=self.type,
)
@staticmethod
def exists(r, op) -> bool:
def exists(r: Optional[Any], op: Callable[[Any], bool]) -> bool:
return op(r) if r is not None else True
@ -405,19 +516,13 @@ class CategoryRule(Rule):
primary_key=True,
init=False,
)
name: Mapped[catfk]
name: Mapped[catfk] = mapped_column(init=False)
__mapper_args__ = {
"polymorphic_identity": "category_rule",
"polymorphic_load": "selectin",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(name=self.name)
def __hash__(self):
return hash(self.id)
class TagRule(Rule):
__tablename__ = "tag_rules"
@ -428,15 +533,19 @@ class TagRule(Rule):
primary_key=True,
init=False,
)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
tag: Mapped[str] = mapped_column(
ForeignKey(Tag.name, ondelete="CASCADE"), init=False
)
__mapper_args__ = {
"polymorphic_identity": "tag_rule",
"polymorphic_load": "selectin",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(tag=self.tag)
def __hash__(self):
return hash(self.id)
class Nordigen(Base):
__tablename__ = "nordigen"
type: Mapped[str] = mapped_column(primary_key=True)
token: Mapped[str]
expires: Mapped[dt.datetime]

View File

@ -1,12 +1,16 @@
from dataclasses import dataclass
import datetime as dt
import dotenv
import json
import nordigen
import os
import requests
import time
from typing import Any, Optional, Sequence, Tuple
import uuid
from pfbudget.db.client import Client
from pfbudget.db.model import Nordigen
from .exceptions import CredentialsError, DownloadError
dotenv.load_dotenv()
@ -16,40 +20,38 @@ dotenv.load_dotenv()
class NordigenCredentials:
id: str
key: str
token: str = ""
def valid(self) -> bool:
return self.id and self.key
return len(self.id) != 0 and len(self.key) != 0
class NordigenClient:
redirect_url = "https://murta.dev"
def __init__(self, credentials: NordigenCredentials):
super().__init__()
def __init__(self, credentials: NordigenCredentials, client: Client):
if not credentials.valid():
raise CredentialsError
self._client = nordigen.NordigenClient(
self.__client = nordigen.NordigenClient(
secret_key=credentials.key, secret_id=credentials.id, timeout=5
)
self.__client.token = self.__token(client)
if credentials.token:
self._client.token = credentials.token
def download(self, requisition_id):
def download(self, requisition_id) -> Sequence[dict[str, Any]]:
try:
requisition = self._client.requisition.get_requisition_by_id(requisition_id)
requisition = self.__client.requisition.get_requisition_by_id(
requisition_id
)
print(requisition)
except requests.HTTPError as e:
raise DownloadError(e)
transactions = {}
transactions = []
for acc in requisition["accounts"]:
account = self._client.account_api(acc)
account = self.__client.account_api(acc)
retries = 0
downloaded = None
while retries < 3:
try:
downloaded = account.get_transactions()
@ -60,55 +62,93 @@ class NordigenClient:
time.sleep(1)
if not downloaded:
print(f"Couldn't download transactions for {account}")
print(f"Couldn't download transactions for {account.get_metadata()}")
continue
transactions.update(downloaded)
if (
"transactions" not in downloaded
or "booked" not in downloaded["transactions"]
):
print(f"{account} doesn't have transactions")
continue
transactions.extend(downloaded["transactions"]["booked"])
return transactions
def dump(self, bank, downloaded):
with open("json/" + bank.name + ".json", "w") as f:
json.dump(downloaded, f)
# @TODO log received JSON
pass
def generate_token(self):
self.token = self._client.generate_token()
print(f"New access token: {self.token}")
return self.token
def new_requisition(
self,
institution_id: str,
max_historical_days: Optional[int] = None,
access_valid_for_days: Optional[int] = None,
) -> Tuple[str, str]:
kwargs = {
"max_historical_days": max_historical_days,
"access_valid_for_days": access_valid_for_days,
}
kwargs = {k: v for k, v in kwargs.items() if v is not None}
def requisition(self, id: str, country: str = "PT"):
requisition = self._client.initialize_session(
redirect_uri=self.redirect_url,
institution_id=id,
reference_id=str(uuid.uuid4()),
req = self.__client.initialize_session(
self.redirect_url, institution_id, str(uuid.uuid4()), **kwargs
)
return requisition.link, requisition.requisition_id
return req.link, req.requisition_id
def country_banks(self, country: str):
return self._client.institution.get_institutions(country)
return self.__client.institution.get_institutions(country)
# def __token(self):
# if token := os.environ.get("TOKEN"):
# return token
# else:
# token = self._client.generate_token()
# print(f"New access token: {token}")
# return token["access"]
def __token(self, client: Client) -> str:
with client.session as session:
token = session.select(Nordigen)
@property
def token(self):
return self._token
def datetime(seconds: int) -> dt.datetime:
return dt.datetime.now() + dt.timedelta(seconds=seconds)
@token.setter
def token(self, value):
if self._token:
print("Replacing existing token with {value}")
self._token = value
if not len(token):
print("First time nordigen token setup")
new = self.__client.generate_token()
session.insert(
[
Nordigen(
"access",
new["access"],
datetime(new["access_expires"]),
),
Nordigen(
"refresh",
new["refresh"],
datetime(new["refresh_expires"]),
),
]
)
return new["access"]
else:
access = next(t for t in token if t.type == "access")
refresh = next(t for t in token if t.type == "refresh")
if access.expires > dt.datetime.now():
pass
elif refresh.expires > dt.datetime.now():
new = self.__client.exchange_token(refresh.token)
access.token = new["access"]
access.expires = datetime(new["access_expires"])
else:
new = self.__client.generate_token()
access.token = new["access"]
access.expires = datetime(new["access_expires"])
refresh.token = new["refresh"]
refresh.expires = datetime(new["refresh_expires"])
return access.token
class NordigenCredentialsManager:
default = NordigenCredentials(
os.environ.get("SECRET_ID"),
os.environ.get("SECRET_KEY"),
os.environ.get("TOKEN"),
os.environ.get("SECRET_ID", ""),
os.environ.get("SECRET_KEY", ""),
)

View File

@ -1,58 +1,45 @@
from collections import namedtuple
from __future__ import annotations
from decimal import Decimal
from importlib import import_module
from pathlib import Path
import datetime as dt
from typing import Any, Callable, NamedTuple, Optional
import yaml
from pfbudget.common.types import NoBankSelected
from pfbudget.db.model import Transaction
from pfbudget.db.model import BankTransaction
from pfbudget.utils import utils
Index = namedtuple(
"Index", ["date", "text", "value", "negate"], defaults=[-1, -1, -1, False]
)
Options = namedtuple(
"Options",
[
"encoding",
"separator",
"date_fmt",
"start",
"end",
"debit",
"credit",
"additional_parser",
"category",
"VISA",
"MasterCard",
"AmericanExpress",
],
defaults=[
"",
"",
"",
1,
None,
Index(),
Index(),
False,
None,
None,
None,
None,
],
)
class Index(NamedTuple):
date: int = -1
text: int = -1
value: int = -1
negate: bool = False
def parse_data(filename: Path, args: dict) -> list[Transaction]:
cfg: dict = yaml.safe_load(open("parsers.yaml"))
class Options(NamedTuple):
encoding: str
separator: str
date_fmt: str
start: int = 1
end: Optional[int] = None
debit: Index = Index()
credit: Index = Index()
additional_parser: bool = False
VISA: Optional[Options] = None
MasterCard: Optional[Options] = None
AmericanExpress: Optional[Options] = None
def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
cfg: dict[str, Any] = yaml.safe_load(open("parsers.yaml"))
assert (
"Banks" in cfg
), "parsers.yaml is missing the Banks section with the list of available banks"
if not args["bank"]:
bank, creditcard = utils.find_credit_institution(
bank, creditcard = utils.find_credit_institution( # type: ignore
filename, cfg.get("Banks"), cfg.get("CreditCards")
)
else:
@ -60,7 +47,7 @@ def parse_data(filename: Path, args: dict) -> list[Transaction]:
creditcard = None if not args["creditcard"] else args["creditcard"][0]
try:
options: dict = cfg[bank]
options: dict[str, Any] = cfg[bank]
except KeyError as e:
banks = cfg["Banks"]
raise NoBankSelected(f"{e} not a valid bank, try one of {banks}")
@ -73,9 +60,6 @@ def parse_data(filename: Path, args: dict) -> list[Transaction]:
raise NoBankSelected(f"{e} not a valid bank, try one of {creditcards}")
bank += creditcard
if args["category"]:
options["category"] = args["category"][0]
if options.get("additional_parser"):
parser = getattr(import_module("pfbudget.extract.parsers"), bank)
transactions = parser(filename, bank, options).parse()
@ -86,7 +70,7 @@ def parse_data(filename: Path, args: dict) -> list[Transaction]:
class Parser:
def __init__(self, filename: Path, bank: str, options: dict):
def __init__(self, filename: Path, bank: str, options: dict[str, Any]):
self.filename = filename
self.bank = bank
@ -97,10 +81,10 @@ class Parser:
self.options = Options(**options)
def func(self, transaction: Transaction):
def func(self, transaction: BankTransaction):
pass
def parse(self) -> list[Transaction]:
def parse(self) -> list[BankTransaction]:
transactions = [
Parser.transaction(line, self.bank, self.options, self.func)
for line in list(open(self.filename, encoding=self.options.encoding))[
@ -111,7 +95,8 @@ class Parser:
return transactions
@staticmethod
def index(line: list, options: Options) -> Index:
def index(line: list[str], options: Options) -> Index:
index = None
if options.debit.date != -1 and options.credit.date != -1:
if options.debit.value != options.credit.value:
if line[options.debit.value]:
@ -138,49 +123,57 @@ class Parser:
else:
raise IndexError("No debit not credit indexes available")
return index
return index if index else Index()
@staticmethod
def transaction(line: str, bank: str, options: Options, func) -> Transaction:
line = line.rstrip().split(options.separator)
def transaction(
line_: str, bank: str, options: Options, func: Callable[[BankTransaction], None]
) -> BankTransaction:
line = line_.rstrip().split(options.separator)
index = Parser.index(line, options)
date = (
dt.datetime.strptime(line[index.date].strip(), options.date_fmt)
.date()
.isoformat()
)
try:
date_str = line[index.date].strip()
date = dt.datetime.strptime(date_str, options.date_fmt).date()
text = line[index.text]
value = utils.parse_decimal(line[index.value])
if index.negate:
value = -value
if options.category:
category = line[options.category]
transaction = Transaction(date, text, bank, value, category)
else:
transaction = Transaction(date, text, bank, value)
transaction = BankTransaction(date, text, value, bank=bank)
if options.additional_parser:
func(transaction)
return transaction
except IndexError:
raise IndexError(line_)
class Bank1(Parser):
def __init__(self, filename: str, bank: str, options: dict):
def __init__(self, filename: Path, bank: str, options: dict[str, Any]):
super().__init__(filename, bank, options)
self.transfers = []
self.transfers: list[dt.date] = []
self.transaction_cost = -Decimal("1")
def func(self, transaction: Transaction):
if "transf" in transaction.description.lower() and transaction.value < 0:
transaction.value -= self.transaction_cost
def func(self, transaction: BankTransaction):
if (
transaction.description
and "transf" in transaction.description.lower()
and transaction.amount < 0
):
transaction.amount -= self.transaction_cost
self.transfers.append(transaction.date)
def parse(self) -> list:
def parse(self) -> list[BankTransaction]:
transactions = super().parse()
for date in self.transfers:
transactions.append(
Transaction(date, "Transaction cost", self.bank, self.transaction_cost)
BankTransaction(
date, "Transaction cost", self.transaction_cost, bank=self.bank
)
)
return transactions

View File

@ -35,4 +35,4 @@ class PSD2Extractor(Extractor):
]
def convert(self, bank, downloaded, start, end):
return [convert(t, bank) for t in downloaded["transactions"]["booked"]]
return [convert(t, bank) for t in downloaded]

View File

@ -4,11 +4,10 @@ from typing import Iterable, Sequence
from pfbudget.db.model import (
CategoryRule,
CategorySelector,
Selector_T,
Transaction,
TransactionCategory,
TransactionTag,
)
from .exceptions import TransactionCategorizedError
from .transform import Transformer
@ -25,12 +24,15 @@ class Categorizer(Transformer):
def transform_inplace(self, transactions: Sequence[Transaction]) -> None:
for rule in self.rules:
for transaction in transactions:
if transaction.category:
raise TransactionCategorizedError(transaction)
if not rule.matches(transaction):
continue
if not transaction.category:
transaction.category = TransactionCategory(
rule.name, CategorySelector(Selector_T.rules)
rule.name, CategorySelector.rules
)
else:
if not transaction.tags:
transaction.tags = {TransactionTag(rule.name)}
else:
transaction.tags.add(TransactionTag(rule.name))

View File

@ -1,6 +1,2 @@
class MoreThanOneMatchError(Exception):
pass
class TransactionCategorizedError(Exception):
pass

View File

@ -6,7 +6,6 @@ from .exceptions import MoreThanOneMatchError
from .transform import Transformer
from pfbudget.db.model import (
CategorySelector,
Selector_T,
Transaction,
TransactionCategory,
)
@ -16,7 +15,7 @@ class Nullifier(Transformer):
NULL_DAYS = 4
def __init__(self, rules=None):
self.rules = rules
self.rules = rules if rules else []
def transform(self, transactions: Sequence[Transaction]) -> Sequence[Transaction]:
"""transform
@ -89,6 +88,6 @@ class Nullifier(Transformer):
def _nullify(self, transaction: Transaction) -> Transaction:
transaction.category = TransactionCategory(
"null", selector=CategorySelector(Selector_T.nullifier)
"null", selector=CategorySelector.nullifier
)
return transaction

1756
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,17 +8,16 @@ readme = "README.md"
packages = [{include = "pfbudget"}]
[tool.poetry.dependencies]
python = "^3.10"
python = "^3.11"
codetiming = "^1.4.0"
matplotlib = "^3.7.1"
nordigen = "^1.3.1"
psycopg2 = {extras = ["binary"], version = "^2.9.6"}
psycopg2 = "^2.9.6"
python-dateutil = "^2.8.2"
python-dotenv = "^1.0.0"
pyyaml = "^6.0"
sqlalchemy = "^2.0.9"
[tool.poetry.group.dev.dependencies]
alembic = "^1.10.3"
black = "^23.3.0"
@ -28,11 +27,15 @@ pytest = "^7.3.0"
pytest-cov = "^4.0.0"
pytest-mock = "^3.10.0"
sqlalchemy = {extras = ["mypy"], version = "^2.0.9"}
ruff = "^0.0.267"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
pythonpath = ". tests"
[pytest]
mock_use_standalone_module = true

8
tests/mocks/banks.py Normal file
View File

@ -0,0 +1,8 @@
from pfbudget.db.model import AccountType, Bank, NordigenBank
checking = Bank(
"bank", "BANK", AccountType.checking, NordigenBank("bank_id", "requisition_id")
)
cc = Bank("cc", "CC", AccountType.MASTERCARD)

View File

@ -1,15 +1,21 @@
from decimal import Decimal
from pfbudget.db.model import Category, CategoryRule, Tag, TagRule
from pfbudget.db.model import Category, CategoryGroup, CategoryRule, Tag, TagRule
category_null = Category("null", None, set())
category_null = Category("null")
categorygroup1 = CategoryGroup("group#1")
category1 = Category(
"cat#1",
None,
{CategoryRule(None, None, "desc#1", None, None, None, Decimal(0), "cat#1")},
"group#1",
rules=[CategoryRule(description="desc#1", max=Decimal(0))],
)
tag_1 = Tag(
"tag#1", {TagRule(None, None, "desc#1", None, None, None, Decimal(0), "tag#1")}
category2 = Category(
"cat#2",
"group#1",
rules=[CategoryRule(description="desc#1", max=Decimal(0))],
)
tag_1 = Tag("tag#1", rules=[TagRule(description="desc#1", max=Decimal(0))])

22
tests/mocks/client.py Normal file
View File

@ -0,0 +1,22 @@
import datetime as dt
from pfbudget.db.client import Client
from pfbudget.db.model import Base, Nordigen
class MockClient(Client):
now = dt.datetime.now()
def __init__(self):
url = "sqlite://"
super().__init__(
url, execution_options={"schema_translate_map": {"pfbudget": None}}
)
Base.metadata.create_all(self.engine)
self.insert(
[
Nordigen("access", "token#1", self.now + dt.timedelta(days=1)),
Nordigen("refresh", "token#2", self.now + dt.timedelta(days=30)),
]
)

View File

@ -1,3 +1,11 @@
from typing import Any, Dict, List, Optional
import nordigen
from nordigen.types.http_enums import HTTPMethod
from nordigen.types.types import RequisitionDto, TokenType
from pfbudget.extract.nordigen import NordigenCredentials
id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
accounts_id = {
@ -10,6 +18,7 @@ accounts_id = {
"owner_name": "string",
}
# The downloaded transactions match the simple and simple_transformed mocks
accounts_id_transactions = {
"transactions": {
"booked": [
@ -80,3 +89,58 @@ requisitions_id = {
"account_selection": False,
"redirect_immediate": False,
}
credentials = NordigenCredentials("ID", "KEY")
class MockNordigenClient(nordigen.NordigenClient):
def __init__(
self,
secret_key: str = "ID",
secret_id: str = "KEY",
timeout: int = 10,
base_url: str = "https://ob.nordigen.com/api/v2",
) -> None:
super().__init__(secret_key, secret_id, timeout, base_url)
def generate_token(self) -> TokenType:
return {
"access": "access_token",
"refresh": "refresh_token",
"access_expires": 86400,
"refresh_expires": 2592000,
}
def exchange_token(self, refresh_token: str) -> TokenType:
assert len(refresh_token) > 0, "invalid refresh token"
return {
"access": "access_token",
"refresh": "refresh_token",
"access_expires": 86400,
"refresh_expires": 2592000,
}
def request(
self,
method: HTTPMethod,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
) -> Any:
if endpoint == "requisitions/" + "requisition_id" + "/":
return requisitions_id
elif endpoint == "accounts/" + id + "/transactions/":
return accounts_id_transactions
else:
raise NotImplementedError(endpoint)
def initialize_session(
self,
redirect_uri: str,
institution_id: str,
reference_id: str,
max_historical_days: int = 90,
access_valid_for_days: int = 90,
access_scope: List[str] | None = None,
) -> RequisitionDto:
return RequisitionDto("http://random", "requisition_id")

View File

@ -0,0 +1,73 @@
from datetime import date
from decimal import Decimal
from pfbudget.db.model import (
BankTransaction,
CategorySelector,
MoneyTransaction,
Note,
SplitTransaction,
Transaction,
TransactionCategory,
TransactionTag,
)
# The simple and simple_transformed match the nordigen mocks
simple = [
BankTransaction(date(2023, 1, 14), "string", Decimal("328.18"), bank="bank"),
BankTransaction(date(2023, 2, 14), "string", Decimal("947.26"), bank="bank"),
]
simple_transformed = [
BankTransaction(
date(2023, 1, 14),
"",
Decimal("328.18"),
bank="bank",
category=TransactionCategory("category#1", CategorySelector.algorithm),
),
BankTransaction(
date(2023, 2, 14),
"",
Decimal("947.26"),
bank="bank",
category=TransactionCategory("category#2", CategorySelector.algorithm),
),
]
bank = [
BankTransaction(date(2023, 1, 1), "", Decimal("-10"), bank="bank#1"),
BankTransaction(date(2023, 1, 1), "", Decimal("-10"), bank="bank#2"),
]
money = [
MoneyTransaction(date(2023, 1, 1), "", Decimal("-10")),
MoneyTransaction(date(2023, 1, 1), "", Decimal("-10")),
]
__original = Transaction(date(2023, 1, 1), "", Decimal("-10"), split=True)
__original.id = 9000
split = [
__original,
SplitTransaction(date(2023, 1, 1), "", Decimal("-5"), original=__original.id),
SplitTransaction(date(2023, 1, 1), "", Decimal("-5"), original=__original.id),
]
tagged = [
Transaction(
date(2023, 1, 1),
"",
Decimal("-10"),
tags={TransactionTag("tag#1"), TransactionTag("tag#1")},
)
]
noted = [
Transaction(
date(2023, 1, 1),
"",
Decimal("-10"),
note=Note("note#1"),
)
]

144
tests/test_backup.py Normal file
View File

@ -0,0 +1,144 @@
from pathlib import Path
from typing import Any, Sequence, Type
import pytest
from mocks import banks, categories, transactions
from mocks.client import MockClient
from pfbudget.common.types import ExportFormat
from pfbudget.core.command import (
BackupCommand,
ExportCommand,
ImportBackupCommand,
ImportCommand,
ImportFailedError,
)
from pfbudget.db.client import Client
from pfbudget.db.model import (
Bank,
BankTransaction,
Base,
Category,
CategoryGroup,
MoneyTransaction,
Note,
SplitTransaction,
Tag,
Transaction,
TransactionCategory,
TransactionTag,
)
@pytest.fixture
def client() -> Client:
return MockClient()
params = [
(transactions.simple, Transaction),
(transactions.simple_transformed, Transaction),
(transactions.bank, Transaction),
(transactions.bank, BankTransaction),
(transactions.money, Transaction),
(transactions.money, MoneyTransaction),
(transactions.split, SplitTransaction),
([banks.checking, banks.cc], Bank),
([categories.category_null, categories.category1, categories.category2], Category),
(
[
categories.categorygroup1,
categories.category_null,
categories.category1,
categories.category2,
],
CategoryGroup,
),
([categories.tag_1], Tag),
]
not_serializable = [
(transactions.simple_transformed, TransactionCategory),
(transactions.tagged, TransactionTag),
(transactions.noted, Note),
]
class TestBackup:
@pytest.mark.parametrize("input, what", params)
def test_import(self, tmp_path: Path, input: Sequence[Any], what: Type[Any]):
file = tmp_path / "test.json"
client = MockClient()
client.insert(input)
originals = client.select(what)
assert originals
command = ExportCommand(client, what, file, ExportFormat.JSON)
command.execute()
other = MockClient()
command = ImportCommand(other, what, file, ExportFormat.JSON)
command.execute()
imported = other.select(what)
assert originals == imported
command = ExportCommand(client, what, file, ExportFormat.pickle)
with pytest.raises(AttributeError):
command.execute()
command = ImportCommand(other, what, file, ExportFormat.pickle)
with pytest.raises(AttributeError):
command.execute()
@pytest.mark.parametrize("input, what", not_serializable)
def test_try_backup_not_serializable(
self, tmp_path: Path, input: Sequence[Any], what: Type[Any]
):
file = tmp_path / "test.json"
client = MockClient()
client.insert(input)
originals = client.select(what)
assert originals
command = ExportCommand(client, what, file, ExportFormat.JSON)
with pytest.raises(AttributeError):
command.execute()
other = MockClient()
command = ImportCommand(other, what, file, ExportFormat.JSON)
with pytest.raises(ImportFailedError):
command.execute()
imported = other.select(what)
assert not imported
def test_full_backup(self, tmp_path: Path):
file = tmp_path / "test.json"
client = MockClient()
client.insert([e for t in params for e in t[0]])
command = BackupCommand(client, file, ExportFormat.JSON)
command.execute()
other = MockClient()
command = ImportBackupCommand(other, file, ExportFormat.JSON)
command.execute()
def subclasses(cls: Type[Any]) -> set[Type[Any]]:
return set(cls.__subclasses__()) | {
s for c in cls.__subclasses__() for s in subclasses(c)
}
for t in [cls for cls in subclasses(Base)]:
originals = client.select(t)
imported = other.select(t)
assert originals == imported, f"{t}"

54
tests/test_command.py Normal file
View File

@ -0,0 +1,54 @@
import json
from pathlib import Path
import pytest
from mocks.client import MockClient
import mocks.transactions
from pfbudget.common.types import ExportFormat
from pfbudget.core.command import ExportCommand, ImportCommand
from pfbudget.db.client import Client
from pfbudget.db.exceptions import InsertError
from pfbudget.db.model import Transaction
@pytest.fixture
def client() -> Client:
return MockClient()
class TestCommand:
def test_export_json(self, tmp_path: Path, client: Client):
file = tmp_path / "test.json"
client.insert(mocks.transactions.simple)
command = ExportCommand(client, Transaction, file, ExportFormat.JSON)
command.execute()
with open(file, newline="") as f:
result = json.load(f)
assert result == [t.serialize() for t in client.select(Transaction)]
def test_export_pickle(self, tmp_path: Path, client: Client):
file = tmp_path / "test.pickle"
command = ExportCommand(client, Transaction, file, ExportFormat.pickle)
with pytest.raises(AttributeError):
command.execute()
def test_import_json(self, tmp_path: Path, client: Client):
file = tmp_path / "test"
client.insert(mocks.transactions.simple)
command = ExportCommand(client, Transaction, file, ExportFormat.JSON)
command.execute()
# Since the transactions are already in the DB, we expect an insert error
with pytest.raises(InsertError):
command = ImportCommand(client, Transaction, file, ExportFormat.JSON)
command.execute()
def test_import_pickle(self, tmp_path: Path, client: Client):
file = tmp_path / "test"
command = ExportCommand(client, Transaction, file, ExportFormat.pickle)
with pytest.raises(AttributeError):
command.execute()

View File

@ -2,14 +2,14 @@ from datetime import date
from decimal import Decimal
import pytest
from mocks.client import MockClient
from pfbudget.db.client import Client
from pfbudget.db.model import (
AccountType,
Bank,
Base,
NordigenBank,
CategorySelector,
Nordigen,
Selector_T,
Transaction,
TransactionCategory,
)
@ -17,20 +17,21 @@ from pfbudget.db.model import (
@pytest.fixture
def client() -> Client:
url = "sqlite://"
client = Client(url, execution_options={"schema_translate_map": {"pfbudget": None}})
Base.metadata.create_all(client.engine)
return client
return MockClient()
@pytest.fixture
def banks(client: Client) -> list[Bank]:
banks = [
Bank("bank", "BANK", AccountType.checking),
Bank("bank", "BANK", AccountType.checking, NordigenBank(None, "req", None)),
Bank("broker", "BROKER", AccountType.investment),
Bank("creditcard", "CC", AccountType.MASTERCARD),
]
banks[0].nordigen = Nordigen("bank", None, "req", None)
# fix nordigen bank names which would be generated post DB insert
for bank in banks:
if bank.nordigen:
bank.nordigen.name = bank.name
client.insert(banks)
return banks
@ -39,19 +40,22 @@ def banks(client: Client) -> list[Bank]:
@pytest.fixture
def transactions(client: Client) -> list[Transaction]:
transactions = [
Transaction(date(2023, 1, 1), "", Decimal("-10")),
Transaction(
date(2023, 1, 1),
"",
Decimal("-10"),
category=TransactionCategory("category", CategorySelector.algorithm),
),
Transaction(date(2023, 1, 2), "", Decimal("-50")),
]
transactions[0].category = TransactionCategory(
"name", CategorySelector(Selector_T.algorithm)
)
client.insert(transactions)
# fix ids which would be generated post DB insert
for i, transaction in enumerate(transactions):
transaction.id = i + 1
transaction.split = False # default
transactions[0].category.id = 1
transactions[0].category.selector.id = 1
if transaction.category:
transaction.category.id = 1
return transactions
@ -121,13 +125,13 @@ class TestDatabase:
def test_update_nordigen(self, client: Client, banks: list[Bank]):
name = banks[0].name
result = client.select(Nordigen, lambda: Nordigen.name == name)
result = client.select(NordigenBank, lambda: NordigenBank.name == name)
assert result[0].requisition_id == "req"
update = {"name": name, "requisition_id": "anotherreq"}
client.update(Nordigen, [update])
client.update(NordigenBank, [update])
result = client.select(Nordigen, lambda: Nordigen.name == name)
result = client.select(NordigenBank, lambda: NordigenBank.name == name)
assert result[0].requisition_id == "anotherreq"
result = client.select(Bank, lambda: Bank.name == name)

View File

@ -31,8 +31,8 @@ class TestDatabaseLoad:
def test_insert(self, loader: Loader):
transactions = [
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
]
loader.load(transactions)

View File

@ -4,9 +4,10 @@ from typing import Any, Optional
import pytest
import requests
from mocks.client import MockClient
import mocks.nordigen as mock
from pfbudget.db.model import AccountType, Bank, BankTransaction, Nordigen
from pfbudget.db.model import AccountType, Bank, BankTransaction, NordigenBank
from pfbudget.extract.exceptions import BankError, CredentialsError
from pfbudget.extract.extract import Extractor
from pfbudget.extract.nordigen import NordigenClient, NordigenCredentials
@ -58,14 +59,13 @@ def mock_requests(monkeypatch: pytest.MonkeyPatch):
@pytest.fixture
def extractor() -> Extractor:
credentials = NordigenCredentials("ID", "KEY", "TOKEN")
return PSD2Extractor(NordigenClient(credentials))
credentials = NordigenCredentials("ID", "KEY")
return PSD2Extractor(NordigenClient(credentials, MockClient()))
@pytest.fixture
def bank() -> Bank:
bank = Bank("Bank#1", "", AccountType.checking)
bank.nordigen = Nordigen("", "", mock.id, False)
bank = Bank("Bank#1", "", AccountType.checking, NordigenBank("", mock.id, False))
return bank
@ -73,7 +73,7 @@ class TestExtractPSD2:
def test_empty_credentials(self):
cred = NordigenCredentials("", "")
with pytest.raises(CredentialsError):
NordigenClient(cred)
NordigenClient(cred, MockClient())
def test_no_psd2_bank(self, extractor: Extractor):
with pytest.raises(BankError):
@ -88,12 +88,17 @@ class TestExtractPSD2:
with pytest.raises(requests.Timeout):
extractor.extract(bank)
def test_extract(self, extractor: Extractor, bank: Bank):
def test_extract(
self, monkeypatch: pytest.MonkeyPatch, extractor: Extractor, bank: Bank
):
monkeypatch.setattr(
"pfbudget.extract.nordigen.NordigenClient.dump", lambda *args: None
)
assert extractor.extract(bank) == [
BankTransaction(
dt.date(2023, 1, 14), "string", Decimal("328.18"), "Bank#1"
dt.date(2023, 1, 14), "string", Decimal("328.18"), bank="Bank#1"
),
BankTransaction(
dt.date(2023, 2, 14), "string", Decimal("947.26"), "Bank#1"
dt.date(2023, 2, 14), "string", Decimal("947.26"), bank="Bank#1"
),
]

View File

@ -5,9 +5,9 @@ import mocks.categories as mock
from pfbudget.db.model import (
BankTransaction,
Category,
CategoryRule,
CategorySelector,
Selector_T,
TransactionCategory,
TransactionTag,
)
@ -20,8 +20,8 @@ from pfbudget.transform.transform import Transformer
class TestTransform:
def test_nullifier(self):
transactions = [
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
]
for t in transactions:
@ -31,14 +31,12 @@ class TestTransform:
transactions = categorizer.transform(transactions)
for t in transactions:
assert t.category == TransactionCategory(
"null", CategorySelector(Selector_T.nullifier)
)
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
def test_nullifier_inplace(self):
transactions = [
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
]
for t in transactions:
@ -48,20 +46,20 @@ class TestTransform:
categorizer.transform_inplace(transactions)
for t in transactions:
assert t.category == TransactionCategory(
"null", CategorySelector(Selector_T.nullifier)
)
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
def test_nullifier_with_rules(self):
transactions = [
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
]
for t in transactions:
assert not t.category
rules = [CategoryRule(None, None, None, None, "Bank#1", None, None, "null")]
rule = CategoryRule(bank="Bank#1")
rule.name = "null"
rules = [rule]
categorizer: Transformer = Nullifier(rules)
transactions = categorizer.transform(transactions)
@ -69,24 +67,28 @@ class TestTransform:
for t in transactions:
assert not t.category
rules.append(CategoryRule(None, None, None, None, "Bank#2", None, None, "null"))
rule = CategoryRule(bank="Bank#2")
rule.name = "null"
rules.append(rule)
categorizer = Nullifier(rules)
transactions = categorizer.transform(transactions)
for t in transactions:
assert t.category == TransactionCategory(
"null", CategorySelector(Selector_T.nullifier)
)
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
def test_tagger(self):
transactions = [
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), "Bank#1")
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), bank="Bank#1")
]
for t in transactions:
assert not t.category
categorizer: Transformer = Tagger(mock.tag_1.rules)
rules = mock.tag_1.rules
for rule in rules:
rule.tag = mock.tag_1.name
categorizer: Transformer = Tagger(rules)
transactions = categorizer.transform(transactions)
for t in transactions:
@ -94,16 +96,32 @@ class TestTransform:
def test_categorize(self):
transactions = [
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), "Bank#1")
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), bank="Bank#1")
]
for t in transactions:
assert not t.category
categorizer: Transformer = Categorizer(mock.category1.rules)
rules = mock.category1.rules
for rule in rules:
rule.name = mock.category1.name
categorizer: Transformer = Categorizer(rules)
transactions = categorizer.transform(transactions)
for t in transactions:
assert t.category == TransactionCategory(
"cat#1", CategorySelector(Selector_T.rules)
)
assert t.category == TransactionCategory("cat#1", CategorySelector.rules)
def test_rule_limits(self):
transactions = [
BankTransaction(date.today(), "", Decimal("-60"), bank="Bank#1"),
BankTransaction(date.today(), "", Decimal("-120"), bank="Bank#1"),
]
cat = Category("cat")
cat.rules = [CategoryRule(min=-120, max=-60)]
for r in cat.rules:
r.name = cat.name
transactions = Categorizer(cat.rules).transform(transactions)
assert all(t.category.name == cat.name for t in transactions)