Compare commits
No commits in common. "00bbceeba50de86c7715178e8ef3061d28d0fd51" and "a3d2d8215e0d2fd17c1976833a7b51e18a8f1c1a" have entirely different histories.
00bbceeba5
...
a3d2d8215e
154
pfbudget/core/categorizer.py
Normal file
154
pfbudget/core/categorizer.py
Normal file
@ -0,0 +1,154 @@
|
|||||||
|
from codetiming import Timer
|
||||||
|
from datetime import timedelta
|
||||||
|
from typing import Sequence
|
||||||
|
|
||||||
|
import pfbudget.db.model as t
|
||||||
|
|
||||||
|
|
||||||
|
class Categorizer:
|
||||||
|
options = {}
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.options["null_days"] = 3
|
||||||
|
|
||||||
|
def rules(
|
||||||
|
self,
|
||||||
|
transactions: Sequence[t.BankTransaction],
|
||||||
|
categories: Sequence[t.Category],
|
||||||
|
tags: Sequence[t.Tag],
|
||||||
|
nullify: bool = True
|
||||||
|
):
|
||||||
|
"""Overarching categorization tool
|
||||||
|
|
||||||
|
Receives a list of transactions (by ref) and updates their category according
|
||||||
|
to the rules defined for each category
|
||||||
|
|
||||||
|
Args:
|
||||||
|
transactions (Sequence[BankTransaction]): uncategorized transactions
|
||||||
|
categories (Sequence[Category]): available categories
|
||||||
|
tags (Sequence[Tag]): currently available tags
|
||||||
|
"""
|
||||||
|
|
||||||
|
if nullify:
|
||||||
|
try:
|
||||||
|
null = next(cat for cat in categories if cat.name == "null")
|
||||||
|
print("Nullifying")
|
||||||
|
self._nullify(transactions, null)
|
||||||
|
|
||||||
|
except StopIteration:
|
||||||
|
print("Null category not defined")
|
||||||
|
|
||||||
|
categories = [cat for cat in categories if cat.name != "null"]
|
||||||
|
|
||||||
|
self._rule_based_categories(transactions, categories)
|
||||||
|
self._rule_based_tags(transactions, tags)
|
||||||
|
|
||||||
|
@Timer(name="nullify")
|
||||||
|
def _nullify(self, transactions: Sequence[t.BankTransaction], null: t.Category):
|
||||||
|
count = 0
|
||||||
|
matching = []
|
||||||
|
for transaction in transactions:
|
||||||
|
for cancel in (
|
||||||
|
cancel
|
||||||
|
for cancel in transactions
|
||||||
|
if (
|
||||||
|
transaction.date - timedelta(days=self.options["null_days"])
|
||||||
|
<= cancel.date
|
||||||
|
<= transaction.date + timedelta(days=self.options["null_days"])
|
||||||
|
and cancel != transaction
|
||||||
|
and cancel.bank != transaction.bank
|
||||||
|
and cancel.amount == -transaction.amount
|
||||||
|
and transaction not in matching
|
||||||
|
and cancel not in matching
|
||||||
|
and all(r.matches(transaction) for r in null.rules)
|
||||||
|
and all(r.matches(cancel) for r in null.rules)
|
||||||
|
)
|
||||||
|
):
|
||||||
|
transaction.category = t.TransactionCategory(
|
||||||
|
name="null",
|
||||||
|
selector=t.CategorySelector(t.Selector_T.nullifier),
|
||||||
|
)
|
||||||
|
cancel.category = t.TransactionCategory(
|
||||||
|
name="null",
|
||||||
|
selector=t.CategorySelector(t.Selector_T.nullifier),
|
||||||
|
)
|
||||||
|
matching.extend([transaction, cancel])
|
||||||
|
count += 2
|
||||||
|
break
|
||||||
|
|
||||||
|
print(f"Nullified {count} of {len(transactions)} transactions")
|
||||||
|
|
||||||
|
@Timer(name="categoryrules")
|
||||||
|
def _rule_based_categories(
|
||||||
|
self,
|
||||||
|
transactions: Sequence[t.BankTransaction],
|
||||||
|
categories: Sequence[t.Category],
|
||||||
|
):
|
||||||
|
print(f"Categorizing {len(transactions)} transactions")
|
||||||
|
d = {}
|
||||||
|
for category in [c for c in categories if c.rules]:
|
||||||
|
for rule in category.rules:
|
||||||
|
# for transaction in [t for t in transactions if not t.category]:
|
||||||
|
for transaction in [
|
||||||
|
t
|
||||||
|
for t in transactions
|
||||||
|
if not t.category or t.category.name != "null"
|
||||||
|
]:
|
||||||
|
if not rule.matches(transaction):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# passed all conditions, assign category
|
||||||
|
if transaction.category:
|
||||||
|
if transaction.category.name == category.name:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if (
|
||||||
|
input(
|
||||||
|
f"Overwrite {transaction} with {category.name}? (y/n)"
|
||||||
|
)
|
||||||
|
== "y"
|
||||||
|
):
|
||||||
|
transaction.category.name = category.name
|
||||||
|
transaction.category.selector.selector = t.Selector_T.rules
|
||||||
|
else:
|
||||||
|
transaction.category = t.TransactionCategory(
|
||||||
|
category.name, t.CategorySelector(t.Selector_T.rules)
|
||||||
|
)
|
||||||
|
|
||||||
|
if rule in d:
|
||||||
|
d[rule] += 1
|
||||||
|
else:
|
||||||
|
d[rule] = 1
|
||||||
|
|
||||||
|
for k, v in d.items():
|
||||||
|
print(f"{v}: {k}")
|
||||||
|
|
||||||
|
@Timer(name="tagrules")
|
||||||
|
def _rule_based_tags(
|
||||||
|
self, transactions: Sequence[t.BankTransaction], tags: Sequence[t.Tag]
|
||||||
|
):
|
||||||
|
print(f"Tagging {len(transactions)} transactions")
|
||||||
|
d = {}
|
||||||
|
for tag in [t for t in tags if len(t.rules) > 0]:
|
||||||
|
for rule in tag.rules:
|
||||||
|
# for transaction in [t for t in transactions if not t.category]:
|
||||||
|
for transaction in [
|
||||||
|
t
|
||||||
|
for t in transactions
|
||||||
|
if tag.name not in [tag.tag for tag in t.tags]
|
||||||
|
]:
|
||||||
|
if not rule.matches(transaction):
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not transaction.tags:
|
||||||
|
transaction.tags = {t.TransactionTag(tag.name)}
|
||||||
|
else:
|
||||||
|
transaction.tags.add(t.TransactionTag(tag.name))
|
||||||
|
|
||||||
|
if rule in d:
|
||||||
|
d[rule] += 1
|
||||||
|
else:
|
||||||
|
d[rule] = 1
|
||||||
|
|
||||||
|
for k, v in d.items():
|
||||||
|
print(f"{v}: {k}")
|
||||||
@ -1,9 +1,12 @@
|
|||||||
import csv
|
import csv
|
||||||
|
import dotenv
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
import pickle
|
import pickle
|
||||||
|
import os
|
||||||
import webbrowser
|
import webbrowser
|
||||||
|
|
||||||
from pfbudget.common.types import Operation
|
from pfbudget.common.types import Operation
|
||||||
|
from pfbudget.core.categorizer import Categorizer
|
||||||
from pfbudget.db.client import DbClient
|
from pfbudget.db.client import DbClient
|
||||||
from pfbudget.db.model import (
|
from pfbudget.db.model import (
|
||||||
Bank,
|
Bank,
|
||||||
@ -24,12 +27,12 @@ from pfbudget.db.model import (
|
|||||||
Transaction,
|
Transaction,
|
||||||
TransactionCategory,
|
TransactionCategory,
|
||||||
)
|
)
|
||||||
from pfbudget.extract.nordigen import NordigenClient, NordigenCredentialsManager
|
from pfbudget.extract.credentials import Credentials
|
||||||
|
from pfbudget.extract.extract import Extract
|
||||||
|
from pfbudget.extract.psd2 import PSD2Client
|
||||||
from pfbudget.extract.parsers import parse_data
|
from pfbudget.extract.parsers import parse_data
|
||||||
from pfbudget.extract.psd2 import PSD2Extractor
|
|
||||||
from pfbudget.transform.categorizer import Categorizer
|
dotenv.load_dotenv()
|
||||||
from pfbudget.transform.nullifier import Nullifier
|
|
||||||
from pfbudget.transform.tagger import Tagger
|
|
||||||
|
|
||||||
|
|
||||||
class Manager:
|
class Manager:
|
||||||
@ -82,18 +85,16 @@ class Manager:
|
|||||||
else:
|
else:
|
||||||
banks = session.get(Bank, Bank.name, params[3])
|
banks = session.get(Bank, Bank.name, params[3])
|
||||||
session.expunge_all()
|
session.expunge_all()
|
||||||
|
client.start = params[0]
|
||||||
extractor = PSD2Extractor(client)
|
client.end = params[1]
|
||||||
transactions = []
|
transactions = client.extract(banks)
|
||||||
for bank in banks:
|
|
||||||
transactions.extend(extractor.extract(bank, params[0], params[1]))
|
|
||||||
|
|
||||||
# dry-run
|
# dry-run
|
||||||
if not params[2]:
|
if not params[2]:
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
session.add(sorted(transactions))
|
session.add(sorted(transactions))
|
||||||
else:
|
else:
|
||||||
print(sorted(transactions))
|
print(transactions)
|
||||||
|
|
||||||
case Operation.Categorize:
|
case Operation.Categorize:
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
@ -102,15 +103,7 @@ class Manager:
|
|||||||
)
|
)
|
||||||
categories = session.get(Category)
|
categories = session.get(Category)
|
||||||
tags = session.get(Tag)
|
tags = session.get(Tag)
|
||||||
|
Categorizer().rules(uncategorized, categories, tags, params[0])
|
||||||
rules = [cat.rules for cat in categories if cat.name == "null"]
|
|
||||||
Nullifier(rules).transform_inplace(uncategorized)
|
|
||||||
|
|
||||||
rules = [rule for cat in categories for rule in cat.rules]
|
|
||||||
Categorizer(rules).transform_inplace(uncategorized)
|
|
||||||
|
|
||||||
rules = [rule for tag in tags for rule in tag.rules]
|
|
||||||
Tagger(rules).transform_inplace(uncategorized)
|
|
||||||
|
|
||||||
case Operation.BankMod:
|
case Operation.BankMod:
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
@ -426,5 +419,10 @@ class Manager:
|
|||||||
self._db = url
|
self._db = url
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def nordigen_client() -> NordigenClient:
|
def nordigen_client() -> Extract:
|
||||||
return NordigenClient(NordigenCredentialsManager.default)
|
credentials = Credentials(
|
||||||
|
os.environ.get("SECRET_ID"),
|
||||||
|
os.environ.get("SECRET_KEY"),
|
||||||
|
os.environ.get("TOKEN"),
|
||||||
|
)
|
||||||
|
return PSD2Client(credentials)
|
||||||
|
|||||||
11
pfbudget/extract/credentials.py
Normal file
11
pfbudget/extract/credentials.py
Normal file
@ -0,0 +1,11 @@
|
|||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Credentials:
|
||||||
|
id: str
|
||||||
|
key: str
|
||||||
|
token: str = ""
|
||||||
|
|
||||||
|
def valid(self) -> bool:
|
||||||
|
return self.id and self.key
|
||||||
@ -6,13 +6,5 @@ class BankError(ExtractError):
|
|||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
class PSD2ClientError(ExtractError):
|
class CredentialsError(ExtractError):
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class CredentialsError(PSD2ClientError):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class DownloadError(PSD2ClientError):
|
|
||||||
pass
|
pass
|
||||||
|
|||||||
@ -1,114 +0,0 @@
|
|||||||
from dataclasses import dataclass
|
|
||||||
import dotenv
|
|
||||||
import json
|
|
||||||
import nordigen
|
|
||||||
import os
|
|
||||||
import requests
|
|
||||||
import time
|
|
||||||
import uuid
|
|
||||||
|
|
||||||
from .exceptions import CredentialsError, DownloadError
|
|
||||||
|
|
||||||
dotenv.load_dotenv()
|
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
|
||||||
class NordigenCredentials:
|
|
||||||
id: str
|
|
||||||
key: str
|
|
||||||
token: str = ""
|
|
||||||
|
|
||||||
def valid(self) -> bool:
|
|
||||||
return self.id and self.key
|
|
||||||
|
|
||||||
|
|
||||||
class NordigenClient:
|
|
||||||
redirect_url = "https://murta.dev"
|
|
||||||
|
|
||||||
def __init__(self, credentials: NordigenCredentials):
|
|
||||||
super().__init__()
|
|
||||||
|
|
||||||
if not credentials.valid():
|
|
||||||
raise CredentialsError
|
|
||||||
|
|
||||||
self._client = nordigen.NordigenClient(
|
|
||||||
secret_key=credentials.key, secret_id=credentials.id, timeout=5
|
|
||||||
)
|
|
||||||
|
|
||||||
if credentials.token:
|
|
||||||
self._client.token = credentials.token
|
|
||||||
|
|
||||||
def download(self, requisition_id):
|
|
||||||
try:
|
|
||||||
requisition = self._client.requisition.get_requisition_by_id(requisition_id)
|
|
||||||
print(requisition)
|
|
||||||
except requests.HTTPError as e:
|
|
||||||
raise DownloadError(e)
|
|
||||||
|
|
||||||
transactions = {}
|
|
||||||
for acc in requisition["accounts"]:
|
|
||||||
account = self._client.account_api(acc)
|
|
||||||
|
|
||||||
retries = 0
|
|
||||||
while retries < 3:
|
|
||||||
try:
|
|
||||||
downloaded = account.get_transactions()
|
|
||||||
break
|
|
||||||
except requests.ReadTimeout:
|
|
||||||
retries += 1
|
|
||||||
print(f"Request #{retries} timed-out, retrying in 1s")
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
if not downloaded:
|
|
||||||
print(f"Couldn't download transactions for {account}")
|
|
||||||
continue
|
|
||||||
|
|
||||||
transactions.update(downloaded)
|
|
||||||
|
|
||||||
return transactions
|
|
||||||
|
|
||||||
def dump(self, bank, downloaded):
|
|
||||||
with open("json/" + bank.name + ".json", "w") as f:
|
|
||||||
json.dump(downloaded, f)
|
|
||||||
|
|
||||||
def generate_token(self):
|
|
||||||
self.token = self._client.generate_token()
|
|
||||||
print(f"New access token: {self.token}")
|
|
||||||
return self.token
|
|
||||||
|
|
||||||
def requisition(self, id: str, country: str = "PT"):
|
|
||||||
requisition = self._client.initialize_session(
|
|
||||||
redirect_uri=self.redirect_url,
|
|
||||||
institution_id=id,
|
|
||||||
reference_id=str(uuid.uuid4()),
|
|
||||||
)
|
|
||||||
return requisition.link, requisition.requisition_id
|
|
||||||
|
|
||||||
def country_banks(self, country: str):
|
|
||||||
return self._client.institution.get_institutions(country)
|
|
||||||
|
|
||||||
# def __token(self):
|
|
||||||
# if token := os.environ.get("TOKEN"):
|
|
||||||
# return token
|
|
||||||
# else:
|
|
||||||
# token = self._client.generate_token()
|
|
||||||
# print(f"New access token: {token}")
|
|
||||||
# return token["access"]
|
|
||||||
|
|
||||||
@property
|
|
||||||
def token(self):
|
|
||||||
return self._token
|
|
||||||
|
|
||||||
@token.setter
|
|
||||||
def token(self, value):
|
|
||||||
if self._token:
|
|
||||||
print("Replacing existing token with {value}")
|
|
||||||
self._token = value
|
|
||||||
|
|
||||||
|
|
||||||
class NordigenCredentialsManager:
|
|
||||||
default = NordigenCredentials(
|
|
||||||
os.environ.get("SECRET_ID"),
|
|
||||||
os.environ.get("SECRET_KEY"),
|
|
||||||
os.environ.get("TOKEN"),
|
|
||||||
)
|
|
||||||
@ -1,39 +1,142 @@
|
|||||||
import datetime as dt
|
import datetime as dt
|
||||||
|
import json
|
||||||
|
import nordigen
|
||||||
|
import requests
|
||||||
|
import time
|
||||||
|
import uuid
|
||||||
|
|
||||||
from typing import Sequence
|
from typing import Sequence
|
||||||
|
|
||||||
from pfbudget.db.model import Bank, BankTransaction
|
import pfbudget.db.model as t
|
||||||
from pfbudget.utils.converters import convert
|
from pfbudget.utils.converters import convert
|
||||||
|
|
||||||
from .exceptions import BankError, DownloadError, ExtractError
|
from .credentials import Credentials
|
||||||
|
from .exceptions import BankError, CredentialsError, ExtractError
|
||||||
from .extract import Extract
|
from .extract import Extract
|
||||||
from .nordigen import NordigenClient
|
|
||||||
|
|
||||||
|
|
||||||
class PSD2Extractor(Extract):
|
class PSD2Client(Extract):
|
||||||
def __init__(self, client: NordigenClient):
|
redirect_url = "https://murta.dev"
|
||||||
self.__client = client
|
|
||||||
|
|
||||||
def extract(
|
def __init__(self, credentials: Credentials):
|
||||||
self, bank: Bank, start=dt.date.min, end=dt.date.max
|
super().__init__()
|
||||||
) -> Sequence[BankTransaction]:
|
|
||||||
if not bank.nordigen:
|
|
||||||
raise BankError("Bank doesn't have Nordigen info")
|
|
||||||
|
|
||||||
try:
|
if not credentials.valid():
|
||||||
print(f"Downloading from {bank}...")
|
raise CredentialsError
|
||||||
downloaded = self.__client.download(bank.nordigen.requisition_id)
|
|
||||||
except DownloadError as e:
|
|
||||||
print(f"There was an issue downloading from {bank.name}\n{e}")
|
|
||||||
raise ExtractError(e)
|
|
||||||
|
|
||||||
self.__client.dump(bank, downloaded)
|
self._client = nordigen.NordigenClient(
|
||||||
|
secret_key=credentials.key, secret_id=credentials.id, timeout=5
|
||||||
|
)
|
||||||
|
|
||||||
return [
|
if credentials.token:
|
||||||
t
|
self._client.token = credentials.token
|
||||||
for t in self.convert(bank, downloaded, start, end)
|
|
||||||
if start <= t.date <= end
|
|
||||||
]
|
|
||||||
|
|
||||||
def convert(self, bank, downloaded, start, end):
|
self._start = dt.date.min
|
||||||
return [convert(t, bank) for t in downloaded["transactions"]["booked"]]
|
self._end = dt.date.max
|
||||||
|
|
||||||
|
def extract(self, banks: Sequence[t.Bank]) -> list[t.BankTransaction]:
|
||||||
|
transactions = []
|
||||||
|
if not banks or any(not b.nordigen for b in banks):
|
||||||
|
raise BankError
|
||||||
|
|
||||||
|
for bank in banks:
|
||||||
|
downloaded = None
|
||||||
|
try:
|
||||||
|
print(f"Downloading from {bank}...")
|
||||||
|
downloaded = self.download(bank.nordigen.requisition_id)
|
||||||
|
except requests.HTTPError as e:
|
||||||
|
print(f"There was an issue downloading from {bank.name} -> {e}")
|
||||||
|
raise ExtractError(e)
|
||||||
|
|
||||||
|
if downloaded:
|
||||||
|
self.dump(bank, downloaded)
|
||||||
|
|
||||||
|
converted = [
|
||||||
|
convert(t, bank) for t in downloaded["transactions"]["booked"]
|
||||||
|
]
|
||||||
|
|
||||||
|
transactions.extend(
|
||||||
|
[t for t in converted if self._start <= t.date <= self._end]
|
||||||
|
)
|
||||||
|
|
||||||
|
return sorted(transactions)
|
||||||
|
|
||||||
|
def download(self, requisition_id):
|
||||||
|
requisition = self._client.requisition.get_requisition_by_id(requisition_id)
|
||||||
|
print(requisition)
|
||||||
|
|
||||||
|
transactions = {}
|
||||||
|
for acc in requisition["accounts"]:
|
||||||
|
account = self._client.account_api(acc)
|
||||||
|
|
||||||
|
retries = 0
|
||||||
|
while retries < 3:
|
||||||
|
try:
|
||||||
|
downloaded = account.get_transactions()
|
||||||
|
break
|
||||||
|
except requests.ReadTimeout:
|
||||||
|
retries += 1
|
||||||
|
print(f"Request #{retries} timed-out, retrying in 1s")
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
if not downloaded:
|
||||||
|
print(f"Couldn't download transactions for {account}")
|
||||||
|
continue
|
||||||
|
|
||||||
|
transactions.update(downloaded)
|
||||||
|
|
||||||
|
return transactions
|
||||||
|
|
||||||
|
def dump(self, bank, downloaded):
|
||||||
|
with open("json/" + bank.name + ".json", "w") as f:
|
||||||
|
json.dump(downloaded, f)
|
||||||
|
|
||||||
|
def generate_token(self):
|
||||||
|
self.token = self._client.generate_token()
|
||||||
|
print(f"New access token: {self.token}")
|
||||||
|
return self.token
|
||||||
|
|
||||||
|
def requisition(self, id: str, country: str = "PT"):
|
||||||
|
requisition = self._client.initialize_session(
|
||||||
|
redirect_uri=self.redirect_url,
|
||||||
|
institution_id=id,
|
||||||
|
reference_id=str(uuid.uuid4()),
|
||||||
|
)
|
||||||
|
return requisition.link, requisition.requisition_id
|
||||||
|
|
||||||
|
def country_banks(self, country: str):
|
||||||
|
return self._client.institution.get_institutions(country)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def start(self):
|
||||||
|
return self._start
|
||||||
|
|
||||||
|
@start.setter
|
||||||
|
def start(self, value):
|
||||||
|
self._start = value
|
||||||
|
|
||||||
|
@property
|
||||||
|
def end(self):
|
||||||
|
return self._end
|
||||||
|
|
||||||
|
@end.setter
|
||||||
|
def end(self, value):
|
||||||
|
self._end = value
|
||||||
|
|
||||||
|
# def __token(self):
|
||||||
|
# if token := os.environ.get("TOKEN"):
|
||||||
|
# return token
|
||||||
|
# else:
|
||||||
|
# token = self._client.generate_token()
|
||||||
|
# print(f"New access token: {token}")
|
||||||
|
# return token["access"]
|
||||||
|
|
||||||
|
@property
|
||||||
|
def token(self):
|
||||||
|
return self._token
|
||||||
|
|
||||||
|
@token.setter
|
||||||
|
def token(self, value):
|
||||||
|
if self._token:
|
||||||
|
print("Replacing existing token with {value}")
|
||||||
|
self._token = value
|
||||||
|
|||||||
@ -1,36 +0,0 @@
|
|||||||
from copy import deepcopy
|
|
||||||
from typing import Sequence
|
|
||||||
|
|
||||||
from pfbudget.db.model import (
|
|
||||||
CategoryRule,
|
|
||||||
CategorySelector,
|
|
||||||
Selector_T,
|
|
||||||
Transaction,
|
|
||||||
TransactionCategory,
|
|
||||||
)
|
|
||||||
from .exceptions import TransactionCategorizedError
|
|
||||||
from .transform import Transformer
|
|
||||||
|
|
||||||
|
|
||||||
class Categorizer(Transformer):
|
|
||||||
def __init__(self, rules: Sequence[CategoryRule]):
|
|
||||||
self.rules = rules
|
|
||||||
|
|
||||||
def transform(self, transactions: Sequence[Transaction]) -> Sequence[Transaction]:
|
|
||||||
result = deepcopy(transactions)
|
|
||||||
self.transform_inplace(result)
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
def transform_inplace(self, transactions: Sequence[Transaction]) -> None:
|
|
||||||
for rule in self.rules:
|
|
||||||
for transaction in transactions:
|
|
||||||
if transaction.category:
|
|
||||||
raise TransactionCategorizedError(transaction)
|
|
||||||
|
|
||||||
if not rule.matches(transaction):
|
|
||||||
continue
|
|
||||||
|
|
||||||
transaction.category = TransactionCategory(
|
|
||||||
rule.name, CategorySelector(Selector_T.rules)
|
|
||||||
)
|
|
||||||
@ -1,6 +0,0 @@
|
|||||||
class MoreThanOneMatchError(Exception):
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class TransactionCategorizedError(Exception):
|
|
||||||
pass
|
|
||||||
@ -1,94 +0,0 @@
|
|||||||
from copy import deepcopy
|
|
||||||
import datetime as dt
|
|
||||||
from typing import Sequence
|
|
||||||
|
|
||||||
from .exceptions import MoreThanOneMatchError
|
|
||||||
from .transform import Transformer
|
|
||||||
from pfbudget.db.model import (
|
|
||||||
CategorySelector,
|
|
||||||
Selector_T,
|
|
||||||
Transaction,
|
|
||||||
TransactionCategory,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class Nullifier(Transformer):
|
|
||||||
NULL_DAYS = 4
|
|
||||||
|
|
||||||
def __init__(self, rules=None):
|
|
||||||
self.rules = rules
|
|
||||||
|
|
||||||
def transform(self, transactions: Sequence[Transaction]) -> Sequence[Transaction]:
|
|
||||||
"""transform
|
|
||||||
|
|
||||||
Find transactions that nullify each others, e.g. transfers between banks or
|
|
||||||
between bank and credit cards.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
transactions (Sequence[Transaction]): ordered sequence of transactions
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
MoreThanOneMatchError: if there is more than a match for a single transation
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Sequence[Transaction]: nullified sequence of transactions
|
|
||||||
"""
|
|
||||||
|
|
||||||
result = deepcopy(transactions)
|
|
||||||
|
|
||||||
for i, transaction in enumerate(result[:-1]):
|
|
||||||
if matches := [t for t in result[i + 1 :] if self._cancels(transaction, t)]:
|
|
||||||
if len(matches) > 1:
|
|
||||||
raise MoreThanOneMatchError(f"{transaction} -> {matches}")
|
|
||||||
|
|
||||||
match = matches[0]
|
|
||||||
|
|
||||||
transaction = self._nullify(transaction)
|
|
||||||
match = self._nullify(match)
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
def transform_inplace(self, transactions: Sequence[Transaction]) -> None:
|
|
||||||
"""_summary_
|
|
||||||
|
|
||||||
Find transactions that nullify each others, e.g. transfers between banks or
|
|
||||||
between bank and credit cards.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
transactions (Sequence[Transaction]): ordered sequence of transactions that
|
|
||||||
will be modified inplace
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
MoreThanOneMatchError: if there is more than a match for a single transation
|
|
||||||
"""
|
|
||||||
|
|
||||||
for transaction in transactions:
|
|
||||||
if matches := [t for t in transactions if self._cancels(transaction, t)]:
|
|
||||||
if len(matches) > 1:
|
|
||||||
raise MoreThanOneMatchError(f"{transaction} -> {matches}")
|
|
||||||
|
|
||||||
match = matches[0]
|
|
||||||
|
|
||||||
transaction = self._nullify(transaction)
|
|
||||||
match = self._nullify(match)
|
|
||||||
|
|
||||||
def _cancels(self, transaction: Transaction, cancel: Transaction):
|
|
||||||
return (
|
|
||||||
transaction.date
|
|
||||||
<= cancel.date
|
|
||||||
<= transaction.date + dt.timedelta(days=self.NULL_DAYS)
|
|
||||||
and cancel != transaction
|
|
||||||
and cancel.bank != transaction.bank
|
|
||||||
and cancel.amount == -transaction.amount
|
|
||||||
and (not cancel.category or cancel.category.name != "null")
|
|
||||||
and (
|
|
||||||
any(r.matches(transaction) for r in self.rules) if self.rules else True
|
|
||||||
)
|
|
||||||
and (any(r.matches(cancel) for r in self.rules) if self.rules else True)
|
|
||||||
)
|
|
||||||
|
|
||||||
def _nullify(self, transaction: Transaction) -> Transaction:
|
|
||||||
transaction.category = TransactionCategory(
|
|
||||||
"null", selector=CategorySelector(Selector_T.nullifier)
|
|
||||||
)
|
|
||||||
return transaction
|
|
||||||
@ -1,30 +0,0 @@
|
|||||||
from copy import deepcopy
|
|
||||||
from typing import Sequence
|
|
||||||
|
|
||||||
from pfbudget.db.model import TagRule, Transaction, TransactionTag
|
|
||||||
from .transform import Transformer
|
|
||||||
|
|
||||||
|
|
||||||
class Tagger(Transformer):
|
|
||||||
def __init__(self, rules: Sequence[TagRule]):
|
|
||||||
self.rules = rules
|
|
||||||
|
|
||||||
def transform(self, transactions: Sequence[Transaction]) -> Sequence[Transaction]:
|
|
||||||
result = deepcopy(transactions)
|
|
||||||
self.transform_inplace(result)
|
|
||||||
|
|
||||||
return result
|
|
||||||
|
|
||||||
def transform_inplace(self, transactions: Sequence[Transaction]) -> None:
|
|
||||||
for rule in self.rules:
|
|
||||||
for transaction in transactions:
|
|
||||||
if rule.tag in transaction.tags:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not rule.matches(transaction):
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not transaction.tags:
|
|
||||||
transaction.tags = {TransactionTag(rule.tag)}
|
|
||||||
else:
|
|
||||||
transaction.tags.add(TransactionTag(rule.tag))
|
|
||||||
@ -1,14 +0,0 @@
|
|||||||
from abc import ABC, abstractmethod
|
|
||||||
from typing import Sequence
|
|
||||||
|
|
||||||
from pfbudget.db.model import Transaction
|
|
||||||
|
|
||||||
|
|
||||||
class Transformer(ABC):
|
|
||||||
@abstractmethod
|
|
||||||
def transform(self, _: Sequence[Transaction]) -> Sequence[Transaction]:
|
|
||||||
raise NotImplementedError
|
|
||||||
|
|
||||||
@abstractmethod
|
|
||||||
def transform_inplace(self, _: Sequence[Transaction]) -> None:
|
|
||||||
raise NotImplementedError
|
|
||||||
@ -1,15 +0,0 @@
|
|||||||
from decimal import Decimal
|
|
||||||
|
|
||||||
from pfbudget.db.model import Category, CategoryRule, Tag, TagRule
|
|
||||||
|
|
||||||
category_null = Category("null", None, set())
|
|
||||||
|
|
||||||
category1 = Category(
|
|
||||||
"cat#1",
|
|
||||||
None,
|
|
||||||
{CategoryRule(None, None, "desc#1", None, None, None, Decimal(0), "cat#1")},
|
|
||||||
)
|
|
||||||
|
|
||||||
tag_1 = Tag(
|
|
||||||
"tag#1", {TagRule(None, None, "desc#1", None, None, None, Decimal(0), "tag#1")}
|
|
||||||
)
|
|
||||||
@ -6,9 +6,9 @@ import requests
|
|||||||
import mocks.nordigen as mock
|
import mocks.nordigen as mock
|
||||||
|
|
||||||
from pfbudget.db.model import Bank, BankTransaction, Nordigen
|
from pfbudget.db.model import Bank, BankTransaction, Nordigen
|
||||||
|
from pfbudget.extract.credentials import Credentials
|
||||||
from pfbudget.extract.exceptions import BankError, CredentialsError
|
from pfbudget.extract.exceptions import BankError, CredentialsError
|
||||||
from pfbudget.extract.nordigen import NordigenClient, NordigenCredentials
|
from pfbudget.extract.psd2 import PSD2Client
|
||||||
from pfbudget.extract.psd2 import PSD2Extractor
|
|
||||||
|
|
||||||
|
|
||||||
class MockGet:
|
class MockGet:
|
||||||
@ -55,37 +55,41 @@ def mock_requests(monkeypatch):
|
|||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def extractor() -> NordigenClient:
|
def client() -> PSD2Client:
|
||||||
credentials = NordigenCredentials("ID", "KEY", "TOKEN")
|
credentials = Credentials("ID", "KEY", "TOKEN")
|
||||||
return PSD2Extractor(NordigenClient(credentials))
|
return PSD2Client(credentials)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def bank() -> list[Bank]:
|
def banks() -> list[Bank]:
|
||||||
bank = Bank("Bank#1", "", "")
|
bank = Bank("Bank#1", "", "")
|
||||||
bank.nordigen = Nordigen("", "", mock.id, False)
|
bank.nordigen = Nordigen("", "", mock.id, False)
|
||||||
return bank
|
return [bank]
|
||||||
|
|
||||||
|
|
||||||
class TestExtractPSD2:
|
class TestExtractPSD2:
|
||||||
def test_empty_credentials(self):
|
def test_empty_credentials(self):
|
||||||
cred = NordigenCredentials("", "")
|
cred = Credentials("", "")
|
||||||
with pytest.raises(CredentialsError):
|
with pytest.raises(CredentialsError):
|
||||||
NordigenClient(cred)
|
PSD2Client(cred)
|
||||||
|
|
||||||
def test_no_psd2_bank(self, extractor):
|
def test_empty_banks(self, client):
|
||||||
with pytest.raises(BankError):
|
with pytest.raises(BankError):
|
||||||
extractor.extract(Bank("", "", ""))
|
client.extract([])
|
||||||
|
|
||||||
def test_timeout(self, monkeypatch, extractor, bank):
|
def test_no_psd2_bank(self, client):
|
||||||
|
with pytest.raises(BankError):
|
||||||
|
client.extract([Bank("", "", "")])
|
||||||
|
|
||||||
|
def test_timeout(self, monkeypatch, client, banks):
|
||||||
monkeypatch.setattr(
|
monkeypatch.setattr(
|
||||||
"requests.get", MockGet(mock_exception=requests.ReadTimeout)
|
"requests.get", MockGet(mock_exception=requests.ReadTimeout)
|
||||||
)
|
)
|
||||||
with pytest.raises(requests.Timeout):
|
with pytest.raises(requests.Timeout):
|
||||||
extractor.extract(bank)
|
client.extract(banks)
|
||||||
|
|
||||||
def test_extract(self, extractor, bank):
|
def test_extract(self, client, banks):
|
||||||
assert extractor.extract(bank) == [
|
assert client.extract(banks) == [
|
||||||
BankTransaction(
|
BankTransaction(
|
||||||
dt.date(2023, 1, 14), "string", Decimal("328.18"), "Bank#1"
|
dt.date(2023, 1, 14), "string", Decimal("328.18"), "Bank#1"
|
||||||
),
|
),
|
||||||
|
|||||||
@ -1,110 +0,0 @@
|
|||||||
from datetime import date
|
|
||||||
from decimal import Decimal
|
|
||||||
|
|
||||||
import mocks.categories as mock
|
|
||||||
|
|
||||||
from pfbudget.db.model import (
|
|
||||||
Bank,
|
|
||||||
BankTransaction,
|
|
||||||
CategoryRule,
|
|
||||||
CategorySelector,
|
|
||||||
Selector_T,
|
|
||||||
TransactionCategory,
|
|
||||||
TransactionTag,
|
|
||||||
)
|
|
||||||
from pfbudget.transform.categorizer import Categorizer
|
|
||||||
from pfbudget.transform.nullifier import Nullifier
|
|
||||||
from pfbudget.transform.tagger import Tagger
|
|
||||||
from pfbudget.transform.transform import Transformer
|
|
||||||
|
|
||||||
|
|
||||||
class TestTransform:
|
|
||||||
def test_nullifier(self):
|
|
||||||
transactions = [
|
|
||||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
|
|
||||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
|
|
||||||
]
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert not t.category
|
|
||||||
|
|
||||||
categorizer: Transformer = Nullifier()
|
|
||||||
transactions = categorizer.transform(transactions)
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert t.category == TransactionCategory(
|
|
||||||
"null", CategorySelector(Selector_T.nullifier)
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_nullifier_inplace(self):
|
|
||||||
transactions = [
|
|
||||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
|
|
||||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
|
|
||||||
]
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert not t.category
|
|
||||||
|
|
||||||
categorizer: Transformer = Nullifier()
|
|
||||||
categorizer.transform_inplace(transactions)
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert t.category == TransactionCategory(
|
|
||||||
"null", CategorySelector(Selector_T.nullifier)
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_nullifier_with_rules(self):
|
|
||||||
transactions = [
|
|
||||||
BankTransaction(date(2023, 1, 1), "", Decimal("-500"), "Bank#1"),
|
|
||||||
BankTransaction(date(2023, 1, 2), "", Decimal("500"), "Bank#2"),
|
|
||||||
]
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert not t.category
|
|
||||||
|
|
||||||
rules = [CategoryRule(None, None, None, None, "Bank#1", None, None, "null")]
|
|
||||||
|
|
||||||
categorizer: Transformer = Nullifier(rules)
|
|
||||||
transactions = categorizer.transform(transactions)
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert not t.category
|
|
||||||
|
|
||||||
rules.append(CategoryRule(None, None, None, None, "Bank#2", None, None, "null"))
|
|
||||||
categorizer = Nullifier(rules)
|
|
||||||
transactions = categorizer.transform(transactions)
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert t.category == TransactionCategory(
|
|
||||||
"null", CategorySelector(Selector_T.nullifier)
|
|
||||||
)
|
|
||||||
|
|
||||||
def test_tagger(self):
|
|
||||||
transactions = [
|
|
||||||
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), "Bank#1")
|
|
||||||
]
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert not t.category
|
|
||||||
|
|
||||||
categorizer: Transformer = Tagger(mock.tag_1.rules)
|
|
||||||
transactions = categorizer.transform(transactions)
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert TransactionTag("tag#1") in t.tags
|
|
||||||
|
|
||||||
def test_categorize(self):
|
|
||||||
transactions = [
|
|
||||||
BankTransaction(date(2023, 1, 1), "desc#1", Decimal("-10"), "Bank#1")
|
|
||||||
]
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert not t.category
|
|
||||||
|
|
||||||
categorizer: Transformer = Categorizer(mock.category1.rules)
|
|
||||||
transactions: Transformer = categorizer.transform(transactions)
|
|
||||||
|
|
||||||
for t in transactions:
|
|
||||||
assert t.category == TransactionCategory(
|
|
||||||
"cat#1", CategorySelector(Selector_T.rules)
|
|
||||||
)
|
|
||||||
Loading…
x
Reference in New Issue
Block a user