Compare commits

..

No commits in common. "ddb02b33f9d3bab1e841efcbeffe76f69cab5b7d" and "5957242b838f4345f607186035b921fd6610093d" have entirely different histories.

56 changed files with 741 additions and 4311 deletions

1
.gitignore vendored
View File

@ -153,5 +153,4 @@ dmypy.json
### Default user directories
export/
tmp/
.pfbudget

View File

@ -1,105 +0,0 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
timezone = UTC
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgresql://pf-budget:muster-neutron-omega@database.lan/pf-budget
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
hooks = black
black.type = console_scripts
black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

View File

@ -1 +0,0 @@
Generic single-database configuration.

View File

@ -1,88 +0,0 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
from pfbudget.db.model import Base
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def include_name(name, type_, parent_names):
if type_ == "schema":
return name in ["bank", "category", "tag", "transactions"]
else:
return True
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
include_name=include_name,
include_schemas=True,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

View File

@ -1,24 +0,0 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -1,32 +0,0 @@
"""Regex rule
Revision ID: 0ce89e987770
Revises: 7adf89ec8d14
Create Date: 2022-12-10 14:00:49.418494+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "0ce89e987770"
down_revision = "7adf89ec8d14"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column("regex", sa.String(), nullable=True),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("categories_rules", "regex", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,40 +0,0 @@
"""Split member of base transaction
Revision ID: 18572111d9ff
Revises: 28556ab17c56
Create Date: 2023-01-23 20:09:37.892997+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "18572111d9ff"
down_revision = "28556ab17c56"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"transactions",
"split",
existing_type=sa.BOOLEAN(),
nullable=False,
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"transactions",
"split",
existing_type=sa.BOOLEAN(),
nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -1,88 +0,0 @@
"""Selector back to transaction
Revision ID: 28556ab17c56
Revises: e455c78df789
Create Date: 2023-01-23 00:34:39.062562+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "28556ab17c56"
down_revision = "e455c78df789"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("tags", "tagged", schema="transactions")
op.create_table(
"selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_selector")),
schema="transactions",
)
op.drop_table("selector", schema="category")
op.rename_table("originals", "transactions", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("transactions", "originals", schema="transactions")
op.create_table(
"selector",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="category",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name="fk_selector_id_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_selector"),
schema="category",
)
op.drop_table("selector", schema="transactions")
op.rename_table("tagged", "tags", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,109 +0,0 @@
"""Add relationships
Revision ID: 287fe9e6682a
Revises: d3534f493239
Create Date: 2022-12-03 16:43:39.633382+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "287fe9e6682a"
down_revision = "d3534f493239"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"fk_categorized_id_originals",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_id_originals"),
"categorized",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_notes_id_originals", "notes", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_notes_id_originals"),
"notes",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_tags_id_originals", "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_tags_id_originals"),
"tags",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_id_originals"), "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
"fk_tags_id_originals",
"tags",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_notes_id_originals"),
"notes",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_notes_id_originals",
"notes",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_categorized_id_originals"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_id_originals",
"categorized",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###

View File

@ -1,49 +0,0 @@
"""Available categories and rules
Revision ID: 2d0891f1be11
Revises: 287fe9e6682a
Create Date: 2022-12-04 11:15:22.758487+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "2d0891f1be11"
down_revision = "287fe9e6682a"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_available")),
schema="transactions",
)
op.create_table(
"categories_rules",
sa.Column("name", sa.String(), nullable=False),
sa.Column("rule", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name=op.f("fk_categories_rules_name_categories_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", "rule", name=op.f("pk_categories_rules")),
schema="transactions",
)
op.alter_column("categorized", "category", new_column_name="name", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column("categorized", "name", new_column_name="category", schema="transactions")
op.drop_table("categories_rules", schema="transactions")
op.drop_table("categories_available", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,74 +0,0 @@
"""Inheritance
Revision ID: 37d80de801a7
Revises: 8cc9870b0d74
Create Date: 2023-01-10 22:41:03.540108+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "37d80de801a7"
down_revision = "8cc9870b0d74"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"originals",
sa.Column("type", sa.String(), nullable=False),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("split", sa.Boolean(), nullable=True),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("original", sa.BigInteger(), nullable=True),
schema="transactions",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=True,
schema="transactions",
)
op.create_foreign_key(
op.f("fk_originals_original_originals"),
"originals",
"originals",
["original"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_originals_original_originals"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=False,
schema="transactions",
)
op.drop_column("originals", "original", schema="transactions")
op.drop_column("originals", "split", schema="transactions")
op.drop_column("originals", "type", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,111 +0,0 @@
"""Init
Revision ID: 50ff1fbb8a00
Revises:
Create Date: 2022-12-03 11:49:30.450115+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "50ff1fbb8a00"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"banks",
sa.Column("name", sa.String(), nullable=False),
sa.Column("BIC", sa.String(length=8), nullable=False),
sa.Column(
"type",
sa.Enum(
"checking",
"savings",
"investment",
"VISA",
"MASTERCARD",
name="accounttype",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name=op.f("pk_banks")),
sa.UniqueConstraint("name", name=op.f("uq_banks_name")),
schema="transactions",
)
op.create_table(
"nordigen",
sa.Column("name", sa.Text(), nullable=False),
sa.Column("bank_id", sa.String(), nullable=True),
sa.Column("requisition_id", sa.String(), nullable=True),
sa.Column("invert", sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(
["name"], ["transactions.banks.name"], name=op.f("fk_nordigen_name_banks")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_nordigen")),
schema="transactions",
)
op.create_table(
"originals",
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column("date", sa.Date(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.Column("bank", sa.Text(), nullable=False),
sa.Column("amount", sa.Numeric(precision=16, scale=2), nullable=False),
sa.ForeignKeyConstraint(
["bank"], ["transactions.banks.name"], name=op.f("fk_originals_bank_banks")
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_originals")),
schema="transactions",
)
op.create_table(
"categorized",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("category", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.originals.id"],
name=op.f("fk_categorized_id_originals"),
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_categorized")),
schema="transactions",
)
op.create_table(
"notes",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("note", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"], ["transactions.originals.id"], name=op.f("fk_notes_id_originals")
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_notes")),
schema="transactions",
)
op.create_table(
"tags",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"], ["transactions.originals.id"], name=op.f("fk_tags_id_originals")
),
sa.PrimaryKeyConstraint("id", "tag", name=op.f("pk_tags")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("tags", schema="transactions")
op.drop_table("notes", schema="transactions")
op.drop_table("categorized", schema="transactions")
op.drop_table("originals", schema="transactions")
op.drop_table("nordigen", schema="transactions")
op.drop_table("banks", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,54 +0,0 @@
"""Category selector
Revision ID: 6863dda76ea2
Revises: 83f4c9837f6e
Create Date: 2022-12-08 00:56:59.032641+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6863dda76ea2"
down_revision = "83f4c9837f6e"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_categories_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_categories_selector")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("categories_selector", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,152 +0,0 @@
"""Rule inheritance
Revision ID: 6b293f78cc97
Revises: 37d80de801a7
Create Date: 2023-01-22 20:05:32.887092+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6b293f78cc97"
down_revision = "37d80de801a7"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("regex", sa.String(), nullable=True),
sa.Column("bank", sa.String(), nullable=True),
sa.Column("min", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("max", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("type", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_categories_rules_id_rules"),
"categories_rules",
"rules",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_column("categories_rules", "bank", schema="transactions")
op.drop_column("categories_rules", "min", schema="transactions")
op.drop_column("categories_rules", "date", schema="transactions")
op.drop_column("categories_rules", "regex", schema="transactions")
op.drop_column("categories_rules", "description", schema="transactions")
op.drop_column("categories_rules", "max", schema="transactions")
op.create_foreign_key(
op.f("fk_tag_rules_id_rules"),
"tag_rules",
"rules",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_column("tag_rules", "bank", schema="transactions")
op.drop_column("tag_rules", "min", schema="transactions")
op.drop_column("tag_rules", "date", schema="transactions")
op.drop_column("tag_rules", "regex", schema="transactions")
op.drop_column("tag_rules", "description", schema="transactions")
op.drop_column("tag_rules", "max", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"tag_rules",
sa.Column(
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column(
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.drop_constraint(
op.f("fk_tag_rules_id_rules"),
"tag_rules",
schema="transactions",
type_="foreignkey",
)
op.add_column(
"categories_rules",
sa.Column(
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column(
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.drop_constraint(
op.f("fk_categories_rules_id_rules"),
"categories_rules",
schema="transactions",
type_="foreignkey",
)
op.drop_table("rules", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,49 +0,0 @@
"""Rule min/max
Revision ID: 753c0bfb2062
Revises: e36e6321568e
Create Date: 2022-12-18 00:24:03.861461+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "753c0bfb2062"
down_revision = "e36e6321568e"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min_amount",
new_column_name="min",
schema="transactions",
)
op.alter_column(
"categories_rules",
"max_amount",
new_column_name="max",
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min",
new_column_name="min_amount",
schema="transactions",
)
op.alter_column(
"categories_rules",
"max",
new_column_name="max_amount",
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -1,43 +0,0 @@
"""Category rule date format
Revision ID: 7adf89ec8d14
Revises: 83603bb7ef9c
Create Date: 2022-12-10 00:08:47.535765+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "7adf89ec8d14"
down_revision = "83603bb7ef9c"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"date",
existing_type=sa.VARCHAR(),
type_=sa.Date(),
existing_nullable=True,
schema="transactions",
postgresql_using="date::date"
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"date",
existing_type=sa.Date(),
type_=sa.VARCHAR(),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -1,38 +0,0 @@
"""Amount of transaction per period
Revision ID: 83603bb7ef9c
Revises: 8b5d5fbc8211
Create Date: 2022-12-09 23:12:15.644758+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "83603bb7ef9c"
down_revision = "8b5d5fbc8211"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_schedules",
sa.Column("amount", sa.Integer(), nullable=True),
schema="transactions",
)
op.drop_column("categories_schedules", "recurring", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_schedules",
sa.Column("recurring", sa.BOOLEAN(), autoincrement=False, nullable=False),
schema="transactions",
)
op.drop_column("categories_schedules", "amount", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,69 +0,0 @@
"""Category groups and relationships
Revision ID: 83f4c9837f6e
Revises: 2d0891f1be11
Create Date: 2022-12-04 15:10:51.924875+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "83f4c9837f6e"
down_revision = "2d0891f1be11"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_groups",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_groups")),
schema="transactions",
)
op.add_column(
"categories_available",
sa.Column("group", sa.String(), nullable=True),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_categories_available_group_categories_groups"),
"categories_available",
"categories_groups",
["group"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.create_foreign_key(
op.f("fk_categorized_name_categories_available"),
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_categorized_name_categories_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.drop_constraint(
op.f("fk_categories_available_group_categories_groups"),
"categories_available",
schema="transactions",
type_="foreignkey",
)
op.drop_column("categories_available", "group", schema="transactions")
op.drop_table("categories_groups", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,92 +0,0 @@
"""Transaction based rules
Revision ID: 8b5d5fbc8211
Revises: e77395969585
Create Date: 2022-12-08 21:05:41.378466+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "8b5d5fbc8211"
down_revision = "e77395969585"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.drop_constraint("pk_categories_rules", "categories_rules", schema="transactions")
op.execute(sa.schema.CreateSequence(sa.schema.Sequence("categories_rules_id_seq", schema="transactions")))
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column(
"id",
sa.BigInteger(),
autoincrement=True,
nullable=False,
server_default=sa.text(
"nextval('transactions.categories_rules_id_seq'::regclass)"
),
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("date", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("description", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("bank", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("min_amount", sa.Float(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("max_amount", sa.Float(), nullable=True),
schema="transactions",
)
op.drop_column("categories_rules", "rule", schema="transactions")
# ### end Alembic commands ###
op.create_primary_key(
"pk_categories_rules",
"categories_rules",
["id"],
schema="transactions",
)
def downgrade() -> None:
op.drop_constraint("pk_categories_rules", "categories_rules", schema="transactions")
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column("rule", sa.String(), autoincrement=False, nullable=False),
schema="transactions",
)
op.drop_column("categories_rules", "max_amount", schema="transactions")
op.drop_column("categories_rules", "min_amount", schema="transactions")
op.drop_column("categories_rules", "bank", schema="transactions")
op.drop_column("categories_rules", "description", schema="transactions")
op.drop_column("categories_rules", "date", schema="transactions")
op.drop_column("categories_rules", "id", schema="transactions")
# ### end Alembic commands ###
op.execute(sa.schema.DropSequence(sa.schema.Sequence("categories_rules_id_seq", schema="transactions")))
op.create_primary_key(
"pk_categories_rules",
"categories_rules",
["name", "rule"],
schema="transactions",
)

View File

@ -1,46 +0,0 @@
"""Links
Revision ID: 8cc9870b0d74
Revises: a910e1b2214d
Create Date: 2022-12-19 22:10:25.136479+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "8cc9870b0d74"
down_revision = "a910e1b2214d"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"links",
sa.Column("original", sa.BigInteger(), nullable=False),
sa.Column("link", sa.BigInteger(), nullable=False),
sa.ForeignKeyConstraint(
["link"],
["transactions.originals.id"],
name=op.f("fk_links_link_originals"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["original"],
["transactions.originals.id"],
name=op.f("fk_links_original_originals"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("original", "link", name=op.f("pk_links")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("links", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,68 +0,0 @@
"""Tag rules
Revision ID: 9028b0f3b985
Revises: 753c0bfb2062
Create Date: 2022-12-18 22:53:13.334046+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "9028b0f3b985"
down_revision = "753c0bfb2062"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"tags_available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_tags_available")),
schema="transactions",
)
op.create_table(
"tag_rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("regex", sa.String(), nullable=True),
sa.Column("bank", sa.String(), nullable=True),
sa.Column("min", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("max", sa.Numeric(precision=16, scale=2), nullable=True),
sa.ForeignKeyConstraint(
["tag"],
["transactions.tags_available.name"],
name=op.f("fk_tag_rules_tag_tags_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_tag_rules")),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_tags_tag_tags_available"),
"tags",
"tags_available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_tag_tags_available"),
"tags",
schema="transactions",
type_="foreignkey",
)
op.drop_table("tag_rules", schema="transactions")
op.drop_table("tags_available", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,32 +0,0 @@
"""Start/End date rule
Revision ID: 952de57a3c43
Revises: 18572111d9ff
Create Date: 2023-02-06 21:57:57.545327+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "952de57a3c43"
down_revision = "18572111d9ff"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"rules", sa.Column("start", sa.Date(), nullable=True), schema="transactions"
)
op.alter_column("rules", column_name="date", new_column_name="end", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column("rules", column_name="end", new_column_name="date", schema="transactions")
op.drop_column("rules", "start", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,56 +0,0 @@
"""Rule inheritance
Revision ID: a910e1b2214d
Revises: 9028b0f3b985
Create Date: 2022-12-19 20:48:04.682812+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "a910e1b2214d"
down_revision = "9028b0f3b985"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"fk_categorized_name_categories_available",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_name_categories_available"),
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_categorized_name_categories_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_name_categories_available",
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###

View File

@ -1,53 +0,0 @@
"""Category schedule
Revision ID: d18cbd50f7c6
Revises: 6863dda76ea2
Create Date: 2022-12-08 13:30:29.048811+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d18cbd50f7c6"
down_revision = "6863dda76ea2"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_schedules",
sa.Column("name", sa.String(), nullable=False),
sa.Column("recurring", sa.Boolean(), nullable=False),
sa.Column(
"period",
sa.Enum(
"daily",
"monthly",
"yearly",
name="period",
schema="transactions",
inherit_schema=True,
),
nullable=True,
),
sa.Column("period_multiplier", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name=op.f("fk_categories_schedules_name_categories_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_schedules")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("categories_schedules", schema="transactions")
# ### end Alembic commands ###

View File

@ -1,36 +0,0 @@
"""Add meal card
Revision ID: d3534f493239
Revises: 50ff1fbb8a00
Create Date: 2022-12-03 12:18:33.519666+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "d3534f493239"
down_revision = "50ff1fbb8a00"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("ALTER TYPE transactions.accounttype ADD VALUE 'mealcard' BEFORE 'VISA'")
def downgrade() -> None:
op.execute(
"""CREATE TYPE transactions.accounttype_new
AS ENUM ('checking', 'savings', 'investment', 'VISA', 'MASTERCARD')
"""
)
op.execute("UPDATE transactions.banks SET type = DEFAULT WHERE type = 'mealcard'")
op.execute(
"""ALTER TABLE transactions.banks
ALTER COLUMN type TYPE transactions.accounttype_new
USING type::text::transactions.accounttype_new
"""
)
op.execute("DROP TYPE transactions.accounttype")
op.execute("ALTER TYPE transactions.accounttype_new RENAME TO accounttype")

View File

@ -1,58 +0,0 @@
"""Rules min/max money
Revision ID: e36e6321568e
Revises: 0ce89e987770
Create Date: 2022-12-10 18:55:07.149010+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "e36e6321568e"
down_revision = "0ce89e987770"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min_amount",
existing_type=sa.DOUBLE_PRECISION(precision=53),
type_=sa.Numeric(precision=16, scale=2),
existing_nullable=True,
schema="transactions",
)
op.alter_column(
"categories_rules",
"max_amount",
existing_type=sa.DOUBLE_PRECISION(precision=53),
type_=sa.Numeric(precision=16, scale=2),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"max_amount",
existing_type=sa.Numeric(precision=16, scale=2),
type_=sa.DOUBLE_PRECISION(precision=53),
existing_nullable=True,
schema="transactions",
)
op.alter_column(
"categories_rules",
"min_amount",
existing_type=sa.Numeric(precision=16, scale=2),
type_=sa.DOUBLE_PRECISION(precision=53),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -1,452 +0,0 @@
"""Divide by schemas
Revision ID: e455c78df789
Revises: 6b293f78cc97
Create Date: 2023-01-22 23:38:23.266906+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "e455c78df789"
down_revision = "6b293f78cc97"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"banks",
sa.Column("name", sa.String(), nullable=False),
sa.Column("BIC", sa.String(length=8), nullable=False),
sa.Column(
"type",
sa.Enum(
"checking",
"savings",
"investment",
"mealcard",
"VISA",
"MASTERCARD",
name="accounttype",
schema="bank",
inherit_schema=True,
),
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name=op.f("pk_banks")),
sa.UniqueConstraint("name", name=op.f("uq_banks_name")),
schema="bank",
)
op.create_table(
"groups",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_groups")),
schema="category",
)
op.create_table(
"available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_available")),
schema="tag",
)
op.create_table(
"nordigen",
sa.Column("name", sa.Text(), nullable=False),
sa.Column("bank_id", sa.String(), nullable=True),
sa.Column("requisition_id", sa.String(), nullable=True),
sa.Column("invert", sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(
["name"], ["bank.banks.name"], name=op.f("fk_nordigen_name_banks")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_nordigen")),
schema="bank",
)
op.create_table(
"available",
sa.Column("name", sa.String(), nullable=False),
sa.Column("group", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["group"], ["category.groups.name"], name=op.f("fk_available_group_groups")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_available")),
schema="category",
)
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name=op.f("fk_rules_id_rules"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["tag"],
["tag.available.name"],
name=op.f("fk_rules_tag_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="tag",
)
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name=op.f("fk_rules_id_rules"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["name"],
["category.available.name"],
name=op.f("fk_rules_name_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="category",
)
op.create_table(
"schedules",
sa.Column("name", sa.String(), nullable=False),
sa.Column(
"period",
sa.Enum(
"daily",
"weekly",
"monthly",
"yearly",
name="period",
schema="category",
inherit_schema=True,
),
nullable=True,
),
sa.Column("period_multiplier", sa.Integer(), nullable=True),
sa.Column("amount", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["name"],
["category.available.name"],
name=op.f("fk_schedules_name_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_schedules")),
schema="category",
)
op.create_table(
"selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="category",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_selector")),
schema="category",
)
op.drop_constraint(
"fk_categorized_name_categories_available",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_name_available"),
"categorized",
"available",
["name"],
["name"],
source_schema="transactions",
referent_schema="category",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_originals_bank_banks",
"originals",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_originals_bank_banks"),
"originals",
"banks",
["bank"],
["name"],
source_schema="transactions",
referent_schema="bank",
)
op.drop_constraint(
"fk_tags_tag_tags_available", "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_tags_tag_available"),
"tags",
"available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="tag",
)
op.drop_table("categories_schedules", schema="transactions")
op.drop_table("categories_rules", schema="transactions")
op.drop_table("categories_available", schema="transactions")
op.drop_table("tag_rules", schema="transactions")
op.drop_table("nordigen", schema="transactions")
op.drop_table("tags_available", schema="transactions")
op.drop_table("banks", schema="transactions")
op.drop_table("categories_selector", schema="transactions")
op.drop_table("categories_groups", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_tag_available"), "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
"fk_tags_tag_tags_available",
"tags",
"tags_available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_originals_bank_banks"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_originals_bank_banks",
"originals",
"banks",
["bank"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_categorized_name_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_name_categories_available",
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.create_table(
"categories_groups",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("name", name="pk_categories_groups"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"categories_selector",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector",
schema="transactions",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name="fk_categories_selector_id_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_categories_selector"),
schema="transactions",
)
op.create_table(
"banks",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("BIC", sa.VARCHAR(length=8), autoincrement=False, nullable=False),
sa.Column(
"type",
postgresql.ENUM(
"checking",
"savings",
"investment",
"mealcard",
"VISA",
"MASTERCARD",
name="accounttype",
schema="transactions",
),
autoincrement=False,
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name="pk_banks"),
sa.UniqueConstraint("name", name="uq_banks_name"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"tags_available",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("name", name="pk_tags_available"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"nordigen",
sa.Column("name", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("bank_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("requisition_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("invert", sa.BOOLEAN(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["name"], ["transactions.banks.name"], name="fk_nordigen_name_banks"
),
sa.PrimaryKeyConstraint("name", name="pk_nordigen"),
schema="transactions",
)
op.create_table(
"tag_rules",
sa.Column(
"id",
sa.BIGINT(),
server_default=sa.text(
"nextval('transactions.tag_rules_id_seq'::regclass)"
),
autoincrement=True,
nullable=False,
),
sa.Column("tag", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name="fk_tag_rules_id_rules",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["tag"],
["transactions.tags_available.name"],
name="fk_tag_rules_tag_tags_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_tag_rules"),
schema="transactions",
)
op.create_table(
"categories_rules",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"id",
sa.BIGINT(),
server_default=sa.text(
"nextval('transactions.categories_rules_id_seq'::regclass)"
),
autoincrement=True,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name="fk_categories_rules_id_rules",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name="fk_categories_rules_name_categories_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_categories_rules"),
schema="transactions",
)
op.create_table(
"categories_available",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("group", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["group"],
["transactions.categories_groups.name"],
name="fk_categories_available_group_categories_groups",
),
sa.PrimaryKeyConstraint("name", name="pk_categories_available"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"categories_schedules",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"period",
postgresql.ENUM(
"daily",
"weekly",
"monthly",
"yearly",
name="period",
schema="transactions",
),
autoincrement=False,
nullable=True,
),
sa.Column(
"period_multiplier", sa.INTEGER(), autoincrement=False, nullable=True
),
sa.Column("amount", sa.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name="fk_categories_schedules_name_categories_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name="pk_categories_schedules"),
schema="transactions",
)
op.drop_table("selector", schema="category")
op.drop_table("schedules", schema="category")
op.drop_table("rules", schema="category")
op.drop_table("rules", schema="tag")
op.drop_table("available", schema="category")
op.drop_table("nordigen", schema="bank")
op.drop_table("available", schema="tag")
op.drop_table("groups", schema="category")
op.drop_table("banks", schema="bank")
# ### end Alembic commands ###

View File

@ -1,37 +0,0 @@
"""Weekly period
Revision ID: e77395969585
Revises: d18cbd50f7c6
Create Date: 2022-12-08 16:35:27.506504+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "e77395969585"
down_revision = "d18cbd50f7c6"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("ALTER TYPE transactions.period ADD VALUE 'weekly' AFTER 'daily'")
def downgrade() -> None:
op.execute(
"""CREATE TYPE transactions.period_new
AS ENUM ('daily', 'monthly', 'yearly')
"""
)
op.execute("UPDATE transactions.categories_schedules SET period = DEFAULT WHERE period = 'weekly'")
op.execute(
"""ALTER TABLE transactions.categories_schedules
ALTER COLUMN period TYPE transactions.period_new
USING period::text::transactions.period_new
"""
)
op.execute("DROP TYPE transactions.period")
op.execute("ALTER TYPE transactions.period_new RENAME TO period")

4
main.py Normal file
View File

@ -0,0 +1,4 @@
from pfbudget import run
if __name__ == "__main__":
run()

View File

@ -1,2 +1,7 @@
__all__ = ["run", "parse_data", "categorize_data"]
__author__ = "Luís Murta"
__version__ = "0.1"
from pfbudget.core.categories import categorize_data
from pfbudget.cli.runnable import run
from pfbudget.input.parsers import parse_data

View File

@ -1,267 +1,4 @@
from pfbudget.cli.argparser import argparser
from pfbudget.cli.interactive import Interactive
from pfbudget.common.types import Operation
from pfbudget.core.manager import Manager
import pfbudget.db.model as type
from pfbudget.utils.utils import parse_args_period
from pfbudget.cli.runnable import run
if __name__ == "__main__":
argparser = argparser()
args = vars(argparser.parse_args())
assert "op" in args, "No Operation selected"
op: Operation = args.pop("op")
assert "database" in args, "No database selected"
db = args.pop("database")
assert "verbose" in args, "No verbose level specified"
verbosity = args.pop("verbose")
params = []
match (op):
case Operation.ManualCategorization:
Interactive(Manager(db, verbosity)).start()
exit()
case Operation.Categorize:
keys = {"no_nulls"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["no_nulls"]]
case Operation.Parse:
keys = {"path", "bank", "creditcard"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["path"], args["bank"], args["creditcard"]]
case Operation.RequisitionId:
keys = {"name", "country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["name"][0], args["country"][0]]
case Operation.Download:
keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = parse_args_period(args)
params = [start, end, args["dry_run"]]
if not args["all"]:
params.append(args["banks"])
else:
params.append([])
case Operation.BankAdd:
keys = {"bank", "bic", "type"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.Bank(
args["bank"][0],
args["bic"][0],
args["type"][0],
)
]
case Operation.BankMod:
keys = {"bank", "bic", "type", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["bic", "type"]
param = {"name": args["bank"][0]}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params = [param]
case Operation.BankDel:
assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"]
case Operation.NordigenAdd:
keys = {"bank", "bank_id", "requisition_id", "invert"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.Nordigen(
args["bank"][0],
args["bank_id"][0] if args["bank_id"] else None,
args["requisition_id"][0] if args["requisition_id"] else None,
args["invert"] if args["invert"] else None,
)
]
case Operation.NordigenMod:
keys = {"bank", "bank_id", "requisition_id", "invert", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["bank_id", "requisition_id"]
nargs_0 = ["invert"]
param = {"name": args["bank"][0]}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: v for k, v in args.items() if k in nargs_0}
param |= {k: None for k in args["remove"] if k in nargs_1}
params = [param]
case Operation.NordigenDel:
assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"]
case Operation.NordigenCountryBanks:
keys = {"country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["country"][0]]
case Operation.CategoryAdd:
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Category(cat, args["group"]) for cat in args["category"]]
case Operation.CategoryUpdate:
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Category(cat) for cat in args["category"]]
params.append(args["group"])
case Operation.CategoryRemove:
assert "category" in args, "argparser ill defined"
params = [type.Category(cat) for cat in args["category"]]
case Operation.CategorySchedule:
keys = {"category", "period", "frequency"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.CategorySchedule(
cat, args["period"][0], args["frequency"][0], None
)
for cat in args["category"]
]
case Operation.RuleAdd:
keys = {"category", "start", "end", "description", "regex", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.CategoryRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
cat,
)
for cat in args["category"]
]
case Operation.RuleRemove | Operation.TagRuleRemove:
keys = {"id"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["id"]
case Operation.RuleModify:
keys = {
"id",
"category",
"date",
"description",
"bank",
"min",
"max",
"remove",
}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["category", "date", "description", "regex", "bank", "min", "max"]
params = []
for id in args["id"]:
param = {"id": id}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params.append(param)
case Operation.TagAdd:
keys = {"tag"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Tag(tag) for tag in args["tag"]]
case Operation.TagRuleAdd:
keys = {"tag", "start", "end", "description", "regex", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.TagRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
tag,
)
for tag in args["tag"]
]
case Operation.TagRuleModify:
keys = {"id", "tag", "date", "description", "bank", "min", "max", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["tag", "date", "description", "regex", "bank", "min", "max"]
params = []
for id in args["id"]:
param = {"id": id}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params.append(param)
case Operation.GroupAdd:
assert "group" in args, "argparser ill defined"
params = [type.CategoryGroup(group) for group in args["group"]]
case Operation.GroupRemove:
assert "group" in args, "argparser ill defined"
params = [type.CategoryGroup(group) for group in args["group"]]
case Operation.Forge | Operation.Dismantle:
keys = {"original", "links"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["original"][0], args["links"]]
case (
Operation.Export
| Operation.Import
| Operation.ExportBanks
| Operation.ImportBanks
| Operation.ExportCategoryRules
| Operation.ImportCategoryRules
| Operation.ExportTagRules
| Operation.ImportTagRules
| Operation.ExportCategories
| Operation.ImportCategories
| Operation.ExportCategoryGroups
| Operation.ImportCategoryGroups
):
keys = {"file", "format"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["file"][0], args["format"][0]]
Manager(db, verbosity).action(op, params)
run()

0
pfbudget/cli/__init__.py Normal file
View File

View File

@ -1,405 +0,0 @@
import argparse
import datetime as dt
import decimal
from dotenv import load_dotenv
import os
import re
from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, Period
from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils.utils
load_dotenv()
DEFAULT_DB = os.environ.get("DEFAULT_DB")
def argparser() -> argparse.ArgumentParser:
universal = argparse.ArgumentParser(add_help=False)
universal.add_argument(
"-db",
"--database",
nargs="?",
help="select current database",
default=DEFAULT_DB,
)
universal.add_argument("-v", "--verbose", action="count", default=0)
period = argparse.ArgumentParser(add_help=False)
period_group = period.add_mutually_exclusive_group()
period_group.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
)
period_group.add_argument("--start", type=str, nargs=1, help="graph start date")
period_group.add_argument("--end", type=str, nargs=1, help="graph end date")
period_group.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser(
description="does cool finance stuff",
parents=[universal],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
if version := re.search(
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', open("pfbudget/__init__.py").read()
):
parser.add_argument(
"--version",
action="version",
version=version.group(1),
)
subparsers = parser.add_subparsers(required=True)
# TODO Init
# init = subparsers.add_parser("init")
# init.set_defaults(op=Operation.Init)
# Exports transactions to .csv file
export = subparsers.add_parser("export")
export.set_defaults(op=Operation.Export)
file_options(export)
pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import)
pimport.add_argument("file", nargs=1, type=str)
# Parse from .csv
parse = subparsers.add_parser("parse")
parse.set_defaults(op=Operation.Parse)
parse.add_argument("path", nargs="+", type=str)
parse.add_argument("--bank", nargs=1, type=str)
parse.add_argument("--creditcard", nargs=1, type=str)
# Automatic/manual categorization
categorize = subparsers.add_parser("categorize").add_subparsers(required=True)
auto = categorize.add_parser("auto")
auto.set_defaults(op=Operation.Categorize)
auto.add_argument("--no-nulls", action="store_false")
categorize.add_parser("manual").set_defaults(op=Operation.ManualCategorization)
"""
Graph
"""
p_graph = subparsers.add_parser(
"graph",
description="Graph of the transactions",
parents=[universal, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_graph.add_argument(
"option",
type=str,
choices=["monthly", "discrete", "networth"],
nargs="?",
default="monthly",
help="graph option help",
)
p_graph.add_argument("--save", action="store_true")
p_graph.set_defaults(func=graph)
"""
Report
"""
p_report = subparsers.add_parser(
"report",
description="Prints report of transaction groups",
parents=[universal, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_report.add_argument(
"option",
type=str,
choices=["net", "detailed"],
nargs="?",
default="net",
help="report option help",
)
p_report.set_defaults(func=report)
# Banks
bank(subparsers.add_parser("bank"))
# Nordigen access token
subparsers.add_parser("token").set_defaults(op=Operation.Token)
# Nordigen requisition id
requisition = subparsers.add_parser("eua")
requisition.set_defaults(op=Operation.RequisitionId)
requisition.add_argument("id", nargs=1, type=str)
requisition.add_argument("country", nargs=1, type=str)
# Download through the Nordigen API
download = subparsers.add_parser("download", parents=[period])
download.set_defaults(op=Operation.Download)
download_banks = download.add_mutually_exclusive_group()
download_banks.add_argument("--all", action="store_true")
download_banks.add_argument("--banks", nargs="+", type=str)
download.add_argument("--dry-run", action="store_true")
# List available banks in country C
banks = subparsers.add_parser("banks")
banks.set_defaults(op=Operation.NordigenCountryBanks)
banks.add_argument("country", nargs=1, type=str)
# Categories
category(subparsers.add_parser("category"))
# Tag
tags(subparsers.add_parser("tag"))
# Link
link(subparsers.add_parser("link"))
return parser
def graph(args):
"""Plots the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.utils.parse_args_period(args)
if args.option == "monthly":
pfbudget.reporting.graph.monthly(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "discrete":
pfbudget.reporting.graph.discrete(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "networth":
pfbudget.reporting.graph.networth(
DatabaseClient(args.database), vars(args), start, end
)
def report(args):
"""Prints a detailed report of the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.utils.parse_args_period(args)
if args.option == "net":
pfbudget.reporting.report.net(DatabaseClient(args.database), start, end)
elif args.option == "detailed":
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
def bank(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.BankAdd)
add.add_argument("bank", nargs=1, type=str)
add.add_argument("bic", nargs=1, type=str)
add.add_argument("type", nargs=1, type=str, choices=[e.name for e in AccountType])
rem = commands.add_parser("del")
rem.set_defaults(op=Operation.BankDel)
rem.add_argument("bank", nargs="+", type=str)
mod = commands.add_parser("mod")
mod.set_defaults(op=Operation.BankMod)
mod.add_argument("bank", nargs=1, type=str)
mod.add_argument("--bic", nargs=1, type=str)
mod.add_argument("--type", nargs=1, type=str, choices=[e.name for e in AccountType])
mod.add_argument("--remove", nargs="*", default=[], type=str)
nordigen(commands.add_parser("nordigen"))
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportBanks)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportBanks)
file_options(pimport)
def nordigen(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.NordigenAdd)
add.add_argument("bank", nargs=1, type=str)
add.add_argument("--bank_id", nargs=1, type=str)
add.add_argument("--requisition_id", nargs=1, type=str)
add.add_argument("--invert", action="store_true")
rem = commands.add_parser("del")
rem.set_defaults(op=Operation.NordigenDel)
rem.add_argument("bank", nargs="+", type=str)
mod = commands.add_parser("mod")
mod.set_defaults(op=Operation.NordigenMod)
mod.add_argument("bank", nargs=1, type=str)
mod.add_argument("--bank_id", nargs=1, type=str)
mod.add_argument("--requisition_id", nargs=1, type=str)
mod.add_argument("--invert", action="store_true")
mod.add_argument("--remove", nargs="*", default=[], type=str)
def category(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.CategoryAdd)
add.add_argument("category", nargs="+", type=str)
add.add_argument("--group", nargs="?", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.CategoryRemove)
remove.add_argument("category", nargs="+", type=str)
update = commands.add_parser("update")
update.set_defaults(op=Operation.CategoryUpdate)
update.add_argument("category", nargs="+", type=str)
update.add_argument("--group", nargs="?", type=str)
schedule = commands.add_parser("schedule")
schedule.set_defaults(op=Operation.CategorySchedule)
schedule.add_argument("category", nargs="+", type=str)
schedule.add_argument("period", nargs=1, choices=[e.value for e in Period])
schedule.add_argument("--frequency", nargs=1, default=[1], type=int)
rule = commands.add_parser("rule")
category_rule(rule)
group = commands.add_parser("group")
category_group(group)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategories)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategories)
file_options(pimport)
def category_group(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.GroupAdd)
add.add_argument("group", nargs="+", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.GroupRemove)
remove.add_argument("group", nargs="+", type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryGroups)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryGroups)
file_options(pimport)
def category_rule(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.RuleAdd)
add.add_argument("category", nargs="+", type=str)
rules(add)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.RuleRemove)
remove.add_argument("id", nargs="+", type=int)
modify = commands.add_parser("modify")
modify.set_defaults(op=Operation.RuleModify)
modify.add_argument("id", nargs="+", type=int)
modify.add_argument("--category", nargs=1, type=str)
rules(modify)
modify.add_argument("--remove", nargs="*", default=[], type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryRules)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryRules)
file_options(pimport)
def tags(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.TagAdd)
add.add_argument("tag", nargs="+", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.TagRemove)
remove.add_argument("tag", nargs="+", type=str)
rule = commands.add_parser("rule")
tag_rule(rule)
def tag_rule(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.TagRuleAdd)
add.add_argument("tag", nargs="+", type=str)
rules(add)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.TagRuleRemove)
remove.add_argument("id", nargs="+", type=int)
modify = commands.add_parser("modify")
modify.set_defaults(op=Operation.TagRuleModify)
modify.add_argument("id", nargs="+", type=int)
modify.add_argument("--tag", nargs=1, type=str)
rules(modify)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportTagRules)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportTagRules)
file_options(pimport)
def rules(parser: argparse.ArgumentParser):
parser.add_argument("--start", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--end", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--description", nargs=1, type=str)
parser.add_argument("--regex", nargs=1, type=str)
parser.add_argument("--bank", nargs=1, type=str)
parser.add_argument("--min", nargs=1, type=decimal.Decimal)
parser.add_argument("--max", nargs=1, type=decimal.Decimal)
def link(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
forge = commands.add_parser("forge")
forge.set_defaults(op=Operation.Forge)
forge.add_argument("original", nargs=1, type=int)
forge.add_argument("links", nargs="+", type=int)
dismantle = commands.add_parser("dismantle")
dismantle.set_defaults(op=Operation.Dismantle)
dismantle.add_argument("original", nargs=1, type=int)
dismantle.add_argument("links", nargs="+", type=int)
def file_options(parser: argparse.ArgumentParser):
parser.add_argument("file", nargs=1, type=str)
parser.add_argument("format", nargs=1, default="pickle")

View File

@ -1,120 +0,0 @@
import decimal
from ..core.manager import Manager
from ..db.model import (
Category,
CategorySelector,
Note,
Selector_T,
SplitTransaction,
Tag,
Transaction,
TransactionCategory,
TransactionTag,
)
class Interactive:
help = "category(:tag)/split/note:/skip/quit"
selector = Selector_T.manual
def __init__(self, manager: Manager) -> None:
self.manager = manager
with self.manager.db.session() as session:
self.categories = session.get(Category)
self.tags = session.get(Tag)
session.expunge_all()
def intro(self) -> None:
print(
f"Welcome! Available categories are {[c.name for c in self.categories]} and"
f" currently existing tags are {[t.name for t in self.tags]}"
)
def start(self) -> None:
self.intro()
with self.manager.db.session() as session:
uncategorized = session.uncategorized()
n = len(uncategorized)
print(f"{n} left to categorize")
i = 0
new = []
next = uncategorized[i]
print(next)
while (command := input("$ ")) != "quit":
match command:
case "help":
print(self.help)
case "skip":
i += 1
case "quit":
break
case "split":
new = self.split(next)
session.add(new)
case other:
if not other:
print(self.help)
continue
if other.startswith("note:"):
# TODO adding notes to a splitted transaction won't allow categorization
next.note = Note(other[len("note:") :].strip())
else:
ct = other.split(":")
if (category := ct[0]) not in [
c.name for c in self.categories
]:
print(self.help, self.categories)
tags = []
if len(ct) > 1:
tags = ct[1:]
next.category = TransactionCategory(
category, CategorySelector(self.selector)
)
for tag in tags:
if tag not in [t.name for t in self.tags]:
session.add([Tag(tag)])
self.tags = session.get(Tag)
next.tags.add(TransactionTag(tag))
i += 1
session.commit()
next = uncategorized[i] if len(new) == 0 else new.pop()
print(next)
def split(self, original: Transaction) -> list[SplitTransaction]:
total = original.amount
new = []
done = False
while not done:
if abs(sum(t.amount for t in new)) > abs(total):
print("Overflow, try again")
new.clear()
continue
if sum(t.amount for t in new) == total:
done = True
break
amount = decimal.Decimal(input("amount: "))
new.append(
SplitTransaction(
original.date, original.description, amount, original.id
)
)
return new

321
pfbudget/cli/runnable.py Normal file
View File

@ -0,0 +1,321 @@
from pathlib import Path
import argparse
import re
from pfbudget.core.categories import categorize_data
from pfbudget.core.manager import Manager
from pfbudget.input.json import JsonParser
from pfbudget.input.nordigen import NordigenInput
from pfbudget.db.client import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils
DEFAULT_DB = "data.db"
class PfBudgetInitialized(Exception):
pass
class PfBudgetNotInitialized(Exception):
pass
class DataFileMissing(Exception):
pass
def argparser(manager: Manager) -> argparse.ArgumentParser:
help = argparse.ArgumentParser(add_help=False)
help.add_argument(
"-db",
"--database",
nargs="?",
help="select current database",
default=DEFAULT_DB,
)
help.add_argument(
"-q", "--quiet", action="store_true", help="reduces the amount of verbose"
)
period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group()
period.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
)
period.add_argument("--start", type=str, nargs=1, help="graph start date")
period.add_argument("--end", type=str, nargs=1, help="graph end date")
period.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser(
description="does cool finance stuff",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--version",
action="version",
version=re.search(
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
open("pfbudget/__init__.py").read(),
).group(1),
)
subparsers = parser.add_subparsers(dest="command", required=True)
"""
Init
"""
p_init = subparsers.add_parser(
"init",
description="Initializes the SQLite3 database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_init.set_defaults(func=lambda args: manager.init())
"""
Exporting
"""
p_export = subparsers.add_parser(
"export",
description="Exports the selected database to a .csv file",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export())
"""
Parsing
"""
p_parse = subparsers.add_parser(
"parse",
description="Parses and adds the requested transactions into the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_parse.add_argument("path", nargs="+", type=str)
p_parse.add_argument("--bank", nargs=1, type=str)
p_parse.add_argument("--creditcard", nargs=1, type=str)
p_parse.add_argument("--category", nargs=1, type=int)
p_parse.set_defaults(func=lambda args: parse(manager, args))
"""
Categorizing
"""
p_categorize = subparsers.add_parser(
"categorize",
description="Categorizes the transactions in the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_categorize.set_defaults(
func=lambda args: categorize_data(DatabaseClient(args.database))
)
"""
Graph
"""
p_graph = subparsers.add_parser(
"graph",
description="Graph of the transactions",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_graph.add_argument(
"option",
type=str,
choices=["monthly", "discrete", "networth"],
nargs="?",
default="monthly",
help="graph option help",
)
p_graph.add_argument("--save", action="store_true")
p_graph.set_defaults(func=graph)
"""
Report
"""
p_report = subparsers.add_parser(
"report",
description="Prints report of transaction groups",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_report.add_argument(
"option",
type=str,
choices=["net", "detailed"],
nargs="?",
default="net",
help="report option help",
)
p_report.set_defaults(func=report)
"""
Register bank
"""
p_register = subparsers.add_parser(
"register",
description="Register a bank",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.add_argument(
"--requisition", type=str, nargs=1, help="requisition option help"
)
p_register.add_argument("--invert", action="store_true")
p_register.set_defaults(func=lambda args: manager.register(vars(args)))
"""
Unregister bank
"""
p_register = subparsers.add_parser(
"unregister",
description="Unregister a bank",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.set_defaults(func=lambda args: manager.unregister(vars(args)))
"""
Nordigen API
"""
p_nordigen_access = subparsers.add_parser(
"token",
description="Get new access token",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_access.set_defaults(func=lambda args: NordigenInput(manager).token())
"""
(Re)new bank requisition ID
"""
p_nordigen_access = subparsers.add_parser(
"renew",
description="(Re)new the Bank requisition ID",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_access.add_argument("name", nargs=1, type=str)
p_nordigen_access.add_argument("country", nargs=1, type=str)
p_nordigen_access.set_defaults(
func=lambda args: NordigenInput(manager).requisition(
args.name[0], args.country[0]
)
)
"""
Downloading through Nordigen API
"""
p_nordigen_download = subparsers.add_parser(
"download",
description="Downloads transactions using Nordigen API",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_download.add_argument("--id", nargs="+", type=str)
p_nordigen_download.add_argument("--name", nargs="+", type=str)
p_nordigen_download.add_argument("--all", action="store_true")
p_nordigen_download.set_defaults(func=lambda args: download(manager, args))
"""
List available banks on Nordigen API
"""
p_nordigen_list = subparsers.add_parser(
"list",
description="Lists banks in {country}",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_list.add_argument("country", nargs=1, type=str)
p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args))
"""
Nordigen JSONs
"""
p_nordigen_json = subparsers.add_parser(
"json",
description="",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_json.add_argument("json", nargs=1, type=str)
p_nordigen_json.add_argument("bank", nargs=1, type=str)
p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction)
p_nordigen_json.set_defaults(
func=lambda args: manager.parser(JsonParser(vars(args)))
)
return parser
def parse(manager: Manager, args):
"""Parses the contents of the path in args to the selected database.
Args:
args (dict): argparse variables
"""
for path in args.path:
if (dir := Path(path)).is_dir():
for file in dir.iterdir():
manager.parse(file, vars(args))
elif Path(path).is_file():
manager.parse(path, vars(args))
else:
raise FileNotFoundError
def graph(args):
"""Plots the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "monthly":
pfbudget.reporting.graph.monthly(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "discrete":
pfbudget.reporting.graph.discrete(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "networth":
pfbudget.reporting.graph.networth(
DatabaseClient(args.database), vars(args), start, end
)
def report(args):
"""Prints a detailed report of the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "net":
pfbudget.reporting.report.net(DatabaseClient(args.database), start, end)
elif args.option == "detailed":
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
def nordigen_banks(manager: Manager, args):
input = NordigenInput(manager)
input.list(vars(args)["country"][0])
def download(manager: Manager, args):
start, end = pfbudget.utils.parse_args_period(args)
manager.parser(NordigenInput(manager, vars(args), start, end))
def run():
manager = Manager(DEFAULT_DB)
args = argparser(manager).parse_args()
args.func(args)

View File

View File

@ -4,53 +4,6 @@ from decimal import Decimal, InvalidOperation
from enum import Enum, auto
class Operation(Enum):
Init = auto()
Transactions = auto()
Parse = auto()
Download = auto()
Categorize = auto()
ManualCategorization = auto()
Token = auto()
RequisitionId = auto()
CategoryAdd = auto()
CategoryUpdate = auto()
CategoryRemove = auto()
CategorySchedule = auto()
RuleAdd = auto()
RuleRemove = auto()
RuleModify = auto()
GroupAdd = auto()
GroupRemove = auto()
TagAdd = auto()
TagRemove = auto()
TagRuleAdd = auto()
TagRuleRemove = auto()
TagRuleModify = auto()
Forge = auto()
Dismantle = auto()
Split = auto()
BankAdd = auto()
BankMod = auto()
BankDel = auto()
NordigenAdd = auto()
NordigenMod = auto()
NordigenDel = auto()
NordigenCountryBanks = auto()
Export = auto()
Import = auto()
ExportBanks = auto()
ImportBanks = auto()
ExportCategoryRules = auto()
ImportCategoryRules = auto()
ExportTagRules = auto()
ImportTagRules = auto()
ExportCategories = auto()
ImportCategories = auto()
ExportCategoryGroups = auto()
ImportCategoryGroups = auto()
class TransactionError(Exception):
pass

View File

View File

@ -9,7 +9,7 @@ import yaml
if TYPE_CHECKING:
from pfbudget.common.types import Transaction
from pfbudget.db.sqlite import DatabaseClient
from pfbudget.db.client import DatabaseClient
Options = namedtuple(

View File

@ -1,154 +0,0 @@
from codetiming import Timer
from datetime import timedelta
from typing import Sequence
import pfbudget.db.model as t
class Categorizer:
options = {}
def __init__(self):
self.options["null_days"] = 3
def rules(
self,
transactions: Sequence[t.BankTransaction],
categories: Sequence[t.Category],
tags: Sequence[t.Tag],
nullify: bool = True
):
"""Overarching categorization tool
Receives a list of transactions (by ref) and updates their category according
to the rules defined for each category
Args:
transactions (Sequence[BankTransaction]): uncategorized transactions
categories (Sequence[Category]): available categories
tags (Sequence[Tag]): currently available tags
"""
if nullify:
try:
null = next(cat for cat in categories if cat.name == "null")
print("Nullifying")
self._nullify(transactions, null)
except StopIteration:
print("Null category not defined")
categories = [cat for cat in categories if cat.name != "null"]
self._rule_based_categories(transactions, categories)
self._rule_based_tags(transactions, tags)
@Timer(name="nullify")
def _nullify(self, transactions: Sequence[t.BankTransaction], null: t.Category):
count = 0
matching = []
for transaction in transactions:
for cancel in (
cancel
for cancel in transactions
if (
transaction.date - timedelta(days=self.options["null_days"])
<= cancel.date
<= transaction.date + timedelta(days=self.options["null_days"])
and cancel != transaction
and cancel.bank != transaction.bank
and cancel.amount == -transaction.amount
and transaction not in matching
and cancel not in matching
and all(r.matches(transaction) for r in null.rules)
and all(r.matches(cancel) for r in null.rules)
)
):
transaction.category = t.TransactionCategory(
name="null",
selector=t.CategorySelector(t.Selector_T.nullifier),
)
cancel.category = t.TransactionCategory(
name="null",
selector=t.CategorySelector(t.Selector_T.nullifier),
)
matching.extend([transaction, cancel])
count += 2
break
print(f"Nullified {count} of {len(transactions)} transactions")
@Timer(name="categoryrules")
def _rule_based_categories(
self,
transactions: Sequence[t.BankTransaction],
categories: Sequence[t.Category],
):
print(f"Categorizing {len(transactions)} transactions")
d = {}
for category in [c for c in categories if c.rules]:
for rule in category.rules:
# for transaction in [t for t in transactions if not t.category]:
for transaction in [
t
for t in transactions
if not t.category or t.category.name != "null"
]:
if not rule.matches(transaction):
continue
# passed all conditions, assign category
if transaction.category:
if transaction.category.name == category.name:
continue
if (
input(
f"Overwrite {transaction} with {category.name}? (y/n)"
)
== "y"
):
transaction.category.name = category.name
transaction.category.selector.selector = t.Selector_T.rules
else:
transaction.category = t.TransactionCategory(
category.name, t.CategorySelector(t.Selector_T.rules)
)
if rule in d:
d[rule] += 1
else:
d[rule] = 1
for k, v in d.items():
print(f"{v}: {k}")
@Timer(name="tagrules")
def _rule_based_tags(
self, transactions: Sequence[t.BankTransaction], tags: Sequence[t.Tag]
):
print(f"Tagging {len(transactions)} transactions")
d = {}
for tag in [t for t in tags if len(t.rules) > 0]:
for rule in tag.rules:
# for transaction in [t for t in transactions if not t.category]:
for transaction in [
t
for t in transactions
if tag.name not in [tag.tag for tag in t.tags]
]:
if not rule.matches(transaction):
continue
if not transaction.tags:
transaction.tags = {t.TransactionTag(tag.name)}
else:
transaction.tags.add(t.TransactionTag(tag.name))
if rule in d:
d[rule] += 1
else:
d[rule] = 1
for k, v in d.items():
print(f"{v}: {k}")

View File

@ -1,412 +1,47 @@
import csv
from pathlib import Path
import pickle
import webbrowser
from pfbudget.common.types import Operation
from pfbudget.core.categorizer import Categorizer
from pfbudget.db.client import DbClient
from pfbudget.db.model import (
Bank,
BankTransaction,
Category,
CategoryGroup,
CategoryRule,
CategorySchedule,
CategorySelector,
Link,
MoneyTransaction,
Nordigen,
Rule,
Selector_T,
SplitTransaction,
Tag,
TagRule,
Transaction,
TransactionCategory,
)
from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.input import Input
from pfbudget.input.parsers import parse_data
from pfbudget.common.types import Bank, Banks, Transaction, Transactions
from pfbudget.db.client import DatabaseClient
from pfbudget.utils import convert
class Manager:
def __init__(self, db: str, verbosity: int = 0):
self._db = db
self._verbosity = verbosity
def __init__(self, db: str):
self.__db = db
def action(self, op: Operation, params=None):
if self._verbosity > 0:
print(f"op={op}, params={params}")
def init(self):
client = DatabaseClient(self.__db)
client.init()
if params is None:
params = []
def register(self, args: dict):
bank = Bank(args["bank"][0], "", args["requisition"][0], args["invert"])
client = DatabaseClient(self.__db)
client.register_bank(convert(bank))
match (op):
case Operation.Init:
pass
def unregister(self, args: dict):
client = DatabaseClient(self.__db)
client.unregister_bank(args["bank"][0])
case Operation.Transactions:
with self.db.session() as session:
transactions = session.get(Transaction)
ret = [t.format for t in transactions]
return ret
def parser(self, parser: Input):
transactions = parser.parse()
self.add_transactions(transactions)
case Operation.Parse:
# Adapter for the parse_data method. Can be refactored.
args = {"bank": params[1], "creditcard": params[2], "category": None}
transactions = []
for path in [Path(p) for p in params[0]]:
if path.is_dir():
for file in path.iterdir():
transactions.extend(self.parse(file, args))
elif path.is_file():
transactions.extend(self.parse(path, args))
else:
raise FileNotFoundError(path)
def parse(self, filename: str, args: dict):
transactions = parse_data(filename, args)
self.add_transactions(transactions)
if (
len(transactions) > 0
and input(f"{transactions[:5]}\nCommit? (y/n)") == "y"
):
with self.db.session() as session:
session.add(sorted(transactions))
def transactions() -> list[Transaction]:
pass
case Operation.Download:
client = NordigenInput()
with self.db.session() as session:
if len(params[3]) == 0:
client.banks = session.get(Bank, Bank.nordigen)
else:
client.banks = session.get(Bank, Bank.name, params[3])
session.expunge_all()
client.start = params[0]
client.end = params[1]
transactions = client.parse()
def add_transactions(self, transactions: Transactions):
client = DatabaseClient(self.__db)
client.insert_transactions([convert(t) for t in transactions])
# dry-run
if not params[2]:
with self.db.session() as session:
session.add(sorted(transactions))
else:
print(transactions)
def get_bank_by(self, key: str, value: str) -> Bank:
client = DatabaseClient(self.__db)
bank = client.get_bank(key, value)
return convert(bank)
case Operation.Categorize:
with self.db.session() as session:
uncategorized = session.get(
BankTransaction, ~BankTransaction.category.has()
)
categories = session.get(Category)
tags = session.get(Tag)
Categorizer().rules(uncategorized, categories, tags, params[0])
case Operation.BankMod:
with self.db.session() as session:
session.update(Bank, params)
case Operation.NordigenMod:
with self.db.session() as session:
session.update(Nordigen, params)
case Operation.BankDel:
with self.db.session() as session:
session.remove_by_name(Bank, params)
case Operation.NordigenDel:
with self.db.session() as session:
session.remove_by_name(Nordigen, params)
case Operation.Token:
NordigenInput().token()
case Operation.RequisitionId:
link, _ = NordigenInput().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
webbrowser.open(link)
case Operation.NordigenCountryBanks:
banks = NordigenInput().country_banks(params[0])
print(banks)
case (
Operation.BankAdd
| Operation.CategoryAdd
| Operation.NordigenAdd
| Operation.RuleAdd
| Operation.TagAdd
| Operation.TagRuleAdd
):
with self.db.session() as session:
session.add(params)
case Operation.CategoryUpdate:
with self.db.session() as session:
session.updategroup(*params)
case Operation.CategoryRemove:
with self.db.session() as session:
session.remove_by_name(Category, params)
case Operation.CategorySchedule:
with self.db.session() as session:
session.updateschedules(params)
case Operation.RuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.remove_by_id(CategoryRule, params)
case Operation.TagRemove:
with self.db.session() as session:
session.remove_by_name(Tag, params)
case Operation.TagRuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.remove_by_id(TagRule, params)
case Operation.RuleModify | Operation.TagRuleModify:
assert all(isinstance(param, dict) for param in params)
with self.db.session() as session:
session.update(Rule, params)
case Operation.GroupAdd:
with self.db.session() as session:
session.add(params)
case Operation.GroupRemove:
assert all(isinstance(param, CategoryGroup) for param in params)
with self.db.session() as session:
session.remove_by_name(CategoryGroup, params)
case Operation.Forge:
if not (
isinstance(params[0], int)
and all(isinstance(p, int) for p in params[1])
):
raise TypeError("f{params} are not transaction ids")
with self.db.session() as session:
original = session.get(Transaction, Transaction.id, params[0])[0]
links = session.get(Transaction, Transaction.id, params[1])
if not original.category:
original.category = self.askcategory(original)
for link in links:
if (
not link.category
or link.category.name != original.category.name
):
print(
f"{link} category will change to"
f" {original.category.name}"
)
link.category = original.category
tobelinked = [Link(original.id, link.id) for link in links]
session.add(tobelinked)
case Operation.Dismantle:
assert all(isinstance(param, Link) for param in params)
with self.db.session() as session:
original = params[0].original
links = [link.link for link in params]
session.remove_links(original, links)
case Operation.Split:
if len(params) < 1 and not all(
isinstance(p, Transaction) for p in params
):
raise TypeError(f"{params} are not transactions")
# t -> t1, t2, t3; t.value == Σti.value
original: Transaction = params[0]
if not original.amount == sum(t.amount for t in params[1:]):
raise ValueError(
f"{original.amount}€ != {sum(v for v, _ in params[1:])}"
)
with self.db.session() as session:
originals = session.get(Transaction, Transaction.id, [original.id])
assert len(originals) == 1, ">1 transactions matched {original.id}!"
originals[0].split = True
transactions = []
for t in params[1:]:
if originals[0].date != t.date:
t.date = originals[0].date
print(
f"{t.date} is different from original date"
f" {originals[0].date}, using original"
)
splitted = SplitTransaction(
t.date, t.description, t.amount, originals[0].id
)
splitted.category = t.category
transactions.append(splitted)
session.add(transactions)
case Operation.Export:
with self.db.session() as session:
self.dump(params[0], params[1], sorted(session.get(Transaction)))
case Operation.Import:
transactions = []
for row in self.load(params[0], params[1]):
match row["type"]:
case "bank":
transaction = BankTransaction(
row["date"],
row["description"],
row["amount"],
row["bank"],
)
case "money":
transaction = MoneyTransaction(
row["date"], row["description"], row["amount"]
)
# TODO case "split" how to match to original transaction?? also save ids?
case _:
continue
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"],
CategorySelector(category["selector"]["selector"]),
)
transactions.append(transaction)
if self.certify(transactions):
with self.db.session() as session:
session.add(transactions)
case Operation.ExportBanks:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(Bank))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0], params[1]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = Nordigen(**row["nordigen"])
banks.append(bank)
if self.certify(banks):
with self.db.session() as session:
session.add(banks)
case Operation.ExportCategoryRules:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(CategoryRule))
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
session.add(rules)
case Operation.ExportTagRules:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(TagRule))
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
session.add(rules)
case Operation.ExportCategories:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(Category))
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
categories = []
for row in self.load(params[0], params[1]):
category = Category(row["name"], row["group"])
if len(row["rules"]) > 0:
# Only category rules could have been created with a rule
rules = row["rules"]
for rule in rules:
del rule["type"]
category.rules = set(CategoryRule(**rule) for rule in rules)
if row["schedule"]:
category.schedule = CategorySchedule(**row["schedule"])
categories.append(category)
if self.certify(categories):
with self.db.session() as session:
session.add(categories)
case Operation.ExportCategoryGroups:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(CategoryGroup))
case Operation.ImportCategoryGroups:
groups = [
CategoryGroup(**row) for row in self.load(params[0], params[1])
]
if self.certify(groups):
with self.db.session() as session:
session.add(groups)
def parse(self, filename: Path, args: dict):
return parse_data(filename, args)
def askcategory(self, transaction: Transaction):
selector = CategorySelector(Selector_T.manual)
with self.db.session() as session:
categories = session.get(Category)
while True:
category = input(f"{transaction}: ")
if category in [c.name for c in categories]:
return TransactionCategory(category, selector)
@staticmethod
def dump(fn, format, sequence):
if format == "pickle":
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
elif format == "csv":
with open(fn, "w", newline="") as f:
csv.writer(f).writerows([e.format.values() for e in sequence])
else:
print("format not well specified")
@staticmethod
def load(fn, format):
if format == "pickle":
with open(fn, "rb") as f:
return pickle.load(f)
elif format == "csv":
raise Exception("CSV import not supported")
else:
print("format not well specified")
return []
@staticmethod
def certify(imports: list) -> bool:
if input(f"{imports[:10]}\nDoes the import seem correct? (y/n)") == "y":
return True
return False
@property
def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 2)
@db.setter
def db(self, url: str):
self._db = url
def get_banks(self) -> Banks:
client = DatabaseClient(self.__db)
return [convert(bank) for bank in client.get_banks()]

View File

@ -1,123 +1,212 @@
from dataclasses import asdict
from sqlalchemy import create_engine, delete, select, update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import false
from typing import Sequence, Type, TypeVar
from __future__ import annotations
from decimal import Decimal
import csv
import datetime
import logging
import logging.config
import pathlib
import sqlite3
from pfbudget.db.model import (
Category,
CategoryGroup,
CategorySchedule,
Link,
Transaction,
)
from pfbudget.common.types import Transaction
import pfbudget.db.schema as Q
class DbClient:
"""
General database client using sqlalchemy
"""
if not pathlib.Path("logs").is_dir():
pathlib.Path("logs").mkdir()
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("pfbudget.transactions")
__sessions: list[Session]
sqlite3.register_adapter(Decimal, lambda d: float(d))
def __init__(self, url: str, echo=False) -> None:
self._engine = create_engine(url, echo=echo)
__DB_NAME = "data.db"
@property
def engine(self):
return self._engine
class ClientSession:
def __init__(self, engine):
self.__engine = engine
class DatabaseClient:
"""SQLite DB connection manager"""
def __enter__(self):
self.__session = Session(self.__engine)
return self
__EXPORT_DIR = "export"
def __exit__(self, exc_type, exc_value, exc_tb):
self.commit()
self.__session.close()
def __init__(self, db: str):
self.db = db
def commit(self):
self.__session.commit()
def expunge_all(self):
self.__session.expunge_all()
T = TypeVar("T")
def get(self, type: Type[T], column=None, values=None) -> Sequence[T]:
if column is not None:
if values:
if isinstance(values, Sequence):
stmt = select(type).where(column.in_(values))
else:
stmt = select(type).where(column == values)
def __execute(self, query: str, params: tuple = None) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
if params:
ret = con.execute(query, params).fetchall()
logger.debug(f"[{self.db}] < {query}{params}")
else:
stmt = select(type).where(column)
else:
stmt = select(type)
ret = con.execute(query).fetchall()
logger.debug(f"[{self.db}] < {query}")
return self.__session.scalars(stmt).all()
if ret:
logger.debug(f"[{self.db}] > {ret}")
except sqlite3.Error:
logger.exception(f"Error while executing [{self.db}] < {query}")
finally:
con.close()
def uncategorized(self) -> Sequence[Transaction]:
"""Selects all valid uncategorized transactions
At this moment that includes:
- Categories w/o category
- AND non-split categories
return ret
Returns:
Sequence[Transaction]: transactions left uncategorized
"""
stmt = (
select(Transaction)
.where(~Transaction.category.has())
.where(Transaction.split == false())
def __executemany(self, query: str, list_of_params: list[tuple]) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
ret = con.executemany(query, list_of_params).fetchall()
logger.debug(f"[{self.db}] < {query}{list_of_params}")
except sqlite3.Error:
logger.exception(
f"Error while executing [{self.db}] < {query} {list_of_params}"
)
return self.__session.scalars(stmt).all()
finally:
con.close()
def add(self, rows: list):
self.__session.add_all(rows)
return ret
def remove_by_name(self, type, rows: list):
stmt = delete(type).where(type.name.in_([row.name for row in rows]))
self.__session.execute(stmt)
def __create_tables(self, tables: tuple[tuple]):
for table_name, query in tables:
logger.info(f"Creating table {table_name} if it doesn't exist already")
self.__execute(query)
def updategroup(self, categories: list[Category], group: CategoryGroup):
stmt = (
update(Category)
.where(Category.name.in_([cat.name for cat in categories]))
.values(group=group)
def init(self):
logging.info(f"Initializing {self.db} database")
self.__create_tables(
(
("transactions", Q.CREATE_TRANSACTIONS_TABLE),
("backups", Q.CREATE_BACKUPS_TABLE),
("banks", Q.CREATE_BANKS_TABLE),
)
self.__session.execute(stmt)
)
def updateschedules(self, schedules: list[CategorySchedule]):
stmt = insert(CategorySchedule).values([asdict(s) for s in schedules])
stmt = stmt.on_conflict_do_update(
index_elements=[CategorySchedule.name],
set_=dict(
recurring=stmt.excluded.recurring,
period=stmt.excluded.period,
period_multiplier=stmt.excluded.period_multiplier,
),
)
self.__session.execute(stmt)
"""Transaction table methods"""
def remove_by_id(self, type, ids: list[int]):
stmt = delete(type).where(type.id.in_(ids))
self.__session.execute(stmt)
def select_all(self) -> list[Transaction] | None:
logger.info(f"Reading all transactions from {self.db}")
transactions = self.__execute("SELECT * FROM transactions")
if transactions:
return [Transaction(t) for t in transactions]
return None
def update(self, type, values: list[dict]):
print(type, values)
self.__session.execute(update(type), values)
def insert_transaction(self, transaction: Transaction):
logger.info(f"Adding {transaction} into {self.db}")
self.__execute(Q.ADD_TRANSACTION, (transaction.to_list(),))
def remove_links(self, original: int, links: list[int]):
stmt = delete(Link).where(
Link.original == original, Link.link.in_(link for link in links)
)
self.__session.execute(stmt)
def insert_transactions(self, transactions: Q.DbTransactions):
logger.info(f"Adding {len(transactions)} into {self.db}")
self.__executemany(Q.ADD_TRANSACTION, [t.tuple() for t in transactions])
def session(self) -> ClientSession:
return self.ClientSession(self.engine)
def update_category(self, transaction: Transaction):
logger.info(f"Update {transaction} category")
self.__execute(Q.UPDATE_CATEGORY, transaction.update_category())
def update_categories(self, transactions: list[Transaction]):
logger.info(f"Update {len(transactions)} transactions' categories")
self.__executemany(
Q.UPDATE_CATEGORY,
[transaction.update_category() for transaction in transactions],
)
def get_duplicated_transactions(self) -> list[Transaction] | None:
logger.info("Get duplicated transactions")
transactions = self.__execute(Q.DUPLICATED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_sorted_transactions(self) -> list[Transaction] | None:
logger.info("Get transactions sorted by date")
transactions = self.__execute(Q.SORTED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange(self, start: datetime, end: datetime) -> list[Transaction] | None:
logger.info(f"Get transactions from {start} to {end}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_category(self, value: str) -> list[Transaction] | None:
logger.info(f"Get transactions where category = {value}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange_category(
self, start: datetime, end: datetime, category: str
) -> list[Transaction] | None:
logger.info(
f"Get transactions from {start} to {end} where category = {category}"
)
transactions = self.__execute(
Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_by_period(self, period: str) -> list[Transaction] | None:
logger.info(f"Get transactions by {period}")
transactions = self.__execute(Q.SELECT_TRANSACTION_BY_PERIOD, period)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_uncategorized_transactions(self) -> list[Transaction] | None:
logger.debug("Get uncategorized transactions")
return self.get_category(None)
def get_daterange_uncategorized_transactions(self, start: datetime, end: datetime):
logger.debug("Get uncategorized transactions from {start} to {end}")
return self.get_daterange_category(start, end, None)
def get_daterage_without(
self, start: datetime, end: datetime, *categories: str
) -> list[Transaction] | None:
logger.info(f"Get transactions between {start} and {end} not in {categories}")
query = Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
"(" + ", ".join("?" for _ in categories) + ")"
)
transactions = self.__execute(query, (start, end, *categories))
if transactions:
return [Transaction(t) for t in transactions]
return None
def export(self):
filename = pathlib.Path(
"@".join([self.db, datetime.datetime.now().isoformat()])
).with_suffix(".csv")
transactions = self.select_all()
logger.info(f"Exporting {self.db} into {filename}")
if not (dir := pathlib.Path(self.__EXPORT_DIR)).is_dir():
dir.mkdir()
with open(dir / filename, "w", newline="") as f:
csv.writer(f, delimiter="\t").writerows(transactions)
"""Banks table methods"""
def register_bank(self, bank: Q.DbBank):
logger.info(f"Registering {bank}")
self.__execute(Q.ADD_BANK, bank.tuple())
def unregister_bank(self, bank: str):
logger.info(f"Unregistering {bank}")
self.__execute(Q.DELETE_BANK, (bank,))
def get_bank(self, key: str, value: str) -> Q.DbBank | None:
logger.info(f"Get bank with {key} = {value}")
bank = self.__execute(Q.SELECT_BANK.format(key), (value, ))
if bank:
return Q.DbBank(*bank[0])
def get_banks(self) -> Q.DbBanks:
logger.info("Get all banks")
banks = self.__execute(Q.SELECT_BANKS)
if banks:
return [Q.DbBank(*bank) for bank in banks]
return []

View File

@ -1,440 +0,0 @@
from __future__ import annotations
import datetime as dt
import decimal
import enum
import re
from typing import Annotated, Any, Optional
from sqlalchemy import (
BigInteger,
Enum,
ForeignKey,
MetaData,
Numeric,
String,
Text,
)
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
MappedAsDataclass,
relationship,
)
class Base(MappedAsDataclass, DeclarativeBase):
metadata = MetaData(
schema="transactions",
naming_convention={
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_`%(constraint_name)s`",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
},
)
class AccountType(enum.Enum):
checking = enum.auto()
savings = enum.auto()
investment = enum.auto()
mealcard = enum.auto()
VISA = enum.auto()
MASTERCARD = enum.auto()
accounttype = Annotated[
AccountType,
mapped_column(Enum(AccountType, inherit_schema=True)),
]
class Export:
@property
def format(self) -> dict[str, Any]:
raise NotImplementedError
class Bank(Base, Export):
__table_args__ = {"schema": "bank"}
__tablename__ = "banks"
name: Mapped[str] = mapped_column(unique=True)
BIC: Mapped[str] = mapped_column(String(8), primary_key=True)
type: Mapped[accounttype] = mapped_column(primary_key=True)
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
BIC=self.BIC,
type=self.type,
nordigen=self.nordigen.format if self.nordigen else None,
)
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=True)]
money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
class Transaction(Base, Export):
__tablename__ = "transactions"
id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[dt.date]
description: Mapped[Optional[str]]
amount: Mapped[money]
split: Mapped[bool] = mapped_column(init=False, default=False)
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(
cascade="all, delete-orphan", init=False, passive_deletes=True
)
tags: Mapped[set[TransactionTag]] = relationship(init=False)
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
@property
def format(self) -> dict[str, Any]:
return dict(
id=self.id,
date=self.date,
description=self.description,
amount=self.amount,
split=self.split,
type=self.type,
category=self.category.format if self.category else None,
# TODO note
tags=[tag.format for tag in self.tags] if self.tags else None,
)
def __lt__(self, other: Transaction):
return self.date < other.date
idfk = Annotated[
int, mapped_column(BigInteger, ForeignKey(Transaction.id, ondelete="CASCADE"))
]
class BankTransaction(Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(bank=self.bank)
class MoneyTransaction(Transaction):
__mapper_args__ = {"polymorphic_identity": "money"}
class SplitTransaction(Transaction):
original: Mapped[idfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(original=self.original)
class CategoryGroup(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "groups"
name: Mapped[str] = mapped_column(primary_key=True)
@property
def format(self) -> dict[str, Any]:
return dict(name=self.name)
class Category(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "available"
name: Mapped[str] = mapped_column(primary_key=True)
group: Mapped[Optional[str]] = mapped_column(
ForeignKey(CategoryGroup.name), default=None
)
rules: Mapped[set[CategoryRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
)
schedule: Mapped[Optional[CategorySchedule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default=None
)
def __repr__(self) -> str:
return (
f"Category(name={self.name}, group={self.group}, #rules={len(self.rules)},"
f" schedule={self.schedule})"
)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
group=self.group if self.group else None,
rules=[rule.format for rule in self.rules],
schedule=self.schedule.format if self.schedule else None,
)
catfk = Annotated[
str,
mapped_column(ForeignKey(Category.name, ondelete="CASCADE")),
]
class TransactionCategory(Base, Export):
__tablename__ = "categorized"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
name: Mapped[catfk]
selector: Mapped[CategorySelector] = relationship(
cascade="all, delete-orphan", lazy="joined"
)
@property
def format(self):
return dict(name=self.name, selector=self.selector.format)
class Note(Base):
__tablename__ = "notes"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
note: Mapped[str]
class Nordigen(Base, Export):
__table_args__ = {"schema": "bank"}
__tablename__ = "nordigen"
name: Mapped[bankfk] = mapped_column(primary_key=True)
bank_id: Mapped[Optional[str]]
requisition_id: Mapped[Optional[str]]
invert: Mapped[Optional[bool]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
bank_id=self.bank_id,
requisition_id=self.requisition_id,
invert=self.invert,
)
class Tag(Base):
__table_args__ = {"schema": "tag"}
__tablename__ = "available"
name: Mapped[str] = mapped_column(primary_key=True)
rules: Mapped[set[TagRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
)
class TransactionTag(Base, Export):
__tablename__ = "tagged"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
@property
def format(self):
return dict(tag=self.tag)
def __hash__(self):
return hash(self.id)
class Selector_T(enum.Enum):
unknown = enum.auto()
nullifier = enum.auto()
vacations = enum.auto()
rules = enum.auto()
algorithm = enum.auto()
manual = enum.auto()
categoryselector = Annotated[
Selector_T,
mapped_column(Enum(Selector_T, inherit_schema=True), default=Selector_T.unknown),
]
class CategorySelector(Base, Export):
__tablename__ = "selector"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(TransactionCategory.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
selector: Mapped[categoryselector]
@property
def format(self):
return dict(selector=self.selector)
class Period(enum.Enum):
daily = "daily"
weekly = "weekly"
monthly = "monthly"
yearly = "yearly"
scheduleperiod = Annotated[Selector_T, mapped_column(Enum(Period, inherit_schema=True))]
class CategorySchedule(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "schedules"
name: Mapped[catfk] = mapped_column(primary_key=True)
period: Mapped[Optional[scheduleperiod]]
period_multiplier: Mapped[Optional[int]]
amount: Mapped[Optional[int]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
period=self.period,
period_multiplier=self.period_multiplier,
amount=self.amount,
)
class Link(Base):
__tablename__ = "links"
original: Mapped[idfk] = mapped_column(primary_key=True)
link: Mapped[idfk] = mapped_column(primary_key=True)
class Rule(Base, Export):
__tablename__ = "rules"
id: Mapped[idpk] = mapped_column(init=False)
start: Mapped[Optional[dt.date]]
end: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]]
regex: Mapped[Optional[str]]
bank: Mapped[Optional[str]]
min: Mapped[Optional[money]]
max: Mapped[Optional[money]]
type: Mapped[str] = mapped_column(init=False)
__mapper_args__ = {
"polymorphic_identity": "rule",
"polymorphic_on": "type",
}
def matches(self, t: BankTransaction) -> bool:
valid = None
if self.regex:
valid = re.compile(self.regex, re.IGNORECASE)
ops = (
Rule.exists(self.start, lambda r: r < t.date),
Rule.exists(self.end, lambda r: r > t.date),
Rule.exists(self.description, lambda r: r == t.description),
Rule.exists(
valid,
lambda r: r.search(t.description) if t.description else False,
),
Rule.exists(self.bank, lambda r: r == t.bank),
Rule.exists(self.min, lambda r: r < t.amount),
Rule.exists(self.max, lambda r: r > t.amount),
)
if all(ops):
return True
return False
@property
def format(self) -> dict[str, Any]:
return dict(
start=self.start,
end=self.end,
description=self.description,
regex=self.regex,
bank=self.bank,
min=self.min,
max=self.max,
type=self.type,
)
@staticmethod
def exists(r, op) -> bool:
return op(r) if r is not None else True
class CategoryRule(Rule):
__table_args__ = {"schema": "category"}
__tablename__ = "rules"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(Rule.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
name: Mapped[catfk]
__mapper_args__ = {
"polymorphic_identity": "category_rule",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(name=self.name)
def __hash__(self):
return hash(self.id)
class TagRule(Rule):
__table_args__ = {"schema": "tag"}
__tablename__ = "rules"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(Rule.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
__mapper_args__ = {
"polymorphic_identity": "tag_rule",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(tag=self.tag)
def __hash__(self):
return hash(self.id)

View File

@ -1,212 +0,0 @@
from __future__ import annotations
from decimal import Decimal
import csv
import datetime
import logging
import logging.config
import pathlib
import sqlite3
from pfbudget.common.types import Transaction
import pfbudget.db.schema as Q
if not pathlib.Path("logs").is_dir():
pathlib.Path("logs").mkdir()
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("pfbudget.transactions")
sqlite3.register_adapter(Decimal, lambda d: float(d))
__DB_NAME = "data.db"
class DatabaseClient:
"""SQLite DB connection manager"""
__EXPORT_DIR = "export"
def __init__(self, db: str):
self.db = db
def __execute(self, query: str, params: tuple = None) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
if params:
ret = con.execute(query, params).fetchall()
logger.debug(f"[{self.db}] < {query}{params}")
else:
ret = con.execute(query).fetchall()
logger.debug(f"[{self.db}] < {query}")
if ret:
logger.debug(f"[{self.db}] > {ret}")
except sqlite3.Error:
logger.exception(f"Error while executing [{self.db}] < {query}")
finally:
con.close()
return ret
def __executemany(self, query: str, list_of_params: list[tuple]) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
ret = con.executemany(query, list_of_params).fetchall()
logger.debug(f"[{self.db}] < {query}{list_of_params}")
except sqlite3.Error:
logger.exception(
f"Error while executing [{self.db}] < {query} {list_of_params}"
)
finally:
con.close()
return ret
def __create_tables(self, tables: tuple[tuple]):
for table_name, query in tables:
logger.info(f"Creating table {table_name} if it doesn't exist already")
self.__execute(query)
def init(self):
logging.info(f"Initializing {self.db} database")
self.__create_tables(
(
("transactions", Q.CREATE_TRANSACTIONS_TABLE),
("backups", Q.CREATE_BACKUPS_TABLE),
("banks", Q.CREATE_BANKS_TABLE),
)
)
"""Transaction table methods"""
def select_all(self) -> list[Transaction] | None:
logger.info(f"Reading all transactions from {self.db}")
transactions = self.__execute("SELECT * FROM transactions")
if transactions:
return [Transaction(t) for t in transactions]
return None
def insert_transaction(self, transaction: Transaction):
logger.info(f"Adding {transaction} into {self.db}")
self.__execute(Q.ADD_TRANSACTION, (transaction.to_list(),))
def insert_transactions(self, transactions: Q.DbTransactions):
logger.info(f"Adding {len(transactions)} into {self.db}")
self.__executemany(Q.ADD_TRANSACTION, [t.tuple() for t in transactions])
def update_category(self, transaction: Transaction):
logger.info(f"Update {transaction} category")
self.__execute(Q.UPDATE_CATEGORY, transaction.update_category())
def update_categories(self, transactions: list[Transaction]):
logger.info(f"Update {len(transactions)} transactions' categories")
self.__executemany(
Q.UPDATE_CATEGORY,
[transaction.update_category() for transaction in transactions],
)
def get_duplicated_transactions(self) -> list[Transaction] | None:
logger.info("Get duplicated transactions")
transactions = self.__execute(Q.DUPLICATED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_sorted_transactions(self) -> list[Transaction] | None:
logger.info("Get transactions sorted by date")
transactions = self.__execute(Q.SORTED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange(self, start: datetime, end: datetime) -> list[Transaction] | None:
logger.info(f"Get transactions from {start} to {end}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_category(self, value: str) -> list[Transaction] | None:
logger.info(f"Get transactions where category = {value}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange_category(
self, start: datetime, end: datetime, category: str
) -> list[Transaction] | None:
logger.info(
f"Get transactions from {start} to {end} where category = {category}"
)
transactions = self.__execute(
Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_by_period(self, period: str) -> list[Transaction] | None:
logger.info(f"Get transactions by {period}")
transactions = self.__execute(Q.SELECT_TRANSACTION_BY_PERIOD, period)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_uncategorized_transactions(self) -> list[Transaction] | None:
logger.debug("Get uncategorized transactions")
return self.get_category(None)
def get_daterange_uncategorized_transactions(self, start: datetime, end: datetime):
logger.debug("Get uncategorized transactions from {start} to {end}")
return self.get_daterange_category(start, end, None)
def get_daterage_without(
self, start: datetime, end: datetime, *categories: str
) -> list[Transaction] | None:
logger.info(f"Get transactions between {start} and {end} not in {categories}")
query = Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
"(" + ", ".join("?" for _ in categories) + ")"
)
transactions = self.__execute(query, (start, end, *categories))
if transactions:
return [Transaction(t) for t in transactions]
return None
def export(self):
filename = pathlib.Path(
"@".join([self.db, datetime.datetime.now().isoformat()])
).with_suffix(".csv")
transactions = self.select_all()
logger.info(f"Exporting {self.db} into {filename}")
if not (dir := pathlib.Path(self.__EXPORT_DIR)).is_dir():
dir.mkdir()
with open(dir / filename, "w", newline="") as f:
csv.writer(f, delimiter="\t").writerows(transactions)
"""Banks table methods"""
def register_bank(self, bank: Q.DbBank):
logger.info(f"Registering {bank}")
self.__execute(Q.ADD_BANK, bank.tuple())
def unregister_bank(self, bank: str):
logger.info(f"Unregistering {bank}")
self.__execute(Q.DELETE_BANK, (bank,))
def get_bank(self, key: str, value: str) -> Q.DbBank | None:
logger.info(f"Get bank with {key} = {value}")
bank = self.__execute(Q.SELECT_BANK.format(key), (value, ))
if bank:
return Q.DbBank(*bank[0])
def get_banks(self) -> Q.DbBanks:
logger.info("Get all banks")
banks = self.__execute(Q.SELECT_BANKS)
if banks:
return [Q.DbBank(*bank) for bank in banks]
return []

View File

View File

@ -1,9 +1,21 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from pfbudget.db.model import Transaction
from pfbudget.common.types import Transactions
if TYPE_CHECKING:
from pfbudget.core.manager import Manager
class Input(ABC):
def __init__(self, manager: Manager):
self._manager = manager
@abstractmethod
def parse(self) -> list[Transaction]:
return NotImplementedError
def parse(self) -> Transactions:
return NotImplemented
@property
def manager(self):
return self._manager

30
pfbudget/input/json.py Normal file
View File

@ -0,0 +1,30 @@
import json
from .input import Input
from pfbudget.common.types import Transactions
from pfbudget.utils import convert, parse_decimal
class JsonParser(Input):
def __init__(self, manager, options):
super().__init__(manager)
self.options = options
def parse(self) -> Transactions:
try:
with open(self.options["json"][0], "r") as f:
return [
convert(
[
t["bookingDate"],
t["remittanceInformationUnstructured"],
self.options["bank"][0],
parse_decimal(t["transactionAmount"]["amount"])
if not self.options["invert"]
else -parse_decimal(t["transactionAmount"]["amount"]),
],
)
for t in json.load(f)["transactions"]["booked"]
]
except KeyError:
print("No json file defined")

View File

@ -1,51 +1,59 @@
import datetime as dt
import dotenv
from datetime import date
from time import sleep
from requests import HTTPError, ReadTimeout
from dotenv import load_dotenv
from nordigen import NordigenClient
from uuid import uuid4
import json
import nordigen
import os
import requests
import time
import uuid
import pfbudget.db.model as t
from pfbudget.utils.converters import convert
import webbrowser
from .input import Input
from pfbudget.common.types import NoBankSelected, Transactions
from pfbudget.utils import convert
dotenv.load_dotenv()
load_dotenv()
class NordigenInput(Input):
redirect_url = "https://murta.dev"
def __init__(self):
super().__init__()
if not (key := os.environ.get("SECRET_KEY")) or not (
id := os.environ.get("SECRET_ID")
):
raise
self._client = nordigen.NordigenClient(
secret_key=key,
secret_id=id,
def __init__(self, manager, options: dict = {}, start=date.min, end=date.max):
super().__init__(manager)
self._client = NordigenClient(
secret_key=os.environ.get("SECRET_KEY"),
secret_id=os.environ.get("SECRET_ID"),
)
self._client.token = self.__token()
self._start = dt.date.min
self._end = dt.date.max
self.client.token = self.__token()
def parse(self) -> list[t.BankTransaction]:
# print(options)
if "all" in options and options["all"]:
self.__banks = self.manager.get_banks()
elif "id" in options and options["id"]:
self.__banks = [
self.manager.get_bank_by("nordigen_id", b) for b in options["id"]
]
elif "name" in options and options["name"]:
self.__banks = [
self.manager.get_bank_by("name", b) for b in options["name"]
]
else:
self.__banks = None
self.__from = start
self.__to = end
def parse(self) -> Transactions:
transactions = []
assert len(self._banks) > 0
if not self.__banks:
raise NoBankSelected
for bank in self._banks:
for bank in self.__banks:
print(f"Downloading from {bank}...")
requisition = self.client.requisition.get_requisition_by_id(
bank.nordigen.requisition_id
bank.requisition_id
)
print(requisition)
for acc in requisition["accounts"]:
account = self._client.account_api(acc)
@ -55,14 +63,14 @@ class NordigenInput(Input):
try:
downloaded = account.get_transactions()
break
except requests.ReadTimeout:
except ReadTimeout:
retries += 1
print(f"Request #{retries} timed-out, retrying in 1s")
time.sleep(1)
except requests.HTTPError as e:
sleep(1)
except HTTPError as e:
retries += 1
print(f"Request #{retries} failed with {e}, retrying in 1s")
time.sleep(1)
sleep(1)
if not downloaded:
print(f"Couldn't download transactions for {account}")
@ -76,59 +84,44 @@ class NordigenInput(Input):
]
transactions.extend(
[t for t in converted if self._start <= t.date <= self._end]
[t for t in converted if self.__from <= t.date <= self.__to]
)
return sorted(transactions)
return transactions
def token(self):
token = self._client.generate_token()
print(f"New access token: {token}")
return token
def requisition(self, id: str, country: str = "PT"):
requisition = self._client.initialize_session(
redirect_uri=self.redirect_url,
institution_id=id,
reference_id=str(uuid.uuid4()),
)
return requisition.link, requisition.requisition_id
def requisition(self, institution: str, country: str = "PT"):
link, _ = self.__requisition_id(institution, country)
webbrowser.open(link)
def country_banks(self, country: str):
return self._client.institution.get_institutions(country)
def list(self, country: str):
print(self._client.institution.get_institutions(country))
@property
def client(self):
return self._client
@property
def banks(self):
return self._banks
@banks.setter
def banks(self, value):
self._banks = value
@property
def start(self):
return self._start
@start.setter
def start(self, value):
self._start = value
@property
def end(self):
return self._end
@end.setter
def end(self, value):
self._end = value
def __token(self):
if token := os.environ.get("TOKEN"):
return token
else:
token = self._client.generate_token()
print(f"New access token: {token}")
return token["access"]
return token
def __requisition_id(self, i: str, c: str):
id = self._client.institution.get_institution_id_by_name(
country=c, institution=i
)
init = self._client.initialize_session(
redirect_uri="https://murta.dev",
institution_id=id,
reference_id=str(uuid4()),
)
print(f"{i}({c}) link: {init.link} and requisition ID: {init.requisition_id}")
return (init.link, init.requisition_id)

View File

@ -1,12 +1,10 @@
from collections import namedtuple
from decimal import Decimal
from importlib import import_module
from pathlib import Path
import datetime as dt
import yaml
from pfbudget.common.types import NoBankSelected
from pfbudget.db.model import Transaction
from pfbudget.common.types import NoBankSelected, Transaction, Transactions
from pfbudget.utils import utils
Index = namedtuple(
@ -45,7 +43,7 @@ Options = namedtuple(
)
def parse_data(filename: Path, args: dict) -> list[Transaction]:
def parse_data(filename: str, args: dict) -> Transactions:
cfg: dict = yaml.safe_load(open("parsers.yaml"))
assert (
"Banks" in cfg
@ -86,7 +84,7 @@ def parse_data(filename: Path, args: dict) -> list[Transaction]:
class Parser:
def __init__(self, filename: Path, bank: str, options: dict):
def __init__(self, filename: str, bank: str, options: dict):
self.filename = filename
self.bank = bank
@ -159,7 +157,7 @@ class Parser:
category = line[options.category]
transaction = Transaction(date, text, bank, value, category)
else:
transaction = Transaction(date, text, bank, value)
transaction = Transaction(date, text, bank, value, options.category)
if options.additional_parser:
func(transaction)

View File

View File

@ -9,7 +9,7 @@ import pfbudget.core.categories
if TYPE_CHECKING:
from pfbudget.db.sqlite import DatabaseClient
from pfbudget.db.client import DatabaseClient
groups = pfbudget.core.categories.cfg["Groups"]

View File

@ -6,7 +6,7 @@ import datetime as dt
import pfbudget.core.categories
if TYPE_CHECKING:
from pfbudget.db.sqlite import DatabaseClient
from pfbudget.db.client import DatabaseClient
def net(db: DatabaseClient, start: dt.date = dt.date.min, end: dt.date = dt.date.max):

View File

@ -0,0 +1,2 @@
from .converters import convert
from .utils import *

View File

@ -1,30 +1,71 @@
import datetime as dt
import functools
from typing import Any
from pfbudget.common.types import TransactionError
import pfbudget.db.model as t
from datetime import timedelta
from functools import singledispatch
from pfbudget.common.types import Bank, Transaction, TransactionError
from pfbudget.db.schema import DbBank, DbTransaction
from .utils import parse_decimal
@functools.singledispatch
def convert(t) -> Any:
print("No converter has been found")
@singledispatch
def convert(t):
print("No converter as been found")
pass
@convert.register
def _(json: dict, bank: t.Bank) -> t.BankTransaction | None:
i = -1 if bank.nordigen and bank.nordigen.invert else 1
def _(t: Transaction) -> DbTransaction:
return DbTransaction(
t.date,
t.description,
t.bank,
t.value,
t.category,
t.original,
t.additional_comment,
)
@convert.register
def _(db: DbTransaction) -> Transaction:
try:
transaction = t.BankTransaction(
date=dt.date.fromisoformat(json["bookingDate"]),
description=json["remittanceInformationUnstructured"],
bank=bank.name,
amount=i * parse_decimal(json["transactionAmount"]["amount"]),
return Transaction(db)
except TransactionError:
print(f"{db} is in the wrong format")
@convert.register
def _(db: DbBank, key: str = "") -> Bank:
bank = Bank(db.name, db.bic, db.requisition_id, db.invert, db.offset, key=key)
if not bank.invert:
bank.invert = False
if not bank.offset:
bank.offset = 0
return bank
@convert.register
def _(bank: Bank) -> DbBank:
bank = DbBank(
bank.name, bank.bic, "", "", bank.requisition_id, bank.invert, bank.offset
)
if not bank.invert:
bank.invert = False
if not bank.offset:
bank.offset = 0
return bank
@convert.register
def _(json: dict, bank: Bank) -> Transaction:
i = -1 if bank.invert else 1
try:
transaction = Transaction(
json["bookingDate"],
json["remittanceInformationUnstructured"],
bank.name,
i * parse_decimal(json["transactionAmount"]["amount"]),
)
# transaction.date += timedelta(days=bank.offset)
transaction.date += timedelta(days=bank.offset)
return transaction
except TransactionError:

View File

@ -59,21 +59,21 @@ def find_credit_institution(fn, banks, creditcards):
return bank, cc
def parse_args_period(args: dict):
def parse_args_period(args):
start, end = date.min, date.max
if args["start"]:
start = datetime.strptime(args["start"][0], "%Y/%m/%d").date()
if args.start:
start = datetime.strptime(args.start[0], "%Y/%m/%d").date()
if args["end"]:
end = datetime.strptime(args["end"][0], "%Y/%m/%d").date()
if args.end:
end = datetime.strptime(args.end[0], "%Y/%m/%d").date()
if args["interval"]:
start = datetime.strptime(args["interval"][0], "%Y/%m/%d").date()
end = datetime.strptime(args["interval"][1], "%Y/%m/%d").date()
if args.interval:
start = datetime.strptime(args.interval[0], "%Y/%m/%d").date()
end = datetime.strptime(args.interval[1], "%Y/%m/%d").date()
if args["year"]:
start = datetime.strptime(args["year"][0], "%Y").date()
end = datetime.strptime(str(int(args["year"][0]) + 1), "%Y").date() - timedelta(
if args.year:
start = datetime.strptime(args.year[0], "%Y").date()
end = datetime.strptime(str(int(args.year[0]) + 1), "%Y").date() - timedelta(
days=1
)

View File

@ -1,7 +1,5 @@
codetiming==1.4.0
matplotlib==3.6.1
nordigen==1.3.0
python-dateutil==2.8.2
python-dotenv==0.21.0
PyYAML==6.0
SQLAlchemy==2.0.0rc2