Extend export/import to rules

Removes additional bank/all options from the transactions export command
line.
Deletes the brief lived CSV class.
This patch start using pickle for simple export/import, other options
can be added later. An issue with the .csv is the lack of a Null field.
Moves logic to Manager, it is simple enough.
This commit is contained in:
Luís Murta 2023-01-15 23:06:20 +00:00
parent 1cce7d421e
commit 6110858d48
Signed by: satprog
GPG Key ID: 169EF1BBD7049F94
8 changed files with 159 additions and 90 deletions

View File

@ -232,17 +232,7 @@ if __name__ == "__main__":
pfbudget.types.Link(args["original"][0], link) for link in args["links"] pfbudget.types.Link(args["original"][0], link) for link in args["links"]
] ]
case pfbudget.Operation.Export: case pfbudget.Operation.Export | pfbudget.Operation.Import | pfbudget.Operation.ExportCategoryRules | pfbudget.Operation.ImportCategoryRules | pfbudget.Operation.ExportTagRules | pfbudget.Operation.ImportTagRules:
keys = {"interval", "start", "end", "year", "all", "banks", "file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = pfbudget.parse_args_period(args)
params = [start, end]
if not args["all"]:
params.append(args["banks"])
params.append(args["file"][0])
case pfbudget.Operation.Import:
keys = {"file"} keys = {"file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"

View File

@ -76,12 +76,9 @@ def argparser() -> argparse.ArgumentParser:
p_init.set_defaults(command=Operation.Init) p_init.set_defaults(command=Operation.Init)
# Exports transactions to .csv file # Exports transactions to .csv file
export = subparsers.add_parser("export", parents=[period]) export = subparsers.add_parser("export")
export.set_defaults(op=Operation.Export) export.set_defaults(op=Operation.Export)
export.add_argument("file", nargs=1, type=str) export_args(export)
export_banks = export.add_mutually_exclusive_group()
export_banks.add_argument("--all", action="store_true")
export_banks.add_argument("--banks", nargs="+", type=str)
pimport = subparsers.add_parser("import") pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import) pimport.set_defaults(op=Operation.Import)
@ -321,6 +318,14 @@ def category_rule(parser: argparse.ArgumentParser):
rules(modify) rules(modify)
modify.add_argument("--remove", nargs="*", default=[], type=str) modify.add_argument("--remove", nargs="*", default=[], type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryRules)
export_args(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryRules)
export_args(pimport)
def tags(parser: argparse.ArgumentParser): def tags(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True) commands = parser.add_subparsers(required=True)
@ -355,6 +360,14 @@ def tag_rule(parser: argparse.ArgumentParser):
modify.add_argument("--tag", nargs=1, type=str) modify.add_argument("--tag", nargs=1, type=str)
rules(modify) rules(modify)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportTagRules)
export_args(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportTagRules)
export_args(pimport)
def rules(parser: argparse.ArgumentParser): def rules(parser: argparse.ArgumentParser):
parser.add_argument("--date", nargs=1, type=dt.date.fromisoformat) parser.add_argument("--date", nargs=1, type=dt.date.fromisoformat)
@ -377,3 +390,7 @@ def link(parser: argparse.ArgumentParser):
dismantle.set_defaults(op=Operation.Dismantle) dismantle.set_defaults(op=Operation.Dismantle)
dismantle.add_argument("original", nargs=1, type=int) dismantle.add_argument("original", nargs=1, type=int)
dismantle.add_argument("links", nargs="+", type=int) dismantle.add_argument("links", nargs="+", type=int)
def export_args(parser: argparse.ArgumentParser):
parser.add_argument("file", nargs=1, type=str)

View File

@ -37,6 +37,10 @@ class Operation(Enum):
NordigenCountryBanks = auto() NordigenCountryBanks = auto()
Export = auto() Export = auto()
Import = auto() Import = auto()
ExportCategoryRules = auto()
ImportCategoryRules = auto()
ExportTagRules = auto()
ImportTagRules = auto()
class TransactionError(Exception): class TransactionError(Exception):

View File

@ -1,4 +1,5 @@
from pathlib import Path from pathlib import Path
import pickle
import webbrowser import webbrowser
from pfbudget.common.types import Operation from pfbudget.common.types import Operation
@ -6,19 +7,21 @@ from pfbudget.core.categorizer import Categorizer
from pfbudget.db.client import DbClient from pfbudget.db.client import DbClient
from pfbudget.db.model import ( from pfbudget.db.model import (
Bank, Bank,
BankTransaction,
Category, Category,
CategoryGroup, CategoryGroup,
CategoryRule, CategoryRule,
CategorySelector,
MoneyTransaction,
Nordigen, Nordigen,
Rule, Rule,
Tag, Tag,
TagRule, TagRule,
Transaction, Transaction,
TransactionCategory,
) )
from pfbudget.input.nordigen import NordigenInput from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.parsers import parse_data from pfbudget.input.parsers import parse_data
from pfbudget.output.csv import CSV
from pfbudget.output.output import Output
class Manager: class Manager:
@ -171,20 +174,37 @@ class Manager:
case Operation.Export: case Operation.Export:
with self.db.session() as session: with self.db.session() as session:
if len(params) < 4: self.dump(params[0], session.get(Transaction))
banks = [bank.name for bank in session.get(Bank)]
transactions = session.transactions(params[0], params[1], banks)
else:
transactions = session.transactions(
params[0], params[1], params[2]
)
csvwriter: Output = CSV(params[-1])
csvwriter.report(transactions)
case Operation.Import: case Operation.Import:
csvwriter: Output = CSV(params[0]) # Output is strange here transactions = []
transactions = csvwriter.load() for row in self.load(params[0]):
match row["type"]:
case "bank":
transaction = BankTransaction(
row["date"],
row["description"],
row["amount"],
row["bank"],
False,
)
case "money":
transaction = MoneyTransaction(
row["date"], row["description"], row["amount"], False
)
# TODO case "split" how to match to original transaction?? also save ids?
case _:
continue
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"],
CategorySelector(category["selector"]["selector"]),
)
transactions.append(transaction)
if ( if (
len(transactions) > 0 len(transactions) > 0
@ -196,6 +216,26 @@ class Manager:
with self.db.session() as session: with self.db.session() as session:
session.add(transactions) session.add(transactions)
case Operation.ExportCategoryRules:
with self.db.session() as session:
self.dump(params[0], session.get(CategoryRule))
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0])]
with self.db.session() as session:
session.add(rules)
case Operation.ExportTagRules:
with self.db.session() as session:
self.dump(params[0], session.get(TagRule))
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0])]
with self.db.session() as session:
session.add(rules)
# def init(self): # def init(self):
# client = DatabaseClient(self.__db) # client = DatabaseClient(self.__db)
# client.init() # client.init()
@ -224,6 +264,14 @@ class Manager:
# bank = client.get_bank(key, value) # bank = client.get_bank(key, value)
# return convert(bank) # return convert(bank)
def dump(self, fn, sequence):
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
def load(self, fn):
with open(fn, "rb") as f:
return pickle.load(f)
@property @property
def db(self) -> DbClient: def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 2) return DbClient(self._db, self._verbosity > 2)

View File

@ -52,6 +52,12 @@ accounttype = Annotated[
] ]
class Export:
@property
def format(self) -> dict[str, Any]:
raise NotImplementedError
class Bank(Base): class Bank(Base):
__tablename__ = "banks" __tablename__ = "banks"
@ -68,7 +74,7 @@ idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=
money = Annotated[Decimal, mapped_column(Numeric(16, 2))] money = Annotated[Decimal, mapped_column(Numeric(16, 2))]
class Transaction(Base): class Transaction(Base, Export):
__tablename__ = "originals" __tablename__ = "originals"
id: Mapped[idpk] = mapped_column(init=False) id: Mapped[idpk] = mapped_column(init=False)
@ -84,6 +90,18 @@ class Transaction(Base):
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"} __mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
@property
def format(self) -> dict[str, Any]:
return dict(
date=self.date,
description=self.description,
amount=self.amount,
type=self.type,
category=self.category.format if self.category else None,
# TODO note
tags=[tag.format for tag in self.tags] if self.tags else None,
)
def __lt__(self, other: Transaction): def __lt__(self, other: Transaction):
return self.date < other.date return self.date < other.date
@ -93,17 +111,20 @@ idfk = Annotated[
] ]
class IsSplit: class BankTransaction(Transaction):
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
class BankTransaction(IsSplit, Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True) bank: Mapped[bankfk] = mapped_column(nullable=True)
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"} __mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(bank=self.bank)
class MoneyTransaction(Transaction):
split: Mapped[bool] = mapped_column(use_existing_column=True, nullable=True)
class MoneyTransaction(IsSplit, Transaction):
__mapper_args__ = {"polymorphic_identity": "money"} __mapper_args__ = {"polymorphic_identity": "money"}
@ -112,6 +133,10 @@ class SplitTransaction(Transaction):
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"} __mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(original=self.original)
class CategoryGroup(Base): class CategoryGroup(Base):
__tablename__ = "categories_groups" __tablename__ = "categories_groups"
@ -144,16 +169,19 @@ catfk = Annotated[
] ]
class TransactionCategory(Base): class TransactionCategory(Base, Export):
__tablename__ = "categorized" __tablename__ = "categorized"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False) id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
name: Mapped[catfk] name: Mapped[catfk]
selector: Mapped[CategorySelector] = relationship(cascade="all, delete-orphan") selector: Mapped[CategorySelector] = relationship(
cascade="all, delete-orphan", lazy="joined"
)
def __repr__(self) -> str: @property
return f"Category({self.name})" def format(self):
return dict(name=self.name, selector=self.selector.format)
class Note(Base): class Note(Base):
@ -182,12 +210,16 @@ class Tag(Base):
) )
class TransactionTag(Base): class TransactionTag(Base, Export):
__tablename__ = "tags" __tablename__ = "tags"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False) id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True) tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
@property
def format(self):
return dict(tag=self.tag)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
@ -207,7 +239,7 @@ categoryselector = Annotated[
] ]
class CategorySelector(Base): class CategorySelector(Base, Export):
__tablename__ = "categories_selector" __tablename__ = "categories_selector"
id: Mapped[int] = mapped_column( id: Mapped[int] = mapped_column(
@ -218,6 +250,10 @@ class CategorySelector(Base):
) )
selector: Mapped[categoryselector] selector: Mapped[categoryselector]
@property
def format(self):
return dict(selector=self.selector)
class Period(enum.Enum): class Period(enum.Enum):
daily = "daily" daily = "daily"
@ -247,7 +283,7 @@ class Link(Base):
link: Mapped[idfk] = mapped_column(primary_key=True) link: Mapped[idfk] = mapped_column(primary_key=True)
class Rule: class Rule(Export):
date: Mapped[Optional[dt.date]] date: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]] description: Mapped[Optional[str]]
regex: Mapped[Optional[str]] regex: Mapped[Optional[str]]
@ -255,7 +291,7 @@ class Rule:
min: Mapped[Optional[money]] min: Mapped[Optional[money]]
max: Mapped[Optional[money]] max: Mapped[Optional[money]]
def matches(self, transaction: Transaction) -> bool: def matches(self, transaction: BankTransaction) -> bool:
if ( if (
(self.date and self.date < transaction.date) (self.date and self.date < transaction.date)
or ( or (
@ -277,6 +313,17 @@ class Rule:
return False return False
return True return True
@property
def format(self) -> dict[str, Any]:
return dict(
date=self.date,
description=self.description,
regex=self.regex,
bank=self.bank,
min=self.min,
max=self.max,
)
class CategoryRule(Base, Rule): class CategoryRule(Base, Rule):
__tablename__ = "categories_rules" __tablename__ = "categories_rules"
@ -284,6 +331,10 @@ class CategoryRule(Base, Rule):
id: Mapped[idpk] = mapped_column(init=False) id: Mapped[idpk] = mapped_column(init=False)
name: Mapped[catfk] name: Mapped[catfk]
@property
def format(self) -> dict[str, Any]:
return super().format | dict(name=self.name)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)
@ -294,5 +345,9 @@ class TagRule(Base, Rule):
id: Mapped[idpk] = mapped_column(init=False) id: Mapped[idpk] = mapped_column(init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE")) tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
@property
def format(self) -> dict[str, Any]:
return super().format | dict(tag=self.tag)
def __hash__(self): def __hash__(self):
return hash(self.id) return hash(self.id)

View File

@ -1 +0,0 @@
__all__ = ["csv", "output"]

View File

@ -1,35 +0,0 @@
from csv import DictReader, writer
from pfbudget.db.model import (
BankTransaction,
MoneyTransaction,
Transaction,
)
from .output import Output
class CSV(Output):
def __init__(self, filename: str):
self.fn = filename
def load(self) -> list[Transaction]:
with open(self.fn, "r", newline="") as f:
r = DictReader(f)
return [
BankTransaction(
row["date"], row["description"], row["amount"], False, row["bank"]
)
if row["bank"]
else MoneyTransaction(
row["date"], row["description"], False, row["amount"]
)
for row in r
]
def report(self, transactions: list[Transaction]):
with open(self.fn, "w", newline="") as f:
w = writer(f, delimiter="\t")
w.writerows(
[(t.date, t.description, t.amount, t.bank) for t in transactions]
)

View File

@ -1,9 +0,0 @@
from abc import ABC, abstractmethod
from pfbudget.db.model import Transaction
class Output(ABC):
@abstractmethod
def report(self, transactions: list[Transaction]):
raise NotImplementedError