Adds the import operation and a timer
to the categorization. We can now import transactions from a csv file, and later automatically categorize them all.
This commit is contained in:
parent
478bd25190
commit
c42a399d3d
@ -242,4 +242,10 @@ if __name__ == "__main__":
|
|||||||
params.append(args["banks"])
|
params.append(args["banks"])
|
||||||
params.append(args["file"][0])
|
params.append(args["file"][0])
|
||||||
|
|
||||||
|
case pfbudget.Operation.Import:
|
||||||
|
keys = {"file"}
|
||||||
|
assert args.keys() >= keys, f"missing {args.keys() - keys}"
|
||||||
|
|
||||||
|
params = args["file"]
|
||||||
|
|
||||||
pfbudget.Manager(db, verbosity).action(op, params)
|
pfbudget.Manager(db, verbosity).action(op, params)
|
||||||
|
|||||||
@ -83,6 +83,10 @@ def argparser() -> argparse.ArgumentParser:
|
|||||||
export_banks.add_argument("--all", action="store_true")
|
export_banks.add_argument("--all", action="store_true")
|
||||||
export_banks.add_argument("--banks", nargs="+", type=str)
|
export_banks.add_argument("--banks", nargs="+", type=str)
|
||||||
|
|
||||||
|
pimport = subparsers.add_parser("import")
|
||||||
|
pimport.set_defaults(op=Operation.Import)
|
||||||
|
pimport.add_argument("file", nargs=1, type=str)
|
||||||
|
|
||||||
# Parse from .csv
|
# Parse from .csv
|
||||||
parse = subparsers.add_parser("parse")
|
parse = subparsers.add_parser("parse")
|
||||||
parse.set_defaults(op=Operation.Parse)
|
parse.set_defaults(op=Operation.Parse)
|
||||||
|
|||||||
@ -36,6 +36,7 @@ class Operation(Enum):
|
|||||||
NordigenDel = auto()
|
NordigenDel = auto()
|
||||||
NordigenCountryBanks = auto()
|
NordigenCountryBanks = auto()
|
||||||
Export = auto()
|
Export = auto()
|
||||||
|
Import = auto()
|
||||||
|
|
||||||
|
|
||||||
class TransactionError(Exception):
|
class TransactionError(Exception):
|
||||||
|
|||||||
@ -8,6 +8,7 @@ from pfbudget.db.model import (
|
|||||||
TransactionTag,
|
TransactionTag,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
from codetiming import Timer
|
||||||
from datetime import timedelta
|
from datetime import timedelta
|
||||||
|
|
||||||
Transactions = list[Transaction]
|
Transactions = list[Transaction]
|
||||||
@ -56,6 +57,7 @@ class Categorizer:
|
|||||||
"""
|
"""
|
||||||
self._manual(transactions)
|
self._manual(transactions)
|
||||||
|
|
||||||
|
@Timer(name="nullify")
|
||||||
def _nullify(self, transactions: Transactions):
|
def _nullify(self, transactions: Transactions):
|
||||||
count = 0
|
count = 0
|
||||||
matching = []
|
matching = []
|
||||||
@ -86,6 +88,7 @@ class Categorizer:
|
|||||||
|
|
||||||
print(f"Nullified {count} transactions")
|
print(f"Nullified {count} transactions")
|
||||||
|
|
||||||
|
@Timer(name="categoryrules")
|
||||||
def _rule_based_categories(
|
def _rule_based_categories(
|
||||||
self, transactions: Transactions, categories: list[Category]
|
self, transactions: Transactions, categories: list[Category]
|
||||||
):
|
):
|
||||||
@ -102,7 +105,10 @@ class Categorizer:
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
# passed all conditions, assign category
|
# passed all conditions, assign category
|
||||||
if transaction.category:
|
if (
|
||||||
|
transaction.category
|
||||||
|
and transaction.category.name == category.name
|
||||||
|
):
|
||||||
if (
|
if (
|
||||||
input(f"Overwrite {transaction} with {category}? (y/n)")
|
input(f"Overwrite {transaction} with {category}? (y/n)")
|
||||||
== "y"
|
== "y"
|
||||||
@ -122,6 +128,7 @@ class Categorizer:
|
|||||||
for k, v in d.items():
|
for k, v in d.items():
|
||||||
print(f"{v}: {k}")
|
print(f"{v}: {k}")
|
||||||
|
|
||||||
|
@Timer(name="tagrules")
|
||||||
def _rule_based_tags(self, transactions: Transactions, tags: list[Tag]):
|
def _rule_based_tags(self, transactions: Transactions, tags: list[Tag]):
|
||||||
d = {}
|
d = {}
|
||||||
for tag in [t for t in tags if t.rules]:
|
for tag in [t for t in tags if t.rules]:
|
||||||
|
|||||||
@ -71,14 +71,18 @@ class Manager:
|
|||||||
|
|
||||||
case Operation.Categorize:
|
case Operation.Categorize:
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
uncategorized = session.get(Transaction, ~Transaction.category.has())
|
uncategorized = session.get(
|
||||||
|
Transaction, ~Transaction.category.has()
|
||||||
|
)
|
||||||
categories = session.get(Category)
|
categories = session.get(Category)
|
||||||
tags = session.get(Tag)
|
tags = session.get(Tag)
|
||||||
Categorizer().rules(uncategorized, categories, tags)
|
Categorizer().rules(uncategorized, categories, tags)
|
||||||
|
|
||||||
case Operation.ManualCategorization:
|
case Operation.ManualCategorization:
|
||||||
with self.db.session() as session:
|
with self.db.session() as session:
|
||||||
uncategorized = session.get(Transaction, ~Transaction.category.has())
|
uncategorized = session.get(
|
||||||
|
Transaction, ~Transaction.category.has()
|
||||||
|
)
|
||||||
categories = session.get(Category)
|
categories = session.get(Category)
|
||||||
tags = session.get(Tag)
|
tags = session.get(Tag)
|
||||||
Categorizer().manual(uncategorized, categories, tags)
|
Categorizer().manual(uncategorized, categories, tags)
|
||||||
@ -178,6 +182,20 @@ class Manager:
|
|||||||
csvwriter: Output = CSV(params[-1])
|
csvwriter: Output = CSV(params[-1])
|
||||||
csvwriter.report(transactions)
|
csvwriter.report(transactions)
|
||||||
|
|
||||||
|
case Operation.Import:
|
||||||
|
csvwriter: Output = CSV(params[0]) # Output is strange here
|
||||||
|
transactions = csvwriter.load()
|
||||||
|
|
||||||
|
if (
|
||||||
|
len(transactions) > 0
|
||||||
|
and input(
|
||||||
|
f"{transactions[:5]}\nDoes the import seem correct? (y/n)"
|
||||||
|
)
|
||||||
|
== "y"
|
||||||
|
):
|
||||||
|
with self.db.session() as session:
|
||||||
|
session.add(transactions)
|
||||||
|
|
||||||
# def init(self):
|
# def init(self):
|
||||||
# client = DatabaseClient(self.__db)
|
# client = DatabaseClient(self.__db)
|
||||||
# client.init()
|
# client.init()
|
||||||
|
|||||||
@ -1,6 +1,10 @@
|
|||||||
from csv import writer
|
from csv import DictReader, writer
|
||||||
|
|
||||||
from pfbudget.db.model import Transaction
|
from pfbudget.db.model import (
|
||||||
|
BankTransaction,
|
||||||
|
MoneyTransaction,
|
||||||
|
Transaction,
|
||||||
|
)
|
||||||
|
|
||||||
from .output import Output
|
from .output import Output
|
||||||
|
|
||||||
@ -9,6 +13,20 @@ class CSV(Output):
|
|||||||
def __init__(self, filename: str):
|
def __init__(self, filename: str):
|
||||||
self.fn = filename
|
self.fn = filename
|
||||||
|
|
||||||
|
def load(self) -> list[Transaction]:
|
||||||
|
with open(self.fn, "r", newline="") as f:
|
||||||
|
r = DictReader(f)
|
||||||
|
return [
|
||||||
|
BankTransaction(
|
||||||
|
row["date"], row["description"], row["amount"], False, row["bank"]
|
||||||
|
)
|
||||||
|
if row["bank"]
|
||||||
|
else MoneyTransaction(
|
||||||
|
row["date"], row["description"], False, row["amount"]
|
||||||
|
)
|
||||||
|
for row in r
|
||||||
|
]
|
||||||
|
|
||||||
def report(self, transactions: list[Transaction]):
|
def report(self, transactions: list[Transaction]):
|
||||||
with open(self.fn, "w", newline="") as f:
|
with open(self.fn, "w", newline="") as f:
|
||||||
w = writer(f, delimiter="\t")
|
w = writer(f, delimiter="\t")
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user