Implemented open connection at each transaction

Also adds vacation and backups table.
This commit is contained in:
Luís Murta 2021-05-27 22:37:50 +01:00
parent 0898143f68
commit 1c2c64f9fe
Signed by: satprog
GPG Key ID: DDF2EFC6179009DC

View File

@ -22,6 +22,20 @@ CREATE TABLE IF NOT EXISTS transactions (
); );
""" """
CREATE_VACATIONS_TABLE = """
CREATE TABLE IF NOT EXISTS vacations (
start TEXT NOT NULL,
end TEXT NOT NULL
)
"""
CREATE_BACKUPS_TABLE = """
CREATE TABLE IF NOT EXISTS backups (
datetime TEXT NOT NULL,
file TEXT NOT NULL
)
"""
CREATE_BANKS_TABLE = """ CREATE_BANKS_TABLE = """
CREATE TABLE banks ( CREATE TABLE banks (
name TEXT NOT NULL PRIMARY KEY, name TEXT NOT NULL PRIMARY KEY,
@ -70,56 +84,56 @@ FROM transactions
""" """
class Connection: class DBManager:
"""SQLite DB connection manager""" """SQLite DB connection manager"""
def __init__(self, db_name): def __init__(self, db):
self.db_name = db_name self.db = db
self.connection = None
self.__open() self.__create_tables(
self.__create_tables((("transactions", CREATE_TRANSACTIONS_TABLE),)) (
("transactions", CREATE_TRANSACTIONS_TABLE),
def __open(self): ("vacations", CREATE_VACATIONS_TABLE),
try: ("backups", CREATE_BACKUPS_TABLE),
self.connection = sqlite3.connect(self.db_name) )
logger.debug(f"SQLite database {self.db_name} connection successful") )
except sqlite3.Error:
logger.exception(f"Error while connection to {self.db_name}")
def close(self):
try:
self.connection.close()
logger.debug(f"SQLite database {self.db_name} closed")
except sqlite3.Error:
logger.exception(f"Error while closing {self.db_name} database")
def __execute(self, query, params=None): def __execute(self, query, params=None):
ret = None
try: try:
with self.connection: con = sqlite3.connect(self.db)
with con:
if params: if params:
cur = self.connection.execute(query, params) ret = con.execute(query, params).fetchall()
logger.debug(f"[{self.db_name}] < {query}{params}") logger.debug(f"[{self.db}] < {query}{params}")
else: else:
cur = self.connection.execute(query) ret = con.execute(query).fetchall()
logger.debug(f"[{self.db_name}] < {query}") logger.debug(f"[{self.db}] < {query}")
if ret := cur.fetchall(): if ret:
logger.debug(f"[{self.db_name}] > {ret}") logger.debug(f"[{self.db}] > {ret}")
return ret
except sqlite3.Error: except sqlite3.Error:
logger.exception(f"Error while executing [{self.db_name}] < {query}") logger.exception(f"Error while executing [{self.db}] < {query}")
finally:
con.close()
return ret
def __executemany(self, query, list_of_params): def __executemany(self, query, list_of_params):
ret = None
try: try:
with self.connection: con = sqlite3.connect(self.db)
cur = self.connection.executemany(query, list_of_params) with con:
logger.debug(f"[{self.db_name}] < {query}{list_of_params}") ret = con.executemany(query, list_of_params).fetchall()
return cur.fetchall() logger.debug(f"[{self.db}] < {query}{list_of_params}")
except sqlite3.Error: except sqlite3.Error:
logger.exception( logger.exception(
f"Error while executing [{self.db_name}] < {query} {list_of_params}" f"Error while executing [{self.db}] < {query} {list_of_params}"
) )
finally:
con.close()
return ret
def __create_tables(self, tables): def __create_tables(self, tables):
for table_name, query in tables: for table_name, query in tables:
@ -127,15 +141,15 @@ class Connection:
self.__execute(query) self.__execute(query)
def select_all(self): def select_all(self):
logger.info(f"Reading all transactions from {self.db_name}") logger.info(f"Reading all transactions from {self.db}")
return self.__execute("SELECT * FROM transactions") return self.__execute("SELECT * FROM transactions")
def add_transaction(self, transaction): def add_transaction(self, transaction):
logger.info(f"Adding {transaction} into {self.db_name}") logger.info(f"Adding {transaction} into {self.db}")
self.__execute(ADD_TRANSACTION, transaction) self.__execute(ADD_TRANSACTION, transaction)
def add_transactions(self, transactions): def add_transactions(self, transactions):
logger.info(f"Adding {len(transactions)} into {self.db_name}") logger.info(f"Adding {len(transactions)} into {self.db}")
self.__executemany(ADD_TRANSACTION, transactions) self.__executemany(ADD_TRANSACTION, transactions)
def update_category(self, transaction): def update_category(self, transaction):