Compare commits

..

No commits in common. "e670d3ddeee39261cc35d0b6ede46369fd630eb2" and "fd24ac3318b0800e2f4fc565070c4dacf17e8782" have entirely different histories.

5 changed files with 17 additions and 116 deletions

View File

@ -1,6 +1,3 @@
from decimal import Decimal
from typing import Sequence
from pfbudget.cli.runnable import argparser
from pfbudget.common.types import Operation
from pfbudget.core.manager import Manager
@ -10,13 +7,10 @@ from pfbudget.utils.utils import parse_args_period
def interactive(manager: Manager):
with manager.db.session() as session:
categories = session.get(type.Category)
print(f"Available categories: {[c.name for c in categories]}")
tags = session.get(type.Tag)
print(f"Available tags: {[t.name for t in tags]}")
transactions = session.uncategorized()
print(f"Available categories: {categories}")
print(f"Available tags: {session.get(type.Tag)}")
transactions = session.get(type.Transaction, ~type.Transaction.category.has())
print(f"{len(transactions)} transactions left to categorize")
for transaction in sorted(transactions):
@ -24,91 +18,34 @@ def interactive(manager: Manager):
quit = False
next = True
while next:
match (input("(<category>(:tag)/split/note/skip/quit): ")):
case "skip":
next = False
continue
match (input("(<category>/split/tag/note/quit): ")):
case "quit" | "exit":
next = False
quit = True
case "split":
manager.action(Operation.Split, split(transaction, categories, tags))
next = False
case "tag":
tag = input("tag: ")
transaction.tags.add(type.TransactionTag(tag))
case "note":
note = input("note: ")
transaction.note = type.Note(note)
case other:
if len(li := other.split(":")) > 1:
_category = li[0]
_tags = li[1:]
else:
_category = other
_tags = []
if _category not in [c.name for c in categories]:
print(f"{other} doesn't have a valid category")
if other not in [c.name for c in categories]:
print(f"{other} is not a valid category")
continue
transaction.category = type.TransactionCategory(
_category,
other,
type.CategorySelector(type.Selector_T.manual),
)
for tag in _tags:
if tag not in [t.name for t in tags]:
session.add([type.Tag(tag)])
tags = session.get(type.Tag)
transaction.tags.add(type.TransactionTag(tag))
next = False
session.commit()
if quit:
break
def split(
original: type.Transaction,
categories: Sequence[type.Category],
tags: Sequence[type.Tag],
) -> list[type.Transaction]:
total = original.amount
splitted: list[type.Transaction] = []
while True:
if abs(sum(t.amount for t in splitted)) > abs(total):
print(
"The total amount from the splitted transactions exceeds the original transaction amount, please try again..."
)
splitted.clear()
if sum(t.amount for t in splitted) == total:
break
while (category := input("New transaction category: ")) not in [
c.name for c in categories
]:
print(f"{category} is not a valid category")
amount = input("amount: ")
split = type.Transaction(original.date, original.description, Decimal(amount))
split.category = type.TransactionCategory(
category, type.CategorySelector(type.Selector_T.manual)
)
splitted.append(split)
splitted.insert(0, original)
return splitted
if __name__ == "__main__":
argparser = argparser()
args = vars(argparser.parse_args())

View File

@ -35,7 +35,6 @@ class Categorizer:
@Timer(name="nullify")
def _nullify(self, transactions: Sequence[t.BankTransaction]):
print(f"Nullifying {len(transactions)} transactions")
count = 0
matching = []
for transaction in transactions:
@ -73,7 +72,6 @@ class Categorizer:
transactions: Sequence[t.BankTransaction],
categories: Sequence[t.Category],
):
print(f"Categorizing {len(transactions)} transactions")
d = {}
for category in [c for c in categories if c.rules]:
for rule in category.rules:
@ -114,7 +112,6 @@ class Categorizer:
def _rule_based_tags(
self, transactions: Sequence[t.BankTransaction], tags: Sequence[t.Tag]
):
print(f"Tagging {len(transactions)} transactions")
d = {}
for tag in [t for t in tags if len(t.rules) > 0]:
for rule in tag.rules:

View File

@ -116,14 +116,7 @@ class Manager:
banks = NordigenInput().country_banks(params[0])
print(banks)
case (
Operation.BankAdd
| Operation.CategoryAdd
| Operation.NordigenAdd
| Operation.RuleAdd
| Operation.TagAdd
| Operation.TagRuleAdd
):
case Operation.BankAdd | Operation.CategoryAdd | Operation.NordigenAdd | Operation.RuleAdd | Operation.TagAdd | Operation.TagRuleAdd:
with self.db.session() as session:
session.add(params)
@ -197,20 +190,12 @@ class Manager:
assert len(originals) == 1, ">1 transactions matched {original.id}!"
originals[0].split = True
transactions = []
for t in params[1:]:
if originals[0].date != t.date:
t.date = originals[0].date
print(
f"{t.date} is different from original date {originals[0].date}, using original"
transactions = [
SplitTransaction(
originals[0].date, t.description, t.amount, originals[0].id
)
splitted = SplitTransaction(
t.date, t.description, t.amount, originals[0].id
)
splitted.category = t.category
transactions.append(splitted)
for t in params[1:]
]
session.add(transactions)
case Operation.Export:

View File

@ -2,7 +2,6 @@ from dataclasses import asdict
from sqlalchemy import create_engine, delete, select, update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import false
from typing import Sequence, Type, TypeVar
from pfbudget.db.model import (
@ -10,7 +9,6 @@ from pfbudget.db.model import (
CategoryGroup,
CategorySchedule,
Link,
Transaction,
)
@ -59,22 +57,6 @@ class DbClient:
return self.__session.scalars(stmt).all()
def uncategorized(self) -> Sequence[Transaction]:
"""Selects all valid uncategorized transactions
At this moment that includes:
- Categories w/o category
- AND non-split categories
Returns:
Sequence[Transaction]: transactions left uncategorized
"""
stmt = (
select(Transaction)
.where(~Transaction.category.has())
.where(Transaction.split == false())
)
return self.__session.scalars(stmt).all()
def add(self, rows: list):
self.__session.add_all(rows)

View File

@ -91,7 +91,7 @@ class Transaction(Base, Export):
description: Mapped[Optional[str]]
amount: Mapped[money]
split: Mapped[bool] = mapped_column(init=False, default=False)
split: Mapped[bool] = mapped_column(init=False)
type: Mapped[str] = mapped_column(init=False)