[Interactive] Defines an Interactive class

Affords a cleaner coding over the function.
Renames the runnable.py into what it actually is, the argparser.py.
This commit is contained in:
Luís Murta 2023-01-30 22:24:23 +00:00
parent 7453ffbd3a
commit 23eb2c80bd
Signed by: satprog
GPG Key ID: 169EF1BBD7049F94
4 changed files with 130 additions and 105 deletions

View File

@ -1,114 +1,14 @@
from decimal import Decimal
from typing import Sequence
from pfbudget.cli.runnable import argparser
from pfbudget.cli.argparser import argparser
from pfbudget.cli.interactive import Interactive
from pfbudget.common.types import Operation
from pfbudget.core.manager import Manager
import pfbudget.db.model as type
from pfbudget.utils.utils import parse_args_period
def interactive(manager: Manager):
with manager.db.session() as session:
categories = session.get(type.Category)
print(f"Available categories: {[c.name for c in categories]}")
tags = session.get(type.Tag)
print(f"Available tags: {[t.name for t in tags]}")
transactions = session.uncategorized()
print(f"{len(transactions)} transactions left to categorize")
for transaction in sorted(transactions):
print(f"{transaction}")
quit = False
next = True
while next:
match (input("(<category>(:tag)/split/note/skip/quit): ")):
case "skip":
next = False
continue
case "quit" | "exit":
next = False
quit = True
case "split":
manager.action(Operation.Split, split(transaction, categories, tags))
next = False
case "note":
note = input("note: ")
transaction.note = type.Note(note)
case other:
if len(li := other.split(":")) > 1:
_category = li[0]
_tags = li[1:]
else:
_category = other
_tags = []
if _category not in [c.name for c in categories]:
print(f"{other} doesn't have a valid category")
continue
transaction.category = type.TransactionCategory(
_category,
type.CategorySelector(type.Selector_T.manual),
)
for tag in _tags:
if tag not in [t.name for t in tags]:
session.add([type.Tag(tag)])
tags = session.get(type.Tag)
transaction.tags.add(type.TransactionTag(tag))
next = False
session.commit()
if quit:
break
def split(
original: type.Transaction,
categories: Sequence[type.Category],
tags: Sequence[type.Tag],
) -> list[type.Transaction]:
total = original.amount
splitted: list[type.Transaction] = []
while True:
if abs(sum(t.amount for t in splitted)) > abs(total):
print(
"The total amount from the splitted transactions exceeds the original transaction amount, please try again..."
)
splitted.clear()
if sum(t.amount for t in splitted) == total:
break
while (category := input("New transaction category: ")) not in [
c.name for c in categories
]:
print(f"{category} is not a valid category")
amount = input("amount: ")
split = type.Transaction(original.date, original.description, Decimal(amount))
split.category = type.TransactionCategory(
category, type.CategorySelector(type.Selector_T.manual)
)
splitted.append(split)
splitted.insert(0, original)
return splitted
if __name__ == "__main__":
argparser = argparser()
args = vars(argparser.parse_args())
@ -125,7 +25,7 @@ if __name__ == "__main__":
params = []
match (op):
case Operation.ManualCategorization:
interactive(Manager(db, verbosity))
Interactive(Manager(db, verbosity)).start()
exit()
case Operation.Parse:

120
pfbudget/cli/interactive.py Normal file
View File

@ -0,0 +1,120 @@
import decimal
from ..core.manager import Manager
from ..db.model import (
Category,
CategorySelector,
Note,
Selector_T,
SplitTransaction,
Tag,
Transaction,
TransactionCategory,
TransactionTag,
)
class Interactive:
help = "category(:tag)/split/note:/skip/quit"
selector = Selector_T.manual
def __init__(self, manager: Manager) -> None:
self.manager = manager
with self.manager.db.session() as session:
self.categories = session.get(Category)
self.tags = session.get(Tag)
session.expunge_all()
def intro(self) -> None:
print(
f"Welcome! Available categories are {[c.name for c in self.categories]} and"
f" currently existing tags are {[t.name for t in self.tags]}"
)
def start(self) -> None:
self.intro()
with self.manager.db.session() as session:
uncategorized = session.uncategorized()
n = len(uncategorized)
print(f"{n} left to categorize")
i = 0
new = []
next = uncategorized[i]
print(next)
while (command := input("$ ")) != "quit":
match command:
case "help":
print(self.help)
case "skip":
i += 1
case "quit":
break
case "split":
new = self.split(next)
session.add(new)
case other:
if not other:
print(self.help)
continue
if other.startswith("note:"):
# TODO adding notes to a splitted transaction won't allow categorization
next.note = Note(other[len("note:") :].strip())
else:
ct = other.split(":")
if (category := ct[0]) not in [
c.name for c in self.categories
]:
print(self.help, self.categories)
tags = []
if len(ct) > 1:
tags = ct[1:]
next.category = TransactionCategory(
category, CategorySelector(self.selector)
)
for tag in tags:
if tag not in [t.name for t in self.tags]:
session.add([Tag(tag)])
self.tags = session.get(Tag)
next.tags.add(TransactionTag(tag))
i += 1
session.commit()
next = uncategorized[i] if len(new) == 0 else new.pop()
print(next)
def split(self, original: Transaction) -> list[SplitTransaction]:
total = original.amount
new = []
done = False
while not done:
if abs(sum(t.amount for t in new)) > abs(total):
print("Overflow, try again")
new.clear()
continue
if sum(t.amount for t in new) == total:
done = True
break
amount = decimal.Decimal(input("amount: "))
new.append(
SplitTransaction(
original.date, original.description, amount, original.id
)
)
return new

View File

@ -96,7 +96,9 @@ class Transaction(Base, Export):
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(
cascade="all, delete-orphan", init=False, passive_deletes=True
)
tags: Mapped[set[TransactionTag]] = relationship(init=False)
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
@ -175,7 +177,10 @@ class Category(Base, Export):
)
def __repr__(self) -> str:
return f"Category(name={self.name}, group={self.group}, #rules={len(self.rules)}, schedule={self.schedule})"
return (
f"Category(name={self.name}, group={self.group}, #rules={len(self.rules)},"
f" schedule={self.schedule})"
)
@property
def format(self) -> dict[str, Any]: