budget/pfbudget/tools.py
Luís Murta 88c1d9d5ca
Refactor argparser and PFState classes
Moves argparser functions to runnable.py and PFState to state.py

Main function will now only call run from runnable.py. Should make it
easier to run additional functions. Program can now be run from main.py,
or imported from pfbudget.
2021-03-07 03:14:25 +00:00

139 lines
4.2 KiB
Python

from pathlib import Path
import datetime as dt
import shutil
from .categories import Categories, Null, Travel, get_categories
from .parsers import parse_data
from .state import PFState
from .transactions import (
Transaction,
load_transactions,
read_transactions,
write_transactions,
)
DIR = ".pfbudget/"
STATE_FILE = DIR + "state"
BACKUP_DIR = DIR + "backup/"
def get_filename(t: Transaction):
return "{}_{}.csv".format(t.year, t.bank)
def backup(state: PFState):
transactions = load_transactions(state.data_dir)
filename = (
BACKUP_DIR
+ "transactions_"
+ dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss")
+ ".csv"
)
write_transactions(Path(filename), transactions)
state.last_backup = filename
def full_backup(state: PFState):
filename = BACKUP_DIR + dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss")
shutil.copytree(state.data_dir, Path(filename))
state.last_datadir_backup = filename
def restore(state: PFState):
if not state.last_datadir_backup:
print("No data directory backup exists")
return
if Path(state.data_dir).is_dir():
option = input(
"A data directory already exists at {}/ . Are you sure you want to restore the last backup? (Y/N) ".format(
state.data_dir
)
)
if option.lower() == "y" or option.lower() == "yes":
shutil.rmtree(state.data_dir)
shutil.copytree(state.last_datadir_backup, state.data_dir)
elif option.lower() == "n" or option.lower() == "no":
return
else:
print("Invalid choice")
return
def parser(state: PFState, raw_dir=None, data_dir=None):
raw = Path(state.raw_dir) if not raw_dir else Path(raw_dir)
dat = Path(state.data_dir) if not data_dir else Path(data_dir)
new_transactions = {}
for rf in raw.iterdir():
if rf.name not in state.raw_files:
new_transactions[rf.name] = parse_data(rf)
state.raw_files.append(rf.name)
# really, really bad optimized file append
for _, transactions in new_transactions.items():
for transaction in transactions:
filename = get_filename(transaction)
old = read_transactions(dat / filename)
old.append(transaction)
old.sort()
write_transactions(dat / filename, old)
if filename not in state.data_files:
state.data_files.append(filename)
state._save() # append to list doesn't trigger setter
def auto_categorization(state: PFState, transactions: list) -> bool:
null = Null()
nulls = null.search_all(transactions)
travel = Travel()
travels = []
missing = False
for vacation in state.vacations:
t = travel.search_all(transactions, vacation[0], vacation[1])
travels.extend(t)
for transaction in transactions:
if not transaction.category:
for category in [category() for category in Categories.get_categories()]:
if category.search(transaction):
transaction.category = category.name
if (
transaction in travels
and transaction.category not in travel.not_in_travel
):
if transaction.category != travel.name:
transaction.category = travel.name
if transaction in nulls:
if transaction.category != null.name:
transaction.category = null.name
if not transaction.category:
missing = True
return missing
def manual_categorization(state: PFState, transactions: list):
print(
"Please categorize the following transactions. If you want to exit, write 'quit'"
)
for transaction in transactions:
while not transaction.category:
category = input(f"{transaction.desc()} category: ")
if category == "quit":
return
if category not in get_categories():
print(
f"Category {category} doesn't exist. Please use one of {get_categories()}"
)
continue
else:
transaction.category = category