Merge branch 'feature/postgresql'

The graphs and reports are broken on this commit.
This commit is contained in:
Luís Murta 2023-04-12 18:25:16 +01:00
commit ddb02b33f9
Signed by: satprog
GPG Key ID: 169EF1BBD7049F94
56 changed files with 4311 additions and 741 deletions

1
.gitignore vendored
View File

@ -153,4 +153,5 @@ dmypy.json
### Default user directories
export/
tmp/
.pfbudget

105
alembic.ini Normal file
View File

@ -0,0 +1,105 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
timezone = UTC
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgresql://pf-budget:muster-neutron-omega@database.lan/pf-budget
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
hooks = black
black.type = console_scripts
black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

88
alembic/env.py Normal file
View File

@ -0,0 +1,88 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
from pfbudget.db.model import Base
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def include_name(name, type_, parent_names):
if type_ == "schema":
return name in ["bank", "category", "tag", "transactions"]
else:
return True
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
include_name=include_name,
include_schemas=True,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,32 @@
"""Regex rule
Revision ID: 0ce89e987770
Revises: 7adf89ec8d14
Create Date: 2022-12-10 14:00:49.418494+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "0ce89e987770"
down_revision = "7adf89ec8d14"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column("regex", sa.String(), nullable=True),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("categories_rules", "regex", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,40 @@
"""Split member of base transaction
Revision ID: 18572111d9ff
Revises: 28556ab17c56
Create Date: 2023-01-23 20:09:37.892997+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "18572111d9ff"
down_revision = "28556ab17c56"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"transactions",
"split",
existing_type=sa.BOOLEAN(),
nullable=False,
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"transactions",
"split",
existing_type=sa.BOOLEAN(),
nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,88 @@
"""Selector back to transaction
Revision ID: 28556ab17c56
Revises: e455c78df789
Create Date: 2023-01-23 00:34:39.062562+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "28556ab17c56"
down_revision = "e455c78df789"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("tags", "tagged", schema="transactions")
op.create_table(
"selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_selector")),
schema="transactions",
)
op.drop_table("selector", schema="category")
op.rename_table("originals", "transactions", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("transactions", "originals", schema="transactions")
op.create_table(
"selector",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="category",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name="fk_selector_id_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_selector"),
schema="category",
)
op.drop_table("selector", schema="transactions")
op.rename_table("tagged", "tags", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,109 @@
"""Add relationships
Revision ID: 287fe9e6682a
Revises: d3534f493239
Create Date: 2022-12-03 16:43:39.633382+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "287fe9e6682a"
down_revision = "d3534f493239"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"fk_categorized_id_originals",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_id_originals"),
"categorized",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_notes_id_originals", "notes", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_notes_id_originals"),
"notes",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_tags_id_originals", "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_tags_id_originals"),
"tags",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_id_originals"), "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
"fk_tags_id_originals",
"tags",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_notes_id_originals"),
"notes",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_notes_id_originals",
"notes",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_categorized_id_originals"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_id_originals",
"categorized",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,49 @@
"""Available categories and rules
Revision ID: 2d0891f1be11
Revises: 287fe9e6682a
Create Date: 2022-12-04 11:15:22.758487+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "2d0891f1be11"
down_revision = "287fe9e6682a"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_available")),
schema="transactions",
)
op.create_table(
"categories_rules",
sa.Column("name", sa.String(), nullable=False),
sa.Column("rule", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name=op.f("fk_categories_rules_name_categories_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", "rule", name=op.f("pk_categories_rules")),
schema="transactions",
)
op.alter_column("categorized", "category", new_column_name="name", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column("categorized", "name", new_column_name="category", schema="transactions")
op.drop_table("categories_rules", schema="transactions")
op.drop_table("categories_available", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,74 @@
"""Inheritance
Revision ID: 37d80de801a7
Revises: 8cc9870b0d74
Create Date: 2023-01-10 22:41:03.540108+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "37d80de801a7"
down_revision = "8cc9870b0d74"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"originals",
sa.Column("type", sa.String(), nullable=False),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("split", sa.Boolean(), nullable=True),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("original", sa.BigInteger(), nullable=True),
schema="transactions",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=True,
schema="transactions",
)
op.create_foreign_key(
op.f("fk_originals_original_originals"),
"originals",
"originals",
["original"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_originals_original_originals"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=False,
schema="transactions",
)
op.drop_column("originals", "original", schema="transactions")
op.drop_column("originals", "split", schema="transactions")
op.drop_column("originals", "type", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,111 @@
"""Init
Revision ID: 50ff1fbb8a00
Revises:
Create Date: 2022-12-03 11:49:30.450115+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "50ff1fbb8a00"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"banks",
sa.Column("name", sa.String(), nullable=False),
sa.Column("BIC", sa.String(length=8), nullable=False),
sa.Column(
"type",
sa.Enum(
"checking",
"savings",
"investment",
"VISA",
"MASTERCARD",
name="accounttype",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name=op.f("pk_banks")),
sa.UniqueConstraint("name", name=op.f("uq_banks_name")),
schema="transactions",
)
op.create_table(
"nordigen",
sa.Column("name", sa.Text(), nullable=False),
sa.Column("bank_id", sa.String(), nullable=True),
sa.Column("requisition_id", sa.String(), nullable=True),
sa.Column("invert", sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(
["name"], ["transactions.banks.name"], name=op.f("fk_nordigen_name_banks")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_nordigen")),
schema="transactions",
)
op.create_table(
"originals",
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column("date", sa.Date(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.Column("bank", sa.Text(), nullable=False),
sa.Column("amount", sa.Numeric(precision=16, scale=2), nullable=False),
sa.ForeignKeyConstraint(
["bank"], ["transactions.banks.name"], name=op.f("fk_originals_bank_banks")
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_originals")),
schema="transactions",
)
op.create_table(
"categorized",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("category", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.originals.id"],
name=op.f("fk_categorized_id_originals"),
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_categorized")),
schema="transactions",
)
op.create_table(
"notes",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("note", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"], ["transactions.originals.id"], name=op.f("fk_notes_id_originals")
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_notes")),
schema="transactions",
)
op.create_table(
"tags",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"], ["transactions.originals.id"], name=op.f("fk_tags_id_originals")
),
sa.PrimaryKeyConstraint("id", "tag", name=op.f("pk_tags")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("tags", schema="transactions")
op.drop_table("notes", schema="transactions")
op.drop_table("categorized", schema="transactions")
op.drop_table("originals", schema="transactions")
op.drop_table("nordigen", schema="transactions")
op.drop_table("banks", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,54 @@
"""Category selector
Revision ID: 6863dda76ea2
Revises: 83f4c9837f6e
Create Date: 2022-12-08 00:56:59.032641+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6863dda76ea2"
down_revision = "83f4c9837f6e"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_categories_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_categories_selector")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("categories_selector", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,152 @@
"""Rule inheritance
Revision ID: 6b293f78cc97
Revises: 37d80de801a7
Create Date: 2023-01-22 20:05:32.887092+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6b293f78cc97"
down_revision = "37d80de801a7"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("regex", sa.String(), nullable=True),
sa.Column("bank", sa.String(), nullable=True),
sa.Column("min", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("max", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("type", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_categories_rules_id_rules"),
"categories_rules",
"rules",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_column("categories_rules", "bank", schema="transactions")
op.drop_column("categories_rules", "min", schema="transactions")
op.drop_column("categories_rules", "date", schema="transactions")
op.drop_column("categories_rules", "regex", schema="transactions")
op.drop_column("categories_rules", "description", schema="transactions")
op.drop_column("categories_rules", "max", schema="transactions")
op.create_foreign_key(
op.f("fk_tag_rules_id_rules"),
"tag_rules",
"rules",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_column("tag_rules", "bank", schema="transactions")
op.drop_column("tag_rules", "min", schema="transactions")
op.drop_column("tag_rules", "date", schema="transactions")
op.drop_column("tag_rules", "regex", schema="transactions")
op.drop_column("tag_rules", "description", schema="transactions")
op.drop_column("tag_rules", "max", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"tag_rules",
sa.Column(
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column(
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.drop_constraint(
op.f("fk_tag_rules_id_rules"),
"tag_rules",
schema="transactions",
type_="foreignkey",
)
op.add_column(
"categories_rules",
sa.Column(
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column(
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.drop_constraint(
op.f("fk_categories_rules_id_rules"),
"categories_rules",
schema="transactions",
type_="foreignkey",
)
op.drop_table("rules", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,49 @@
"""Rule min/max
Revision ID: 753c0bfb2062
Revises: e36e6321568e
Create Date: 2022-12-18 00:24:03.861461+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "753c0bfb2062"
down_revision = "e36e6321568e"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min_amount",
new_column_name="min",
schema="transactions",
)
op.alter_column(
"categories_rules",
"max_amount",
new_column_name="max",
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min",
new_column_name="min_amount",
schema="transactions",
)
op.alter_column(
"categories_rules",
"max",
new_column_name="max_amount",
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,43 @@
"""Category rule date format
Revision ID: 7adf89ec8d14
Revises: 83603bb7ef9c
Create Date: 2022-12-10 00:08:47.535765+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "7adf89ec8d14"
down_revision = "83603bb7ef9c"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"date",
existing_type=sa.VARCHAR(),
type_=sa.Date(),
existing_nullable=True,
schema="transactions",
postgresql_using="date::date"
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"date",
existing_type=sa.Date(),
type_=sa.VARCHAR(),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,38 @@
"""Amount of transaction per period
Revision ID: 83603bb7ef9c
Revises: 8b5d5fbc8211
Create Date: 2022-12-09 23:12:15.644758+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "83603bb7ef9c"
down_revision = "8b5d5fbc8211"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_schedules",
sa.Column("amount", sa.Integer(), nullable=True),
schema="transactions",
)
op.drop_column("categories_schedules", "recurring", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_schedules",
sa.Column("recurring", sa.BOOLEAN(), autoincrement=False, nullable=False),
schema="transactions",
)
op.drop_column("categories_schedules", "amount", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,69 @@
"""Category groups and relationships
Revision ID: 83f4c9837f6e
Revises: 2d0891f1be11
Create Date: 2022-12-04 15:10:51.924875+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "83f4c9837f6e"
down_revision = "2d0891f1be11"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_groups",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_groups")),
schema="transactions",
)
op.add_column(
"categories_available",
sa.Column("group", sa.String(), nullable=True),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_categories_available_group_categories_groups"),
"categories_available",
"categories_groups",
["group"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.create_foreign_key(
op.f("fk_categorized_name_categories_available"),
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_categorized_name_categories_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.drop_constraint(
op.f("fk_categories_available_group_categories_groups"),
"categories_available",
schema="transactions",
type_="foreignkey",
)
op.drop_column("categories_available", "group", schema="transactions")
op.drop_table("categories_groups", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,92 @@
"""Transaction based rules
Revision ID: 8b5d5fbc8211
Revises: e77395969585
Create Date: 2022-12-08 21:05:41.378466+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "8b5d5fbc8211"
down_revision = "e77395969585"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.drop_constraint("pk_categories_rules", "categories_rules", schema="transactions")
op.execute(sa.schema.CreateSequence(sa.schema.Sequence("categories_rules_id_seq", schema="transactions")))
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column(
"id",
sa.BigInteger(),
autoincrement=True,
nullable=False,
server_default=sa.text(
"nextval('transactions.categories_rules_id_seq'::regclass)"
),
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("date", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("description", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("bank", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("min_amount", sa.Float(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("max_amount", sa.Float(), nullable=True),
schema="transactions",
)
op.drop_column("categories_rules", "rule", schema="transactions")
# ### end Alembic commands ###
op.create_primary_key(
"pk_categories_rules",
"categories_rules",
["id"],
schema="transactions",
)
def downgrade() -> None:
op.drop_constraint("pk_categories_rules", "categories_rules", schema="transactions")
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column("rule", sa.String(), autoincrement=False, nullable=False),
schema="transactions",
)
op.drop_column("categories_rules", "max_amount", schema="transactions")
op.drop_column("categories_rules", "min_amount", schema="transactions")
op.drop_column("categories_rules", "bank", schema="transactions")
op.drop_column("categories_rules", "description", schema="transactions")
op.drop_column("categories_rules", "date", schema="transactions")
op.drop_column("categories_rules", "id", schema="transactions")
# ### end Alembic commands ###
op.execute(sa.schema.DropSequence(sa.schema.Sequence("categories_rules_id_seq", schema="transactions")))
op.create_primary_key(
"pk_categories_rules",
"categories_rules",
["name", "rule"],
schema="transactions",
)

View File

@ -0,0 +1,46 @@
"""Links
Revision ID: 8cc9870b0d74
Revises: a910e1b2214d
Create Date: 2022-12-19 22:10:25.136479+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "8cc9870b0d74"
down_revision = "a910e1b2214d"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"links",
sa.Column("original", sa.BigInteger(), nullable=False),
sa.Column("link", sa.BigInteger(), nullable=False),
sa.ForeignKeyConstraint(
["link"],
["transactions.originals.id"],
name=op.f("fk_links_link_originals"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["original"],
["transactions.originals.id"],
name=op.f("fk_links_original_originals"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("original", "link", name=op.f("pk_links")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("links", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,68 @@
"""Tag rules
Revision ID: 9028b0f3b985
Revises: 753c0bfb2062
Create Date: 2022-12-18 22:53:13.334046+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "9028b0f3b985"
down_revision = "753c0bfb2062"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"tags_available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_tags_available")),
schema="transactions",
)
op.create_table(
"tag_rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("regex", sa.String(), nullable=True),
sa.Column("bank", sa.String(), nullable=True),
sa.Column("min", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("max", sa.Numeric(precision=16, scale=2), nullable=True),
sa.ForeignKeyConstraint(
["tag"],
["transactions.tags_available.name"],
name=op.f("fk_tag_rules_tag_tags_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_tag_rules")),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_tags_tag_tags_available"),
"tags",
"tags_available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_tag_tags_available"),
"tags",
schema="transactions",
type_="foreignkey",
)
op.drop_table("tag_rules", schema="transactions")
op.drop_table("tags_available", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,32 @@
"""Start/End date rule
Revision ID: 952de57a3c43
Revises: 18572111d9ff
Create Date: 2023-02-06 21:57:57.545327+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "952de57a3c43"
down_revision = "18572111d9ff"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"rules", sa.Column("start", sa.Date(), nullable=True), schema="transactions"
)
op.alter_column("rules", column_name="date", new_column_name="end", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column("rules", column_name="end", new_column_name="date", schema="transactions")
op.drop_column("rules", "start", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,56 @@
"""Rule inheritance
Revision ID: a910e1b2214d
Revises: 9028b0f3b985
Create Date: 2022-12-19 20:48:04.682812+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "a910e1b2214d"
down_revision = "9028b0f3b985"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"fk_categorized_name_categories_available",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_name_categories_available"),
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_categorized_name_categories_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_name_categories_available",
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,53 @@
"""Category schedule
Revision ID: d18cbd50f7c6
Revises: 6863dda76ea2
Create Date: 2022-12-08 13:30:29.048811+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d18cbd50f7c6"
down_revision = "6863dda76ea2"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_schedules",
sa.Column("name", sa.String(), nullable=False),
sa.Column("recurring", sa.Boolean(), nullable=False),
sa.Column(
"period",
sa.Enum(
"daily",
"monthly",
"yearly",
name="period",
schema="transactions",
inherit_schema=True,
),
nullable=True,
),
sa.Column("period_multiplier", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name=op.f("fk_categories_schedules_name_categories_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_schedules")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("categories_schedules", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,36 @@
"""Add meal card
Revision ID: d3534f493239
Revises: 50ff1fbb8a00
Create Date: 2022-12-03 12:18:33.519666+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "d3534f493239"
down_revision = "50ff1fbb8a00"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("ALTER TYPE transactions.accounttype ADD VALUE 'mealcard' BEFORE 'VISA'")
def downgrade() -> None:
op.execute(
"""CREATE TYPE transactions.accounttype_new
AS ENUM ('checking', 'savings', 'investment', 'VISA', 'MASTERCARD')
"""
)
op.execute("UPDATE transactions.banks SET type = DEFAULT WHERE type = 'mealcard'")
op.execute(
"""ALTER TABLE transactions.banks
ALTER COLUMN type TYPE transactions.accounttype_new
USING type::text::transactions.accounttype_new
"""
)
op.execute("DROP TYPE transactions.accounttype")
op.execute("ALTER TYPE transactions.accounttype_new RENAME TO accounttype")

View File

@ -0,0 +1,58 @@
"""Rules min/max money
Revision ID: e36e6321568e
Revises: 0ce89e987770
Create Date: 2022-12-10 18:55:07.149010+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "e36e6321568e"
down_revision = "0ce89e987770"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min_amount",
existing_type=sa.DOUBLE_PRECISION(precision=53),
type_=sa.Numeric(precision=16, scale=2),
existing_nullable=True,
schema="transactions",
)
op.alter_column(
"categories_rules",
"max_amount",
existing_type=sa.DOUBLE_PRECISION(precision=53),
type_=sa.Numeric(precision=16, scale=2),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"max_amount",
existing_type=sa.Numeric(precision=16, scale=2),
type_=sa.DOUBLE_PRECISION(precision=53),
existing_nullable=True,
schema="transactions",
)
op.alter_column(
"categories_rules",
"min_amount",
existing_type=sa.Numeric(precision=16, scale=2),
type_=sa.DOUBLE_PRECISION(precision=53),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,452 @@
"""Divide by schemas
Revision ID: e455c78df789
Revises: 6b293f78cc97
Create Date: 2023-01-22 23:38:23.266906+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "e455c78df789"
down_revision = "6b293f78cc97"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"banks",
sa.Column("name", sa.String(), nullable=False),
sa.Column("BIC", sa.String(length=8), nullable=False),
sa.Column(
"type",
sa.Enum(
"checking",
"savings",
"investment",
"mealcard",
"VISA",
"MASTERCARD",
name="accounttype",
schema="bank",
inherit_schema=True,
),
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name=op.f("pk_banks")),
sa.UniqueConstraint("name", name=op.f("uq_banks_name")),
schema="bank",
)
op.create_table(
"groups",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_groups")),
schema="category",
)
op.create_table(
"available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_available")),
schema="tag",
)
op.create_table(
"nordigen",
sa.Column("name", sa.Text(), nullable=False),
sa.Column("bank_id", sa.String(), nullable=True),
sa.Column("requisition_id", sa.String(), nullable=True),
sa.Column("invert", sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(
["name"], ["bank.banks.name"], name=op.f("fk_nordigen_name_banks")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_nordigen")),
schema="bank",
)
op.create_table(
"available",
sa.Column("name", sa.String(), nullable=False),
sa.Column("group", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["group"], ["category.groups.name"], name=op.f("fk_available_group_groups")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_available")),
schema="category",
)
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name=op.f("fk_rules_id_rules"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["tag"],
["tag.available.name"],
name=op.f("fk_rules_tag_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="tag",
)
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name=op.f("fk_rules_id_rules"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["name"],
["category.available.name"],
name=op.f("fk_rules_name_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="category",
)
op.create_table(
"schedules",
sa.Column("name", sa.String(), nullable=False),
sa.Column(
"period",
sa.Enum(
"daily",
"weekly",
"monthly",
"yearly",
name="period",
schema="category",
inherit_schema=True,
),
nullable=True,
),
sa.Column("period_multiplier", sa.Integer(), nullable=True),
sa.Column("amount", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["name"],
["category.available.name"],
name=op.f("fk_schedules_name_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_schedules")),
schema="category",
)
op.create_table(
"selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="category",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_selector")),
schema="category",
)
op.drop_constraint(
"fk_categorized_name_categories_available",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_name_available"),
"categorized",
"available",
["name"],
["name"],
source_schema="transactions",
referent_schema="category",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_originals_bank_banks",
"originals",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_originals_bank_banks"),
"originals",
"banks",
["bank"],
["name"],
source_schema="transactions",
referent_schema="bank",
)
op.drop_constraint(
"fk_tags_tag_tags_available", "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_tags_tag_available"),
"tags",
"available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="tag",
)
op.drop_table("categories_schedules", schema="transactions")
op.drop_table("categories_rules", schema="transactions")
op.drop_table("categories_available", schema="transactions")
op.drop_table("tag_rules", schema="transactions")
op.drop_table("nordigen", schema="transactions")
op.drop_table("tags_available", schema="transactions")
op.drop_table("banks", schema="transactions")
op.drop_table("categories_selector", schema="transactions")
op.drop_table("categories_groups", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_tag_available"), "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
"fk_tags_tag_tags_available",
"tags",
"tags_available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_originals_bank_banks"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_originals_bank_banks",
"originals",
"banks",
["bank"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_categorized_name_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_name_categories_available",
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.create_table(
"categories_groups",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("name", name="pk_categories_groups"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"categories_selector",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector",
schema="transactions",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name="fk_categories_selector_id_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_categories_selector"),
schema="transactions",
)
op.create_table(
"banks",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("BIC", sa.VARCHAR(length=8), autoincrement=False, nullable=False),
sa.Column(
"type",
postgresql.ENUM(
"checking",
"savings",
"investment",
"mealcard",
"VISA",
"MASTERCARD",
name="accounttype",
schema="transactions",
),
autoincrement=False,
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name="pk_banks"),
sa.UniqueConstraint("name", name="uq_banks_name"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"tags_available",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("name", name="pk_tags_available"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"nordigen",
sa.Column("name", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("bank_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("requisition_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("invert", sa.BOOLEAN(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["name"], ["transactions.banks.name"], name="fk_nordigen_name_banks"
),
sa.PrimaryKeyConstraint("name", name="pk_nordigen"),
schema="transactions",
)
op.create_table(
"tag_rules",
sa.Column(
"id",
sa.BIGINT(),
server_default=sa.text(
"nextval('transactions.tag_rules_id_seq'::regclass)"
),
autoincrement=True,
nullable=False,
),
sa.Column("tag", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name="fk_tag_rules_id_rules",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["tag"],
["transactions.tags_available.name"],
name="fk_tag_rules_tag_tags_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_tag_rules"),
schema="transactions",
)
op.create_table(
"categories_rules",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"id",
sa.BIGINT(),
server_default=sa.text(
"nextval('transactions.categories_rules_id_seq'::regclass)"
),
autoincrement=True,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name="fk_categories_rules_id_rules",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name="fk_categories_rules_name_categories_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_categories_rules"),
schema="transactions",
)
op.create_table(
"categories_available",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("group", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["group"],
["transactions.categories_groups.name"],
name="fk_categories_available_group_categories_groups",
),
sa.PrimaryKeyConstraint("name", name="pk_categories_available"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"categories_schedules",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"period",
postgresql.ENUM(
"daily",
"weekly",
"monthly",
"yearly",
name="period",
schema="transactions",
),
autoincrement=False,
nullable=True,
),
sa.Column(
"period_multiplier", sa.INTEGER(), autoincrement=False, nullable=True
),
sa.Column("amount", sa.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name="fk_categories_schedules_name_categories_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name="pk_categories_schedules"),
schema="transactions",
)
op.drop_table("selector", schema="category")
op.drop_table("schedules", schema="category")
op.drop_table("rules", schema="category")
op.drop_table("rules", schema="tag")
op.drop_table("available", schema="category")
op.drop_table("nordigen", schema="bank")
op.drop_table("available", schema="tag")
op.drop_table("groups", schema="category")
op.drop_table("banks", schema="bank")
# ### end Alembic commands ###

View File

@ -0,0 +1,37 @@
"""Weekly period
Revision ID: e77395969585
Revises: d18cbd50f7c6
Create Date: 2022-12-08 16:35:27.506504+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "e77395969585"
down_revision = "d18cbd50f7c6"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("ALTER TYPE transactions.period ADD VALUE 'weekly' AFTER 'daily'")
def downgrade() -> None:
op.execute(
"""CREATE TYPE transactions.period_new
AS ENUM ('daily', 'monthly', 'yearly')
"""
)
op.execute("UPDATE transactions.categories_schedules SET period = DEFAULT WHERE period = 'weekly'")
op.execute(
"""ALTER TABLE transactions.categories_schedules
ALTER COLUMN period TYPE transactions.period_new
USING period::text::transactions.period_new
"""
)
op.execute("DROP TYPE transactions.period")
op.execute("ALTER TYPE transactions.period_new RENAME TO period")

View File

@ -1,4 +0,0 @@
from pfbudget import run
if __name__ == "__main__":
run()

View File

@ -1,7 +1,2 @@
__all__ = ["run", "parse_data", "categorize_data"]
__author__ = "Luís Murta"
__version__ = "0.1"
from pfbudget.core.categories import categorize_data
from pfbudget.cli.runnable import run
from pfbudget.input.parsers import parse_data

View File

@ -1,4 +1,267 @@
from pfbudget.cli.runnable import run
from pfbudget.cli.argparser import argparser
from pfbudget.cli.interactive import Interactive
from pfbudget.common.types import Operation
from pfbudget.core.manager import Manager
import pfbudget.db.model as type
from pfbudget.utils.utils import parse_args_period
if __name__ == "__main__":
run()
argparser = argparser()
args = vars(argparser.parse_args())
assert "op" in args, "No Operation selected"
op: Operation = args.pop("op")
assert "database" in args, "No database selected"
db = args.pop("database")
assert "verbose" in args, "No verbose level specified"
verbosity = args.pop("verbose")
params = []
match (op):
case Operation.ManualCategorization:
Interactive(Manager(db, verbosity)).start()
exit()
case Operation.Categorize:
keys = {"no_nulls"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["no_nulls"]]
case Operation.Parse:
keys = {"path", "bank", "creditcard"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["path"], args["bank"], args["creditcard"]]
case Operation.RequisitionId:
keys = {"name", "country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["name"][0], args["country"][0]]
case Operation.Download:
keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = parse_args_period(args)
params = [start, end, args["dry_run"]]
if not args["all"]:
params.append(args["banks"])
else:
params.append([])
case Operation.BankAdd:
keys = {"bank", "bic", "type"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.Bank(
args["bank"][0],
args["bic"][0],
args["type"][0],
)
]
case Operation.BankMod:
keys = {"bank", "bic", "type", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["bic", "type"]
param = {"name": args["bank"][0]}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params = [param]
case Operation.BankDel:
assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"]
case Operation.NordigenAdd:
keys = {"bank", "bank_id", "requisition_id", "invert"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.Nordigen(
args["bank"][0],
args["bank_id"][0] if args["bank_id"] else None,
args["requisition_id"][0] if args["requisition_id"] else None,
args["invert"] if args["invert"] else None,
)
]
case Operation.NordigenMod:
keys = {"bank", "bank_id", "requisition_id", "invert", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["bank_id", "requisition_id"]
nargs_0 = ["invert"]
param = {"name": args["bank"][0]}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: v for k, v in args.items() if k in nargs_0}
param |= {k: None for k in args["remove"] if k in nargs_1}
params = [param]
case Operation.NordigenDel:
assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"]
case Operation.NordigenCountryBanks:
keys = {"country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["country"][0]]
case Operation.CategoryAdd:
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Category(cat, args["group"]) for cat in args["category"]]
case Operation.CategoryUpdate:
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Category(cat) for cat in args["category"]]
params.append(args["group"])
case Operation.CategoryRemove:
assert "category" in args, "argparser ill defined"
params = [type.Category(cat) for cat in args["category"]]
case Operation.CategorySchedule:
keys = {"category", "period", "frequency"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.CategorySchedule(
cat, args["period"][0], args["frequency"][0], None
)
for cat in args["category"]
]
case Operation.RuleAdd:
keys = {"category", "start", "end", "description", "regex", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.CategoryRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
cat,
)
for cat in args["category"]
]
case Operation.RuleRemove | Operation.TagRuleRemove:
keys = {"id"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["id"]
case Operation.RuleModify:
keys = {
"id",
"category",
"date",
"description",
"bank",
"min",
"max",
"remove",
}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["category", "date", "description", "regex", "bank", "min", "max"]
params = []
for id in args["id"]:
param = {"id": id}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params.append(param)
case Operation.TagAdd:
keys = {"tag"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Tag(tag) for tag in args["tag"]]
case Operation.TagRuleAdd:
keys = {"tag", "start", "end", "description", "regex", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.TagRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
tag,
)
for tag in args["tag"]
]
case Operation.TagRuleModify:
keys = {"id", "tag", "date", "description", "bank", "min", "max", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["tag", "date", "description", "regex", "bank", "min", "max"]
params = []
for id in args["id"]:
param = {"id": id}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params.append(param)
case Operation.GroupAdd:
assert "group" in args, "argparser ill defined"
params = [type.CategoryGroup(group) for group in args["group"]]
case Operation.GroupRemove:
assert "group" in args, "argparser ill defined"
params = [type.CategoryGroup(group) for group in args["group"]]
case Operation.Forge | Operation.Dismantle:
keys = {"original", "links"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["original"][0], args["links"]]
case (
Operation.Export
| Operation.Import
| Operation.ExportBanks
| Operation.ImportBanks
| Operation.ExportCategoryRules
| Operation.ImportCategoryRules
| Operation.ExportTagRules
| Operation.ImportTagRules
| Operation.ExportCategories
| Operation.ImportCategories
| Operation.ExportCategoryGroups
| Operation.ImportCategoryGroups
):
keys = {"file", "format"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["file"][0], args["format"][0]]
Manager(db, verbosity).action(op, params)

405
pfbudget/cli/argparser.py Normal file
View File

@ -0,0 +1,405 @@
import argparse
import datetime as dt
import decimal
from dotenv import load_dotenv
import os
import re
from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, Period
from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils.utils
load_dotenv()
DEFAULT_DB = os.environ.get("DEFAULT_DB")
def argparser() -> argparse.ArgumentParser:
universal = argparse.ArgumentParser(add_help=False)
universal.add_argument(
"-db",
"--database",
nargs="?",
help="select current database",
default=DEFAULT_DB,
)
universal.add_argument("-v", "--verbose", action="count", default=0)
period = argparse.ArgumentParser(add_help=False)
period_group = period.add_mutually_exclusive_group()
period_group.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
)
period_group.add_argument("--start", type=str, nargs=1, help="graph start date")
period_group.add_argument("--end", type=str, nargs=1, help="graph end date")
period_group.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser(
description="does cool finance stuff",
parents=[universal],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
if version := re.search(
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', open("pfbudget/__init__.py").read()
):
parser.add_argument(
"--version",
action="version",
version=version.group(1),
)
subparsers = parser.add_subparsers(required=True)
# TODO Init
# init = subparsers.add_parser("init")
# init.set_defaults(op=Operation.Init)
# Exports transactions to .csv file
export = subparsers.add_parser("export")
export.set_defaults(op=Operation.Export)
file_options(export)
pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import)
pimport.add_argument("file", nargs=1, type=str)
# Parse from .csv
parse = subparsers.add_parser("parse")
parse.set_defaults(op=Operation.Parse)
parse.add_argument("path", nargs="+", type=str)
parse.add_argument("--bank", nargs=1, type=str)
parse.add_argument("--creditcard", nargs=1, type=str)
# Automatic/manual categorization
categorize = subparsers.add_parser("categorize").add_subparsers(required=True)
auto = categorize.add_parser("auto")
auto.set_defaults(op=Operation.Categorize)
auto.add_argument("--no-nulls", action="store_false")
categorize.add_parser("manual").set_defaults(op=Operation.ManualCategorization)
"""
Graph
"""
p_graph = subparsers.add_parser(
"graph",
description="Graph of the transactions",
parents=[universal, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_graph.add_argument(
"option",
type=str,
choices=["monthly", "discrete", "networth"],
nargs="?",
default="monthly",
help="graph option help",
)
p_graph.add_argument("--save", action="store_true")
p_graph.set_defaults(func=graph)
"""
Report
"""
p_report = subparsers.add_parser(
"report",
description="Prints report of transaction groups",
parents=[universal, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_report.add_argument(
"option",
type=str,
choices=["net", "detailed"],
nargs="?",
default="net",
help="report option help",
)
p_report.set_defaults(func=report)
# Banks
bank(subparsers.add_parser("bank"))
# Nordigen access token
subparsers.add_parser("token").set_defaults(op=Operation.Token)
# Nordigen requisition id
requisition = subparsers.add_parser("eua")
requisition.set_defaults(op=Operation.RequisitionId)
requisition.add_argument("id", nargs=1, type=str)
requisition.add_argument("country", nargs=1, type=str)
# Download through the Nordigen API
download = subparsers.add_parser("download", parents=[period])
download.set_defaults(op=Operation.Download)
download_banks = download.add_mutually_exclusive_group()
download_banks.add_argument("--all", action="store_true")
download_banks.add_argument("--banks", nargs="+", type=str)
download.add_argument("--dry-run", action="store_true")
# List available banks in country C
banks = subparsers.add_parser("banks")
banks.set_defaults(op=Operation.NordigenCountryBanks)
banks.add_argument("country", nargs=1, type=str)
# Categories
category(subparsers.add_parser("category"))
# Tag
tags(subparsers.add_parser("tag"))
# Link
link(subparsers.add_parser("link"))
return parser
def graph(args):
"""Plots the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.utils.parse_args_period(args)
if args.option == "monthly":
pfbudget.reporting.graph.monthly(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "discrete":
pfbudget.reporting.graph.discrete(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "networth":
pfbudget.reporting.graph.networth(
DatabaseClient(args.database), vars(args), start, end
)
def report(args):
"""Prints a detailed report of the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.utils.parse_args_period(args)
if args.option == "net":
pfbudget.reporting.report.net(DatabaseClient(args.database), start, end)
elif args.option == "detailed":
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
def bank(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.BankAdd)
add.add_argument("bank", nargs=1, type=str)
add.add_argument("bic", nargs=1, type=str)
add.add_argument("type", nargs=1, type=str, choices=[e.name for e in AccountType])
rem = commands.add_parser("del")
rem.set_defaults(op=Operation.BankDel)
rem.add_argument("bank", nargs="+", type=str)
mod = commands.add_parser("mod")
mod.set_defaults(op=Operation.BankMod)
mod.add_argument("bank", nargs=1, type=str)
mod.add_argument("--bic", nargs=1, type=str)
mod.add_argument("--type", nargs=1, type=str, choices=[e.name for e in AccountType])
mod.add_argument("--remove", nargs="*", default=[], type=str)
nordigen(commands.add_parser("nordigen"))
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportBanks)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportBanks)
file_options(pimport)
def nordigen(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.NordigenAdd)
add.add_argument("bank", nargs=1, type=str)
add.add_argument("--bank_id", nargs=1, type=str)
add.add_argument("--requisition_id", nargs=1, type=str)
add.add_argument("--invert", action="store_true")
rem = commands.add_parser("del")
rem.set_defaults(op=Operation.NordigenDel)
rem.add_argument("bank", nargs="+", type=str)
mod = commands.add_parser("mod")
mod.set_defaults(op=Operation.NordigenMod)
mod.add_argument("bank", nargs=1, type=str)
mod.add_argument("--bank_id", nargs=1, type=str)
mod.add_argument("--requisition_id", nargs=1, type=str)
mod.add_argument("--invert", action="store_true")
mod.add_argument("--remove", nargs="*", default=[], type=str)
def category(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.CategoryAdd)
add.add_argument("category", nargs="+", type=str)
add.add_argument("--group", nargs="?", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.CategoryRemove)
remove.add_argument("category", nargs="+", type=str)
update = commands.add_parser("update")
update.set_defaults(op=Operation.CategoryUpdate)
update.add_argument("category", nargs="+", type=str)
update.add_argument("--group", nargs="?", type=str)
schedule = commands.add_parser("schedule")
schedule.set_defaults(op=Operation.CategorySchedule)
schedule.add_argument("category", nargs="+", type=str)
schedule.add_argument("period", nargs=1, choices=[e.value for e in Period])
schedule.add_argument("--frequency", nargs=1, default=[1], type=int)
rule = commands.add_parser("rule")
category_rule(rule)
group = commands.add_parser("group")
category_group(group)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategories)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategories)
file_options(pimport)
def category_group(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.GroupAdd)
add.add_argument("group", nargs="+", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.GroupRemove)
remove.add_argument("group", nargs="+", type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryGroups)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryGroups)
file_options(pimport)
def category_rule(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.RuleAdd)
add.add_argument("category", nargs="+", type=str)
rules(add)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.RuleRemove)
remove.add_argument("id", nargs="+", type=int)
modify = commands.add_parser("modify")
modify.set_defaults(op=Operation.RuleModify)
modify.add_argument("id", nargs="+", type=int)
modify.add_argument("--category", nargs=1, type=str)
rules(modify)
modify.add_argument("--remove", nargs="*", default=[], type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryRules)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryRules)
file_options(pimport)
def tags(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.TagAdd)
add.add_argument("tag", nargs="+", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.TagRemove)
remove.add_argument("tag", nargs="+", type=str)
rule = commands.add_parser("rule")
tag_rule(rule)
def tag_rule(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.TagRuleAdd)
add.add_argument("tag", nargs="+", type=str)
rules(add)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.TagRuleRemove)
remove.add_argument("id", nargs="+", type=int)
modify = commands.add_parser("modify")
modify.set_defaults(op=Operation.TagRuleModify)
modify.add_argument("id", nargs="+", type=int)
modify.add_argument("--tag", nargs=1, type=str)
rules(modify)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportTagRules)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportTagRules)
file_options(pimport)
def rules(parser: argparse.ArgumentParser):
parser.add_argument("--start", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--end", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--description", nargs=1, type=str)
parser.add_argument("--regex", nargs=1, type=str)
parser.add_argument("--bank", nargs=1, type=str)
parser.add_argument("--min", nargs=1, type=decimal.Decimal)
parser.add_argument("--max", nargs=1, type=decimal.Decimal)
def link(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
forge = commands.add_parser("forge")
forge.set_defaults(op=Operation.Forge)
forge.add_argument("original", nargs=1, type=int)
forge.add_argument("links", nargs="+", type=int)
dismantle = commands.add_parser("dismantle")
dismantle.set_defaults(op=Operation.Dismantle)
dismantle.add_argument("original", nargs=1, type=int)
dismantle.add_argument("links", nargs="+", type=int)
def file_options(parser: argparse.ArgumentParser):
parser.add_argument("file", nargs=1, type=str)
parser.add_argument("format", nargs=1, default="pickle")

120
pfbudget/cli/interactive.py Normal file
View File

@ -0,0 +1,120 @@
import decimal
from ..core.manager import Manager
from ..db.model import (
Category,
CategorySelector,
Note,
Selector_T,
SplitTransaction,
Tag,
Transaction,
TransactionCategory,
TransactionTag,
)
class Interactive:
help = "category(:tag)/split/note:/skip/quit"
selector = Selector_T.manual
def __init__(self, manager: Manager) -> None:
self.manager = manager
with self.manager.db.session() as session:
self.categories = session.get(Category)
self.tags = session.get(Tag)
session.expunge_all()
def intro(self) -> None:
print(
f"Welcome! Available categories are {[c.name for c in self.categories]} and"
f" currently existing tags are {[t.name for t in self.tags]}"
)
def start(self) -> None:
self.intro()
with self.manager.db.session() as session:
uncategorized = session.uncategorized()
n = len(uncategorized)
print(f"{n} left to categorize")
i = 0
new = []
next = uncategorized[i]
print(next)
while (command := input("$ ")) != "quit":
match command:
case "help":
print(self.help)
case "skip":
i += 1
case "quit":
break
case "split":
new = self.split(next)
session.add(new)
case other:
if not other:
print(self.help)
continue
if other.startswith("note:"):
# TODO adding notes to a splitted transaction won't allow categorization
next.note = Note(other[len("note:") :].strip())
else:
ct = other.split(":")
if (category := ct[0]) not in [
c.name for c in self.categories
]:
print(self.help, self.categories)
tags = []
if len(ct) > 1:
tags = ct[1:]
next.category = TransactionCategory(
category, CategorySelector(self.selector)
)
for tag in tags:
if tag not in [t.name for t in self.tags]:
session.add([Tag(tag)])
self.tags = session.get(Tag)
next.tags.add(TransactionTag(tag))
i += 1
session.commit()
next = uncategorized[i] if len(new) == 0 else new.pop()
print(next)
def split(self, original: Transaction) -> list[SplitTransaction]:
total = original.amount
new = []
done = False
while not done:
if abs(sum(t.amount for t in new)) > abs(total):
print("Overflow, try again")
new.clear()
continue
if sum(t.amount for t in new) == total:
done = True
break
amount = decimal.Decimal(input("amount: "))
new.append(
SplitTransaction(
original.date, original.description, amount, original.id
)
)
return new

View File

@ -1,321 +0,0 @@
from pathlib import Path
import argparse
import re
from pfbudget.core.categories import categorize_data
from pfbudget.core.manager import Manager
from pfbudget.input.json import JsonParser
from pfbudget.input.nordigen import NordigenInput
from pfbudget.db.client import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils
DEFAULT_DB = "data.db"
class PfBudgetInitialized(Exception):
pass
class PfBudgetNotInitialized(Exception):
pass
class DataFileMissing(Exception):
pass
def argparser(manager: Manager) -> argparse.ArgumentParser:
help = argparse.ArgumentParser(add_help=False)
help.add_argument(
"-db",
"--database",
nargs="?",
help="select current database",
default=DEFAULT_DB,
)
help.add_argument(
"-q", "--quiet", action="store_true", help="reduces the amount of verbose"
)
period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group()
period.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
)
period.add_argument("--start", type=str, nargs=1, help="graph start date")
period.add_argument("--end", type=str, nargs=1, help="graph end date")
period.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser(
description="does cool finance stuff",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--version",
action="version",
version=re.search(
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
open("pfbudget/__init__.py").read(),
).group(1),
)
subparsers = parser.add_subparsers(dest="command", required=True)
"""
Init
"""
p_init = subparsers.add_parser(
"init",
description="Initializes the SQLite3 database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_init.set_defaults(func=lambda args: manager.init())
"""
Exporting
"""
p_export = subparsers.add_parser(
"export",
description="Exports the selected database to a .csv file",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export())
"""
Parsing
"""
p_parse = subparsers.add_parser(
"parse",
description="Parses and adds the requested transactions into the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_parse.add_argument("path", nargs="+", type=str)
p_parse.add_argument("--bank", nargs=1, type=str)
p_parse.add_argument("--creditcard", nargs=1, type=str)
p_parse.add_argument("--category", nargs=1, type=int)
p_parse.set_defaults(func=lambda args: parse(manager, args))
"""
Categorizing
"""
p_categorize = subparsers.add_parser(
"categorize",
description="Categorizes the transactions in the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_categorize.set_defaults(
func=lambda args: categorize_data(DatabaseClient(args.database))
)
"""
Graph
"""
p_graph = subparsers.add_parser(
"graph",
description="Graph of the transactions",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_graph.add_argument(
"option",
type=str,
choices=["monthly", "discrete", "networth"],
nargs="?",
default="monthly",
help="graph option help",
)
p_graph.add_argument("--save", action="store_true")
p_graph.set_defaults(func=graph)
"""
Report
"""
p_report = subparsers.add_parser(
"report",
description="Prints report of transaction groups",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_report.add_argument(
"option",
type=str,
choices=["net", "detailed"],
nargs="?",
default="net",
help="report option help",
)
p_report.set_defaults(func=report)
"""
Register bank
"""
p_register = subparsers.add_parser(
"register",
description="Register a bank",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.add_argument(
"--requisition", type=str, nargs=1, help="requisition option help"
)
p_register.add_argument("--invert", action="store_true")
p_register.set_defaults(func=lambda args: manager.register(vars(args)))
"""
Unregister bank
"""
p_register = subparsers.add_parser(
"unregister",
description="Unregister a bank",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.set_defaults(func=lambda args: manager.unregister(vars(args)))
"""
Nordigen API
"""
p_nordigen_access = subparsers.add_parser(
"token",
description="Get new access token",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_access.set_defaults(func=lambda args: NordigenInput(manager).token())
"""
(Re)new bank requisition ID
"""
p_nordigen_access = subparsers.add_parser(
"renew",
description="(Re)new the Bank requisition ID",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_access.add_argument("name", nargs=1, type=str)
p_nordigen_access.add_argument("country", nargs=1, type=str)
p_nordigen_access.set_defaults(
func=lambda args: NordigenInput(manager).requisition(
args.name[0], args.country[0]
)
)
"""
Downloading through Nordigen API
"""
p_nordigen_download = subparsers.add_parser(
"download",
description="Downloads transactions using Nordigen API",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_download.add_argument("--id", nargs="+", type=str)
p_nordigen_download.add_argument("--name", nargs="+", type=str)
p_nordigen_download.add_argument("--all", action="store_true")
p_nordigen_download.set_defaults(func=lambda args: download(manager, args))
"""
List available banks on Nordigen API
"""
p_nordigen_list = subparsers.add_parser(
"list",
description="Lists banks in {country}",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_list.add_argument("country", nargs=1, type=str)
p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args))
"""
Nordigen JSONs
"""
p_nordigen_json = subparsers.add_parser(
"json",
description="",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_json.add_argument("json", nargs=1, type=str)
p_nordigen_json.add_argument("bank", nargs=1, type=str)
p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction)
p_nordigen_json.set_defaults(
func=lambda args: manager.parser(JsonParser(vars(args)))
)
return parser
def parse(manager: Manager, args):
"""Parses the contents of the path in args to the selected database.
Args:
args (dict): argparse variables
"""
for path in args.path:
if (dir := Path(path)).is_dir():
for file in dir.iterdir():
manager.parse(file, vars(args))
elif Path(path).is_file():
manager.parse(path, vars(args))
else:
raise FileNotFoundError
def graph(args):
"""Plots the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "monthly":
pfbudget.reporting.graph.monthly(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "discrete":
pfbudget.reporting.graph.discrete(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "networth":
pfbudget.reporting.graph.networth(
DatabaseClient(args.database), vars(args), start, end
)
def report(args):
"""Prints a detailed report of the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "net":
pfbudget.reporting.report.net(DatabaseClient(args.database), start, end)
elif args.option == "detailed":
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
def nordigen_banks(manager: Manager, args):
input = NordigenInput(manager)
input.list(vars(args)["country"][0])
def download(manager: Manager, args):
start, end = pfbudget.utils.parse_args_period(args)
manager.parser(NordigenInput(manager, vars(args), start, end))
def run():
manager = Manager(DEFAULT_DB)
args = argparser(manager).parse_args()
args.func(args)

View File

@ -4,6 +4,53 @@ from decimal import Decimal, InvalidOperation
from enum import Enum, auto
class Operation(Enum):
Init = auto()
Transactions = auto()
Parse = auto()
Download = auto()
Categorize = auto()
ManualCategorization = auto()
Token = auto()
RequisitionId = auto()
CategoryAdd = auto()
CategoryUpdate = auto()
CategoryRemove = auto()
CategorySchedule = auto()
RuleAdd = auto()
RuleRemove = auto()
RuleModify = auto()
GroupAdd = auto()
GroupRemove = auto()
TagAdd = auto()
TagRemove = auto()
TagRuleAdd = auto()
TagRuleRemove = auto()
TagRuleModify = auto()
Forge = auto()
Dismantle = auto()
Split = auto()
BankAdd = auto()
BankMod = auto()
BankDel = auto()
NordigenAdd = auto()
NordigenMod = auto()
NordigenDel = auto()
NordigenCountryBanks = auto()
Export = auto()
Import = auto()
ExportBanks = auto()
ImportBanks = auto()
ExportCategoryRules = auto()
ImportCategoryRules = auto()
ExportTagRules = auto()
ImportTagRules = auto()
ExportCategories = auto()
ImportCategories = auto()
ExportCategoryGroups = auto()
ImportCategoryGroups = auto()
class TransactionError(Exception):
pass

View File

@ -9,7 +9,7 @@ import yaml
if TYPE_CHECKING:
from pfbudget.common.types import Transaction
from pfbudget.db.client import DatabaseClient
from pfbudget.db.sqlite import DatabaseClient
Options = namedtuple(

View File

@ -0,0 +1,154 @@
from codetiming import Timer
from datetime import timedelta
from typing import Sequence
import pfbudget.db.model as t
class Categorizer:
options = {}
def __init__(self):
self.options["null_days"] = 3
def rules(
self,
transactions: Sequence[t.BankTransaction],
categories: Sequence[t.Category],
tags: Sequence[t.Tag],
nullify: bool = True
):
"""Overarching categorization tool
Receives a list of transactions (by ref) and updates their category according
to the rules defined for each category
Args:
transactions (Sequence[BankTransaction]): uncategorized transactions
categories (Sequence[Category]): available categories
tags (Sequence[Tag]): currently available tags
"""
if nullify:
try:
null = next(cat for cat in categories if cat.name == "null")
print("Nullifying")
self._nullify(transactions, null)
except StopIteration:
print("Null category not defined")
categories = [cat for cat in categories if cat.name != "null"]
self._rule_based_categories(transactions, categories)
self._rule_based_tags(transactions, tags)
@Timer(name="nullify")
def _nullify(self, transactions: Sequence[t.BankTransaction], null: t.Category):
count = 0
matching = []
for transaction in transactions:
for cancel in (
cancel
for cancel in transactions
if (
transaction.date - timedelta(days=self.options["null_days"])
<= cancel.date
<= transaction.date + timedelta(days=self.options["null_days"])
and cancel != transaction
and cancel.bank != transaction.bank
and cancel.amount == -transaction.amount
and transaction not in matching
and cancel not in matching
and all(r.matches(transaction) for r in null.rules)
and all(r.matches(cancel) for r in null.rules)
)
):
transaction.category = t.TransactionCategory(
name="null",
selector=t.CategorySelector(t.Selector_T.nullifier),
)
cancel.category = t.TransactionCategory(
name="null",
selector=t.CategorySelector(t.Selector_T.nullifier),
)
matching.extend([transaction, cancel])
count += 2
break
print(f"Nullified {count} of {len(transactions)} transactions")
@Timer(name="categoryrules")
def _rule_based_categories(
self,
transactions: Sequence[t.BankTransaction],
categories: Sequence[t.Category],
):
print(f"Categorizing {len(transactions)} transactions")
d = {}
for category in [c for c in categories if c.rules]:
for rule in category.rules:
# for transaction in [t for t in transactions if not t.category]:
for transaction in [
t
for t in transactions
if not t.category or t.category.name != "null"
]:
if not rule.matches(transaction):
continue
# passed all conditions, assign category
if transaction.category:
if transaction.category.name == category.name:
continue
if (
input(
f"Overwrite {transaction} with {category.name}? (y/n)"
)
== "y"
):
transaction.category.name = category.name
transaction.category.selector.selector = t.Selector_T.rules
else:
transaction.category = t.TransactionCategory(
category.name, t.CategorySelector(t.Selector_T.rules)
)
if rule in d:
d[rule] += 1
else:
d[rule] = 1
for k, v in d.items():
print(f"{v}: {k}")
@Timer(name="tagrules")
def _rule_based_tags(
self, transactions: Sequence[t.BankTransaction], tags: Sequence[t.Tag]
):
print(f"Tagging {len(transactions)} transactions")
d = {}
for tag in [t for t in tags if len(t.rules) > 0]:
for rule in tag.rules:
# for transaction in [t for t in transactions if not t.category]:
for transaction in [
t
for t in transactions
if tag.name not in [tag.tag for tag in t.tags]
]:
if not rule.matches(transaction):
continue
if not transaction.tags:
transaction.tags = {t.TransactionTag(tag.name)}
else:
transaction.tags.add(t.TransactionTag(tag.name))
if rule in d:
d[rule] += 1
else:
d[rule] = 1
for k, v in d.items():
print(f"{v}: {k}")

View File

@ -1,47 +1,412 @@
from pfbudget.input.input import Input
import csv
from pathlib import Path
import pickle
import webbrowser
from pfbudget.common.types import Operation
from pfbudget.core.categorizer import Categorizer
from pfbudget.db.client import DbClient
from pfbudget.db.model import (
Bank,
BankTransaction,
Category,
CategoryGroup,
CategoryRule,
CategorySchedule,
CategorySelector,
Link,
MoneyTransaction,
Nordigen,
Rule,
Selector_T,
SplitTransaction,
Tag,
TagRule,
Transaction,
TransactionCategory,
)
from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.parsers import parse_data
from pfbudget.common.types import Bank, Banks, Transaction, Transactions
from pfbudget.db.client import DatabaseClient
from pfbudget.utils import convert
class Manager:
def __init__(self, db: str):
self.__db = db
def __init__(self, db: str, verbosity: int = 0):
self._db = db
self._verbosity = verbosity
def init(self):
client = DatabaseClient(self.__db)
client.init()
def action(self, op: Operation, params=None):
if self._verbosity > 0:
print(f"op={op}, params={params}")
def register(self, args: dict):
bank = Bank(args["bank"][0], "", args["requisition"][0], args["invert"])
client = DatabaseClient(self.__db)
client.register_bank(convert(bank))
if params is None:
params = []
def unregister(self, args: dict):
client = DatabaseClient(self.__db)
client.unregister_bank(args["bank"][0])
def parser(self, parser: Input):
transactions = parser.parse()
self.add_transactions(transactions)
def parse(self, filename: str, args: dict):
transactions = parse_data(filename, args)
self.add_transactions(transactions)
def transactions() -> list[Transaction]:
match (op):
case Operation.Init:
pass
def add_transactions(self, transactions: Transactions):
client = DatabaseClient(self.__db)
client.insert_transactions([convert(t) for t in transactions])
case Operation.Transactions:
with self.db.session() as session:
transactions = session.get(Transaction)
ret = [t.format for t in transactions]
return ret
def get_bank_by(self, key: str, value: str) -> Bank:
client = DatabaseClient(self.__db)
bank = client.get_bank(key, value)
return convert(bank)
case Operation.Parse:
# Adapter for the parse_data method. Can be refactored.
args = {"bank": params[1], "creditcard": params[2], "category": None}
transactions = []
for path in [Path(p) for p in params[0]]:
if path.is_dir():
for file in path.iterdir():
transactions.extend(self.parse(file, args))
elif path.is_file():
transactions.extend(self.parse(path, args))
else:
raise FileNotFoundError(path)
def get_banks(self) -> Banks:
client = DatabaseClient(self.__db)
return [convert(bank) for bank in client.get_banks()]
if (
len(transactions) > 0
and input(f"{transactions[:5]}\nCommit? (y/n)") == "y"
):
with self.db.session() as session:
session.add(sorted(transactions))
case Operation.Download:
client = NordigenInput()
with self.db.session() as session:
if len(params[3]) == 0:
client.banks = session.get(Bank, Bank.nordigen)
else:
client.banks = session.get(Bank, Bank.name, params[3])
session.expunge_all()
client.start = params[0]
client.end = params[1]
transactions = client.parse()
# dry-run
if not params[2]:
with self.db.session() as session:
session.add(sorted(transactions))
else:
print(transactions)
case Operation.Categorize:
with self.db.session() as session:
uncategorized = session.get(
BankTransaction, ~BankTransaction.category.has()
)
categories = session.get(Category)
tags = session.get(Tag)
Categorizer().rules(uncategorized, categories, tags, params[0])
case Operation.BankMod:
with self.db.session() as session:
session.update(Bank, params)
case Operation.NordigenMod:
with self.db.session() as session:
session.update(Nordigen, params)
case Operation.BankDel:
with self.db.session() as session:
session.remove_by_name(Bank, params)
case Operation.NordigenDel:
with self.db.session() as session:
session.remove_by_name(Nordigen, params)
case Operation.Token:
NordigenInput().token()
case Operation.RequisitionId:
link, _ = NordigenInput().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
webbrowser.open(link)
case Operation.NordigenCountryBanks:
banks = NordigenInput().country_banks(params[0])
print(banks)
case (
Operation.BankAdd
| Operation.CategoryAdd
| Operation.NordigenAdd
| Operation.RuleAdd
| Operation.TagAdd
| Operation.TagRuleAdd
):
with self.db.session() as session:
session.add(params)
case Operation.CategoryUpdate:
with self.db.session() as session:
session.updategroup(*params)
case Operation.CategoryRemove:
with self.db.session() as session:
session.remove_by_name(Category, params)
case Operation.CategorySchedule:
with self.db.session() as session:
session.updateschedules(params)
case Operation.RuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.remove_by_id(CategoryRule, params)
case Operation.TagRemove:
with self.db.session() as session:
session.remove_by_name(Tag, params)
case Operation.TagRuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.remove_by_id(TagRule, params)
case Operation.RuleModify | Operation.TagRuleModify:
assert all(isinstance(param, dict) for param in params)
with self.db.session() as session:
session.update(Rule, params)
case Operation.GroupAdd:
with self.db.session() as session:
session.add(params)
case Operation.GroupRemove:
assert all(isinstance(param, CategoryGroup) for param in params)
with self.db.session() as session:
session.remove_by_name(CategoryGroup, params)
case Operation.Forge:
if not (
isinstance(params[0], int)
and all(isinstance(p, int) for p in params[1])
):
raise TypeError("f{params} are not transaction ids")
with self.db.session() as session:
original = session.get(Transaction, Transaction.id, params[0])[0]
links = session.get(Transaction, Transaction.id, params[1])
if not original.category:
original.category = self.askcategory(original)
for link in links:
if (
not link.category
or link.category.name != original.category.name
):
print(
f"{link} category will change to"
f" {original.category.name}"
)
link.category = original.category
tobelinked = [Link(original.id, link.id) for link in links]
session.add(tobelinked)
case Operation.Dismantle:
assert all(isinstance(param, Link) for param in params)
with self.db.session() as session:
original = params[0].original
links = [link.link for link in params]
session.remove_links(original, links)
case Operation.Split:
if len(params) < 1 and not all(
isinstance(p, Transaction) for p in params
):
raise TypeError(f"{params} are not transactions")
# t -> t1, t2, t3; t.value == Σti.value
original: Transaction = params[0]
if not original.amount == sum(t.amount for t in params[1:]):
raise ValueError(
f"{original.amount}€ != {sum(v for v, _ in params[1:])}"
)
with self.db.session() as session:
originals = session.get(Transaction, Transaction.id, [original.id])
assert len(originals) == 1, ">1 transactions matched {original.id}!"
originals[0].split = True
transactions = []
for t in params[1:]:
if originals[0].date != t.date:
t.date = originals[0].date
print(
f"{t.date} is different from original date"
f" {originals[0].date}, using original"
)
splitted = SplitTransaction(
t.date, t.description, t.amount, originals[0].id
)
splitted.category = t.category
transactions.append(splitted)
session.add(transactions)
case Operation.Export:
with self.db.session() as session:
self.dump(params[0], params[1], sorted(session.get(Transaction)))
case Operation.Import:
transactions = []
for row in self.load(params[0], params[1]):
match row["type"]:
case "bank":
transaction = BankTransaction(
row["date"],
row["description"],
row["amount"],
row["bank"],
)
case "money":
transaction = MoneyTransaction(
row["date"], row["description"], row["amount"]
)
# TODO case "split" how to match to original transaction?? also save ids?
case _:
continue
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"],
CategorySelector(category["selector"]["selector"]),
)
transactions.append(transaction)
if self.certify(transactions):
with self.db.session() as session:
session.add(transactions)
case Operation.ExportBanks:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(Bank))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0], params[1]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = Nordigen(**row["nordigen"])
banks.append(bank)
if self.certify(banks):
with self.db.session() as session:
session.add(banks)
case Operation.ExportCategoryRules:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(CategoryRule))
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
session.add(rules)
case Operation.ExportTagRules:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(TagRule))
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
session.add(rules)
case Operation.ExportCategories:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(Category))
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
categories = []
for row in self.load(params[0], params[1]):
category = Category(row["name"], row["group"])
if len(row["rules"]) > 0:
# Only category rules could have been created with a rule
rules = row["rules"]
for rule in rules:
del rule["type"]
category.rules = set(CategoryRule(**rule) for rule in rules)
if row["schedule"]:
category.schedule = CategorySchedule(**row["schedule"])
categories.append(category)
if self.certify(categories):
with self.db.session() as session:
session.add(categories)
case Operation.ExportCategoryGroups:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(CategoryGroup))
case Operation.ImportCategoryGroups:
groups = [
CategoryGroup(**row) for row in self.load(params[0], params[1])
]
if self.certify(groups):
with self.db.session() as session:
session.add(groups)
def parse(self, filename: Path, args: dict):
return parse_data(filename, args)
def askcategory(self, transaction: Transaction):
selector = CategorySelector(Selector_T.manual)
with self.db.session() as session:
categories = session.get(Category)
while True:
category = input(f"{transaction}: ")
if category in [c.name for c in categories]:
return TransactionCategory(category, selector)
@staticmethod
def dump(fn, format, sequence):
if format == "pickle":
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
elif format == "csv":
with open(fn, "w", newline="") as f:
csv.writer(f).writerows([e.format.values() for e in sequence])
else:
print("format not well specified")
@staticmethod
def load(fn, format):
if format == "pickle":
with open(fn, "rb") as f:
return pickle.load(f)
elif format == "csv":
raise Exception("CSV import not supported")
else:
print("format not well specified")
return []
@staticmethod
def certify(imports: list) -> bool:
if input(f"{imports[:10]}\nDoes the import seem correct? (y/n)") == "y":
return True
return False
@property
def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 2)
@db.setter
def db(self, url: str):
self._db = url

View File

@ -1,212 +1,123 @@
from __future__ import annotations
from decimal import Decimal
import csv
import datetime
import logging
import logging.config
import pathlib
import sqlite3
from dataclasses import asdict
from sqlalchemy import create_engine, delete, select, update
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from sqlalchemy.sql.expression import false
from typing import Sequence, Type, TypeVar
from pfbudget.common.types import Transaction
import pfbudget.db.schema as Q
from pfbudget.db.model import (
Category,
CategoryGroup,
CategorySchedule,
Link,
Transaction,
)
if not pathlib.Path("logs").is_dir():
pathlib.Path("logs").mkdir()
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("pfbudget.transactions")
class DbClient:
"""
General database client using sqlalchemy
"""
sqlite3.register_adapter(Decimal, lambda d: float(d))
__sessions: list[Session]
__DB_NAME = "data.db"
def __init__(self, url: str, echo=False) -> None:
self._engine = create_engine(url, echo=echo)
@property
def engine(self):
return self._engine
class DatabaseClient:
"""SQLite DB connection manager"""
class ClientSession:
def __init__(self, engine):
self.__engine = engine
__EXPORT_DIR = "export"
def __enter__(self):
self.__session = Session(self.__engine)
return self
def __init__(self, db: str):
self.db = db
def __exit__(self, exc_type, exc_value, exc_tb):
self.commit()
self.__session.close()
def __execute(self, query: str, params: tuple = None) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
if params:
ret = con.execute(query, params).fetchall()
logger.debug(f"[{self.db}] < {query}{params}")
def commit(self):
self.__session.commit()
def expunge_all(self):
self.__session.expunge_all()
T = TypeVar("T")
def get(self, type: Type[T], column=None, values=None) -> Sequence[T]:
if column is not None:
if values:
if isinstance(values, Sequence):
stmt = select(type).where(column.in_(values))
else:
ret = con.execute(query).fetchall()
logger.debug(f"[{self.db}] < {query}")
stmt = select(type).where(column == values)
else:
stmt = select(type).where(column)
else:
stmt = select(type)
if ret:
logger.debug(f"[{self.db}] > {ret}")
except sqlite3.Error:
logger.exception(f"Error while executing [{self.db}] < {query}")
finally:
con.close()
return self.__session.scalars(stmt).all()
return ret
def uncategorized(self) -> Sequence[Transaction]:
"""Selects all valid uncategorized transactions
At this moment that includes:
- Categories w/o category
- AND non-split categories
def __executemany(self, query: str, list_of_params: list[tuple]) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
ret = con.executemany(query, list_of_params).fetchall()
logger.debug(f"[{self.db}] < {query}{list_of_params}")
except sqlite3.Error:
logger.exception(
f"Error while executing [{self.db}] < {query} {list_of_params}"
Returns:
Sequence[Transaction]: transactions left uncategorized
"""
stmt = (
select(Transaction)
.where(~Transaction.category.has())
.where(Transaction.split == false())
)
finally:
con.close()
return self.__session.scalars(stmt).all()
return ret
def add(self, rows: list):
self.__session.add_all(rows)
def __create_tables(self, tables: tuple[tuple]):
for table_name, query in tables:
logger.info(f"Creating table {table_name} if it doesn't exist already")
self.__execute(query)
def remove_by_name(self, type, rows: list):
stmt = delete(type).where(type.name.in_([row.name for row in rows]))
self.__session.execute(stmt)
def init(self):
logging.info(f"Initializing {self.db} database")
self.__create_tables(
(
("transactions", Q.CREATE_TRANSACTIONS_TABLE),
("backups", Q.CREATE_BACKUPS_TABLE),
("banks", Q.CREATE_BANKS_TABLE),
def updategroup(self, categories: list[Category], group: CategoryGroup):
stmt = (
update(Category)
.where(Category.name.in_([cat.name for cat in categories]))
.values(group=group)
)
self.__session.execute(stmt)
def updateschedules(self, schedules: list[CategorySchedule]):
stmt = insert(CategorySchedule).values([asdict(s) for s in schedules])
stmt = stmt.on_conflict_do_update(
index_elements=[CategorySchedule.name],
set_=dict(
recurring=stmt.excluded.recurring,
period=stmt.excluded.period,
period_multiplier=stmt.excluded.period_multiplier,
),
)
self.__session.execute(stmt)
"""Transaction table methods"""
def remove_by_id(self, type, ids: list[int]):
stmt = delete(type).where(type.id.in_(ids))
self.__session.execute(stmt)
def select_all(self) -> list[Transaction] | None:
logger.info(f"Reading all transactions from {self.db}")
transactions = self.__execute("SELECT * FROM transactions")
if transactions:
return [Transaction(t) for t in transactions]
return None
def update(self, type, values: list[dict]):
print(type, values)
self.__session.execute(update(type), values)
def insert_transaction(self, transaction: Transaction):
logger.info(f"Adding {transaction} into {self.db}")
self.__execute(Q.ADD_TRANSACTION, (transaction.to_list(),))
def insert_transactions(self, transactions: Q.DbTransactions):
logger.info(f"Adding {len(transactions)} into {self.db}")
self.__executemany(Q.ADD_TRANSACTION, [t.tuple() for t in transactions])
def update_category(self, transaction: Transaction):
logger.info(f"Update {transaction} category")
self.__execute(Q.UPDATE_CATEGORY, transaction.update_category())
def update_categories(self, transactions: list[Transaction]):
logger.info(f"Update {len(transactions)} transactions' categories")
self.__executemany(
Q.UPDATE_CATEGORY,
[transaction.update_category() for transaction in transactions],
def remove_links(self, original: int, links: list[int]):
stmt = delete(Link).where(
Link.original == original, Link.link.in_(link for link in links)
)
self.__session.execute(stmt)
def get_duplicated_transactions(self) -> list[Transaction] | None:
logger.info("Get duplicated transactions")
transactions = self.__execute(Q.DUPLICATED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_sorted_transactions(self) -> list[Transaction] | None:
logger.info("Get transactions sorted by date")
transactions = self.__execute(Q.SORTED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange(self, start: datetime, end: datetime) -> list[Transaction] | None:
logger.info(f"Get transactions from {start} to {end}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_category(self, value: str) -> list[Transaction] | None:
logger.info(f"Get transactions where category = {value}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange_category(
self, start: datetime, end: datetime, category: str
) -> list[Transaction] | None:
logger.info(
f"Get transactions from {start} to {end} where category = {category}"
)
transactions = self.__execute(
Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_by_period(self, period: str) -> list[Transaction] | None:
logger.info(f"Get transactions by {period}")
transactions = self.__execute(Q.SELECT_TRANSACTION_BY_PERIOD, period)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_uncategorized_transactions(self) -> list[Transaction] | None:
logger.debug("Get uncategorized transactions")
return self.get_category(None)
def get_daterange_uncategorized_transactions(self, start: datetime, end: datetime):
logger.debug("Get uncategorized transactions from {start} to {end}")
return self.get_daterange_category(start, end, None)
def get_daterage_without(
self, start: datetime, end: datetime, *categories: str
) -> list[Transaction] | None:
logger.info(f"Get transactions between {start} and {end} not in {categories}")
query = Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
"(" + ", ".join("?" for _ in categories) + ")"
)
transactions = self.__execute(query, (start, end, *categories))
if transactions:
return [Transaction(t) for t in transactions]
return None
def export(self):
filename = pathlib.Path(
"@".join([self.db, datetime.datetime.now().isoformat()])
).with_suffix(".csv")
transactions = self.select_all()
logger.info(f"Exporting {self.db} into {filename}")
if not (dir := pathlib.Path(self.__EXPORT_DIR)).is_dir():
dir.mkdir()
with open(dir / filename, "w", newline="") as f:
csv.writer(f, delimiter="\t").writerows(transactions)
"""Banks table methods"""
def register_bank(self, bank: Q.DbBank):
logger.info(f"Registering {bank}")
self.__execute(Q.ADD_BANK, bank.tuple())
def unregister_bank(self, bank: str):
logger.info(f"Unregistering {bank}")
self.__execute(Q.DELETE_BANK, (bank,))
def get_bank(self, key: str, value: str) -> Q.DbBank | None:
logger.info(f"Get bank with {key} = {value}")
bank = self.__execute(Q.SELECT_BANK.format(key), (value, ))
if bank:
return Q.DbBank(*bank[0])
def get_banks(self) -> Q.DbBanks:
logger.info("Get all banks")
banks = self.__execute(Q.SELECT_BANKS)
if banks:
return [Q.DbBank(*bank) for bank in banks]
return []
def session(self) -> ClientSession:
return self.ClientSession(self.engine)

440
pfbudget/db/model.py Normal file
View File

@ -0,0 +1,440 @@
from __future__ import annotations
import datetime as dt
import decimal
import enum
import re
from typing import Annotated, Any, Optional
from sqlalchemy import (
BigInteger,
Enum,
ForeignKey,
MetaData,
Numeric,
String,
Text,
)
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
MappedAsDataclass,
relationship,
)
class Base(MappedAsDataclass, DeclarativeBase):
metadata = MetaData(
schema="transactions",
naming_convention={
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_`%(constraint_name)s`",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
},
)
class AccountType(enum.Enum):
checking = enum.auto()
savings = enum.auto()
investment = enum.auto()
mealcard = enum.auto()
VISA = enum.auto()
MASTERCARD = enum.auto()
accounttype = Annotated[
AccountType,
mapped_column(Enum(AccountType, inherit_schema=True)),
]
class Export:
@property
def format(self) -> dict[str, Any]:
raise NotImplementedError
class Bank(Base, Export):
__table_args__ = {"schema": "bank"}
__tablename__ = "banks"
name: Mapped[str] = mapped_column(unique=True)
BIC: Mapped[str] = mapped_column(String(8), primary_key=True)
type: Mapped[accounttype] = mapped_column(primary_key=True)
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
BIC=self.BIC,
type=self.type,
nordigen=self.nordigen.format if self.nordigen else None,
)
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=True)]
money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
class Transaction(Base, Export):
__tablename__ = "transactions"
id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[dt.date]
description: Mapped[Optional[str]]
amount: Mapped[money]
split: Mapped[bool] = mapped_column(init=False, default=False)
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(
cascade="all, delete-orphan", init=False, passive_deletes=True
)
tags: Mapped[set[TransactionTag]] = relationship(init=False)
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
@property
def format(self) -> dict[str, Any]:
return dict(
id=self.id,
date=self.date,
description=self.description,
amount=self.amount,
split=self.split,
type=self.type,
category=self.category.format if self.category else None,
# TODO note
tags=[tag.format for tag in self.tags] if self.tags else None,
)
def __lt__(self, other: Transaction):
return self.date < other.date
idfk = Annotated[
int, mapped_column(BigInteger, ForeignKey(Transaction.id, ondelete="CASCADE"))
]
class BankTransaction(Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(bank=self.bank)
class MoneyTransaction(Transaction):
__mapper_args__ = {"polymorphic_identity": "money"}
class SplitTransaction(Transaction):
original: Mapped[idfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(original=self.original)
class CategoryGroup(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "groups"
name: Mapped[str] = mapped_column(primary_key=True)
@property
def format(self) -> dict[str, Any]:
return dict(name=self.name)
class Category(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "available"
name: Mapped[str] = mapped_column(primary_key=True)
group: Mapped[Optional[str]] = mapped_column(
ForeignKey(CategoryGroup.name), default=None
)
rules: Mapped[set[CategoryRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
)
schedule: Mapped[Optional[CategorySchedule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default=None
)
def __repr__(self) -> str:
return (
f"Category(name={self.name}, group={self.group}, #rules={len(self.rules)},"
f" schedule={self.schedule})"
)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
group=self.group if self.group else None,
rules=[rule.format for rule in self.rules],
schedule=self.schedule.format if self.schedule else None,
)
catfk = Annotated[
str,
mapped_column(ForeignKey(Category.name, ondelete="CASCADE")),
]
class TransactionCategory(Base, Export):
__tablename__ = "categorized"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
name: Mapped[catfk]
selector: Mapped[CategorySelector] = relationship(
cascade="all, delete-orphan", lazy="joined"
)
@property
def format(self):
return dict(name=self.name, selector=self.selector.format)
class Note(Base):
__tablename__ = "notes"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
note: Mapped[str]
class Nordigen(Base, Export):
__table_args__ = {"schema": "bank"}
__tablename__ = "nordigen"
name: Mapped[bankfk] = mapped_column(primary_key=True)
bank_id: Mapped[Optional[str]]
requisition_id: Mapped[Optional[str]]
invert: Mapped[Optional[bool]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
bank_id=self.bank_id,
requisition_id=self.requisition_id,
invert=self.invert,
)
class Tag(Base):
__table_args__ = {"schema": "tag"}
__tablename__ = "available"
name: Mapped[str] = mapped_column(primary_key=True)
rules: Mapped[set[TagRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
)
class TransactionTag(Base, Export):
__tablename__ = "tagged"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
@property
def format(self):
return dict(tag=self.tag)
def __hash__(self):
return hash(self.id)
class Selector_T(enum.Enum):
unknown = enum.auto()
nullifier = enum.auto()
vacations = enum.auto()
rules = enum.auto()
algorithm = enum.auto()
manual = enum.auto()
categoryselector = Annotated[
Selector_T,
mapped_column(Enum(Selector_T, inherit_schema=True), default=Selector_T.unknown),
]
class CategorySelector(Base, Export):
__tablename__ = "selector"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(TransactionCategory.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
selector: Mapped[categoryselector]
@property
def format(self):
return dict(selector=self.selector)
class Period(enum.Enum):
daily = "daily"
weekly = "weekly"
monthly = "monthly"
yearly = "yearly"
scheduleperiod = Annotated[Selector_T, mapped_column(Enum(Period, inherit_schema=True))]
class CategorySchedule(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "schedules"
name: Mapped[catfk] = mapped_column(primary_key=True)
period: Mapped[Optional[scheduleperiod]]
period_multiplier: Mapped[Optional[int]]
amount: Mapped[Optional[int]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
period=self.period,
period_multiplier=self.period_multiplier,
amount=self.amount,
)
class Link(Base):
__tablename__ = "links"
original: Mapped[idfk] = mapped_column(primary_key=True)
link: Mapped[idfk] = mapped_column(primary_key=True)
class Rule(Base, Export):
__tablename__ = "rules"
id: Mapped[idpk] = mapped_column(init=False)
start: Mapped[Optional[dt.date]]
end: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]]
regex: Mapped[Optional[str]]
bank: Mapped[Optional[str]]
min: Mapped[Optional[money]]
max: Mapped[Optional[money]]
type: Mapped[str] = mapped_column(init=False)
__mapper_args__ = {
"polymorphic_identity": "rule",
"polymorphic_on": "type",
}
def matches(self, t: BankTransaction) -> bool:
valid = None
if self.regex:
valid = re.compile(self.regex, re.IGNORECASE)
ops = (
Rule.exists(self.start, lambda r: r < t.date),
Rule.exists(self.end, lambda r: r > t.date),
Rule.exists(self.description, lambda r: r == t.description),
Rule.exists(
valid,
lambda r: r.search(t.description) if t.description else False,
),
Rule.exists(self.bank, lambda r: r == t.bank),
Rule.exists(self.min, lambda r: r < t.amount),
Rule.exists(self.max, lambda r: r > t.amount),
)
if all(ops):
return True
return False
@property
def format(self) -> dict[str, Any]:
return dict(
start=self.start,
end=self.end,
description=self.description,
regex=self.regex,
bank=self.bank,
min=self.min,
max=self.max,
type=self.type,
)
@staticmethod
def exists(r, op) -> bool:
return op(r) if r is not None else True
class CategoryRule(Rule):
__table_args__ = {"schema": "category"}
__tablename__ = "rules"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(Rule.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
name: Mapped[catfk]
__mapper_args__ = {
"polymorphic_identity": "category_rule",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(name=self.name)
def __hash__(self):
return hash(self.id)
class TagRule(Rule):
__table_args__ = {"schema": "tag"}
__tablename__ = "rules"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(Rule.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
__mapper_args__ = {
"polymorphic_identity": "tag_rule",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(tag=self.tag)
def __hash__(self):
return hash(self.id)

212
pfbudget/db/sqlite.py Normal file
View File

@ -0,0 +1,212 @@
from __future__ import annotations
from decimal import Decimal
import csv
import datetime
import logging
import logging.config
import pathlib
import sqlite3
from pfbudget.common.types import Transaction
import pfbudget.db.schema as Q
if not pathlib.Path("logs").is_dir():
pathlib.Path("logs").mkdir()
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("pfbudget.transactions")
sqlite3.register_adapter(Decimal, lambda d: float(d))
__DB_NAME = "data.db"
class DatabaseClient:
"""SQLite DB connection manager"""
__EXPORT_DIR = "export"
def __init__(self, db: str):
self.db = db
def __execute(self, query: str, params: tuple = None) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
if params:
ret = con.execute(query, params).fetchall()
logger.debug(f"[{self.db}] < {query}{params}")
else:
ret = con.execute(query).fetchall()
logger.debug(f"[{self.db}] < {query}")
if ret:
logger.debug(f"[{self.db}] > {ret}")
except sqlite3.Error:
logger.exception(f"Error while executing [{self.db}] < {query}")
finally:
con.close()
return ret
def __executemany(self, query: str, list_of_params: list[tuple]) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
ret = con.executemany(query, list_of_params).fetchall()
logger.debug(f"[{self.db}] < {query}{list_of_params}")
except sqlite3.Error:
logger.exception(
f"Error while executing [{self.db}] < {query} {list_of_params}"
)
finally:
con.close()
return ret
def __create_tables(self, tables: tuple[tuple]):
for table_name, query in tables:
logger.info(f"Creating table {table_name} if it doesn't exist already")
self.__execute(query)
def init(self):
logging.info(f"Initializing {self.db} database")
self.__create_tables(
(
("transactions", Q.CREATE_TRANSACTIONS_TABLE),
("backups", Q.CREATE_BACKUPS_TABLE),
("banks", Q.CREATE_BANKS_TABLE),
)
)
"""Transaction table methods"""
def select_all(self) -> list[Transaction] | None:
logger.info(f"Reading all transactions from {self.db}")
transactions = self.__execute("SELECT * FROM transactions")
if transactions:
return [Transaction(t) for t in transactions]
return None
def insert_transaction(self, transaction: Transaction):
logger.info(f"Adding {transaction} into {self.db}")
self.__execute(Q.ADD_TRANSACTION, (transaction.to_list(),))
def insert_transactions(self, transactions: Q.DbTransactions):
logger.info(f"Adding {len(transactions)} into {self.db}")
self.__executemany(Q.ADD_TRANSACTION, [t.tuple() for t in transactions])
def update_category(self, transaction: Transaction):
logger.info(f"Update {transaction} category")
self.__execute(Q.UPDATE_CATEGORY, transaction.update_category())
def update_categories(self, transactions: list[Transaction]):
logger.info(f"Update {len(transactions)} transactions' categories")
self.__executemany(
Q.UPDATE_CATEGORY,
[transaction.update_category() for transaction in transactions],
)
def get_duplicated_transactions(self) -> list[Transaction] | None:
logger.info("Get duplicated transactions")
transactions = self.__execute(Q.DUPLICATED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_sorted_transactions(self) -> list[Transaction] | None:
logger.info("Get transactions sorted by date")
transactions = self.__execute(Q.SORTED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange(self, start: datetime, end: datetime) -> list[Transaction] | None:
logger.info(f"Get transactions from {start} to {end}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_category(self, value: str) -> list[Transaction] | None:
logger.info(f"Get transactions where category = {value}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange_category(
self, start: datetime, end: datetime, category: str
) -> list[Transaction] | None:
logger.info(
f"Get transactions from {start} to {end} where category = {category}"
)
transactions = self.__execute(
Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_by_period(self, period: str) -> list[Transaction] | None:
logger.info(f"Get transactions by {period}")
transactions = self.__execute(Q.SELECT_TRANSACTION_BY_PERIOD, period)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_uncategorized_transactions(self) -> list[Transaction] | None:
logger.debug("Get uncategorized transactions")
return self.get_category(None)
def get_daterange_uncategorized_transactions(self, start: datetime, end: datetime):
logger.debug("Get uncategorized transactions from {start} to {end}")
return self.get_daterange_category(start, end, None)
def get_daterage_without(
self, start: datetime, end: datetime, *categories: str
) -> list[Transaction] | None:
logger.info(f"Get transactions between {start} and {end} not in {categories}")
query = Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
"(" + ", ".join("?" for _ in categories) + ")"
)
transactions = self.__execute(query, (start, end, *categories))
if transactions:
return [Transaction(t) for t in transactions]
return None
def export(self):
filename = pathlib.Path(
"@".join([self.db, datetime.datetime.now().isoformat()])
).with_suffix(".csv")
transactions = self.select_all()
logger.info(f"Exporting {self.db} into {filename}")
if not (dir := pathlib.Path(self.__EXPORT_DIR)).is_dir():
dir.mkdir()
with open(dir / filename, "w", newline="") as f:
csv.writer(f, delimiter="\t").writerows(transactions)
"""Banks table methods"""
def register_bank(self, bank: Q.DbBank):
logger.info(f"Registering {bank}")
self.__execute(Q.ADD_BANK, bank.tuple())
def unregister_bank(self, bank: str):
logger.info(f"Unregistering {bank}")
self.__execute(Q.DELETE_BANK, (bank,))
def get_bank(self, key: str, value: str) -> Q.DbBank | None:
logger.info(f"Get bank with {key} = {value}")
bank = self.__execute(Q.SELECT_BANK.format(key), (value, ))
if bank:
return Q.DbBank(*bank[0])
def get_banks(self) -> Q.DbBanks:
logger.info("Get all banks")
banks = self.__execute(Q.SELECT_BANKS)
if banks:
return [Q.DbBank(*bank) for bank in banks]
return []

View File

@ -1,21 +1,9 @@
from __future__ import annotations
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from pfbudget.common.types import Transactions
if TYPE_CHECKING:
from pfbudget.core.manager import Manager
from pfbudget.db.model import Transaction
class Input(ABC):
def __init__(self, manager: Manager):
self._manager = manager
@abstractmethod
def parse(self) -> Transactions:
return NotImplemented
@property
def manager(self):
return self._manager
def parse(self) -> list[Transaction]:
return NotImplementedError

View File

@ -1,30 +0,0 @@
import json
from .input import Input
from pfbudget.common.types import Transactions
from pfbudget.utils import convert, parse_decimal
class JsonParser(Input):
def __init__(self, manager, options):
super().__init__(manager)
self.options = options
def parse(self) -> Transactions:
try:
with open(self.options["json"][0], "r") as f:
return [
convert(
[
t["bookingDate"],
t["remittanceInformationUnstructured"],
self.options["bank"][0],
parse_decimal(t["transactionAmount"]["amount"])
if not self.options["invert"]
else -parse_decimal(t["transactionAmount"]["amount"]),
],
)
for t in json.load(f)["transactions"]["booked"]
]
except KeyError:
print("No json file defined")

View File

@ -1,59 +1,51 @@
from datetime import date
from time import sleep
from requests import HTTPError, ReadTimeout
from dotenv import load_dotenv
from nordigen import NordigenClient
from uuid import uuid4
import datetime as dt
import dotenv
import json
import nordigen
import os
import webbrowser
import requests
import time
import uuid
import pfbudget.db.model as t
from pfbudget.utils.converters import convert
from .input import Input
from pfbudget.common.types import NoBankSelected, Transactions
from pfbudget.utils import convert
load_dotenv()
dotenv.load_dotenv()
class NordigenInput(Input):
def __init__(self, manager, options: dict = {}, start=date.min, end=date.max):
super().__init__(manager)
self._client = NordigenClient(
secret_key=os.environ.get("SECRET_KEY"),
secret_id=os.environ.get("SECRET_ID"),
redirect_url = "https://murta.dev"
def __init__(self):
super().__init__()
if not (key := os.environ.get("SECRET_KEY")) or not (
id := os.environ.get("SECRET_ID")
):
raise
self._client = nordigen.NordigenClient(
secret_key=key,
secret_id=id,
)
self.client.token = self.__token()
self._client.token = self.__token()
self._start = dt.date.min
self._end = dt.date.max
# print(options)
if "all" in options and options["all"]:
self.__banks = self.manager.get_banks()
elif "id" in options and options["id"]:
self.__banks = [
self.manager.get_bank_by("nordigen_id", b) for b in options["id"]
]
elif "name" in options and options["name"]:
self.__banks = [
self.manager.get_bank_by("name", b) for b in options["name"]
]
else:
self.__banks = None
self.__from = start
self.__to = end
def parse(self) -> Transactions:
def parse(self) -> list[t.BankTransaction]:
transactions = []
if not self.__banks:
raise NoBankSelected
assert len(self._banks) > 0
for bank in self.__banks:
for bank in self._banks:
print(f"Downloading from {bank}...")
requisition = self.client.requisition.get_requisition_by_id(
bank.requisition_id
bank.nordigen.requisition_id
)
print(requisition)
for acc in requisition["accounts"]:
account = self._client.account_api(acc)
@ -63,14 +55,14 @@ class NordigenInput(Input):
try:
downloaded = account.get_transactions()
break
except ReadTimeout:
except requests.ReadTimeout:
retries += 1
print(f"Request #{retries} timed-out, retrying in 1s")
sleep(1)
except HTTPError as e:
time.sleep(1)
except requests.HTTPError as e:
retries += 1
print(f"Request #{retries} failed with {e}, retrying in 1s")
sleep(1)
time.sleep(1)
if not downloaded:
print(f"Couldn't download transactions for {account}")
@ -84,44 +76,59 @@ class NordigenInput(Input):
]
transactions.extend(
[t for t in converted if self.__from <= t.date <= self.__to]
[t for t in converted if self._start <= t.date <= self._end]
)
return transactions
return sorted(transactions)
def token(self):
token = self._client.generate_token()
print(f"New access token: {token}")
return token
def requisition(self, institution: str, country: str = "PT"):
link, _ = self.__requisition_id(institution, country)
webbrowser.open(link)
def requisition(self, id: str, country: str = "PT"):
requisition = self._client.initialize_session(
redirect_uri=self.redirect_url,
institution_id=id,
reference_id=str(uuid.uuid4()),
)
return requisition.link, requisition.requisition_id
def list(self, country: str):
print(self._client.institution.get_institutions(country))
def country_banks(self, country: str):
return self._client.institution.get_institutions(country)
@property
def client(self):
return self._client
@property
def banks(self):
return self._banks
@banks.setter
def banks(self, value):
self._banks = value
@property
def start(self):
return self._start
@start.setter
def start(self, value):
self._start = value
@property
def end(self):
return self._end
@end.setter
def end(self, value):
self._end = value
def __token(self):
if token := os.environ.get("TOKEN"):
return token
else:
token = self._client.generate_token()
print(f"New access token: {token}")
return token
def __requisition_id(self, i: str, c: str):
id = self._client.institution.get_institution_id_by_name(
country=c, institution=i
)
init = self._client.initialize_session(
redirect_uri="https://murta.dev",
institution_id=id,
reference_id=str(uuid4()),
)
print(f"{i}({c}) link: {init.link} and requisition ID: {init.requisition_id}")
return (init.link, init.requisition_id)
return token["access"]

View File

@ -1,10 +1,12 @@
from collections import namedtuple
from decimal import Decimal
from importlib import import_module
from pathlib import Path
import datetime as dt
import yaml
from pfbudget.common.types import NoBankSelected, Transaction, Transactions
from pfbudget.common.types import NoBankSelected
from pfbudget.db.model import Transaction
from pfbudget.utils import utils
Index = namedtuple(
@ -43,7 +45,7 @@ Options = namedtuple(
)
def parse_data(filename: str, args: dict) -> Transactions:
def parse_data(filename: Path, args: dict) -> list[Transaction]:
cfg: dict = yaml.safe_load(open("parsers.yaml"))
assert (
"Banks" in cfg
@ -84,7 +86,7 @@ def parse_data(filename: str, args: dict) -> Transactions:
class Parser:
def __init__(self, filename: str, bank: str, options: dict):
def __init__(self, filename: Path, bank: str, options: dict):
self.filename = filename
self.bank = bank
@ -157,7 +159,7 @@ class Parser:
category = line[options.category]
transaction = Transaction(date, text, bank, value, category)
else:
transaction = Transaction(date, text, bank, value, options.category)
transaction = Transaction(date, text, bank, value)
if options.additional_parser:
func(transaction)

View File

@ -9,7 +9,7 @@ import pfbudget.core.categories
if TYPE_CHECKING:
from pfbudget.db.client import DatabaseClient
from pfbudget.db.sqlite import DatabaseClient
groups = pfbudget.core.categories.cfg["Groups"]

View File

@ -6,7 +6,7 @@ import datetime as dt
import pfbudget.core.categories
if TYPE_CHECKING:
from pfbudget.db.client import DatabaseClient
from pfbudget.db.sqlite import DatabaseClient
def net(db: DatabaseClient, start: dt.date = dt.date.min, end: dt.date = dt.date.max):

View File

@ -1,2 +0,0 @@
from .converters import convert
from .utils import *

View File

@ -1,71 +1,30 @@
from datetime import timedelta
from functools import singledispatch
import datetime as dt
import functools
from typing import Any
from pfbudget.common.types import TransactionError
import pfbudget.db.model as t
from pfbudget.common.types import Bank, Transaction, TransactionError
from pfbudget.db.schema import DbBank, DbTransaction
from .utils import parse_decimal
@singledispatch
def convert(t):
print("No converter as been found")
@functools.singledispatch
def convert(t) -> Any:
print("No converter has been found")
pass
@convert.register
def _(t: Transaction) -> DbTransaction:
return DbTransaction(
t.date,
t.description,
t.bank,
t.value,
t.category,
t.original,
t.additional_comment,
)
@convert.register
def _(db: DbTransaction) -> Transaction:
def _(json: dict, bank: t.Bank) -> t.BankTransaction | None:
i = -1 if bank.nordigen and bank.nordigen.invert else 1
try:
return Transaction(db)
except TransactionError:
print(f"{db} is in the wrong format")
@convert.register
def _(db: DbBank, key: str = "") -> Bank:
bank = Bank(db.name, db.bic, db.requisition_id, db.invert, db.offset, key=key)
if not bank.invert:
bank.invert = False
if not bank.offset:
bank.offset = 0
return bank
@convert.register
def _(bank: Bank) -> DbBank:
bank = DbBank(
bank.name, bank.bic, "", "", bank.requisition_id, bank.invert, bank.offset
transaction = t.BankTransaction(
date=dt.date.fromisoformat(json["bookingDate"]),
description=json["remittanceInformationUnstructured"],
bank=bank.name,
amount=i * parse_decimal(json["transactionAmount"]["amount"]),
)
if not bank.invert:
bank.invert = False
if not bank.offset:
bank.offset = 0
return bank
@convert.register
def _(json: dict, bank: Bank) -> Transaction:
i = -1 if bank.invert else 1
try:
transaction = Transaction(
json["bookingDate"],
json["remittanceInformationUnstructured"],
bank.name,
i * parse_decimal(json["transactionAmount"]["amount"]),
)
transaction.date += timedelta(days=bank.offset)
# transaction.date += timedelta(days=bank.offset)
return transaction
except TransactionError:

View File

@ -59,21 +59,21 @@ def find_credit_institution(fn, banks, creditcards):
return bank, cc
def parse_args_period(args):
def parse_args_period(args: dict):
start, end = date.min, date.max
if args.start:
start = datetime.strptime(args.start[0], "%Y/%m/%d").date()
if args["start"]:
start = datetime.strptime(args["start"][0], "%Y/%m/%d").date()
if args.end:
end = datetime.strptime(args.end[0], "%Y/%m/%d").date()
if args["end"]:
end = datetime.strptime(args["end"][0], "%Y/%m/%d").date()
if args.interval:
start = datetime.strptime(args.interval[0], "%Y/%m/%d").date()
end = datetime.strptime(args.interval[1], "%Y/%m/%d").date()
if args["interval"]:
start = datetime.strptime(args["interval"][0], "%Y/%m/%d").date()
end = datetime.strptime(args["interval"][1], "%Y/%m/%d").date()
if args.year:
start = datetime.strptime(args.year[0], "%Y").date()
end = datetime.strptime(str(int(args.year[0]) + 1), "%Y").date() - timedelta(
if args["year"]:
start = datetime.strptime(args["year"][0], "%Y").date()
end = datetime.strptime(str(int(args["year"][0]) + 1), "%Y").date() - timedelta(
days=1
)

View File

@ -1,5 +1,7 @@
codetiming==1.4.0
matplotlib==3.6.1
nordigen==1.3.0
python-dateutil==2.8.2
python-dotenv==0.21.0
PyYAML==6.0
SQLAlchemy==2.0.0rc2