Merge branch 'feature/nordigen'

This commit is contained in:
Luís Murta 2022-11-22 22:12:32 +00:00
commit 5957242b83
Signed by: satprog
GPG Key ID: 169EF1BBD7049F94
24 changed files with 929 additions and 374 deletions

View File

@ -2,6 +2,6 @@ __all__ = ["run", "parse_data", "categorize_data"]
__author__ = "Luís Murta"
__version__ = "0.1"
from .categories import categorize_data
from .parsers import parse_data
from .runnable import run
from pfbudget.core.categories import categorize_data
from pfbudget.cli.runnable import run
from pfbudget.input.parsers import parse_data

4
pfbudget/__main__.py Normal file
View File

@ -0,0 +1,4 @@
from pfbudget.cli.runnable import run
if __name__ == "__main__":
run()

0
pfbudget/cli/__init__.py Normal file
View File

321
pfbudget/cli/runnable.py Normal file
View File

@ -0,0 +1,321 @@
from pathlib import Path
import argparse
import re
from pfbudget.core.categories import categorize_data
from pfbudget.core.manager import Manager
from pfbudget.input.json import JsonParser
from pfbudget.input.nordigen import NordigenInput
from pfbudget.db.client import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils
DEFAULT_DB = "data.db"
class PfBudgetInitialized(Exception):
pass
class PfBudgetNotInitialized(Exception):
pass
class DataFileMissing(Exception):
pass
def argparser(manager: Manager) -> argparse.ArgumentParser:
help = argparse.ArgumentParser(add_help=False)
help.add_argument(
"-db",
"--database",
nargs="?",
help="select current database",
default=DEFAULT_DB,
)
help.add_argument(
"-q", "--quiet", action="store_true", help="reduces the amount of verbose"
)
period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group()
period.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
)
period.add_argument("--start", type=str, nargs=1, help="graph start date")
period.add_argument("--end", type=str, nargs=1, help="graph end date")
period.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser(
description="does cool finance stuff",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--version",
action="version",
version=re.search(
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
open("pfbudget/__init__.py").read(),
).group(1),
)
subparsers = parser.add_subparsers(dest="command", required=True)
"""
Init
"""
p_init = subparsers.add_parser(
"init",
description="Initializes the SQLite3 database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_init.set_defaults(func=lambda args: manager.init())
"""
Exporting
"""
p_export = subparsers.add_parser(
"export",
description="Exports the selected database to a .csv file",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export())
"""
Parsing
"""
p_parse = subparsers.add_parser(
"parse",
description="Parses and adds the requested transactions into the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_parse.add_argument("path", nargs="+", type=str)
p_parse.add_argument("--bank", nargs=1, type=str)
p_parse.add_argument("--creditcard", nargs=1, type=str)
p_parse.add_argument("--category", nargs=1, type=int)
p_parse.set_defaults(func=lambda args: parse(manager, args))
"""
Categorizing
"""
p_categorize = subparsers.add_parser(
"categorize",
description="Categorizes the transactions in the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_categorize.set_defaults(
func=lambda args: categorize_data(DatabaseClient(args.database))
)
"""
Graph
"""
p_graph = subparsers.add_parser(
"graph",
description="Graph of the transactions",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_graph.add_argument(
"option",
type=str,
choices=["monthly", "discrete", "networth"],
nargs="?",
default="monthly",
help="graph option help",
)
p_graph.add_argument("--save", action="store_true")
p_graph.set_defaults(func=graph)
"""
Report
"""
p_report = subparsers.add_parser(
"report",
description="Prints report of transaction groups",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_report.add_argument(
"option",
type=str,
choices=["net", "detailed"],
nargs="?",
default="net",
help="report option help",
)
p_report.set_defaults(func=report)
"""
Register bank
"""
p_register = subparsers.add_parser(
"register",
description="Register a bank",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.add_argument(
"--requisition", type=str, nargs=1, help="requisition option help"
)
p_register.add_argument("--invert", action="store_true")
p_register.set_defaults(func=lambda args: manager.register(vars(args)))
"""
Unregister bank
"""
p_register = subparsers.add_parser(
"unregister",
description="Unregister a bank",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.set_defaults(func=lambda args: manager.unregister(vars(args)))
"""
Nordigen API
"""
p_nordigen_access = subparsers.add_parser(
"token",
description="Get new access token",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_access.set_defaults(func=lambda args: NordigenInput(manager).token())
"""
(Re)new bank requisition ID
"""
p_nordigen_access = subparsers.add_parser(
"renew",
description="(Re)new the Bank requisition ID",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_access.add_argument("name", nargs=1, type=str)
p_nordigen_access.add_argument("country", nargs=1, type=str)
p_nordigen_access.set_defaults(
func=lambda args: NordigenInput(manager).requisition(
args.name[0], args.country[0]
)
)
"""
Downloading through Nordigen API
"""
p_nordigen_download = subparsers.add_parser(
"download",
description="Downloads transactions using Nordigen API",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_download.add_argument("--id", nargs="+", type=str)
p_nordigen_download.add_argument("--name", nargs="+", type=str)
p_nordigen_download.add_argument("--all", action="store_true")
p_nordigen_download.set_defaults(func=lambda args: download(manager, args))
"""
List available banks on Nordigen API
"""
p_nordigen_list = subparsers.add_parser(
"list",
description="Lists banks in {country}",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_list.add_argument("country", nargs=1, type=str)
p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args))
"""
Nordigen JSONs
"""
p_nordigen_json = subparsers.add_parser(
"json",
description="",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_json.add_argument("json", nargs=1, type=str)
p_nordigen_json.add_argument("bank", nargs=1, type=str)
p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction)
p_nordigen_json.set_defaults(
func=lambda args: manager.parser(JsonParser(vars(args)))
)
return parser
def parse(manager: Manager, args):
"""Parses the contents of the path in args to the selected database.
Args:
args (dict): argparse variables
"""
for path in args.path:
if (dir := Path(path)).is_dir():
for file in dir.iterdir():
manager.parse(file, vars(args))
elif Path(path).is_file():
manager.parse(path, vars(args))
else:
raise FileNotFoundError
def graph(args):
"""Plots the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "monthly":
pfbudget.reporting.graph.monthly(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "discrete":
pfbudget.reporting.graph.discrete(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "networth":
pfbudget.reporting.graph.networth(
DatabaseClient(args.database), vars(args), start, end
)
def report(args):
"""Prints a detailed report of the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "net":
pfbudget.reporting.report.net(DatabaseClient(args.database), start, end)
elif args.option == "detailed":
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
def nordigen_banks(manager: Manager, args):
input = NordigenInput(manager)
input.list(vars(args)["country"][0])
def download(manager: Manager, args):
start, end = pfbudget.utils.parse_args_period(args)
manager.parser(NordigenInput(manager, vars(args), start, end))
def run():
manager = Manager(DEFAULT_DB)
args = argparser(manager).parse_args()
args.func(args)

View File

View File

@ -1,7 +1,7 @@
from dataclasses import dataclass
from datetime import date
from decimal import Decimal, InvalidOperation
COMMENT_TOKEN = "#"
from enum import Enum, auto
class TransactionError(Exception):
@ -27,7 +27,7 @@ class Transaction:
self.description = " ".join(arg[1].split())
self.bank = arg[2]
if type(arg[3]) is float:
self.value = arg[3]
self.value = Decimal(str(arg[3]))
else:
self.value = Decimal(args[3])
self.category = arg[4]
@ -101,3 +101,29 @@ class Transaction:
return "{} {} {}€ at {}".format(
self.date.strftime("%d/%m/%y"), self.category, self.value, self.bank
)
Transactions = list[Transaction]
class PrimaryKey(Enum):
ID = auto()
NAME = auto()
BIC = auto()
@dataclass
class Bank:
name: str
bic: str
requisition_id: str
invert: bool
offset: int
key: PrimaryKey = PrimaryKey.ID
Banks = list[Bank]
class NoBankSelected(Exception):
pass

View File

View File

@ -8,8 +8,8 @@ import yaml
if TYPE_CHECKING:
from pfbudget.database import DBManager
from pfbudget.transactions import Transaction
from pfbudget.common.types import Transaction
from pfbudget.db.client import DatabaseClient
Options = namedtuple(
@ -53,7 +53,7 @@ groups = {
}
def categorize_data(db: DBManager):
def categorize_data(db: DatabaseClient):
# 1st) Classifying null transactions, i.e. transfers between banks.
# Will not overwrite previous categories
@ -96,7 +96,7 @@ def categorize_data(db: DBManager):
break
def vacations(db: DBManager) -> None:
def vacations(db: DatabaseClient) -> None:
try:
date_fmt = categories["Travel"].date_fmt
for start, end in categories["Travel"].vacations:
@ -134,7 +134,7 @@ def vacations(db: DBManager) -> None:
logging.exception(e)
def nulls(db: DBManager) -> None:
def nulls(db: DatabaseClient) -> None:
null = categories.get("Null", Options())
transactions = db.get_uncategorized_transactions()
if not transactions:

47
pfbudget/core/manager.py Normal file
View File

@ -0,0 +1,47 @@
from pfbudget.input.input import Input
from pfbudget.input.parsers import parse_data
from pfbudget.common.types import Bank, Banks, Transaction, Transactions
from pfbudget.db.client import DatabaseClient
from pfbudget.utils import convert
class Manager:
def __init__(self, db: str):
self.__db = db
def init(self):
client = DatabaseClient(self.__db)
client.init()
def register(self, args: dict):
bank = Bank(args["bank"][0], "", args["requisition"][0], args["invert"])
client = DatabaseClient(self.__db)
client.register_bank(convert(bank))
def unregister(self, args: dict):
client = DatabaseClient(self.__db)
client.unregister_bank(args["bank"][0])
def parser(self, parser: Input):
transactions = parser.parse()
self.add_transactions(transactions)
def parse(self, filename: str, args: dict):
transactions = parse_data(filename, args)
self.add_transactions(transactions)
def transactions() -> list[Transaction]:
pass
def add_transactions(self, transactions: Transactions):
client = DatabaseClient(self.__db)
client.insert_transactions([convert(t) for t in transactions])
def get_bank_by(self, key: str, value: str) -> Bank:
client = DatabaseClient(self.__db)
bank = client.get_bank(key, value)
return convert(bank)
def get_banks(self) -> Banks:
client = DatabaseClient(self.__db)
return [convert(bank) for bank in client.get_banks()]

View File

@ -7,7 +7,8 @@ import logging.config
import pathlib
import sqlite3
from .transactions import Transaction
from pfbudget.common.types import Transaction
import pfbudget.db.schema as Q
if not pathlib.Path("logs").is_dir():
@ -19,94 +20,8 @@ sqlite3.register_adapter(Decimal, lambda d: float(d))
__DB_NAME = "data.db"
CREATE_TRANSACTIONS_TABLE = """
CREATE TABLE IF NOT EXISTS "transactions" (
"date" TEXT NOT NULL,
"description" TEXT,
"bank" TEXT NOT NULL,
"value" REAL NOT NULL,
"category" TEXT,
"original" TEXT,
"additional comments" TEXT
);
"""
CREATE_BACKUPS_TABLE = """
CREATE TABLE IF NOT EXISTS backups (
datetime TEXT NOT NULL,
file TEXT NOT NULL
)
"""
CREATE_BANKS_TABLE = """
CREATE TABLE banks (
name TEXT NOT NULL PRIMARY KEY,
url TEXT
)
"""
ADD_TRANSACTION = """
INSERT INTO transactions (date, description, bank, value, category) values (?,?,?,?,?)
"""
UPDATE_CATEGORY = """
UPDATE transactions
SET category = (?)
WHERE date = (?) AND description = (?) AND bank = (?) AND value = (?)
"""
DUPLICATED_TRANSACTIONS = """
SELECT COUNT(*), date, description, bank, value
FROM transactions
GROUP BY date, description, bank, value
HAVING COUNT(*) > 1
ORDER BY date ASC
"""
SORTED_TRANSACTIONS = """
SELECT *
FROM transactions
ORDER BY date ASC
"""
SELECT_TRANSACTIONS_BETWEEN_DATES = """
SELECT *
FROM transactions
WHERE date BETWEEN (?) AND (?)
ORDER BY date ASC
"""
SELECT_TRANSACTIONS_BY_CATEGORY = """
SELECT *
FROM transactions
WHERE category IS (?)
ORDER BY date ASC
"""
SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY = """
SELECT *
FROM transactions
WHERE date BETWEEN (?) AND (?)
AND category IS (?)
ORDER BY date ASC
"""
SELECT_TRANSACTION_BY_PERIOD = """
SELECT EXTRACT((?) FROM date) AS (?), date, description, bank, value
FROM transactions
ORDER BY date ASC
"""
SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES = """
SELECT *
FROM transactions
WHERE date BETWEEN (?) AND (?)
AND category NOT IN {}
ORDER BY date ASC
"""
class DBManager:
class DatabaseClient:
"""SQLite DB connection manager"""
__EXPORT_DIR = "export"
@ -160,11 +75,14 @@ class DBManager:
logging.info(f"Initializing {self.db} database")
self.__create_tables(
(
("transactions", CREATE_TRANSACTIONS_TABLE),
("backups", CREATE_BACKUPS_TABLE),
("transactions", Q.CREATE_TRANSACTIONS_TABLE),
("backups", Q.CREATE_BACKUPS_TABLE),
("banks", Q.CREATE_BANKS_TABLE),
)
)
"""Transaction table methods"""
def select_all(self) -> list[Transaction] | None:
logger.info(f"Reading all transactions from {self.db}")
transactions = self.__execute("SELECT * FROM transactions")
@ -174,48 +92,47 @@ class DBManager:
def insert_transaction(self, transaction: Transaction):
logger.info(f"Adding {transaction} into {self.db}")
self.__execute(ADD_TRANSACTION, (transaction.to_list(),))
self.__execute(Q.ADD_TRANSACTION, (transaction.to_list(),))
def insert_transactions(self, transactions: list[Transaction]):
def insert_transactions(self, transactions: Q.DbTransactions):
logger.info(f"Adding {len(transactions)} into {self.db}")
transactions = [t.to_list() for t in transactions]
self.__executemany(ADD_TRANSACTION, transactions)
self.__executemany(Q.ADD_TRANSACTION, [t.tuple() for t in transactions])
def update_category(self, transaction: Transaction):
logger.info(f"Update {transaction} category")
self.__execute(UPDATE_CATEGORY, transaction.update_category())
self.__execute(Q.UPDATE_CATEGORY, transaction.update_category())
def update_categories(self, transactions: list[Transaction]):
logger.info(f"Update {len(transactions)} transactions' categories")
self.__executemany(
UPDATE_CATEGORY,
Q.UPDATE_CATEGORY,
[transaction.update_category() for transaction in transactions],
)
def get_duplicated_transactions(self) -> list[Transaction] | None:
logger.info("Get duplicated transactions")
transactions = self.__execute(DUPLICATED_TRANSACTIONS)
transactions = self.__execute(Q.DUPLICATED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_sorted_transactions(self) -> list[Transaction] | None:
logger.info("Get transactions sorted by date")
transactions = self.__execute(SORTED_TRANSACTIONS)
transactions = self.__execute(Q.SORTED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange(self, start: datetime, end: datetime) -> list[Transaction] | None:
logger.info(f"Get transactions from {start} to {end}")
transactions = self.__execute(SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_category(self, value: str) -> list[Transaction] | None:
logger.info(f"Get transactions where category = {value}")
transactions = self.__execute(SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
if transactions:
return [Transaction(t) for t in transactions]
return None
@ -227,7 +144,7 @@ class DBManager:
f"Get transactions from {start} to {end} where category = {category}"
)
transactions = self.__execute(
SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
)
if transactions:
return [Transaction(t) for t in transactions]
@ -235,7 +152,7 @@ class DBManager:
def get_by_period(self, period: str) -> list[Transaction] | None:
logger.info(f"Get transactions by {period}")
transactions = self.__execute(SELECT_TRANSACTION_BY_PERIOD, period)
transactions = self.__execute(Q.SELECT_TRANSACTION_BY_PERIOD, period)
if transactions:
return [Transaction(t) for t in transactions]
return None
@ -252,7 +169,7 @@ class DBManager:
self, start: datetime, end: datetime, *categories: str
) -> list[Transaction] | None:
logger.info(f"Get transactions between {start} and {end} not in {categories}")
query = SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
query = Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
"(" + ", ".join("?" for _ in categories) + ")"
)
transactions = self.__execute(query, (start, end, *categories))
@ -270,3 +187,26 @@ class DBManager:
dir.mkdir()
with open(dir / filename, "w", newline="") as f:
csv.writer(f, delimiter="\t").writerows(transactions)
"""Banks table methods"""
def register_bank(self, bank: Q.DbBank):
logger.info(f"Registering {bank}")
self.__execute(Q.ADD_BANK, bank.tuple())
def unregister_bank(self, bank: str):
logger.info(f"Unregistering {bank}")
self.__execute(Q.DELETE_BANK, (bank,))
def get_bank(self, key: str, value: str) -> Q.DbBank | None:
logger.info(f"Get bank with {key} = {value}")
bank = self.__execute(Q.SELECT_BANK.format(key), (value, ))
if bank:
return Q.DbBank(*bank[0])
def get_banks(self) -> Q.DbBanks:
logger.info("Get all banks")
banks = self.__execute(Q.SELECT_BANKS)
if banks:
return [Q.DbBank(*bank) for bank in banks]
return []

161
pfbudget/db/schema.py Normal file
View File

@ -0,0 +1,161 @@
from dataclasses import dataclass
from decimal import Decimal
CREATE_TRANSACTIONS_TABLE = """
CREATE TABLE IF NOT EXISTS "transactions" (
"date" TEXT NOT NULL,
"description" TEXT,
"bank" TEXT NOT NULL,
"value" REAL NOT NULL,
"category" TEXT,
"original" TEXT,
"additional comments" TEXT
);
"""
@dataclass
class DbTransaction:
date: str
description: str
bank: str
value: Decimal
category: str
original: str
comments: str
def tuple(self) -> tuple:
return (
self.date,
self.description,
self.bank,
self.value,
self.category,
)
DbTransactions = list[DbTransaction]
CREATE_BACKUPS_TABLE = """
CREATE TABLE IF NOT EXISTS backups (
datetime TEXT NOT NULL,
file TEXT NOT NULL
)
"""
CREATE_BANKS_TABLE = """
CREATE TABLE IF NOT EXISTS banks (
name TEXT NOT NULL PRIMARY KEY,
bic TEXT,
nordigen_id TEXT,
nordigen_name TEXT,
requisition_id TEXT,
invert INTEGER,
offset INTEGER
)
"""
@dataclass
class DbBank:
name: str
bic: str
nordigen_id: str
nordigen_name: str
requisition_id: str
invert: bool = False
offset: int = 0
def tuple(self):
return (
self.name,
self.bic,
self.nordigen_id,
self.nordigen_name,
self.requisition_id,
int(self.invert),
self.offset,
)
DbBanks = list[DbBank]
ADD_TRANSACTION = """
INSERT INTO transactions (date, description, bank, value, category) values (?,?,?,?,?)
"""
UPDATE_CATEGORY = """
UPDATE transactions
SET category = (?)
WHERE date = (?) AND description = (?) AND bank = (?) AND value = (?)
"""
DUPLICATED_TRANSACTIONS = """
SELECT COUNT(*), date, description, bank, value
FROM transactions
GROUP BY date, description, bank, value
HAVING COUNT(*) > 1
ORDER BY date ASC
"""
SORTED_TRANSACTIONS = """
SELECT *
FROM transactions
ORDER BY date ASC
"""
SELECT_TRANSACTIONS_BETWEEN_DATES = """
SELECT *
FROM transactions
WHERE date BETWEEN (?) AND (?)
ORDER BY date ASC
"""
SELECT_TRANSACTIONS_BY_CATEGORY = """
SELECT *
FROM transactions
WHERE category IS (?)
ORDER BY date ASC
"""
SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY = """
SELECT *
FROM transactions
WHERE date BETWEEN (?) AND (?)
AND category IS (?)
ORDER BY date ASC
"""
SELECT_TRANSACTION_BY_PERIOD = """
SELECT EXTRACT((?) FROM date) AS (?), date, description, bank, value
FROM transactions
ORDER BY date ASC
"""
SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES = """
SELECT *
FROM transactions
WHERE date BETWEEN (?) AND (?)
AND category NOT IN {}
ORDER BY date ASC
"""
ADD_BANK = """
INSERT INTO banks (name, bic, nordigen_id, nordigen_name, requisition_id, invert) values (?,?,?,?,?,?)
"""
DELETE_BANK = """
DELETE FROM banks
WHERE name = (?)
"""
SELECT_BANK = """
SELECT *
FROM banks
WHERE {} = (?)
"""
SELECT_BANKS = """
SELECT *
FROM banks
"""

View File

21
pfbudget/input/input.py Normal file
View File

@ -0,0 +1,21 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from pfbudget.common.types import Transactions
if TYPE_CHECKING:
from pfbudget.core.manager import Manager
class Input(ABC):
def __init__(self, manager: Manager):
self._manager = manager
@abstractmethod
def parse(self) -> Transactions:
return NotImplemented
@property
def manager(self):
return self._manager

30
pfbudget/input/json.py Normal file
View File

@ -0,0 +1,30 @@
import json
from .input import Input
from pfbudget.common.types import Transactions
from pfbudget.utils import convert, parse_decimal
class JsonParser(Input):
def __init__(self, manager, options):
super().__init__(manager)
self.options = options
def parse(self) -> Transactions:
try:
with open(self.options["json"][0], "r") as f:
return [
convert(
[
t["bookingDate"],
t["remittanceInformationUnstructured"],
self.options["bank"][0],
parse_decimal(t["transactionAmount"]["amount"])
if not self.options["invert"]
else -parse_decimal(t["transactionAmount"]["amount"]),
],
)
for t in json.load(f)["transactions"]["booked"]
]
except KeyError:
print("No json file defined")

127
pfbudget/input/nordigen.py Normal file
View File

@ -0,0 +1,127 @@
from datetime import date
from time import sleep
from requests import HTTPError, ReadTimeout
from dotenv import load_dotenv
from nordigen import NordigenClient
from uuid import uuid4
import json
import os
import webbrowser
from .input import Input
from pfbudget.common.types import NoBankSelected, Transactions
from pfbudget.utils import convert
load_dotenv()
class NordigenInput(Input):
def __init__(self, manager, options: dict = {}, start=date.min, end=date.max):
super().__init__(manager)
self._client = NordigenClient(
secret_key=os.environ.get("SECRET_KEY"),
secret_id=os.environ.get("SECRET_ID"),
)
self.client.token = self.__token()
# print(options)
if "all" in options and options["all"]:
self.__banks = self.manager.get_banks()
elif "id" in options and options["id"]:
self.__banks = [
self.manager.get_bank_by("nordigen_id", b) for b in options["id"]
]
elif "name" in options and options["name"]:
self.__banks = [
self.manager.get_bank_by("name", b) for b in options["name"]
]
else:
self.__banks = None
self.__from = start
self.__to = end
def parse(self) -> Transactions:
transactions = []
if not self.__banks:
raise NoBankSelected
for bank in self.__banks:
print(f"Downloading from {bank}...")
requisition = self.client.requisition.get_requisition_by_id(
bank.requisition_id
)
for acc in requisition["accounts"]:
account = self._client.account_api(acc)
retries = 0
downloaded = {}
while retries < 3:
try:
downloaded = account.get_transactions()
break
except ReadTimeout:
retries += 1
print(f"Request #{retries} timed-out, retrying in 1s")
sleep(1)
except HTTPError as e:
retries += 1
print(f"Request #{retries} failed with {e}, retrying in 1s")
sleep(1)
if not downloaded:
print(f"Couldn't download transactions for {account}")
continue
with open("json/" + bank.name + ".json", "w") as f:
json.dump(downloaded, f)
converted = [
convert(t, bank) for t in downloaded["transactions"]["booked"]
]
transactions.extend(
[t for t in converted if self.__from <= t.date <= self.__to]
)
return transactions
def token(self):
token = self._client.generate_token()
print(f"New access token: {token}")
return token
def requisition(self, institution: str, country: str = "PT"):
link, _ = self.__requisition_id(institution, country)
webbrowser.open(link)
def list(self, country: str):
print(self._client.institution.get_institutions(country))
@property
def client(self):
return self._client
def __token(self):
if token := os.environ.get("TOKEN"):
return token
else:
token = self._client.generate_token()
print(f"New access token: {token}")
return token
def __requisition_id(self, i: str, c: str):
id = self._client.institution.get_institution_id_by_name(
country=c, institution=i
)
init = self._client.initialize_session(
redirect_uri="https://murta.dev",
institution_id=id,
reference_id=str(uuid4()),
)
print(f"{i}({c}) link: {init.link} and requisition ID: {init.requisition_id}")
return (init.link, init.requisition_id)

View File

@ -1,16 +1,11 @@
from __future__ import annotations
from collections import namedtuple
from decimal import Decimal
from importlib import import_module
from typing import TYPE_CHECKING
import datetime as dt
import yaml
from .transactions import Transaction
from . import utils
if TYPE_CHECKING:
from .database import DBManager
from pfbudget.common.types import NoBankSelected, Transaction, Transactions
from pfbudget.utils import utils
Index = namedtuple(
"Index", ["date", "text", "value", "negate"], defaults=[-1, -1, -1, False]
@ -48,7 +43,7 @@ Options = namedtuple(
)
def parse_data(db: DBManager, filename: str, args: dict) -> None:
def parse_data(filename: str, args: dict) -> Transactions:
cfg: dict = yaml.safe_load(open("parsers.yaml"))
assert (
"Banks" in cfg
@ -62,22 +57,30 @@ def parse_data(db: DBManager, filename: str, args: dict) -> None:
bank = args["bank"][0]
creditcard = None if not args["creditcard"] else args["creditcard"][0]
if not creditcard:
try:
options: dict = cfg[bank]
else:
options: dict = cfg[bank][creditcard]
except KeyError as e:
banks = cfg["Banks"]
raise NoBankSelected(f"{e} not a valid bank, try one of {banks}")
if creditcard:
try:
options = options[creditcard]
except KeyError as e:
creditcards = cfg["CreditCards"]
raise NoBankSelected(f"{e} not a valid bank, try one of {creditcards}")
bank += creditcard
if args["category"]:
options["category"] = args["category"][0]
if options.get("additional_parser"):
parser = getattr(import_module("pfbudget.parsers"), bank)
parser = getattr(import_module("pfbudget.input.parsers"), bank)
transactions = parser(filename, bank, options).parse()
else:
transactions = Parser(filename, bank, options).parse()
db.insert_transactions(transactions)
return transactions
class Parser:

View File

View File

@ -5,18 +5,18 @@ from typing import TYPE_CHECKING
import datetime as dt
import matplotlib.pyplot as plt
import pfbudget.categories
import pfbudget.core.categories
if TYPE_CHECKING:
from pfbudget.database import DBManager
from pfbudget.db.client import DatabaseClient
groups = pfbudget.categories.cfg["Groups"]
groups = pfbudget.core.categories.cfg["Groups"]
def monthly(
db: DBManager, args: dict, start: dt.date = dt.date.min, end: dt.date = dt.date.max
db: DatabaseClient, args: dict, start: dt.date = dt.date.min, end: dt.date = dt.date.max
):
transactions = db.get_daterange(start, end)
start, end = transactions[0].date, transactions[-1].date
@ -33,7 +33,7 @@ def monthly(
<= month
+ dt.timedelta(days=monthrange(month.year, month.month)[1] - 1)
)
for group, categories in pfbudget.categories.groups.items()
for group, categories in pfbudget.core.categories.groups.items()
},
)
for month in [
@ -68,21 +68,21 @@ def monthly(
list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))),
[
[-groups[group] for _, groups in monthly_transactions]
for group in pfbudget.categories.groups
for group in pfbudget.core.categories.groups
if group != "income-fixed"
and group != "income-extra"
and group != "investment"
],
labels=[
group
for group in pfbudget.categories.groups
for group in pfbudget.core.categories.groups
if group != "income-fixed"
and group != "income-extra"
and group != "investment"
],
colors=[
groups.get(group, {"color": "gray"})["color"]
for group in pfbudget.categories.groups
for group in pfbudget.core.categories.groups
if group != "income-fixed"
and group != "income-extra"
and group != "investment"
@ -96,7 +96,7 @@ def monthly(
def discrete(
db: DBManager, args: dict, start: dt.date = dt.date.min, end: dt.date = dt.date.max
db: DatabaseClient, args: dict, start: dt.date = dt.date.min, end: dt.date = dt.date.max
):
transactions = db.get_daterange(start, end)
start, end = transactions[0].date, transactions[-1].date
@ -113,7 +113,7 @@ def discrete(
<= month
+ dt.timedelta(days=monthrange(month.year, month.month)[1] - 1)
)
for category in pfbudget.categories.categories
for category in pfbudget.core.categories.categories
},
)
for month in [
@ -131,8 +131,8 @@ def discrete(
sum(
value
for category, value in categories.items()
if category in pfbudget.categories.groups["income-fixed"]
or category in pfbudget.categories.groups["income-extra"]
if category in pfbudget.core.categories.groups["income-fixed"]
or category in pfbudget.core.categories.groups["income-extra"]
)
for _, categories in monthly_transactions
],
@ -145,7 +145,7 @@ def discrete(
sum(
value
for category, value in categories.items()
if category in pfbudget.categories.groups["income-fixed"]
if category in pfbudget.core.categories.groups["income-fixed"]
)
for _, categories in monthly_transactions
],
@ -156,18 +156,18 @@ def discrete(
list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))),
[
[-categories[category] for _, categories in monthly_transactions]
for category in pfbudget.categories.categories
if category not in pfbudget.categories.groups["income-fixed"]
and category not in pfbudget.categories.groups["income-extra"]
and category not in pfbudget.categories.groups["investment"]
for category in pfbudget.core.categories.categories
if category not in pfbudget.core.categories.groups["income-fixed"]
and category not in pfbudget.core.categories.groups["income-extra"]
and category not in pfbudget.core.categories.groups["investment"]
and category != "Null"
],
labels=[
category
for category in pfbudget.categories.categories
if category not in pfbudget.categories.groups["income-fixed"]
and category not in pfbudget.categories.groups["income-extra"]
and category not in pfbudget.categories.groups["investment"]
for category in pfbudget.core.categories.categories
if category not in pfbudget.core.categories.groups["income-fixed"]
and category not in pfbudget.core.categories.groups["income-extra"]
and category not in pfbudget.core.categories.groups["investment"]
and category != "Null"
],
)
@ -180,7 +180,7 @@ def discrete(
def networth(
db: DBManager, args: dict, start: dt.date = dt.date.min, end: dt.date = dt.date.max
db: DatabaseClient, args: dict, start: dt.date = dt.date.min, end: dt.date = dt.date.max
):
transactions = db.get_daterange(start, end)
start, end = transactions[0].date, transactions[-1].date
@ -193,11 +193,12 @@ def networth(
transaction.value
for transaction in transactions
if transaction.original != "No"
and transaction.category not in pfbudget.categories.groups["investment"]
and transaction.category not in pfbudget.core.categories.groups["investment"]
and month
<= transaction.date
<= month + dt.timedelta(days=monthrange(month.year, month.month)[1] - 1)
) + accum
)
+ accum,
)
for month in [
month.date()
@ -210,10 +211,8 @@ def networth(
plt.figure(tight_layout=True)
plt.plot(
list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))),
[
value for _, value in monthly_networth
],
label="Total networth"
[value for _, value in monthly_networth],
label="Total networth",
)
plt.grid()
plt.legend(loc="upper left")

View File

@ -3,13 +3,13 @@ from dateutil.rrule import rrule, YEARLY
from typing import TYPE_CHECKING
import datetime as dt
import pfbudget.categories
import pfbudget.core.categories
if TYPE_CHECKING:
from pfbudget.database import DBManager
from pfbudget.db.client import DatabaseClient
def net(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date.max):
def net(db: DatabaseClient, start: dt.date = dt.date.min, end: dt.date = dt.date.max):
transactions = db.get_daterange(start, end)
start, end = transactions[0].date, transactions[-1].date
@ -23,7 +23,7 @@ def net(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date.max)
if transaction.category in categories
and year <= transaction.date <= year.replace(month=12, day=31)
)
for group, categories in pfbudget.categories.groups.items()
for group, categories in pfbudget.core.categories.groups.items()
},
)
for year in [
@ -62,7 +62,8 @@ def net(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date.max)
print(f"Invested: {investments:.2f}\n")
def detailed(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date.max):
def detailed(db: DatabaseClient, start: dt.date = dt.date.min, end: dt.date = dt.date.max):
transactions = db.get_daterange(start, end)
start, end = transactions[0].date, transactions[-1].date
@ -76,7 +77,7 @@ def detailed(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date
if transaction.category == category
and year <= transaction.date <= year.replace(month=12, day=31)
)
for category in pfbudget.categories.categories
for category in pfbudget.core.categories.categories
},
)
for year in [
@ -93,23 +94,23 @@ def detailed(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date
income = sum(
sum
for category, sum in categories.items()
if category in pfbudget.categories.groups["income-fixed"]
or category in pfbudget.categories.groups["income-extra"]
if category in pfbudget.core.categories.groups["income-fixed"]
or category in pfbudget.core.categories.groups["income-extra"]
)
print(f"Income: {income:.2f}\n")
investments = -sum(
sum
for category, sum in categories.items()
if category in pfbudget.categories.groups["investment"]
if category in pfbudget.core.categories.groups["investment"]
)
expenses = 0
for category, value in categories.items():
if (
category not in pfbudget.categories.groups["income-fixed"]
and category not in pfbudget.categories.groups["income-extra"]
and category not in pfbudget.categories.groups["investment"]
category not in pfbudget.core.categories.groups["income-fixed"]
and category not in pfbudget.core.categories.groups["income-extra"]
and category not in pfbudget.core.categories.groups["investment"]
):
if category == "Null":
if value != 0:

View File

@ -1,203 +0,0 @@
from pathlib import Path
import argparse
import re
from .categories import categorize_data
from .database import DBManager
from .parsers import parse_data
import pfbudget.graph
import pfbudget.report
import pfbudget.utils
DEFAULT_DB = "data.db"
class PfBudgetInitialized(Exception):
pass
class PfBudgetNotInitialized(Exception):
pass
class DataFileMissing(Exception):
pass
def argparser() -> argparse.ArgumentParser:
help = argparse.ArgumentParser(add_help=False)
help.add_argument(
"-db",
"--database",
nargs="?",
help="select current database",
default=DEFAULT_DB,
)
help.add_argument(
"-q", "--quiet", action="store_true", help="reduces the amount of verbose"
)
period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group()
period.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
)
period.add_argument("--start", type=str, nargs=1, help="graph start date")
period.add_argument("--end", type=str, nargs=1, help="graph end date")
period.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser(
description="does cool finance stuff",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--version",
action="version",
version=re.search(
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
open("pfbudget/__init__.py").read(),
).group(1),
)
subparsers = parser.add_subparsers(dest="command", required=True)
"""
Init
"""
p_init = subparsers.add_parser(
"init",
description="Initializes the SQLite3 database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_init.set_defaults(func=lambda args: DBManager(args.database).init())
"""
Exporting
"""
p_export = subparsers.add_parser(
"export",
description="Exports the selected database to a .csv file",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_export.set_defaults(func=lambda args: DBManager(args.database).export())
"""
Parsing
"""
p_parse = subparsers.add_parser(
"parse",
description="Parses and adds the requested transactions into the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_parse.add_argument("path", nargs="+", type=str)
p_parse.add_argument("--bank", nargs=1, type=str)
p_parse.add_argument("--creditcard", nargs=1, type=str)
p_parse.add_argument("--category", nargs=1, type=int)
p_parse.set_defaults(func=parse)
"""
Categorizing
"""
p_categorize = subparsers.add_parser(
"categorize",
description="Categorizes the transactions in the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_categorize.set_defaults(
func=lambda args: categorize_data(DBManager(args.database))
)
"""
Graph
"""
p_graph = subparsers.add_parser(
"graph",
description="Graph of the transactions",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_graph.add_argument(
"option",
type=str,
choices=["monthly", "discrete", "networth"],
nargs="?",
default="monthly",
help="graph option help",
)
p_graph.add_argument("--save", action="store_true")
p_graph.set_defaults(func=graph)
"""
Report
"""
p_report = subparsers.add_parser(
"report",
description="Prints report of transaction groups",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_report.add_argument(
"option",
type=str,
choices=["net", "detailed"],
nargs="?",
default="net",
help="report option help",
)
p_report.set_defaults(func=report)
return parser
def parse(args):
"""Parses the contents of the path in args to the selected database.
Args:
args (dict): argparse variables
"""
db = DBManager(args.database)
for path in args.path:
if (dir := Path(path)).is_dir():
for file in dir.iterdir():
parse_data(db, file, vars(args))
elif Path(path).is_file():
parse_data(db, path, vars(args))
else:
raise FileNotFoundError
def graph(args):
"""Plots the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "monthly":
pfbudget.graph.monthly(DBManager(args.database), vars(args), start, end)
elif args.option == "discrete":
pfbudget.graph.discrete(DBManager(args.database), vars(args), start, end)
elif args.option == "networth":
pfbudget.graph.networth(DBManager(args.database), vars(args), start, end)
def report(args):
"""Prints a detailed report of the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "net":
pfbudget.report.net(DBManager(args.database), start, end)
elif args.option == "detailed":
pfbudget.report.detailed(DBManager(args.database), start, end)
def run():
args = argparser().parse_args()
args.func(args)

View File

@ -0,0 +1,2 @@
from .converters import convert
from .utils import *

View File

@ -0,0 +1,72 @@
from datetime import timedelta
from functools import singledispatch
from pfbudget.common.types import Bank, Transaction, TransactionError
from pfbudget.db.schema import DbBank, DbTransaction
from .utils import parse_decimal
@singledispatch
def convert(t):
print("No converter as been found")
pass
@convert.register
def _(t: Transaction) -> DbTransaction:
return DbTransaction(
t.date,
t.description,
t.bank,
t.value,
t.category,
t.original,
t.additional_comment,
)
@convert.register
def _(db: DbTransaction) -> Transaction:
try:
return Transaction(db)
except TransactionError:
print(f"{db} is in the wrong format")
@convert.register
def _(db: DbBank, key: str = "") -> Bank:
bank = Bank(db.name, db.bic, db.requisition_id, db.invert, db.offset, key=key)
if not bank.invert:
bank.invert = False
if not bank.offset:
bank.offset = 0
return bank
@convert.register
def _(bank: Bank) -> DbBank:
bank = DbBank(
bank.name, bank.bic, "", "", bank.requisition_id, bank.invert, bank.offset
)
if not bank.invert:
bank.invert = False
if not bank.offset:
bank.offset = 0
return bank
@convert.register
def _(json: dict, bank: Bank) -> Transaction:
i = -1 if bank.invert else 1
try:
transaction = Transaction(
json["bookingDate"],
json["remittanceInformationUnstructured"],
bank.name,
i * parse_decimal(json["transactionAmount"]["amount"]),
)
transaction.date += timedelta(days=bank.offset)
return transaction
except TransactionError:
print(f"{json} is in the wrong format")

View File

@ -51,9 +51,10 @@ def find_credit_institution(fn, banks, creditcards):
raise WrongFilenameError
if bank.lower() not in [bank.lower() for bank in banks]:
raise BankNotAvailableError(f"{fn}: {banks}")
raise BankNotAvailableError(f"{fn} -> {bank}: {banks}")
if cc and cc.lower() not in [cc.lower() for cc in creditcards]:
raise CreditCardNotAvailableError(f"{fn}: {banks}")
print(f"{fn} -> {cc} not in {creditcards}, using {bank} parser")
cc = None
return bank, cc

View File

@ -1,2 +1,5 @@
matplotlib==3.3.4
PyYAML==5.4.1
matplotlib==3.6.1
nordigen==1.3.0
python-dateutil==2.8.2
python-dotenv==0.21.0
PyYAML==6.0