budget/pfbudget/tools.py
Luís Murta fe840b622c
Adds graph subcommand
Moves all graph and matplotlib dependencies to graph.py.
Moves Transactions and Categories methods outside the class for better
visibility.
Variable expenses renamed to required.
transactions.py now has by_year, by_month, by_categories in global scope
and slightly improved perfomance.
2021-01-29 22:48:55 +00:00

263 lines
7.2 KiB
Python

from pathlib import Path
import csv
import datetime as dt
import pickle
import shutil
from .categories import Categories, Null, Travel, get_categories
from .transactions import (
Transaction,
load_transactions,
read_transactions,
write_transactions,
)
from .parsers import parse_data
def get_filename(t: Transaction):
return "{}_{}.csv".format(t.year, t.bank)
class PFState:
def __init__(self, filename, *args, **kwargs):
if Path(filename).is_file():
raise FileExistsError("PFState already exists")
self.filename = filename
for d in args:
for k in d:
setattr(self, k, d[k])
for k in kwargs:
setattr(self, k, kwargs[k])
self._save()
@property
def filename(self):
return self._filename
@filename.setter
def filename(self, v):
if not isinstance(v, str):
raise TypeError("Expected string")
self._filename = v
self._save()
@property
def raw_dir(self):
return self._raw_dir
@raw_dir.setter
def raw_dir(self, v):
if not isinstance(v, str):
raise TypeError("Expected string")
self._raw_dir = v
self._save()
@property
def data_dir(self):
return self._data_dir
@data_dir.setter
def data_dir(self, v):
if not isinstance(v, str):
raise TypeError("Expected string")
self._data_dir = v
self._save()
@property
def raw_files(self):
return self._raw_files
@raw_files.setter
def raw_files(self, v):
if not isinstance(v, list):
raise TypeError("Expected list")
self._raw_files = v
self._save()
@property
def data_files(self):
return self._data_files
@data_files.setter
def data_files(self, v):
if not isinstance(v, list):
raise TypeError("Expected list")
self._data_files = v
self._save()
@property
def vacations(self):
return self._vacations
@vacations.setter
def vacations(self, v):
if not isinstance(v, list):
raise TypeError("Expected list")
self._vacations = v
self._save()
@property
def last_backup(self):
return self._last_backup
@last_backup.setter
def last_backup(self, v):
if not isinstance(v, str):
raise TypeError("Expected string")
self._last_backup = v
self._save()
@property
def last_datadir_backup(self):
return self._last_datadir_backup
@last_datadir_backup.setter
def last_datadir_backup(self, v):
if not isinstance(v, str):
raise TypeError("Expected string")
self._last_datadir_backup = v
self._save()
def _save(self):
pickle.dump(self, open(self.filename, "wb"))
def __repr__(self):
r = []
for attr, value in self.__dict__.items():
r.append(": ".join([str(attr), str(value)]))
return ", ".join(r)
def pfstate(filename, *args, **kwargs):
"""pfstate function
If it only receives a filename it return false or true depending if that file exists.
If it receives anything else, it will return a PFState.
"""
assert isinstance(filename, str), "filename is not string"
if Path(filename).is_file():
pfstate.state = pickle.load(open(filename, "rb"))
if not isinstance(pfstate.state, PFState):
raise TypeError("Unpickled object not of type PFState")
elif args or kwargs:
pfstate.state = PFState(filename, *args, **kwargs)
else:
pfstate.state = None
return pfstate.state
def backup(state: PFState):
transactions = load_transactions(state.data_dir)
filename = (
".pfbudget/backups/"
+ "transactions_"
+ dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss")
+ ".csv"
)
write_transactions(Path(filename), transactions)
state.last_backup = filename
def full_backup(state: PFState):
filename = ".pfbudget/backups/" + dt.datetime.now().strftime("%Y-%m-%d_%Hh%Mm%Ss")
shutil.copytree(state.data_dir, Path(filename))
state.last_datadir_backup = filename
def restore(state: PFState):
if not state.last_datadir_backup:
print("No data directory backup exists")
return
if Path(state.data_dir).is_dir():
option = input(
"A data directory already exists at {}/ . Are you sure you want to restore the last backup? (Y/N) ".format(
state.data_dir
)
)
if option.lower() == "y" or option.lower() == "yes":
shutil.rmtree(state.data_dir)
shutil.copytree(state.last_datadir_backup, state.data_dir)
elif option.lower() == "n" or option.lower() == "no":
return
else:
print("Invalid choice")
return
def parser(state: PFState, raw_dir=None, data_dir=None):
raw = Path(state.raw_dir) if not raw_dir else Path(raw_dir)
dat = Path(state.data_dir) if not data_dir else Path(data_dir)
new_transactions = {}
for rf in raw.iterdir():
if rf.name not in state.raw_files:
new_transactions[rf.name] = parse_data(rf)
state.raw_files.append(rf.name)
# really, really bad optimized file append
for _, transactions in new_transactions.items():
for transaction in transactions:
filename = get_filename(transaction)
old = read_transactions(dat / filename)
old.append(transaction)
old.sort()
write_transactions(dat / filename, old)
if filename not in state.data_files:
state.data_files.append(filename)
state._save() # append to list doesn't trigger setter
def auto_categorization(state: PFState, transactions: list) -> bool:
null = Null()
nulls = null.search_all(transactions)
travel = Travel()
travels = []
missing = False
for vacation in state.vacations:
t = travel.search_all(transactions, vacation[0], vacation[1])
travels.extend(t)
for transaction in transactions:
if not transaction.category:
for category in [category() for category in Categories.get_categories()]:
if category.search(transaction):
transaction.category = category.name
if (
transaction in travels
and transaction.category not in travel.not_in_travel
):
if transaction.category != travel.name:
transaction.category = travel.name
if transaction in nulls:
if transaction.category != null.name:
transaction.category = null.name
if not transaction.category:
missing = True
return missing
def manual_categorization(state: PFState, transactions: list):
for transaction in transactions:
while not transaction.category:
category = input(f"{transaction.desc()} category: ")
if category == "quit":
return
if category not in get_categories():
print("category doesn't exist")
continue
else:
transaction.category = category