Removes obsolete code
Removes `PFState` class and surrounding code. Removes load/save transactions methods, that were not used since database introduction. Removes status command from runnable.py.
This commit is contained in:
parent
37c97453a9
commit
c0cc8d5563
@ -95,10 +95,6 @@ def argparser() -> argparse.ArgumentParser:
|
|||||||
p_report = subparsers.add_parser("report", parents=[help, period])
|
p_report = subparsers.add_parser("report", parents=[help, period])
|
||||||
p_report.set_defaults(func=report)
|
p_report.set_defaults(func=report)
|
||||||
|
|
||||||
p_status = subparsers.add_parser("status", help="status help")
|
|
||||||
|
|
||||||
p_status.set_defaults(func=status)
|
|
||||||
|
|
||||||
return parser
|
return parser
|
||||||
|
|
||||||
|
|
||||||
@ -119,18 +115,6 @@ def parse(args):
|
|||||||
raise FileNotFoundError
|
raise FileNotFoundError
|
||||||
|
|
||||||
|
|
||||||
def status(state, args):
|
|
||||||
"""Status
|
|
||||||
|
|
||||||
Prints the state file.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
state (PFState): Internal state of the program
|
|
||||||
args (dict): argparse variables
|
|
||||||
"""
|
|
||||||
print(state)
|
|
||||||
|
|
||||||
|
|
||||||
def graph(args):
|
def graph(args):
|
||||||
"""Plots the transactions over a period of time.
|
"""Plots the transactions over a period of time.
|
||||||
|
|
||||||
|
|||||||
@ -1,142 +0,0 @@
|
|||||||
from pathlib import Path
|
|
||||||
import pickle
|
|
||||||
|
|
||||||
|
|
||||||
class PFState:
|
|
||||||
def __init__(self, filename: str, *args, **kwargs):
|
|
||||||
if Path(filename).is_file():
|
|
||||||
raise FileExistsError("PFState already exists")
|
|
||||||
|
|
||||||
if not Path(filename).parent.is_dir():
|
|
||||||
Path(filename).parent.mkdir(parents=True)
|
|
||||||
(Path(filename).parent / "backup/").mkdir(parents=True)
|
|
||||||
|
|
||||||
self.filename = filename
|
|
||||||
for d in args:
|
|
||||||
for k in d:
|
|
||||||
setattr(self, k, d[k])
|
|
||||||
for k in kwargs:
|
|
||||||
setattr(self, k, kwargs[k])
|
|
||||||
|
|
||||||
if not Path(self.raw_dir).is_dir():
|
|
||||||
Path(self.raw_dir).mkdir(parents=True)
|
|
||||||
|
|
||||||
if not Path(self.data_dir).is_dir():
|
|
||||||
Path(self.data_dir).mkdir(parents=True)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def filename(self):
|
|
||||||
return self._filename
|
|
||||||
|
|
||||||
@filename.setter
|
|
||||||
def filename(self, v):
|
|
||||||
if not isinstance(v, str):
|
|
||||||
raise TypeError("Expected string")
|
|
||||||
self._filename = v
|
|
||||||
self._save()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def raw_dir(self):
|
|
||||||
return self._raw_dir
|
|
||||||
|
|
||||||
@raw_dir.setter
|
|
||||||
def raw_dir(self, v):
|
|
||||||
if not isinstance(v, str):
|
|
||||||
raise TypeError("Expected string")
|
|
||||||
self._raw_dir = v
|
|
||||||
self._save()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def data_dir(self):
|
|
||||||
return self._data_dir
|
|
||||||
|
|
||||||
@data_dir.setter
|
|
||||||
def data_dir(self, v):
|
|
||||||
if not isinstance(v, str):
|
|
||||||
raise TypeError("Expected string")
|
|
||||||
self._data_dir = v
|
|
||||||
self._save()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def raw_files(self):
|
|
||||||
return self._raw_files
|
|
||||||
|
|
||||||
@raw_files.setter
|
|
||||||
def raw_files(self, v):
|
|
||||||
if not isinstance(v, list):
|
|
||||||
raise TypeError("Expected list")
|
|
||||||
self._raw_files = v
|
|
||||||
self._save()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def data_files(self):
|
|
||||||
return self._data_files
|
|
||||||
|
|
||||||
@data_files.setter
|
|
||||||
def data_files(self, v):
|
|
||||||
if not isinstance(v, list):
|
|
||||||
raise TypeError("Expected list")
|
|
||||||
self._data_files = v
|
|
||||||
self._save()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def vacations(self):
|
|
||||||
return self._vacations
|
|
||||||
|
|
||||||
@vacations.setter
|
|
||||||
def vacations(self, v):
|
|
||||||
if not isinstance(v, list):
|
|
||||||
raise TypeError("Expected list")
|
|
||||||
self._vacations = v
|
|
||||||
self._save()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def last_backup(self):
|
|
||||||
return self._last_backup
|
|
||||||
|
|
||||||
@last_backup.setter
|
|
||||||
def last_backup(self, v):
|
|
||||||
if not isinstance(v, str):
|
|
||||||
raise TypeError("Expected string")
|
|
||||||
self._last_backup = v
|
|
||||||
self._save()
|
|
||||||
|
|
||||||
@property
|
|
||||||
def last_datadir_backup(self):
|
|
||||||
return self._last_datadir_backup
|
|
||||||
|
|
||||||
@last_datadir_backup.setter
|
|
||||||
def last_datadir_backup(self, v):
|
|
||||||
if not isinstance(v, str):
|
|
||||||
raise TypeError("Expected string")
|
|
||||||
self._last_datadir_backup = v
|
|
||||||
self._save()
|
|
||||||
|
|
||||||
def _save(self):
|
|
||||||
pickle.dump(self, open(self.filename, "wb"))
|
|
||||||
|
|
||||||
def __repr__(self):
|
|
||||||
r = []
|
|
||||||
for attr, value in self.__dict__.items():
|
|
||||||
r.append(": ".join([str(attr), str(value)]))
|
|
||||||
return ", ".join(r)
|
|
||||||
|
|
||||||
|
|
||||||
def pfstate(filename, *args, **kwargs):
|
|
||||||
"""pfstate function
|
|
||||||
|
|
||||||
If it only receives a filename it return false or true depending if that file exists.
|
|
||||||
If it receives anything else, it will return a PFState.
|
|
||||||
"""
|
|
||||||
assert isinstance(filename, str), "filename is not string"
|
|
||||||
|
|
||||||
if Path(filename).is_file():
|
|
||||||
pfstate.state = pickle.load(open(filename, "rb"))
|
|
||||||
if not isinstance(pfstate.state, PFState):
|
|
||||||
raise TypeError("Unpickled object not of type PFState")
|
|
||||||
elif args or kwargs:
|
|
||||||
pfstate.state = PFState(filename, *args, **kwargs)
|
|
||||||
else:
|
|
||||||
pfstate.state = None
|
|
||||||
|
|
||||||
return pfstate.state
|
|
||||||
@ -1,7 +1,5 @@
|
|||||||
from csv import reader, writer
|
|
||||||
from datetime import date
|
from datetime import date
|
||||||
from decimal import Decimal, InvalidOperation
|
from decimal import Decimal, InvalidOperation
|
||||||
from pathlib import Path
|
|
||||||
|
|
||||||
COMMENT_TOKEN = "#"
|
COMMENT_TOKEN = "#"
|
||||||
|
|
||||||
@ -95,46 +93,3 @@ class Transaction:
|
|||||||
return "{} {} {}€ ({})".format(
|
return "{} {} {}€ ({})".format(
|
||||||
self.date.strftime("%d/%m/%y"), self.category, self.value, self.bank
|
self.date.strftime("%d/%m/%y"), self.category, self.value, self.bank
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def load_transactions(data_dir) -> list:
|
|
||||||
transactions = []
|
|
||||||
for df in Path(data_dir).iterdir():
|
|
||||||
try:
|
|
||||||
trs = read_transactions(df)
|
|
||||||
except TransactionError as e:
|
|
||||||
print(f"{e} -> datafile {df}")
|
|
||||||
raise TransactionError
|
|
||||||
transactions.extend(trs)
|
|
||||||
|
|
||||||
transactions.sort()
|
|
||||||
return transactions
|
|
||||||
|
|
||||||
|
|
||||||
def save_transactions(data_dir, transactions):
|
|
||||||
files2write = set(t.file if t.modified else None for t in transactions)
|
|
||||||
files2write.discard(None)
|
|
||||||
for f in files2write:
|
|
||||||
trs = [t for t in transactions if t.file == f]
|
|
||||||
write_transactions(f, trs)
|
|
||||||
|
|
||||||
|
|
||||||
def read_transactions(filename, encoding="utf-8") -> list:
|
|
||||||
try:
|
|
||||||
with open(filename, newline="", encoding=encoding) as f:
|
|
||||||
r = reader(f, delimiter="\t")
|
|
||||||
transactions = [
|
|
||||||
Transaction(row, file=filename)
|
|
||||||
for row in r
|
|
||||||
if row and row[0][0] != COMMENT_TOKEN
|
|
||||||
]
|
|
||||||
except FileNotFoundError:
|
|
||||||
transactions = []
|
|
||||||
|
|
||||||
return transactions
|
|
||||||
|
|
||||||
|
|
||||||
def write_transactions(file, transactions, append=False, encoding="utf-8"):
|
|
||||||
with open(file, "a" if append else "w", newline="", encoding=encoding) as f:
|
|
||||||
w = writer(f, delimiter="\t")
|
|
||||||
w.writerows([transaction.to_csv() for transaction in transactions])
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user