Move from a direct access to DB by the parsers/categorizers to a middle layer, which will bring an easier way to have two input alternatives. This patch starts by instantiating the manager on the cli runnable and using it for the parser function. This patch also moves the queries to a different file, so that introducing new functions on the DB client becomes more manageable and clearer. Finally, the new manager will need converters to move from the code type Transaction to the DB types. This will eventually simplify the code data model by removing more of its method and leaving it a simple dataclass. Issue #14
187 lines
6.8 KiB
Python
187 lines
6.8 KiB
Python
from __future__ import annotations
|
|
from decimal import Decimal
|
|
import csv
|
|
import datetime
|
|
import logging
|
|
import logging.config
|
|
import pathlib
|
|
import sqlite3
|
|
|
|
from pfbudget.core.transactions import Transaction
|
|
import pfbudget.db.schema as Q
|
|
|
|
|
|
if not pathlib.Path("logs").is_dir():
|
|
pathlib.Path("logs").mkdir()
|
|
logging.config.fileConfig("logging.conf")
|
|
logger = logging.getLogger("pfbudget.transactions")
|
|
|
|
sqlite3.register_adapter(Decimal, lambda d: float(d))
|
|
|
|
__DB_NAME = "data.db"
|
|
|
|
|
|
class DatabaseClient:
|
|
"""SQLite DB connection manager"""
|
|
|
|
__EXPORT_DIR = "export"
|
|
|
|
def __init__(self, db: str):
|
|
self.db = db
|
|
|
|
def __execute(self, query: str, params: tuple = None) -> list | None:
|
|
ret = None
|
|
try:
|
|
con = sqlite3.connect(self.db)
|
|
with con:
|
|
if params:
|
|
ret = con.execute(query, params).fetchall()
|
|
logger.debug(f"[{self.db}] < {query}{params}")
|
|
else:
|
|
ret = con.execute(query).fetchall()
|
|
logger.debug(f"[{self.db}] < {query}")
|
|
|
|
if ret:
|
|
logger.debug(f"[{self.db}] > {ret}")
|
|
except sqlite3.Error:
|
|
logger.exception(f"Error while executing [{self.db}] < {query}")
|
|
finally:
|
|
con.close()
|
|
|
|
return ret
|
|
|
|
def __executemany(self, query: str, list_of_params: list[tuple]) -> list | None:
|
|
ret = None
|
|
try:
|
|
con = sqlite3.connect(self.db)
|
|
with con:
|
|
ret = con.executemany(query, list_of_params).fetchall()
|
|
logger.debug(f"[{self.db}] < {query}{list_of_params}")
|
|
except sqlite3.Error:
|
|
logger.exception(
|
|
f"Error while executing [{self.db}] < {query} {list_of_params}"
|
|
)
|
|
finally:
|
|
con.close()
|
|
|
|
return ret
|
|
|
|
def __create_tables(self, tables: tuple[tuple]):
|
|
for table_name, query in tables:
|
|
logger.info(f"Creating table {table_name} if it doesn't exist already")
|
|
self.__execute(query)
|
|
|
|
def init(self):
|
|
logging.info(f"Initializing {self.db} database")
|
|
self.__create_tables(
|
|
(
|
|
("transactions", Q.CREATE_TRANSACTIONS_TABLE),
|
|
("backups", Q.CREATE_BACKUPS_TABLE),
|
|
)
|
|
)
|
|
|
|
def select_all(self) -> list[Transaction] | None:
|
|
logger.info(f"Reading all transactions from {self.db}")
|
|
transactions = self.__execute("SELECT * FROM transactions")
|
|
if transactions:
|
|
return [Transaction(t) for t in transactions]
|
|
return None
|
|
|
|
def insert_transaction(self, transaction: Transaction):
|
|
logger.info(f"Adding {transaction} into {self.db}")
|
|
self.__execute(Q.ADD_TRANSACTION, (transaction.to_list(),))
|
|
|
|
def insert_transactions(self, transactions: list[list]):
|
|
logger.info(f"Adding {len(transactions)} into {self.db}")
|
|
self.__executemany(Q.ADD_TRANSACTION, transactions)
|
|
|
|
def update_category(self, transaction: Transaction):
|
|
logger.info(f"Update {transaction} category")
|
|
self.__execute(Q.UPDATE_CATEGORY, transaction.update_category())
|
|
|
|
def update_categories(self, transactions: list[Transaction]):
|
|
logger.info(f"Update {len(transactions)} transactions' categories")
|
|
self.__executemany(
|
|
Q.UPDATE_CATEGORY,
|
|
[transaction.update_category() for transaction in transactions],
|
|
)
|
|
|
|
def get_duplicated_transactions(self) -> list[Transaction] | None:
|
|
logger.info("Get duplicated transactions")
|
|
transactions = self.__execute(Q.DUPLICATED_TRANSACTIONS)
|
|
if transactions:
|
|
return [Transaction(t) for t in transactions]
|
|
return None
|
|
|
|
def get_sorted_transactions(self) -> list[Transaction] | None:
|
|
logger.info("Get transactions sorted by date")
|
|
transactions = self.__execute(Q.SORTED_TRANSACTIONS)
|
|
if transactions:
|
|
return [Transaction(t) for t in transactions]
|
|
return None
|
|
|
|
def get_daterange(self, start: datetime, end: datetime) -> list[Transaction] | None:
|
|
logger.info(f"Get transactions from {start} to {end}")
|
|
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
|
|
if transactions:
|
|
return [Transaction(t) for t in transactions]
|
|
return None
|
|
|
|
def get_category(self, value: str) -> list[Transaction] | None:
|
|
logger.info(f"Get transactions where category = {value}")
|
|
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
|
|
if transactions:
|
|
return [Transaction(t) for t in transactions]
|
|
return None
|
|
|
|
def get_daterange_category(
|
|
self, start: datetime, end: datetime, category: str
|
|
) -> list[Transaction] | None:
|
|
logger.info(
|
|
f"Get transactions from {start} to {end} where category = {category}"
|
|
)
|
|
transactions = self.__execute(
|
|
Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
|
|
)
|
|
if transactions:
|
|
return [Transaction(t) for t in transactions]
|
|
return None
|
|
|
|
def get_by_period(self, period: str) -> list[Transaction] | None:
|
|
logger.info(f"Get transactions by {period}")
|
|
transactions = self.__execute(Q.SELECT_TRANSACTION_BY_PERIOD, period)
|
|
if transactions:
|
|
return [Transaction(t) for t in transactions]
|
|
return None
|
|
|
|
def get_uncategorized_transactions(self) -> list[Transaction] | None:
|
|
logger.debug("Get uncategorized transactions")
|
|
return self.get_category(None)
|
|
|
|
def get_daterange_uncategorized_transactions(self, start: datetime, end: datetime):
|
|
logger.debug("Get uncategorized transactions from {start} to {end}")
|
|
return self.get_daterange_category(start, end, None)
|
|
|
|
def get_daterage_without(
|
|
self, start: datetime, end: datetime, *categories: str
|
|
) -> list[Transaction] | None:
|
|
logger.info(f"Get transactions between {start} and {end} not in {categories}")
|
|
query = Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
|
|
"(" + ", ".join("?" for _ in categories) + ")"
|
|
)
|
|
transactions = self.__execute(query, (start, end, *categories))
|
|
if transactions:
|
|
return [Transaction(t) for t in transactions]
|
|
return None
|
|
|
|
def export(self):
|
|
filename = pathlib.Path(
|
|
"@".join([self.db, datetime.datetime.now().isoformat()])
|
|
).with_suffix(".csv")
|
|
transactions = self.select_all()
|
|
logger.info(f"Exporting {self.db} into {filename}")
|
|
if not (dir := pathlib.Path(self.__EXPORT_DIR)).is_dir():
|
|
dir.mkdir()
|
|
with open(dir / filename, "w", newline="") as f:
|
|
csv.writer(f, delimiter="\t").writerows(transactions)
|