Compare commits

..

4 Commits

Author SHA1 Message Date
92f8493f86
Move selector table back to transactions schema
Change table name originals to transactions and tags to tagged.
2023-01-23 00:43:05 +00:00
7f837b849f
Split tables per different schemas 2023-01-23 00:35:59 +00:00
8760f5a0a4
Export/Import categories and groups 2023-01-23 00:06:36 +00:00
dd0aaa01b8
Export/import for banks 2023-01-22 23:42:32 +00:00
8 changed files with 718 additions and 41 deletions

View File

@ -27,7 +27,7 @@ target_metadata = Base.metadata
def include_name(name, type_, parent_names): def include_name(name, type_, parent_names):
if type_ == "schema": if type_ == "schema":
return name in ["transactions"] return name in ["bank", "category", "tag", "transactions"]
else: else:
return True return True

View File

@ -0,0 +1,88 @@
"""Selector back to transaction
Revision ID: 28556ab17c56
Revises: e455c78df789
Create Date: 2023-01-23 00:34:39.062562+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "28556ab17c56"
down_revision = "e455c78df789"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("tags", "tagged", schema="transactions")
op.create_table(
"selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_selector")),
schema="transactions",
)
op.drop_table("selector", schema="category")
op.rename_table("originals", "transactions", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("transactions", "originals", schema="transactions")
op.create_table(
"selector",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="category",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name="fk_selector_id_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_selector"),
schema="category",
)
op.drop_table("selector", schema="transactions")
op.rename_table("tagged", "tags", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,452 @@
"""Divide by schemas
Revision ID: e455c78df789
Revises: 6b293f78cc97
Create Date: 2023-01-22 23:38:23.266906+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "e455c78df789"
down_revision = "6b293f78cc97"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"banks",
sa.Column("name", sa.String(), nullable=False),
sa.Column("BIC", sa.String(length=8), nullable=False),
sa.Column(
"type",
sa.Enum(
"checking",
"savings",
"investment",
"mealcard",
"VISA",
"MASTERCARD",
name="accounttype",
schema="bank",
inherit_schema=True,
),
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name=op.f("pk_banks")),
sa.UniqueConstraint("name", name=op.f("uq_banks_name")),
schema="bank",
)
op.create_table(
"groups",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_groups")),
schema="category",
)
op.create_table(
"available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_available")),
schema="tag",
)
op.create_table(
"nordigen",
sa.Column("name", sa.Text(), nullable=False),
sa.Column("bank_id", sa.String(), nullable=True),
sa.Column("requisition_id", sa.String(), nullable=True),
sa.Column("invert", sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(
["name"], ["bank.banks.name"], name=op.f("fk_nordigen_name_banks")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_nordigen")),
schema="bank",
)
op.create_table(
"available",
sa.Column("name", sa.String(), nullable=False),
sa.Column("group", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["group"], ["category.groups.name"], name=op.f("fk_available_group_groups")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_available")),
schema="category",
)
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name=op.f("fk_rules_id_rules"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["tag"],
["tag.available.name"],
name=op.f("fk_rules_tag_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="tag",
)
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name=op.f("fk_rules_id_rules"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["name"],
["category.available.name"],
name=op.f("fk_rules_name_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="category",
)
op.create_table(
"schedules",
sa.Column("name", sa.String(), nullable=False),
sa.Column(
"period",
sa.Enum(
"daily",
"weekly",
"monthly",
"yearly",
name="period",
schema="category",
inherit_schema=True,
),
nullable=True,
),
sa.Column("period_multiplier", sa.Integer(), nullable=True),
sa.Column("amount", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["name"],
["category.available.name"],
name=op.f("fk_schedules_name_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_schedules")),
schema="category",
)
op.create_table(
"selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="category",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_selector")),
schema="category",
)
op.drop_constraint(
"fk_categorized_name_categories_available",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_name_available"),
"categorized",
"available",
["name"],
["name"],
source_schema="transactions",
referent_schema="category",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_originals_bank_banks",
"originals",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_originals_bank_banks"),
"originals",
"banks",
["bank"],
["name"],
source_schema="transactions",
referent_schema="bank",
)
op.drop_constraint(
"fk_tags_tag_tags_available", "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_tags_tag_available"),
"tags",
"available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="tag",
)
op.drop_table("categories_schedules", schema="transactions")
op.drop_table("categories_rules", schema="transactions")
op.drop_table("categories_available", schema="transactions")
op.drop_table("tag_rules", schema="transactions")
op.drop_table("nordigen", schema="transactions")
op.drop_table("tags_available", schema="transactions")
op.drop_table("banks", schema="transactions")
op.drop_table("categories_selector", schema="transactions")
op.drop_table("categories_groups", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_tag_available"), "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
"fk_tags_tag_tags_available",
"tags",
"tags_available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_originals_bank_banks"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_originals_bank_banks",
"originals",
"banks",
["bank"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_categorized_name_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_name_categories_available",
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.create_table(
"categories_groups",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("name", name="pk_categories_groups"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"categories_selector",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector",
schema="transactions",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name="fk_categories_selector_id_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_categories_selector"),
schema="transactions",
)
op.create_table(
"banks",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("BIC", sa.VARCHAR(length=8), autoincrement=False, nullable=False),
sa.Column(
"type",
postgresql.ENUM(
"checking",
"savings",
"investment",
"mealcard",
"VISA",
"MASTERCARD",
name="accounttype",
schema="transactions",
),
autoincrement=False,
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name="pk_banks"),
sa.UniqueConstraint("name", name="uq_banks_name"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"tags_available",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("name", name="pk_tags_available"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"nordigen",
sa.Column("name", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("bank_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("requisition_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("invert", sa.BOOLEAN(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["name"], ["transactions.banks.name"], name="fk_nordigen_name_banks"
),
sa.PrimaryKeyConstraint("name", name="pk_nordigen"),
schema="transactions",
)
op.create_table(
"tag_rules",
sa.Column(
"id",
sa.BIGINT(),
server_default=sa.text(
"nextval('transactions.tag_rules_id_seq'::regclass)"
),
autoincrement=True,
nullable=False,
),
sa.Column("tag", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name="fk_tag_rules_id_rules",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["tag"],
["transactions.tags_available.name"],
name="fk_tag_rules_tag_tags_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_tag_rules"),
schema="transactions",
)
op.create_table(
"categories_rules",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"id",
sa.BIGINT(),
server_default=sa.text(
"nextval('transactions.categories_rules_id_seq'::regclass)"
),
autoincrement=True,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name="fk_categories_rules_id_rules",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name="fk_categories_rules_name_categories_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_categories_rules"),
schema="transactions",
)
op.create_table(
"categories_available",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("group", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["group"],
["transactions.categories_groups.name"],
name="fk_categories_available_group_categories_groups",
),
sa.PrimaryKeyConstraint("name", name="pk_categories_available"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"categories_schedules",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"period",
postgresql.ENUM(
"daily",
"weekly",
"monthly",
"yearly",
name="period",
schema="transactions",
),
autoincrement=False,
nullable=True,
),
sa.Column(
"period_multiplier", sa.INTEGER(), autoincrement=False, nullable=True
),
sa.Column("amount", sa.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name="fk_categories_schedules_name_categories_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name="pk_categories_schedules"),
schema="transactions",
)
op.drop_table("selector", schema="category")
op.drop_table("schedules", schema="category")
op.drop_table("rules", schema="category")
op.drop_table("rules", schema="tag")
op.drop_table("available", schema="category")
op.drop_table("nordigen", schema="bank")
op.drop_table("available", schema="tag")
op.drop_table("groups", schema="category")
op.drop_table("banks", schema="bank")
# ### end Alembic commands ###

View File

@ -232,7 +232,20 @@ if __name__ == "__main__":
pfbudget.t.Link(args["original"][0], link) for link in args["links"] pfbudget.t.Link(args["original"][0], link) for link in args["links"]
] ]
case pfbudget.Operation.Export | pfbudget.Operation.Import | pfbudget.Operation.ExportCategoryRules | pfbudget.Operation.ImportCategoryRules | pfbudget.Operation.ExportTagRules | pfbudget.Operation.ImportTagRules: case (
pfbudget.Operation.Export
| pfbudget.Operation.Import
| pfbudget.Operation.ExportBanks
| pfbudget.Operation.ImportBanks
| pfbudget.Operation.ExportCategoryRules
| pfbudget.Operation.ImportCategoryRules
| pfbudget.Operation.ExportTagRules
| pfbudget.Operation.ImportTagRules
| pfbudget.Operation.ExportCategories
| pfbudget.Operation.ImportCategories
| pfbudget.Operation.ExportCategoryGroups
| pfbudget.Operation.ImportCategoryGroups
):
keys = {"file"} keys = {"file"}
assert args.keys() >= keys, f"missing {args.keys() - keys}" assert args.keys() >= keys, f"missing {args.keys() - keys}"

View File

@ -225,6 +225,14 @@ def bank(parser: argparse.ArgumentParser):
nordigen(commands.add_parser("nordigen")) nordigen(commands.add_parser("nordigen"))
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportBanks)
export_args(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportBanks)
export_args(pimport)
def nordigen(parser: argparse.ArgumentParser): def nordigen(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True) commands = parser.add_subparsers(required=True)
@ -278,6 +286,14 @@ def category(parser: argparse.ArgumentParser):
group = commands.add_parser("group") group = commands.add_parser("group")
category_group(group) category_group(group)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategories)
export_args(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategories)
export_args(pimport)
def category_group(parser: argparse.ArgumentParser): def category_group(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True) commands = parser.add_subparsers(required=True)
@ -290,6 +306,14 @@ def category_group(parser: argparse.ArgumentParser):
remove.set_defaults(op=Operation.GroupRemove) remove.set_defaults(op=Operation.GroupRemove)
remove.add_argument("group", nargs="+", type=str) remove.add_argument("group", nargs="+", type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryGroups)
export_args(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryGroups)
export_args(pimport)
def category_rule(parser: argparse.ArgumentParser): def category_rule(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True) commands = parser.add_subparsers(required=True)

View File

@ -37,10 +37,16 @@ class Operation(Enum):
NordigenCountryBanks = auto() NordigenCountryBanks = auto()
Export = auto() Export = auto()
Import = auto() Import = auto()
ExportBanks = auto()
ImportBanks = auto()
ExportCategoryRules = auto() ExportCategoryRules = auto()
ImportCategoryRules = auto() ImportCategoryRules = auto()
ExportTagRules = auto() ExportTagRules = auto()
ImportTagRules = auto() ImportTagRules = auto()
ExportCategories = auto()
ImportCategories = auto()
ExportCategoryGroups = auto()
ImportCategoryGroups = auto()
class TransactionError(Exception): class TransactionError(Exception):

View File

@ -11,6 +11,7 @@ from pfbudget.db.model import (
Category, Category,
CategoryGroup, CategoryGroup,
CategoryRule, CategoryRule,
CategorySchedule,
CategorySelector, CategorySelector,
Link, Link,
MoneyTransaction, MoneyTransaction,
@ -213,16 +214,26 @@ class Manager:
transactions.append(transaction) transactions.append(transaction)
if ( if self.certify(transactions):
len(transactions) > 0
and input(
f"{transactions[:5]}\nDoes the import seem correct? (y/n)"
)
== "y"
):
with self.db.session() as session: with self.db.session() as session:
session.add(transactions) session.add(transactions)
case Operation.ExportBanks:
with self.db.session() as session:
self.dump(params[0], session.get(Bank))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = Nordigen(**row["nordigen"])
banks.append(bank)
if self.certify(banks):
with self.db.session() as session:
session.add(banks)
case Operation.ExportCategoryRules: case Operation.ExportCategoryRules:
with self.db.session() as session: with self.db.session() as session:
self.dump(params[0], session.get(CategoryRule)) self.dump(params[0], session.get(CategoryRule))
@ -230,11 +241,7 @@ class Manager:
case Operation.ImportCategoryRules: case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0])] rules = [CategoryRule(**row) for row in self.load(params[0])]
if ( if self.certify(rules):
len(rules) > 0
and input(f"{rules[:5]}\nDoes the import seem correct? (y/n)")
== "y"
):
with self.db.session() as session: with self.db.session() as session:
session.add(rules) session.add(rules)
@ -245,25 +252,64 @@ class Manager:
case Operation.ImportTagRules: case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0])] rules = [TagRule(**row) for row in self.load(params[0])]
if ( if self.certify(rules):
len(rules) > 0
and input(f"{rules[:5]}\nDoes the import seem correct? (y/n)")
== "y"
):
with self.db.session() as session: with self.db.session() as session:
session.add(rules) session.add(rules)
case Operation.ExportCategories:
with self.db.session() as session:
self.dump(params[0], session.get(Category))
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
categories = []
for row in self.load(params[0]):
category = Category(row["name"], row["group"])
if len(row["rules"]) > 0:
# Only category rules could have been created with a rule
rules = row["rules"]
for rule in rules:
del rule["type"]
category.rules = set(CategoryRule(**rule) for rule in rules)
if row["schedule"]:
category.schedule = CategorySchedule(**row["schedule"])
categories.append(category)
if self.certify(categories):
with self.db.session() as session:
session.add(categories)
case Operation.ExportCategoryGroups:
with self.db.session() as session:
self.dump(params[0], session.get(CategoryGroup))
case Operation.ImportCategoryGroups:
groups = [CategoryGroup(**row) for row in self.load(params[0])]
if self.certify(groups):
with self.db.session() as session:
session.add(groups)
def parse(self, filename: Path, args: dict): def parse(self, filename: Path, args: dict):
return parse_data(filename, args) return parse_data(filename, args)
def dump(self, fn, sequence): @staticmethod
def dump(fn, sequence):
with open(fn, "wb") as f: with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f) pickle.dump([e.format for e in sequence], f)
def load(self, fn): @staticmethod
def load(fn):
with open(fn, "rb") as f: with open(fn, "rb") as f:
return pickle.load(f) return pickle.load(f)
@staticmethod
def certify(imports: list) -> bool:
if input(f"{imports[:10]}\nDoes the import seem correct? (y/n)") == "y":
return True
return False
@property @property
def db(self) -> DbClient: def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 2) return DbClient(self._db, self._verbosity > 2)

View File

@ -24,15 +24,15 @@ from sqlalchemy.orm import (
class Base(MappedAsDataclass, DeclarativeBase): class Base(MappedAsDataclass, DeclarativeBase):
__table_args__ = {"schema": "transactions"}
metadata = MetaData( metadata = MetaData(
schema="transactions",
naming_convention={ naming_convention={
"ix": "ix_%(column_0_label)s", "ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s", "uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_`%(constraint_name)s`", "ck": "ck_%(table_name)s_`%(constraint_name)s`",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s", "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s", "pk": "pk_%(table_name)s",
} },
) )
@ -57,7 +57,8 @@ class Export:
raise NotImplementedError raise NotImplementedError
class Bank(Base): class Bank(Base, Export):
__table_args__ = {"schema": "bank"}
__tablename__ = "banks" __tablename__ = "banks"
name: Mapped[str] = mapped_column(unique=True) name: Mapped[str] = mapped_column(unique=True)
@ -66,6 +67,15 @@ class Bank(Base):
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False) nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
BIC=self.BIC,
type=self.type,
nordigen=self.nordigen.format if self.nordigen else None,
)
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))] bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
@ -74,7 +84,7 @@ money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
class Transaction(Base, Export): class Transaction(Base, Export):
__tablename__ = "originals" __tablename__ = "transactions"
id: Mapped[idpk] = mapped_column(init=False) id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[dt.date] date: Mapped[dt.date]
@ -137,14 +147,20 @@ class SplitTransaction(Transaction):
return super().format | dict(original=self.original) return super().format | dict(original=self.original)
class CategoryGroup(Base): class CategoryGroup(Base, Export):
__tablename__ = "categories_groups" __table_args__ = {"schema": "category"}
__tablename__ = "groups"
name: Mapped[str] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(primary_key=True)
@property
def format(self) -> dict[str, Any]:
return dict(name=self.name)
class Category(Base):
__tablename__ = "categories_available" class Category(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "available"
name: Mapped[str] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(primary_key=True)
group: Mapped[Optional[str]] = mapped_column( group: Mapped[Optional[str]] = mapped_column(
@ -161,6 +177,15 @@ class Category(Base):
def __repr__(self) -> str: def __repr__(self) -> str:
return f"Category(name={self.name}, group={self.group}, #rules={len(self.rules)}, schedule={self.schedule})" return f"Category(name={self.name}, group={self.group}, #rules={len(self.rules)}, schedule={self.schedule})"
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
group=self.group if self.group else None,
rules=[rule.format for rule in self.rules],
schedule=self.schedule.format if self.schedule else None,
)
catfk = Annotated[ catfk = Annotated[
str, str,
@ -190,7 +215,8 @@ class Note(Base):
note: Mapped[str] note: Mapped[str]
class Nordigen(Base): class Nordigen(Base, Export):
__table_args__ = {"schema": "bank"}
__tablename__ = "nordigen" __tablename__ = "nordigen"
name: Mapped[bankfk] = mapped_column(primary_key=True) name: Mapped[bankfk] = mapped_column(primary_key=True)
@ -198,9 +224,19 @@ class Nordigen(Base):
requisition_id: Mapped[Optional[str]] requisition_id: Mapped[Optional[str]]
invert: Mapped[Optional[bool]] invert: Mapped[Optional[bool]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
bank_id=self.bank_id,
requisition_id=self.requisition_id,
invert=self.invert,
)
class Tag(Base): class Tag(Base):
__tablename__ = "tags_available" __table_args__ = {"schema": "tag"}
__tablename__ = "available"
name: Mapped[str] = mapped_column(primary_key=True) name: Mapped[str] = mapped_column(primary_key=True)
@ -210,7 +246,7 @@ class Tag(Base):
class TransactionTag(Base, Export): class TransactionTag(Base, Export):
__tablename__ = "tags" __tablename__ = "tagged"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False) id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True) tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
@ -223,7 +259,7 @@ class TransactionTag(Base, Export):
return hash(self.id) return hash(self.id)
class Selector(enum.Enum): class Selector_T(enum.Enum):
unknown = enum.auto() unknown = enum.auto()
nullifier = enum.auto() nullifier = enum.auto()
vacations = enum.auto() vacations = enum.auto()
@ -233,13 +269,13 @@ class Selector(enum.Enum):
categoryselector = Annotated[ categoryselector = Annotated[
Selector, Selector_T,
mapped_column(Enum(Selector, inherit_schema=True), default=Selector.unknown), mapped_column(Enum(Selector_T, inherit_schema=True), default=Selector_T.unknown),
] ]
class CategorySelector(Base, Export): class CategorySelector(Base, Export):
__tablename__ = "categories_selector" __tablename__ = "selector"
id: Mapped[int] = mapped_column( id: Mapped[int] = mapped_column(
BigInteger, BigInteger,
@ -261,17 +297,27 @@ class Period(enum.Enum):
yearly = "yearly" yearly = "yearly"
scheduleperiod = Annotated[Selector, mapped_column(Enum(Period, inherit_schema=True))] scheduleperiod = Annotated[Selector_T, mapped_column(Enum(Period, inherit_schema=True))]
class CategorySchedule(Base): class CategorySchedule(Base, Export):
__tablename__ = "categories_schedules" __table_args__ = {"schema": "category"}
__tablename__ = "schedules"
name: Mapped[catfk] = mapped_column(primary_key=True) name: Mapped[catfk] = mapped_column(primary_key=True)
period: Mapped[Optional[scheduleperiod]] period: Mapped[Optional[scheduleperiod]]
period_multiplier: Mapped[Optional[int]] period_multiplier: Mapped[Optional[int]]
amount: Mapped[Optional[int]] amount: Mapped[Optional[int]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
period=self.period,
period_multiplier=self.period_multiplier,
amount=self.amount,
)
class Link(Base): class Link(Base):
__tablename__ = "links" __tablename__ = "links"
@ -334,7 +380,8 @@ class Rule(Base, Export):
class CategoryRule(Rule): class CategoryRule(Rule):
__tablename__ = "categories_rules" __table_args__ = {"schema": "category"}
__tablename__ = "rules"
id: Mapped[int] = mapped_column( id: Mapped[int] = mapped_column(
BigInteger, BigInteger,
@ -357,7 +404,8 @@ class CategoryRule(Rule):
class TagRule(Rule): class TagRule(Rule):
__tablename__ = "tag_rules" __table_args__ = {"schema": "tag"}
__tablename__ = "rules"
id: Mapped[int] = mapped_column( id: Mapped[int] = mapped_column(
BigInteger, BigInteger,