Compare commits

..

No commits in common. "29c5206638d13ef42e70d9bbbfff0435624101a8" and "bdd7cac4be597cba3232ffc60c9e539bb13fcbd5" have entirely different histories.

32 changed files with 1357 additions and 2291 deletions

3
.gitignore vendored
View File

@ -174,6 +174,3 @@ poetry.toml
pyrightconfig.json
# End of https://www.toptal.com/developers/gitignore/api/python
# Project specific ignores
database.db

View File

@ -1,35 +0,0 @@
"""nordigen tokens
Revision ID: 325b901ac712
Revises: 60469d5dd2b0
Create Date: 2023-05-25 19:10:10.374008+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "325b901ac712"
down_revision = "60469d5dd2b0"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"nordigen",
sa.Column("type", sa.String(), nullable=False),
sa.Column("token", sa.String(), nullable=False),
sa.Column("expires", sa.DateTime(), nullable=False),
sa.PrimaryKeyConstraint("type", name=op.f("pk_nordigen")),
schema="pfbudget",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("nordigen", schema="pfbudget")
# ### end Alembic commands ###

View File

@ -1,48 +0,0 @@
"""Drop SQLAlchemy enum
Revision ID: 60469d5dd2b0
Revises: b599dafcf468
Create Date: 2023-05-15 19:24:07.911352+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "60469d5dd2b0"
down_revision = "b599dafcf468"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute(
"""
CREATE TYPE pfbudget.scheduleperiod
AS ENUM ('daily', 'weekly', 'monthly', 'yearly')
"""
)
op.execute(
"""ALTER TABLE pfbudget.category_schedules
ALTER COLUMN period TYPE pfbudget.scheduleperiod
USING period::text::pfbudget.scheduleperiod
"""
)
op.execute("DROP TYPE pfbudget.period")
def downgrade() -> None:
op.execute(
"""
CREATE TYPE pfbudget.period
AS ENUM ('daily', 'weekly', 'monthly', 'yearly')
"""
)
op.execute(
"""ALTER TABLE pfbudget.category_schedules
ALTER COLUMN period TYPE pfbudget.period
USING period::text::pfbudget.period
"""
)
op.execute("DROP TYPE pfbudget.scheduleperiod")

View File

@ -1,74 +0,0 @@
"""Compact category selector
Revision ID: 8623e709e111
Revises: ce68ee15e5d2
Create Date: 2023-05-08 19:00:51.063240+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "8623e709e111"
down_revision = "ce68ee15e5d2"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("category_selectors", schema="pfbudget")
op.add_column(
"transactions_categorized",
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="pfbudget",
inherit_schema=True,
),
nullable=False,
),
schema="pfbudget",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("transactions_categorized", "selector", schema="pfbudget")
op.create_table(
"category_selectors",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="pfbudget",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["pfbudget.transactions_categorized.id"],
name="fk_category_selectors_id_transactions_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_category_selectors"),
schema="pfbudget",
)
# ### end Alembic commands ###

View File

@ -1,46 +0,0 @@
"""Selector type name change
Revision ID: b599dafcf468
Revises: 8623e709e111
Create Date: 2023-05-08 19:46:20.661214+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "b599dafcf468"
down_revision = "8623e709e111"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute(
"""
CREATE TYPE pfbudget.categoryselector
AS ENUM ('unknown', 'nullifier', 'vacations', 'rules', 'algorithm', 'manual')
"""
)
op.execute(
"""ALTER TABLE pfbudget.transactions_categorized
ALTER COLUMN selector TYPE pfbudget.categoryselector
USING selector::text::pfbudget.categoryselector
"""
)
op.execute("DROP TYPE pfbudget.selector_t")
def downgrade() -> None:
op.execute(
"""
CREATE TYPE pfbudget.selector_t
AS ENUM ('unknown', 'nullifier', 'vacations', 'rules', 'algorithm', 'manual')
"""
)
op.execute(
"""ALTER TABLE pfbudget.transactions_categorized
ALTER COLUMN selector TYPE pfbudget.selector_t
USING selector::text::pfbudget.selector_t
"""
)
op.execute("DROP TYPE pfbudget.categoryselector")

View File

@ -38,10 +38,10 @@ if __name__ == "__main__":
params = [args["path"], args["bank"], args["creditcard"]]
case Operation.RequisitionId:
keys = {"bank"}
keys = {"name", "country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["bank"][0]]
params = [args["name"][0], args["country"][0]]
case Operation.Download:
keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
@ -163,14 +163,14 @@ if __name__ == "__main__":
params = [
type.CategoryRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
cat,
start=args["start"][0] if args["start"] else None,
end=args["end"][0] if args["end"] else None,
description=args["description"][0] if args["description"] else None,
regex=args["regex"][0] if args["regex"] else None,
bank=args["bank"][0] if args["bank"] else None,
min=args["min"][0] if args["min"] else None,
max=args["max"][0] if args["max"] else None,
)
for cat in args["category"]
]
@ -215,14 +215,14 @@ if __name__ == "__main__":
params = [
type.TagRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
tag,
start=args["start"][0] if args["start"] else None,
end=args["end"][0] if args["end"] else None,
description=args["description"][0] if args["description"] else None,
regex=args["regex"][0] if args["regex"] else None,
bank=args["bank"][0] if args["bank"] else None,
min=args["min"][0] if args["min"] else None,
max=args["max"][0] if args["max"] else None,
)
for tag in args["tag"]
]

View File

@ -6,7 +6,7 @@ import os
import re
from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, SchedulePeriod
from pfbudget.db.model import AccountType, Period
from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph
@ -60,12 +60,11 @@ def argparser() -> argparse.ArgumentParser:
# init = subparsers.add_parser("init")
# init.set_defaults(op=Operation.Init)
# Exports transactions to specified format and file
# Exports transactions to .csv file
export = subparsers.add_parser("export")
export.set_defaults(op=Operation.Export)
file_options(export)
# Imports transactions from specified format and file
pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import)
file_options(pimport)
@ -133,7 +132,8 @@ def argparser() -> argparse.ArgumentParser:
# PSD2 requisition id
requisition = subparsers.add_parser("eua")
requisition.set_defaults(op=Operation.RequisitionId)
requisition.add_argument("bank", nargs=1, type=str)
requisition.add_argument("id", nargs=1, type=str)
requisition.add_argument("country", nargs=1, type=str)
# Download through the PSD2 API
download = subparsers.add_parser("download", parents=[period])
@ -268,7 +268,7 @@ def category(parser: argparse.ArgumentParser):
schedule = commands.add_parser("schedule")
schedule.set_defaults(op=Operation.CategorySchedule)
schedule.add_argument("category", nargs="+", type=str)
schedule.add_argument("period", nargs=1, choices=[e.value for e in SchedulePeriod])
schedule.add_argument("period", nargs=1, choices=[e.value for e in Period])
schedule.add_argument("--frequency", nargs=1, default=[1], type=int)
rule = commands.add_parser("rule")

View File

@ -3,8 +3,9 @@ import decimal
from ..core.manager import Manager
from ..db.model import (
Category,
Note,
CategorySelector,
Note,
Selector_T,
SplitTransaction,
Tag,
Transaction,
@ -15,13 +16,15 @@ from ..db.model import (
class Interactive:
help = "category(:tag)/split/note:/skip/quit"
selector = CategorySelector.manual
selector = Selector_T.manual
def __init__(self, manager: Manager) -> None:
self.manager = manager
self.categories = self.manager.database.select(Category)
self.tags = self.manager.database.select(Tag)
with self.manager.db.session() as session:
self.categories = session.get(Category)
self.tags = session.get(Tag)
session.expunge_all()
def intro(self) -> None:
print(
@ -32,34 +35,28 @@ class Interactive:
def start(self) -> None:
self.intro()
with self.manager.database.session as session:
uncategorized = session.select(
Transaction, lambda: ~Transaction.category.has()
)
uncategorized.sort()
with self.manager.db.session() as session:
uncategorized = session.uncategorized()
n = len(uncategorized)
print(f"{n} left to categorize")
i = 0
new = []
while (command := input("$ ")) != "quit" and i < len(uncategorized):
current = uncategorized[i] if len(new) == 0 else new.pop()
print(current)
next = uncategorized[i]
print(next)
while (command := input("$ ")) != "quit":
match command:
case "help":
print(self.help)
case "skip":
if len(uncategorized) == 0:
i += 1
case "quit":
break
case "split":
new = self.split(current)
new = self.split(next)
session.insert(new)
case other:
@ -70,32 +67,35 @@ class Interactive:
if other.startswith("note:"):
# TODO adding notes to a splitted transaction won't allow
# categorization
current.note = Note(other[len("note:") :].strip())
next.note = Note(other[len("note:") :].strip())
else:
ct = other.split(":")
if (category := ct[0]) not in [
c.name for c in self.categories
]:
print(self.help, self.categories)
continue
tags = []
if len(ct) > 1:
tags = ct[1:]
current.category = TransactionCategory(
category, self.selector
next.category = TransactionCategory(
category, CategorySelector(self.selector)
)
for tag in tags:
if tag not in [t.name for t in self.tags]:
session.insert([Tag(tag)])
self.tags = session.get(Tag)
current.tags.add(TransactionTag(tag))
next.tags.add(TransactionTag(tag))
if len(new) == 0:
i += 1
session.commit()
next = uncategorized[i] if len(new) == 0 else new.pop()
print(next)
def split(self, original: Transaction) -> list[SplitTransaction]:
total = original.amount
new = []

View File

@ -51,11 +51,6 @@ class Operation(Enum):
ImportCategoryGroups = auto()
class ExportFormat(Enum):
JSON = auto()
pickle = auto()
class TransactionError(Exception):
pass

View File

@ -1,128 +0,0 @@
from abc import ABC, abstractmethod
import json
from pathlib import Path
import pickle
from typing import Type
from pfbudget.common.types import ExportFormat
from pfbudget.db.client import Client
from pfbudget.db.model import (
Bank,
Category,
CategoryGroup,
Serializable,
Tag,
Transaction,
)
# required for the backup import
import pfbudget.db.model
class Command(ABC):
@abstractmethod
def execute(self) -> None:
raise NotImplementedError
def undo(self) -> None:
raise NotImplementedError
class ExportCommand(Command):
def __init__(
self, client: Client, what: Type[Serializable], fn: Path, format: ExportFormat
):
self.__client = client
self.what = what
self.fn = fn
self.format = format
def execute(self) -> None:
values = self.__client.select(self.what)
match self.format:
case ExportFormat.JSON:
with open(self.fn, "w", newline="") as f:
json.dump([e.serialize() for e in values], f, indent=4)
case ExportFormat.pickle:
raise AttributeError("pickle export not working at the moment!")
with open(self.fn, "wb") as f:
pickle.dump(values, f)
class ImportCommand(Command):
def __init__(
self, client: Client, what: Type[Serializable], fn: Path, format: ExportFormat
):
self.__client = client
self.what = what
self.fn = fn
self.format = format
def execute(self) -> None:
match self.format:
case ExportFormat.JSON:
with open(self.fn, "r") as f:
try:
values = json.load(f)
values = [self.what.deserialize(v) for v in values]
except json.JSONDecodeError as e:
raise ImportFailedError(e)
case ExportFormat.pickle:
raise AttributeError("pickle import not working at the moment!")
with open(self.fn, "rb") as f:
values = pickle.load(f)
self.__client.insert(values)
class ImportFailedError(Exception):
pass
class BackupCommand(Command):
def __init__(self, client: Client, fn: Path, format: ExportFormat) -> None:
self.__client = client
self.fn = fn
self.format = format
def execute(self) -> None:
banks = self.__client.select(Bank)
groups = self.__client.select(CategoryGroup)
categories = self.__client.select(Category)
tags = self.__client.select(Tag)
transactions = self.__client.select(Transaction)
values = [*banks, *groups, *categories, *tags, *transactions]
match self.format:
case ExportFormat.JSON:
with open(self.fn, "w", newline="") as f:
json.dump([e.serialize() for e in values], f, indent=4)
case ExportFormat.pickle:
raise AttributeError("pickle export not working at the moment!")
class ImportBackupCommand(Command):
def __init__(self, client: Client, fn: Path, format: ExportFormat) -> None:
self.__client = client
self.fn = fn
self.format = format
def execute(self) -> None:
match self.format:
case ExportFormat.JSON:
with open(self.fn, "r") as f:
try:
values = json.load(f)
values = [
getattr(pfbudget.db.model, v["class_"]).deserialize(v)
for v in values
]
except json.JSONDecodeError as e:
raise ImportFailedError(e)
case ExportFormat.pickle:
raise AttributeError("pickle import not working at the moment!")
self.__client.insert(values)

View File

@ -1,3 +1,4 @@
import csv
import json
from pathlib import Path
import pickle
@ -13,11 +14,12 @@ from pfbudget.db.model import (
CategoryGroup,
CategoryRule,
CategorySchedule,
CategorySelector,
Link,
MoneyTransaction,
NordigenBank,
Nordigen,
Rule,
CategorySelector,
Selector_T,
SplitTransaction,
Tag,
TagRule,
@ -79,7 +81,7 @@ class Manager:
else:
banks = self.database.select(Bank, Bank.nordigen)
extractor = PSD2Extractor(self.nordigen_client())
extractor = PSD2Extractor(Manager.nordigen_client())
transactions = []
for bank in banks:
@ -101,20 +103,10 @@ class Manager:
categories = session.select(Category)
tags = session.select(Tag)
rules = [
rule
for cat in categories
if cat.name == "null"
for rule in cat.rules
]
rules = [cat.rules for cat in categories if cat.name == "null"]
Nullifier(rules).transform_inplace(uncategorized)
rules = [
rule
for cat in categories
if cat.name != "null"
for rule in cat.rules
]
rules = [rule for cat in categories for rule in cat.rules]
Categorizer(rules).transform_inplace(uncategorized)
rules = [rule for tag in tags for rule in tag.rules]
@ -124,34 +116,24 @@ class Manager:
self.database.update(Bank, params)
case Operation.PSD2Mod:
self.database.update(NordigenBank, params)
self.database.update(Nordigen, params)
case Operation.BankDel:
self.database.delete(Bank, Bank.name, params)
case Operation.PSD2Del:
self.database.delete(NordigenBank, NordigenBank.name, params)
self.database.delete(Nordigen, Nordigen.name, params)
case Operation.Token:
Manager.nordigen_client().generate_token()
case Operation.RequisitionId:
bank_name = params[0]
bank = self.database.select(Bank, (lambda: Bank.name == bank_name))[0]
if not bank.nordigen or not bank.nordigen.bank_id:
raise ValueError(f"{bank} doesn't have a Nordigen ID")
link, req_id = self.nordigen_client().new_requisition(
bank.nordigen.bank_id
)
self.database.update(
NordigenBank,
[{"name": bank.nordigen.name, "requisition_id": req_id}],
)
link, _ = Manager.nordigen_client().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
webbrowser.open(link)
case Operation.PSD2CountryBanks:
banks = self.nordigen_client().country_banks(params[0])
banks = Manager.nordigen_client().country_banks(params[0])
print(banks)
case (
@ -263,7 +245,10 @@ class Manager:
session.insert(transactions)
case Operation.Export:
self.dump(params[0], params[1], self.database.select(Transaction))
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(Transaction, session)
)
case Operation.Import:
transactions = []
@ -289,7 +274,8 @@ class Manager:
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"], category["selector"]["selector"]
category["name"],
CategorySelector(category["selector"]["selector"]),
)
transactions.append(transaction)
@ -298,21 +284,27 @@ class Manager:
self.database.insert(transactions)
case Operation.ExportBanks:
self.dump(params[0], params[1], self.database.select(Bank))
with self.database.session as session:
self.dump(params[0], params[1], self.database.select(Bank, session))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0], params[1]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = NordigenBank(**row["nordigen"])
bank.nordigen = Nordigen(**row["nordigen"])
banks.append(bank)
if self.certify(banks):
self.database.insert(banks)
case Operation.ExportCategoryRules:
self.dump(params[0], params[1], self.database.select(CategoryRule))
with self.database.session as session:
self.dump(
params[0],
params[1],
self.database.select(CategoryRule, session),
)
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
@ -321,7 +313,10 @@ class Manager:
self.database.insert(rules)
case Operation.ExportTagRules:
self.dump(params[0], params[1], self.database.select(TagRule))
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(TagRule, session)
)
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
@ -330,7 +325,10 @@ class Manager:
self.database.insert(rules)
case Operation.ExportCategories:
self.dump(params[0], params[1], self.database.select(Category))
with self.database.session as session:
self.dump(
params[0], params[1], self.database.select(Category, session)
)
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
@ -343,7 +341,7 @@ class Manager:
for rule in rules:
del rule["type"]
category.rules = [CategoryRule(**rule) for rule in rules]
category.rules = set(CategoryRule(**rule) for rule in rules)
if row["schedule"]:
category.schedule = CategorySchedule(**row["schedule"])
categories.append(category)
@ -352,7 +350,12 @@ class Manager:
self.database.insert(categories)
case Operation.ExportCategoryGroups:
self.dump(params[0], params[1], self.database.select(CategoryGroup))
with self.database.session as session:
self.dump(
params[0],
params[1],
self.database.select(CategoryGroup, session),
)
case Operation.ImportCategoryGroups:
groups = [
@ -366,7 +369,7 @@ class Manager:
return parse_data(filename, args)
def askcategory(self, transaction: Transaction):
selector = CategorySelector.manual
selector = CategorySelector(Selector_T.manual)
categories = self.database.select(Category)
@ -380,6 +383,9 @@ class Manager:
if format == "pickle":
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
elif format == "csv":
with open(fn, "w", newline="") as f:
csv.writer(f).writerows([e.format.values() for e in sequence])
elif format == "json":
with open(fn, "w", newline="") as f:
json.dump([e.format for e in sequence], f, indent=4, default=str)
@ -391,6 +397,8 @@ class Manager:
if format == "pickle":
with open(fn, "rb") as f:
return pickle.load(f)
elif format == "csv":
raise Exception("CSV import not supported")
else:
print("format not well specified")
return []
@ -407,5 +415,6 @@ class Manager:
self._database = Client(self._db, echo=self._verbosity > 2)
return self._database
def nordigen_client(self) -> NordigenClient:
return NordigenClient(NordigenCredentialsManager.default, self.database)
@staticmethod
def nordigen_client() -> NordigenClient:
return NordigenClient(NordigenCredentialsManager.default)

View File

@ -1,11 +1,10 @@
from collections.abc import Sequence
from copy import deepcopy
from sqlalchemy import Engine, create_engine, delete, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session, sessionmaker
from typing import Any, Mapping, Optional, Type, TypeVar
from pfbudget.db.exceptions import InsertError
# from pfbudget.db.exceptions import InsertError, SelectError
class DatabaseSession:
@ -17,17 +16,10 @@ class DatabaseSession:
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any):
try:
if exc_type:
self.__session.rollback()
else:
self.__session.commit()
except IntegrityError as e:
raise InsertError() from e
finally:
self.__session.close()
def close(self):
self.__session.close()
def insert(self, sequence: Sequence[Any]) -> None:
@ -41,10 +33,7 @@ class DatabaseSession:
else:
stmt = select(what)
return self.__session.scalars(stmt).unique().all()
def delete(self, obj: Any) -> None:
self.__session.delete(obj)
return self.__session.scalars(stmt).all()
class Client:
@ -61,16 +50,13 @@ class Client:
T = TypeVar("T")
def select(self, what: Type[T], exists: Optional[Any] = None) -> Sequence[T]:
session = self.session
result = session.select(what, exists)
session.close()
return result
return self.session.select(what, exists)
def update(self, what: Type[Any], values: Sequence[Mapping[str, Any]]) -> None:
with self._sessionmaker() as session, session.begin():
session.execute(update(what), values)
def delete(self, what: Type[Any], column: Any, values: Sequence[Any]) -> None:
def delete(self, what: Type[Any], column: Any, values: Sequence[str]) -> None:
with self._sessionmaker() as session, session.begin():
session.execute(delete(what).where(column.in_(values)))

View File

@ -1,11 +1,9 @@
from __future__ import annotations
from collections.abc import Mapping, MutableMapping, Sequence
from dataclasses import dataclass
import datetime as dt
import decimal
import enum
import re
from typing import Annotated, Any, Callable, Optional, Self, cast
from typing import Annotated, Any, Optional
from sqlalchemy import (
BigInteger,
@ -38,20 +36,6 @@ class Base(MappedAsDataclass, DeclarativeBase):
},
)
type_annotation_map = {
enum.Enum: Enum(enum.Enum, create_constraint=True, inherit_schema=True),
}
@dataclass
class Serializable:
def serialize(self) -> Mapping[str, Any]:
return dict(class_=type(self).__name__)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
raise NotImplementedError
class AccountType(enum.Enum):
checking = enum.auto()
@ -62,38 +46,36 @@ class AccountType(enum.Enum):
MASTERCARD = enum.auto()
class Bank(Base, Serializable):
accounttype = Annotated[
AccountType,
mapped_column(Enum(AccountType, inherit_schema=True)),
]
class Export:
@property
def format(self) -> dict[str, Any]:
raise NotImplementedError
class Bank(Base, Export):
__tablename__ = "banks"
name: Mapped[str] = mapped_column(primary_key=True)
BIC: Mapped[str] = mapped_column(String(8))
type: Mapped[AccountType]
type: Mapped[accounttype]
nordigen: Mapped[Optional[NordigenBank]] = relationship(default=None, lazy="joined")
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
def serialize(self) -> Mapping[str, Any]:
nordigen = None
if self.nordigen:
nordigen = {
"bank_id": self.nordigen.bank_id,
"requisition_id": self.nordigen.requisition_id,
"invert": self.nordigen.invert,
}
return super().serialize() | dict(
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
BIC=self.BIC,
type=self.type.name,
nordigen=nordigen,
type=self.type,
nordigen=self.nordigen.format if self.nordigen else None,
)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
bank = cls(map["name"], map["BIC"], map["type"])
if map["nordigen"]:
bank.nordigen = NordigenBank(**map["nordigen"])
return bank
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
@ -108,7 +90,7 @@ idpk = Annotated[
money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
class Transaction(Base, Serializable):
class Transaction(Base, Export):
__tablename__ = "transactions"
id: Mapped[idpk] = mapped_column(init=False)
@ -116,83 +98,32 @@ class Transaction(Base, Serializable):
description: Mapped[Optional[str]]
amount: Mapped[money]
split: Mapped[bool] = mapped_column(default=False)
category: Mapped[Optional[TransactionCategory]] = relationship(
back_populates="transaction", default=None, lazy="joined"
)
tags: Mapped[set[TransactionTag]] = relationship(default_factory=set, lazy="joined")
note: Mapped[Optional[Note]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default=None, lazy="joined"
)
split: Mapped[bool] = mapped_column(init=False, default=False)
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(
cascade="all, delete-orphan", init=False, passive_deletes=True
)
tags: Mapped[set[TransactionTag]] = relationship(init=False)
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
def serialize(self) -> Mapping[str, Any]:
category = None
if self.category:
category = {
"name": self.category.name,
"selector": self.category.selector.name,
}
return super().serialize() | dict(
@property
def format(self) -> dict[str, Any]:
return dict(
id=self.id,
date=self.date.isoformat(),
date=self.date,
description=self.description,
amount=str(self.amount),
amount=self.amount,
split=self.split,
category=category if category else None,
tags=[{"tag": tag.tag} for tag in self.tags],
note={"note": self.note.note} if self.note else None,
type=self.type,
category=self.category.format if self.category else None,
# TODO note
tags=[tag.format for tag in self.tags] if self.tags else None,
)
@classmethod
def deserialize(
cls, map: Mapping[str, Any]
) -> Transaction | BankTransaction | MoneyTransaction | SplitTransaction:
match map["type"]:
case "bank":
return BankTransaction.deserialize(map)
case "money":
return MoneyTransaction.deserialize(map)
case "split":
return SplitTransaction.deserialize(map)
case _:
return cls._deserialize(map)
@classmethod
def _deserialize(cls, map: Mapping[str, Any]) -> Self:
category = None
if map["category"]:
category = TransactionCategory(map["category"]["name"])
if map["category"]["selector"]:
category.selector = map["category"]["selector"]
tags: set[TransactionTag] = set()
if map["tags"]:
tags = set(TransactionTag(t["tag"]) for t in map["tags"])
note = None
if map["note"]:
note = Note(map["note"]["note"])
result = cls(
dt.date.fromisoformat(map["date"]),
map["description"],
map["amount"],
map["split"],
category,
tags,
note,
)
if map["id"]:
result.id = map["id"]
return result
def __lt__(self, other: Transaction):
return self.date < other.date
@ -203,64 +134,40 @@ idfk = Annotated[
class BankTransaction(Transaction):
bank: Mapped[Optional[bankfk]] = mapped_column(default=None)
bank: Mapped[bankfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
def serialize(self) -> Mapping[str, Any]:
map = cast(MutableMapping[str, Any], super().serialize())
map["bank"] = self.bank
return map
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
transaction = cls._deserialize(map)
transaction.bank = map["bank"]
return transaction
@property
def format(self) -> dict[str, Any]:
return super().format | dict(bank=self.bank)
class MoneyTransaction(Transaction):
__mapper_args__ = {"polymorphic_identity": "money"}
def serialize(self) -> Mapping[str, Any]:
return super().serialize()
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
return cls._deserialize(map)
class SplitTransaction(Transaction):
original: Mapped[Optional[idfk]] = mapped_column(default=None)
original: Mapped[idfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
def serialize(self) -> Mapping[str, Any]:
map = cast(MutableMapping[str, Any], super().serialize())
map["original"] = self.original
return map
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
transaction = cls._deserialize(map)
transaction.original = map["original"]
return transaction
@property
def format(self) -> dict[str, Any]:
return super().format | dict(original=self.original)
class CategoryGroup(Base, Serializable):
class CategoryGroup(Base, Export):
__tablename__ = "category_groups"
name: Mapped[str] = mapped_column(primary_key=True)
def serialize(self) -> Mapping[str, Any]:
return super().serialize() | dict(name=self.name)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
return cls(map["name"])
@property
def format(self) -> dict[str, Any]:
return dict(name=self.name)
class Category(Base, Serializable, repr=False):
class Category(Base, Export):
__tablename__ = "categories"
name: Mapped[str] = mapped_column(primary_key=True)
@ -268,67 +175,11 @@ class Category(Base, Serializable, repr=False):
ForeignKey(CategoryGroup.name), default=None
)
rules: Mapped[list[CategoryRule]] = relationship(
cascade="all, delete-orphan",
passive_deletes=True,
default_factory=list,
lazy="joined",
rules: Mapped[set[CategoryRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
)
schedule: Mapped[Optional[CategorySchedule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default=None, lazy="joined"
)
def serialize(self) -> Mapping[str, Any]:
rules: Sequence[Mapping[str, Any]] = []
for rule in self.rules:
rules.append(
{
"start": rule.start.isoformat() if rule.start else None,
"end": rule.end.isoformat() if rule.end else None,
"description": rule.description,
"regex": rule.regex,
"bank": rule.bank,
"min": str(rule.min) if rule.min is not None else None,
"max": str(rule.max) if rule.max is not None else None,
}
)
schedule = None
if self.schedule:
schedule = {
"period": self.schedule.period.name if self.schedule.period else None,
"period_multiplier": self.schedule.period_multiplier,
"amount": self.schedule.amount,
}
return super().serialize() | dict(
name=self.name,
group=self.group,
rules=rules,
schedule=schedule,
)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
rules: list[CategoryRule] = []
for rule in map["rules"]:
rules.append(
CategoryRule(
dt.date.fromisoformat(rule["start"]) if rule["start"] else None,
dt.date.fromisoformat(rule["end"]) if rule["end"] else None,
rule["description"],
rule["regex"],
rule["bank"],
rule["min"],
rule["max"],
)
)
return cls(
map["name"],
map["group"],
rules,
CategorySchedule(**map["schedule"]) if map["schedule"] else None,
cascade="all, delete-orphan", passive_deletes=True, default=None
)
def __repr__(self) -> str:
@ -337,6 +188,15 @@ class Category(Base, Serializable, repr=False):
f" schedule={self.schedule})"
)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
group=self.group if self.group else None,
rules=[rule.format for rule in self.rules],
schedule=self.schedule.format if self.schedule else None,
)
catfk = Annotated[
str,
@ -344,25 +204,20 @@ catfk = Annotated[
]
class CategorySelector(enum.Enum):
unknown = enum.auto()
nullifier = enum.auto()
vacations = enum.auto()
rules = enum.auto()
algorithm = enum.auto()
manual = enum.auto()
class TransactionCategory(Base):
class TransactionCategory(Base, Export):
__tablename__ = "transactions_categorized"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
name: Mapped[catfk]
selector: Mapped[CategorySelector] = mapped_column(default=CategorySelector.unknown)
selector: Mapped[CategorySelector] = relationship(
cascade="all, delete-orphan", lazy="joined"
)
transaction: Mapped[Transaction] = relationship(
back_populates="category", init=False, compare=False
@property
def format(self):
return dict(
name=self.name, selector=self.selector.format if self.selector else None
)
@ -373,85 +228,106 @@ class Note(Base):
note: Mapped[str]
class NordigenBank(Base):
class Nordigen(Base, Export):
__tablename__ = "banks_nordigen"
name: Mapped[bankfk] = mapped_column(primary_key=True, init=False)
name: Mapped[bankfk] = mapped_column(primary_key=True)
bank_id: Mapped[Optional[str]]
requisition_id: Mapped[Optional[str]]
invert: Mapped[Optional[bool]] = mapped_column(default=None)
invert: Mapped[Optional[bool]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
bank_id=self.bank_id,
requisition_id=self.requisition_id,
invert=self.invert,
)
class Tag(Base, Serializable):
class Tag(Base):
__tablename__ = "tags"
name: Mapped[str] = mapped_column(primary_key=True)
rules: Mapped[list[TagRule]] = relationship(
cascade="all, delete-orphan",
passive_deletes=True,
default_factory=list,
lazy="joined",
rules: Mapped[set[TagRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
)
def serialize(self) -> Mapping[str, Any]:
rules: Sequence[Mapping[str, Any]] = []
for rule in self.rules:
rules.append(
{
"start": rule.start,
"end": rule.end,
"description": rule.description,
"regex": rule.regex,
"bank": rule.bank,
"min": str(rule.min) if rule.min is not None else None,
"max": str(rule.max) if rule.max is not None else None,
}
)
return super().serialize() | dict(name=self.name, rules=rules)
@classmethod
def deserialize(cls, map: Mapping[str, Any]) -> Self:
rules: list[TagRule] = []
for rule in map["rules"]:
rules.append(
TagRule(
dt.date.fromisoformat(rule["start"]) if rule["start"] else None,
dt.date.fromisoformat(rule["end"]) if rule["end"] else None,
rule["description"],
rule["regex"],
rule["bank"],
rule["min"],
rule["max"],
)
)
return cls(map["name"], rules)
class TransactionTag(Base, unsafe_hash=True):
class TransactionTag(Base, Export):
__tablename__ = "transactions_tagged"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
@property
def format(self):
return dict(tag=self.tag)
class SchedulePeriod(enum.Enum):
daily = enum.auto()
weekly = enum.auto()
monthly = enum.auto()
yearly = enum.auto()
def __hash__(self):
return hash(self.id)
class CategorySchedule(Base):
class Selector_T(enum.Enum):
unknown = enum.auto()
nullifier = enum.auto()
vacations = enum.auto()
rules = enum.auto()
algorithm = enum.auto()
manual = enum.auto()
categoryselector = Annotated[
Selector_T,
mapped_column(Enum(Selector_T, inherit_schema=True), default=Selector_T.unknown),
]
class CategorySelector(Base, Export):
__tablename__ = "category_selectors"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(TransactionCategory.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
selector: Mapped[categoryselector]
@property
def format(self):
return dict(selector=self.selector)
class Period(enum.Enum):
daily = "daily"
weekly = "weekly"
monthly = "monthly"
yearly = "yearly"
scheduleperiod = Annotated[Selector_T, mapped_column(Enum(Period, inherit_schema=True))]
class CategorySchedule(Base, Export):
__tablename__ = "category_schedules"
name: Mapped[catfk] = mapped_column(primary_key=True, init=False)
period: Mapped[Optional[SchedulePeriod]]
name: Mapped[catfk] = mapped_column(primary_key=True)
period: Mapped[Optional[scheduleperiod]]
period_multiplier: Mapped[Optional[int]]
amount: Mapped[Optional[int]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
period=self.period,
period_multiplier=self.period_multiplier,
amount=self.amount,
)
class Link(Base):
__tablename__ = "links"
@ -460,17 +336,17 @@ class Link(Base):
link: Mapped[idfk] = mapped_column(primary_key=True)
class Rule(Base):
class Rule(Base, Export):
__tablename__ = "rules"
id: Mapped[idpk] = mapped_column(init=False)
start: Mapped[Optional[dt.date]] = mapped_column(default=None)
end: Mapped[Optional[dt.date]] = mapped_column(default=None)
description: Mapped[Optional[str]] = mapped_column(default=None)
regex: Mapped[Optional[str]] = mapped_column(default=None)
bank: Mapped[Optional[str]] = mapped_column(default=None)
min: Mapped[Optional[money]] = mapped_column(default=None)
max: Mapped[Optional[money]] = mapped_column(default=None)
start: Mapped[Optional[dt.date]]
end: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]]
regex: Mapped[Optional[str]]
bank: Mapped[Optional[str]]
min: Mapped[Optional[money]]
max: Mapped[Optional[money]]
type: Mapped[str] = mapped_column(init=False)
@ -485,16 +361,16 @@ class Rule(Base):
valid = re.compile(self.regex, re.IGNORECASE)
ops = (
Rule.exists(self.start, lambda r: t.date >= r),
Rule.exists(self.end, lambda r: t.date <= r),
Rule.exists(self.start, lambda r: r < t.date),
Rule.exists(self.end, lambda r: r > t.date),
Rule.exists(self.description, lambda r: r == t.description),
Rule.exists(
valid,
lambda r: r.search(t.description) if t.description else False,
),
Rule.exists(self.bank, lambda r: r == t.bank),
Rule.exists(self.min, lambda r: t.amount >= r),
Rule.exists(self.max, lambda r: t.amount <= r),
Rule.exists(self.min, lambda r: r < t.amount),
Rule.exists(self.max, lambda r: r > t.amount),
)
if all(ops):
@ -502,8 +378,21 @@ class Rule(Base):
return False
@property
def format(self) -> dict[str, Any]:
return dict(
start=self.start,
end=self.end,
description=self.description,
regex=self.regex,
bank=self.bank,
min=self.min,
max=self.max,
type=self.type,
)
@staticmethod
def exists(r: Optional[Any], op: Callable[[Any], bool]) -> bool:
def exists(r, op) -> bool:
return op(r) if r is not None else True
@ -516,13 +405,19 @@ class CategoryRule(Rule):
primary_key=True,
init=False,
)
name: Mapped[catfk] = mapped_column(init=False)
name: Mapped[catfk]
__mapper_args__ = {
"polymorphic_identity": "category_rule",
"polymorphic_load": "selectin",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(name=self.name)
def __hash__(self):
return hash(self.id)
class TagRule(Rule):
__tablename__ = "tag_rules"
@ -533,19 +428,15 @@ class TagRule(Rule):
primary_key=True,
init=False,
)
tag: Mapped[str] = mapped_column(
ForeignKey(Tag.name, ondelete="CASCADE"), init=False
)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
__mapper_args__ = {
"polymorphic_identity": "tag_rule",
"polymorphic_load": "selectin",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(tag=self.tag)
class Nordigen(Base):
__tablename__ = "nordigen"
type: Mapped[str] = mapped_column(primary_key=True)
token: Mapped[str]
expires: Mapped[dt.datetime]
def __hash__(self):
return hash(self.id)

View File

@ -1,16 +1,12 @@
from dataclasses import dataclass
import datetime as dt
import dotenv
import json
import nordigen
import os
import requests
import time
from typing import Any, Optional, Sequence, Tuple
import uuid
from pfbudget.db.client import Client
from pfbudget.db.model import Nordigen
from .exceptions import CredentialsError, DownloadError
dotenv.load_dotenv()
@ -20,38 +16,40 @@ dotenv.load_dotenv()
class NordigenCredentials:
id: str
key: str
token: str = ""
def valid(self) -> bool:
return len(self.id) != 0 and len(self.key) != 0
return self.id and self.key
class NordigenClient:
redirect_url = "https://murta.dev"
def __init__(self, credentials: NordigenCredentials, client: Client):
def __init__(self, credentials: NordigenCredentials):
super().__init__()
if not credentials.valid():
raise CredentialsError
self.__client = nordigen.NordigenClient(
self._client = nordigen.NordigenClient(
secret_key=credentials.key, secret_id=credentials.id, timeout=5
)
self.__client.token = self.__token(client)
def download(self, requisition_id) -> Sequence[dict[str, Any]]:
if credentials.token:
self._client.token = credentials.token
def download(self, requisition_id):
try:
requisition = self.__client.requisition.get_requisition_by_id(
requisition_id
)
requisition = self._client.requisition.get_requisition_by_id(requisition_id)
print(requisition)
except requests.HTTPError as e:
raise DownloadError(e)
transactions = []
transactions = {}
for acc in requisition["accounts"]:
account = self.__client.account_api(acc)
account = self._client.account_api(acc)
retries = 0
downloaded = None
while retries < 3:
try:
downloaded = account.get_transactions()
@ -62,93 +60,55 @@ class NordigenClient:
time.sleep(1)
if not downloaded:
print(f"Couldn't download transactions for {account.get_metadata()}")
print(f"Couldn't download transactions for {account}")
continue
if (
"transactions" not in downloaded
or "booked" not in downloaded["transactions"]
):
print(f"{account} doesn't have transactions")
continue
transactions.extend(downloaded["transactions"]["booked"])
transactions.update(downloaded)
return transactions
def dump(self, bank, downloaded):
# @TODO log received JSON
pass
with open("json/" + bank.name + ".json", "w") as f:
json.dump(downloaded, f)
def new_requisition(
self,
institution_id: str,
max_historical_days: Optional[int] = None,
access_valid_for_days: Optional[int] = None,
) -> Tuple[str, str]:
kwargs = {
"max_historical_days": max_historical_days,
"access_valid_for_days": access_valid_for_days,
}
kwargs = {k: v for k, v in kwargs.items() if v is not None}
def generate_token(self):
self.token = self._client.generate_token()
print(f"New access token: {self.token}")
return self.token
req = self.__client.initialize_session(
self.redirect_url, institution_id, str(uuid.uuid4()), **kwargs
def requisition(self, id: str, country: str = "PT"):
requisition = self._client.initialize_session(
redirect_uri=self.redirect_url,
institution_id=id,
reference_id=str(uuid.uuid4()),
)
return req.link, req.requisition_id
return requisition.link, requisition.requisition_id
def country_banks(self, country: str):
return self.__client.institution.get_institutions(country)
return self._client.institution.get_institutions(country)
def __token(self, client: Client) -> str:
with client.session as session:
token = session.select(Nordigen)
# def __token(self):
# if token := os.environ.get("TOKEN"):
# return token
# else:
# token = self._client.generate_token()
# print(f"New access token: {token}")
# return token["access"]
def datetime(seconds: int) -> dt.datetime:
return dt.datetime.now() + dt.timedelta(seconds=seconds)
@property
def token(self):
return self._token
if not len(token):
print("First time nordigen token setup")
new = self.__client.generate_token()
session.insert(
[
Nordigen(
"access",
new["access"],
datetime(new["access_expires"]),
),
Nordigen(
"refresh",
new["refresh"],
datetime(new["refresh_expires"]),
),
]
)
return new["access"]
else:
access = next(t for t in token if t.type == "access")
refresh = next(t for t in token if t.type == "refresh")
if access.expires > dt.datetime.now():
pass
elif refresh.expires > dt.datetime.now():
new = self.__client.exchange_token(refresh.token)
access.token = new["access"]
access.expires = datetime(new["access_expires"])
else:
new = self.__client.generate_token()
access.token = new["access"]
access.expires = datetime(new["access_expires"])
refresh.token = new["refresh"]
refresh.expires = datetime(new["refresh_expires"])
return access.token
@token.setter
def token(self, value):
if self._token:
print("Replacing existing token with {value}")
self._token = value
class NordigenCredentialsManager:
default = NordigenCredentials(
os.environ.get("SECRET_ID", ""),
os.environ.get("SECRET_KEY", ""),
os.environ.get("SECRET_ID"),
os.environ.get("SECRET_KEY"),
os.environ.get("TOKEN"),
)

View File

@ -1,45 +1,58 @@
from __future__ import annotations
from collections import namedtuple
from decimal import Decimal
from importlib import import_module
from pathlib import Path
import datetime as dt
from typing import Any, Callable, NamedTuple, Optional
import yaml
from pfbudget.common.types import NoBankSelected
from pfbudget.db.model import BankTransaction
from pfbudget.db.model import Transaction
from pfbudget.utils import utils
class Index(NamedTuple):
date: int = -1
text: int = -1
value: int = -1
negate: bool = False
Index = namedtuple(
"Index", ["date", "text", "value", "negate"], defaults=[-1, -1, -1, False]
)
Options = namedtuple(
"Options",
[
"encoding",
"separator",
"date_fmt",
"start",
"end",
"debit",
"credit",
"additional_parser",
"category",
"VISA",
"MasterCard",
"AmericanExpress",
],
defaults=[
"",
"",
"",
1,
None,
Index(),
Index(),
False,
None,
None,
None,
None,
],
)
class Options(NamedTuple):
encoding: str
separator: str
date_fmt: str
start: int = 1
end: Optional[int] = None
debit: Index = Index()
credit: Index = Index()
additional_parser: bool = False
VISA: Optional[Options] = None
MasterCard: Optional[Options] = None
AmericanExpress: Optional[Options] = None
def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
cfg: dict[str, Any] = yaml.safe_load(open("parsers.yaml"))
def parse_data(filename: Path, args: dict) -> list[Transaction]:
cfg: dict = yaml.safe_load(open("parsers.yaml"))
assert (
"Banks" in cfg
), "parsers.yaml is missing the Banks section with the list of available banks"
if not args["bank"]:
bank, creditcard = utils.find_credit_institution( # type: ignore
bank, creditcard = utils.find_credit_institution(
filename, cfg.get("Banks"), cfg.get("CreditCards")
)
else:
@ -47,7 +60,7 @@ def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
creditcard = None if not args["creditcard"] else args["creditcard"][0]
try:
options: dict[str, Any] = cfg[bank]
options: dict = cfg[bank]
except KeyError as e:
banks = cfg["Banks"]
raise NoBankSelected(f"{e} not a valid bank, try one of {banks}")
@ -60,6 +73,9 @@ def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
raise NoBankSelected(f"{e} not a valid bank, try one of {creditcards}")
bank += creditcard
if args["category"]:
options["category"] = args["category"][0]
if options.get("additional_parser"):
parser = getattr(import_module("pfbudget.extract.parsers"), bank)
transactions = parser(filename, bank, options).parse()
@ -70,7 +86,7 @@ def parse_data(filename: Path, args: dict[str, Any]) -> list[BankTransaction]:
class Parser:
def __init__(self, filename: Path, bank: str, options: dict[str, Any]):
def __init__(self, filename: Path, bank: str, options: dict):
self.filename = filename
self.bank = bank
@ -81,10 +97,10 @@ class Parser:
self.options = Options(**options)
def func(self, transaction: BankTransaction):
def func(self, transaction: Transaction):
pass
def parse(self) -> list[BankTransaction]:
def parse(self) -> list[Transaction]:
transactions = [
Parser.transaction(line, self.bank, self.options, self.func)
for line in list(open(self.filename, encoding=self.options.encoding))[
@ -95,8 +111,7 @@ class Parser:
return transactions
@staticmethod
def index(line: list[str], options: Options) -> Index:
index = None
def index(line: list, options: Options) -> Index:
if options.debit.date != -1 and options.credit.date != -1:
if options.debit.value != options.credit.value:
if line[options.debit.value]:
@ -123,57 +138,49 @@ class Parser:
else:
raise IndexError("No debit not credit indexes available")
return index if index else Index()
return index
@staticmethod
def transaction(
line_: str, bank: str, options: Options, func: Callable[[BankTransaction], None]
) -> BankTransaction:
line = line_.rstrip().split(options.separator)
def transaction(line: str, bank: str, options: Options, func) -> Transaction:
line = line.rstrip().split(options.separator)
index = Parser.index(line, options)
try:
date_str = line[index.date].strip()
date = dt.datetime.strptime(date_str, options.date_fmt).date()
date = (
dt.datetime.strptime(line[index.date].strip(), options.date_fmt)
.date()
.isoformat()
)
text = line[index.text]
value = utils.parse_decimal(line[index.value])
if index.negate:
value = -value
transaction = BankTransaction(date, text, value, bank=bank)
if options.category:
category = line[options.category]
transaction = Transaction(date, text, bank, value, category)
else:
transaction = Transaction(date, text, bank, value)
if options.additional_parser:
func(transaction)
return transaction
except IndexError:
raise IndexError(line_)
class Bank1(Parser):
def __init__(self, filename: Path, bank: str, options: dict[str, Any]):
def __init__(self, filename: str, bank: str, options: dict):
super().__init__(filename, bank, options)
self.transfers: list[dt.date] = []
self.transfers = []
self.transaction_cost = -Decimal("1")
def func(self, transaction: BankTransaction):
if (
transaction.description
and "transf" in transaction.description.lower()
and transaction.amount < 0
):
transaction.amount -= self.transaction_cost
def func(self, transaction: Transaction):
if "transf" in transaction.description.lower() and transaction.value < 0:
transaction.value -= self.transaction_cost
self.transfers.append(transaction.date)
def parse(self) -> list[BankTransaction]:
def parse(self) -> list:
transactions = super().parse()
for date in self.transfers:
transactions.append(
BankTransaction(
date, "Transaction cost", self.transaction_cost, bank=self.bank
)
Transaction(date, "Transaction cost", self.bank, self.transaction_cost)
)
return transactions

View File

@ -35,4 +35,4 @@ class PSD2Extractor(Extractor):
]
def convert(self, bank, downloaded, start, end):
return [convert(t, bank) for t in downloaded]
return [convert(t, bank) for t in downloaded["transactions"]["booked"]]

View File

@ -4,10 +4,11 @@ from typing import Iterable, Sequence
from pfbudget.db.model import (
CategoryRule,
CategorySelector,
Selector_T,
Transaction,
TransactionCategory,
TransactionTag,
)
from .exceptions import TransactionCategorizedError
from .transform import Transformer
@ -24,15 +25,12 @@ class Categorizer(Transformer):
def transform_inplace(self, transactions: Sequence[Transaction]) -> None:
for rule in self.rules:
for transaction in transactions:
if transaction.category:
raise TransactionCategorizedError(transaction)
if not rule.matches(transaction):
continue
if not transaction.category:
transaction.category = TransactionCategory(
rule.name, CategorySelector.rules
rule.name, CategorySelector(Selector_T.rules)
)
else:
if not transaction.tags:
transaction.tags = {TransactionTag(rule.name)}
else:
transaction.tags.add(TransactionTag(rule.name))

View File

@ -1,2 +1,6 @@
class MoreThanOneMatchError(Exception):
pass
class TransactionCategorizedError(Exception):
pass

View File

@ -6,6 +6,7 @@ from .exceptions import MoreThanOneMatchError
from .transform import Transformer
from pfbudget.db.model import (
CategorySelector,
Selector_T,
Transaction,
TransactionCategory,
)
@ -15,7 +16,7 @@ class Nullifier(Transformer):
NULL_DAYS = 4
def __init__(self, rules=None):
self.rules = rules if rules else []
self.rules = rules
def transform(self, transactions: Sequence[Transaction]) -> Sequence[Transaction]:
"""transform
@ -88,6 +89,6 @@ class Nullifier(Transformer):
def _nullify(self, transaction: Transaction) -> Transaction:
transaction.category = TransactionCategory(
"null", selector=CategorySelector.nullifier
"null", selector=CategorySelector(Selector_T.nullifier)
)
return transaction

1776
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,16 +8,17 @@ readme = "README.md"
packages = [{include = "pfbudget"}]
[tool.poetry.dependencies]
python = "^3.11"
python = "^3.10"
codetiming = "^1.4.0"
matplotlib = "^3.7.1"
nordigen = "^1.3.1"
psycopg2 = "^2.9.6"
psycopg2 = {extras = ["binary"], version = "^2.9.6"}
python-dateutil = "^2.8.2"
python-dotenv = "^1.0.0"
pyyaml = "^6.0"
sqlalchemy = "^2.0.9"
[tool.poetry.group.dev.dependencies]
alembic = "^1.10.3"
black = "^23.3.0"
@ -27,15 +28,11 @@ pytest = "^7.3.0"
pytest-cov = "^4.0.0"
pytest-mock = "^3.10.0"
sqlalchemy = {extras = ["mypy"], version = "^2.0.9"}
ruff = "^0.0.267"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.pytest.ini_options]
pythonpath = ". tests"
[pytest]
mock_use_standalone_module = true

View File

@ -1,8 +0,0 @@
from pfbudget.db.model import AccountType, Bank, NordigenBank
checking = Bank(
"bank", "BANK", AccountType.checking, NordigenBank("bank_id", "requisition_id")
)
cc = Bank("cc", "CC", AccountType.MASTERCARD)

View File

@ -1,21 +1,15 @@
from decimal import Decimal
from pfbudget.db.model import Category, CategoryGroup, CategoryRule, Tag, TagRule
from pfbudget.db.model import Category, CategoryRule, Tag, TagRule
category_null = Category("null")
categorygroup1 = CategoryGroup("group#1")
category_null = Category("null", None, set())
category1 = Category(
"cat#1",
"group#1",
rules=[CategoryRule(description="desc#1", max=Decimal(0))],
None,
{CategoryRule(None, None, "desc#1", None, None, None, Decimal(0), "cat#1")},
)
category2 = Category(
"cat#2",
"group#1",
rules=[CategoryRule(description="desc#1", max=Decimal(0))],
tag_1 = Tag(
"tag#1", {TagRule(None, None, "desc#1", None, None, None, Decimal(0), "tag#1")}
)
tag_1 = Tag("tag#1", rules=[TagRule(description="desc#1", max=Decimal(0))])

View File

@ -1,22 +0,0 @@
import datetime as dt
from pfbudget.db.client import Client
from pfbudget.db.model import Base, Nordigen
class MockClient(Client):
now = dt.datetime.now()
def __init__(self):
url = "sqlite://"
super().__init__(
url, execution_options={"schema_translate_map": {"pfbudget": None}}
)
Base.metadata.create_all(self.engine)
self.insert(
[
Nordigen("access", "token#1", self.now + dt.timedelta(days=1)),
Nordigen("refresh", "token#2", self.now + dt.timedelta(days=30)),
]
)

View File

@ -1,11 +1,3 @@
from typing import Any, Dict, List, Optional
import nordigen
from nordigen.types.http_enums import HTTPMethod
from nordigen.types.types import RequisitionDto, TokenType
from pfbudget.extract.nordigen import NordigenCredentials
id = "3fa85f64-5717-4562-b3fc-2c963f66afa6"
accounts_id = {
@ -18,7 +10,6 @@ accounts_id = {
"owner_name": "string",
}
# The downloaded transactions match the simple and simple_transformed mocks
accounts_id_transactions = {
"transactions": {
"booked": [
@ -89,58 +80,3 @@ requisitions_id = {
"account_selection": False,
"redirect_immediate": False,
}
credentials = NordigenCredentials("ID", "KEY")
class MockNordigenClient(nordigen.NordigenClient):
def __init__(
self,
secret_key: str = "ID",
secret_id: str = "KEY",
timeout: int = 10,
base_url: str = "https://ob.nordigen.com/api/v2",
) -> None:
super().__init__(secret_key, secret_id, timeout, base_url)
def generate_token(self) -> TokenType:
return {
"access": "access_token",
"refresh": "refresh_token",
"access_expires": 86400,
"refresh_expires": 2592000,
}
def exchange_token(self, refresh_token: str) -> TokenType:
assert len(refresh_token) > 0, "invalid refresh token"
return {
"access": "access_token",
"refresh": "refresh_token",
"access_expires": 86400,
"refresh_expires": 2592000,
}
def request(
self,
method: HTTPMethod,
endpoint: str,
data: Optional[Dict[str, Any]] = None,
headers: Optional[Dict[str, Any]] = None,
) -> Any:
if endpoint == "requisitions/" + "requisition_id" + "/":
return requisitions_id
elif endpoint == "accounts/" + id + "/transactions/":
return accounts_id_transactions
else:
raise NotImplementedError(endpoint)
def initialize_session(
self,
redirect_uri: str,
institution_id: str,
reference_id: str,
max_historical_days: int = 90,
access_valid_for_days: int = 90,
access_scope: List[str] | None = None,
) -> RequisitionDto:
return RequisitionDto("http://random", "requisition_id")

View File

@ -1,73 +0,0 @@
from datetime import date
from decimal import Decimal
from pfbudget.db.model import (
BankTransaction,
CategorySelector,
MoneyTransaction,
Note,
SplitTransaction,
Transaction,
TransactionCategory,
TransactionTag,
)
# The simple and simple_transformed match the nordigen mocks
simple = [
BankTransaction(date(2023, 1, 14), "string", Decimal("328.18"), bank="bank"),
BankTransaction(date(2023, 2, 14), "string", Decimal("947.26"), bank="bank"),
]
simple_transformed = [
BankTransaction(
date(2023, 1, 14),
"",
Decimal("328.18"),
bank="bank",
category=TransactionCategory("category#1", CategorySelector.algorithm),
),
BankTransaction(
date(2023, 2, 14),
"",
Decimal("947.26"),
bank="bank",
category=TransactionCategory("category#2", CategorySelector.algorithm),
),
]
bank = [
BankTransaction(date(2023, 1, 1), "", Decimal("-10"), bank="bank#1"),
BankTransaction(date(2023, 1, 1), "", Decimal("-10"), bank="bank#2"),
]
money = [
MoneyTransaction(date(2023, 1, 1), "", Decimal("-10")),
MoneyTransaction(date(2023, 1, 1), "", Decimal("-10")),
]
__original = Transaction(date(2023, 1, 1), "", Decimal("-10"), split=True)
__original.id = 9000
split = [
__original,
SplitTransaction(date(2023, 1, 1), "", Decimal("-5"), original=__original.id),
SplitTransaction(date(2023, 1, 1), "", Decimal("-5"), original=__original.id),
]
tagged = [
Transaction(
date(2023, 1, 1),
"",
Decimal("-10"),
tags={TransactionTag("tag#1"), TransactionTag("tag#1")},
)
]
noted = [
Transaction(
date(2023, 1, 1),
"",
Decimal("-10"),
note=Note("note#1"),
)
]

View File

@ -1,144 +0,0 @@
from pathlib import Path
from typing import Any, Sequence, Type
import pytest
from mocks import banks, categories, transactions
from mocks.client import MockClient
from pfbudget.common.types import ExportFormat
from pfbudget.core.command import (
BackupCommand,
ExportCommand,
ImportBackupCommand,
ImportCommand,
ImportFailedError,
)
from pfbudget.db.client import Client
from pfbudget.db.model import (
Bank,
BankTransaction,
Base,
Category,
CategoryGroup,
MoneyTransaction,
Note,
SplitTransaction,
Tag,
Transaction,
TransactionCategory,
TransactionTag,
)
@pytest.fixture
def client() -> Client:
return MockClient()
params = [
(transactions.simple, Transaction),
(transactions.simple_transformed, Transaction),
(transactions.bank, Transaction),
(transactions.bank, BankTransaction),
(transactions.money, Transaction),
(transactions.money, MoneyTransaction),
(transactions.split, SplitTransaction),
([banks.checking, banks.cc], Bank),
([categories.category_null, categories.category1, categories.category2], Category),
(
[
categories.categorygroup1,
categories.category_null,
categories.category1,
categories.category2,
],
CategoryGroup,
),
([categories.tag_1], Tag),
]
not_serializable = [
(transactions.simple_transformed, TransactionCategory),
(transactions.tagged, TransactionTag),
(transactions.noted, Note),
]
class TestBackup:
@pytest.mark.parametrize("input, what", params)
def test_import(self, tmp_path: Path, input: Sequence[Any], what: Type[Any]):
file = tmp_path / "test.json"
client = MockClient()
client.insert(input)
originals = client.select(what)
assert originals
command = ExportCommand(client, what, file, ExportFormat.JSON)
command.execute()
other = MockClient()
command = ImportCommand(other, what, file, ExportFormat.JSON)
command.execute()
imported = other.select(what)
assert originals == imported
command = ExportCommand(client, what, file, ExportFormat.pickle)
with pytest.raises(AttributeError):
command.execute()
command = ImportCommand(other, what, file, ExportFormat.pickle)
with pytest.raises(AttributeError):
command.execute()
@pytest.mark.parametrize("input, what", not_serializable)
def test_try_backup_not_serializable(
self, tmp_path: Path, input: Sequence[Any], what: Type[Any]
):
file = tmp_path / "test.json"
client = MockClient()
client.insert(input)
originals = client.select(what)
assert originals
command = ExportCommand(client, what, file, ExportFormat.JSON)
with pytest.raises(AttributeError):
command.execute()
other = MockClient()
command = ImportCommand(other, what, file, ExportFormat.JSON)
with pytest.raises(ImportFailedError):
command.execute()
imported = other.select(what)
assert not imported
def test_full_backup(self, tmp_path: Path):
file = tmp_path / "test.json"
client = MockClient()
client.insert([e for t in params for e in t[0]])
command = BackupCommand(client, file, ExportFormat.JSON)
command.execute()
other = MockClient()
command = ImportBackupCommand(other, file, ExportFormat.JSON)
command.execute()
def subclasses(cls: Type[Any]) -> set[Type[Any]]:
return set(cls.__subclasses__()) | {
s for c in cls.__subclasses__() for s in subclasses(c)
}
for t in [cls for cls in subclasses(Base)]:
originals = client.select(t)
imported = other.select(t)
assert originals == imported, f"{t}"

View File

@ -1,54 +0,0 @@
import json
from pathlib import Path
import pytest
from mocks.client import MockClient
import mocks.transactions
from pfbudget.common.types import ExportFormat
from pfbudget.core.command import ExportCommand, ImportCommand
from pfbudget.db.client import Client
from pfbudget.db.exceptions import InsertError
from pfbudget.db.model import Transaction
@pytest.fixture
def client() -> Client:
return MockClient()
class TestCommand:
def test_export_json(self, tmp_path: Path, client: Client):
file = tmp_path / "test.json"
client.insert(mocks.transactions.simple)
command = ExportCommand(client, Transaction, file, ExportFormat.JSON)
command.execute()
with open(file, newline="") as f:
result = json.load(f)
assert result == [t.serialize() for t in client.select(Transaction)]
def test_export_pickle(self, tmp_path: Path, client: Client):
file = tmp_path / "test.pickle"
command = ExportCommand(client, Transaction, file, ExportFormat.pickle)
with pytest.raises(AttributeError):
command.execute()
def test_import_json(self, tmp_path: Path, client: Client):
file = tmp_path / "test"
client.insert(mocks.transactions.simple)
command = ExportCommand(client, Transaction, file, ExportFormat.JSON)
command.execute()
# Since the transactions are already in the DB, we expect an insert error
with pytest.raises(InsertError):
command = ImportCommand(client, Transaction, file, ExportFormat.JSON)
command.execute()
def test_import_pickle(self, tmp_path: Path, client: Client):
file = tmp_path / "test"
command = ExportCommand(client, Transaction, file, ExportFormat.pickle)
with pytest.raises(AttributeError):
command.execute()

View File

@ -2,14 +2,14 @@ from datetime import date
from decimal import Decimal
import pytest
from mocks.client import MockClient
from pfbudget.db.client import Client
from pfbudget.db.model import (
AccountType,
Bank,
NordigenBank,
Base,
CategorySelector,
Nordigen,
Selector_T,
Transaction,
TransactionCategory,
)
@ -17,21 +17,20 @@ from pfbudget.db.model import (
@pytest.fixture
def client() -> Client:
return MockClient()
url = "sqlite://"
client = Client(url, execution_options={"schema_translate_map": {"pfbudget": None}})
Base.metadata.create_all(client.engine)
return client
@pytest.fixture
def banks(client: Client) -> list[Bank]:
banks = [
Bank("bank", "BANK", AccountType.checking, NordigenBank(None, "req", None)),
Bank("bank", "BANK", AccountType.checking),
Bank("broker", "BROKER", AccountType.investment),
Bank("creditcard", "CC", AccountType.MASTERCARD),
]
# fix nordigen bank names which would be generated post DB insert
for bank in banks:
if bank.nordigen:
bank.nordigen.name = bank.name
banks[0].nordigen = Nordigen("bank", None, "req", None)
client.insert(banks)
return banks
@ -40,22 +39,19 @@ def banks(client: Client) -> list[Bank]:
@pytest.fixture
def transactions(client: Client) -> list[Transaction]:
transactions = [
Transaction(
date(2023, 1, 1),
"",
Decimal("-10"),
category=TransactionCategory("category", CategorySelector.algorithm),
),
Transaction(date(2023, 1, 1), "", Decimal("-10")),
Transaction(date(2023, 1, 2), "", Decimal("-50")),
]
transactions[0].category = TransactionCategory(
"name", CategorySelector(Selector_T.algorithm)
)
client.insert(transactions)
# fix ids which would be generated post DB insert
for i, transaction in enumerate(transactions):
transaction.id = i + 1
if transaction.category:
transaction.category.id = 1
transaction.split = False # default
transactions[0].category.id = 1
transactions[0].category.selector.id = 1
return transactions
@ -125,13 +121,13 @@ class TestDatabase:
def test_update_nordigen(self, client: Client, banks: list[Bank]):
name = banks[0].name
result = client.select(NordigenBank, lambda: NordigenBank.name == name)
result = client.select(Nordigen, lambda: Nordigen.name == name)
assert result[0].requisition_id == "req"
update = {"name": name, "requisition_id": "anotherreq"}
client.update(NordigenBank, [update])
client.update(Nordigen, [update])
result = client.select(NordigenBank, lambda: NordigenBank.name == name)
result = client.select(Nordigen, lambda: Nordigen.name == name)
assert result[0].requisition_id == "anotherreq"
result = client.select(Bank, lambda: Bank.name == name)

View File

@ -31,8 +31,8 @@ class TestDatabaseLoad:
def test_insert(self, loader: Loader):
transactions = [
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
]
loader.load(transactions)

View File

@ -4,10 +4,9 @@ from typing import Any, Optional
import pytest
import requests
from mocks.client import MockClient
import mocks.nordigen as mock
from pfbudget.db.model import AccountType, Bank, BankTransaction, NordigenBank
from pfbudget.db.model import AccountType, Bank, BankTransaction, Nordigen
from pfbudget.extract.exceptions import BankError, CredentialsError
from pfbudget.extract.extract import Extractor
from pfbudget.extract.nordigen import NordigenClient, NordigenCredentials
@ -59,13 +58,14 @@ def mock_requests(monkeypatch: pytest.MonkeyPatch):
@pytest.fixture
def extractor() -> Extractor:
credentials = NordigenCredentials("ID", "KEY")
return PSD2Extractor(NordigenClient(credentials, MockClient()))
credentials = NordigenCredentials("ID", "KEY", "TOKEN")
return PSD2Extractor(NordigenClient(credentials))
@pytest.fixture
def bank() -> Bank:
bank = Bank("Bank#1", "", AccountType.checking, NordigenBank("", mock.id, False))
bank = Bank("Bank#1", "", AccountType.checking)
bank.nordigen = Nordigen("", "", mock.id, False)
return bank
@ -73,7 +73,7 @@ class TestExtractPSD2:
def test_empty_credentials(self):
cred = NordigenCredentials("", "")
with pytest.raises(CredentialsError):
NordigenClient(cred, MockClient())
NordigenClient(cred)
def test_no_psd2_bank(self, extractor: Extractor):
with pytest.raises(BankError):
@ -88,17 +88,12 @@ class TestExtractPSD2:
with pytest.raises(requests.Timeout):
extractor.extract(bank)
def test_extract(
self, monkeypatch: pytest.MonkeyPatch, extractor: Extractor, bank: Bank
):
monkeypatch.setattr(
"pfbudget.extract.nordigen.NordigenClient.dump", lambda *args: None
)
def test_extract(self, extractor: Extractor, bank: Bank):
assert extractor.extract(bank) == [
BankTransaction(
dt.date(2023, 1, 14), "string", Decimal("328.18"), bank="Bank#1"
dt.date(2023, 1, 14), "string", Decimal("328.18"), "Bank#1"
),
BankTransaction(
dt.date(2023, 2, 14), "string", Decimal("947.26"), bank="Bank#1"
dt.date(2023, 2, 14), "string", Decimal("947.26"), "Bank#1"
),
]

View File

@ -5,9 +5,9 @@ import mocks.categories as mock
from pfbudget.db.model import (
BankTransaction,
Category,
CategoryRule,
CategorySelector,
Selector_T,
TransactionCategory,
TransactionTag,
)
@ -20,8 +20,8 @@ from pfbudget.transform.transform import Transformer
class TestTransform:
def test_nullifier(self):
transactions = [
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
]
for t in transactions:
@ -31,12 +31,14 @@ class TestTransform:
transactions = categorizer.transform(transactions)
for t in transactions:
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
assert t.category == TransactionCategory(
"null", CategorySelector(Selector_T.nullifier)
)
def test_nullifier_inplace(self):
transactions = [
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
]
for t in transactions:
@ -46,20 +48,20 @@ class TestTransform:
categorizer.transform_inplace(transactions)
for t in transactions:
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
assert t.category == TransactionCategory(
"null", CategorySelector(Selector_T.nullifier)
)
def test_nullifier_with_rules(self):
transactions = [
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), bank="Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), bank="Bank#2"),
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
]
for t in transactions:
assert not t.category
rule = CategoryRule(bank="Bank#1")
rule.name = "null"
rules = [rule]
rules = [CategoryRule(None, None, None, None, "Bank#1", None, None, "null")]
categorizer: Transformer = Nullifier(rules)
transactions = categorizer.transform(transactions)
@ -67,28 +69,24 @@ class TestTransform:
for t in transactions:
assert not t.category
rule = CategoryRule(bank="Bank#2")
rule.name = "null"
rules.append(rule)
rules.append(CategoryRule(None, None, None, None, "Bank#2", None, None, "null"))
categorizer = Nullifier(rules)
transactions = categorizer.transform(transactions)
for t in transactions:
assert t.category == TransactionCategory("null", CategorySelector.nullifier)
assert t.category == TransactionCategory(
"null", CategorySelector(Selector_T.nullifier)
)
def test_tagger(self):
transactions = [
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), bank="Bank#1")
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), "Bank#1")
]
for t in transactions:
assert not t.category
rules = mock.tag_1.rules
for rule in rules:
rule.tag = mock.tag_1.name
categorizer: Transformer = Tagger(rules)
categorizer: Transformer = Tagger(mock.tag_1.rules)
transactions = categorizer.transform(transactions)
for t in transactions:
@ -96,32 +94,16 @@ class TestTransform:
def test_categorize(self):
transactions = [
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), bank="Bank#1")
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), "Bank#1")
]
for t in transactions:
assert not t.category
rules = mock.category1.rules
for rule in rules:
rule.name = mock.category1.name
categorizer: Transformer = Categorizer(rules)
categorizer: Transformer = Categorizer(mock.category1.rules)
transactions = categorizer.transform(transactions)
for t in transactions:
assert t.category == TransactionCategory("cat#1", CategorySelector.rules)
def test_rule_limits(self):
transactions = [
BankTransaction(date.today(), "", Decimal("-60"), bank="Bank#1"),
BankTransaction(date.today(), "", Decimal("-120"), bank="Bank#1"),
]
cat = Category("cat")
cat.rules = [CategoryRule(min=-120, max=-60)]
for r in cat.rules:
r.name = cat.name
transactions = Categorizer(cat.rules).transform(transactions)
assert all(t.category.name == cat.name for t in transactions)
assert t.category == TransactionCategory(
"cat#1", CategorySelector(Selector_T.rules)
)