Compare commits

...

67 Commits

Author SHA1 Message Date
ddb02b33f9
Merge branch 'feature/postgresql'
The graphs and reports are broken on this commit.
2023-04-12 18:25:16 +01:00
5af05c2fa4
Print the transaction ID 2023-04-12 18:23:24 +01:00
2cbf00a939
CLI now allow --no-nulls flag on categorize
Null categorization, which is a major time slog, may now be disabled.
2023-04-12 18:20:57 +01:00
6b26651d22
Option eua now takes id instead of bank name 2023-04-12 18:19:24 +01:00
ed2dda63e9
Allows using rules for the nullying step 2023-02-23 23:24:01 +00:00
1a774e3769
Adds get all transactions operation 2023-02-23 23:23:19 +00:00
dd724b6c28
Export in .csv
Importing is not supported, since there's no way to represent a Null
field in .csv
2023-02-23 23:21:54 +00:00
6f68d971ee
Clear up forge/dismantle logic 2023-02-11 22:48:04 +00:00
f7df033d58
Add start date rule
Rename date to end.
2023-02-11 22:46:41 +00:00
23eb2c80bd
[Interactive] Defines an Interactive class
Affords a cleaner coding over the function.
Renames the runnable.py into what it actually is, the argparser.py.
2023-01-30 22:24:23 +00:00
7453ffbd3a
[Interactive] Adds new format for tags
Now tags can be defined along side categories by starting with :.
E.g. eating out:work:today will classify the transaction with the eating
out category and work and today tags.
2023-01-29 23:48:48 +00:00
da348c4ffb
[Fix] Splitted categories were not being commited 2023-01-29 23:48:27 +00:00
2da721d53c
[Interactive] Adds skip command 2023-01-29 23:48:22 +00:00
f943374ade
[Fix] Splitted transactions do not have category
Adds uncategorized method to the DB client to retrieve transactions w/o
a category AND not splitted.
2023-01-29 23:39:46 +00:00
abff76ad4e
[Interactive] Finish split command 2023-01-29 21:26:22 +00:00
e0fc310ef6
[Fix] Add split default 2023-01-29 21:16:05 +00:00
13709b7a04
[Interactive] Adds new tag 2023-01-29 21:14:49 +00:00
fd24ac3318
Remove unnecessary __init__.pys
and import using submodules in __main__.py
2023-01-23 23:52:20 +00:00
36e7f84bd9
Interactive categorization moved to __main__.py
It didn't make sense to have it inside the manager, it should only be
used to process commands and its paramaters.
2023-01-23 23:31:38 +00:00
5235fcdfc3
Adds business logic for split transaction
Moves split member to be part of parent class and removes it from the
init.
2023-01-23 23:03:42 +00:00
ace5195164
Move selector table back to transactions schema
Change table name originals to transactions and tags to tagged.
2023-01-23 19:36:24 +00:00
ae0a195069
Split tables per different schemas 2023-01-23 19:36:07 +00:00
8760f5a0a4
Export/Import categories and groups 2023-01-23 00:06:36 +00:00
dd0aaa01b8
Export/import for banks 2023-01-22 23:42:32 +00:00
fd6793b4f4
Turned on type checking
and as a result, had to fix a LOT of minor potential future issue.
It also reorders and clears unused imports.

When exporting transactions, it will sort by date.
2023-01-22 20:44:05 +00:00
d4b5f1f11a
Rule inheritance
Both rules, categorries and tags, now derive from the rule base type.
This clears up some type definitions.
2023-01-22 20:22:46 +00:00
6110858d48
Extend export/import to rules
Removes additional bank/all options from the transactions export command
line.
Deletes the brief lived CSV class.
This patch start using pickle for simple export/import, other options
can be added later. An issue with the .csv is the lack of a Null field.
Moves logic to Manager, it is simple enough.
2023-01-15 23:06:20 +00:00
1cce7d421e
Update requirements.txt with the SQL ORM 2023-01-10 23:49:06 +00:00
c42a399d3d
Adds the import operation and a timer
to the categorization. We can now import transactions from a csv file,
and later automatically categorize them all.
2023-01-10 23:45:09 +00:00
478bd25190
Subclass the Transaction with multiple children
Each children is essentually a type of transaction. We currently have:
- bank transactions
- money transactions
- split transactions

The table inheritance is implemented as a single table, with a
polymorphic type and Null columns.

Adds a IsSplit interface, which will later be used for the category
views, so as to not repeat transactions.
2023-01-10 23:42:37 +00:00
0d287624c4
Load the default DB from the .env file 2023-01-10 21:35:43 +00:00
c37e7eb37c
Readds manual categorization
Also fixes a categorization bug in the Manager, in the DB client method.
2023-01-10 21:32:08 +00:00
86afa99217
Finish the remaining Nordigen operations
from the Manager POV and the update on the argparses.
Also clears unnecessary methods from the DB client interface.
Better assert information on the __main__.py
2023-01-08 19:41:07 +00:00
9b45ee4817
Update the export operation
to work with the Manager.
Also removes the run method from the runnable.py, since everything is
done in the __main__.py file of the pfbudget module.
2023-01-08 19:41:07 +00:00
9500e808de
Update the parse operation
to coordinate with the manager
2023-01-06 22:05:01 +00:00
55a5b09c45
Fix download, bank, token and renew->eua ops
- Update the download, token and eua cli with the new operations
- Change the bank and nordigen cli to be more in line with the other
  add/modify/remove operations. Also update manager logic.
- Fix some model.py leftovers
- Add __lt__ to Transaction to enable sorting
- Remove universal from child argparsers
2023-01-01 18:46:04 +00:00
071711dbdb
Fix CategoryRule and TagRule initializations
and re-adds the remove option to the modify category rule cli.
2022-12-30 16:09:40 +00:00
7fe5b6bd32
Adds links between transactions
Sometimes transctions are directly related to one another w/o being of
an equal value, e.g. someone pays for the meal w/ the CC, and everyone
pays him/her.

Clear leftover __repr__ methods in the model classes, the dataclass
decorator will create those automatically.
2022-12-19 22:55:04 +00:00
f20cf685ad
Update Foreign Key delete cascade for categories 2022-12-19 22:10:06 +00:00
e57859f601
Updates verbosity options
Now it's used by adding -v to the command called. No verbosity=quiet.
2022-12-19 22:02:11 +00:00
e27f2f08cf
Adds rule based tagging of transactions
Tags will work as additional categories to filter/organize by. It makes
sense they can also be rule based.
Since rules are common to both categories and tags, reorganize the
classes in the model. It doesn't affect the DB.
2022-12-19 21:59:41 +00:00
058d0cc05d
Adds Tags rules
Creates an available tags table to cross reference to.
Clarifies some logic in the DbClient, since all adds/removes are the
same.
2022-12-19 20:35:18 +00:00
6c33a94a5f
Change min_amount/max_amount columns for min/max
Simplifies the action creation.
2022-12-19 20:35:18 +00:00
a2b2f2c1d1
Adds modify rule operation 2022-12-18 00:48:45 +00:00
7986fe8e5b
[Fix] argparse group option not an array 2022-12-17 20:59:40 +00:00
17d8d5d813
[Fix] Changes rule's money type to Decimal
Also fixes comparison w/ max amount.
2022-12-17 20:48:16 +00:00
72a8995fe6
Adds regex rule and remove rule option
Categorization rules can now search using a regex pattern.
2022-12-10 18:54:16 +00:00
d321481e29
Rule based categorizing
Uses the rules defined for each category to classify each transaction.
Fixes the categorize command, which was broken from previous refactors.
Swaps str type on the categories_rules date to date.
2022-12-10 00:24:44 +00:00
fed007ff89
Changes recurring categories boolean to int
to indicate the number of expected transactions per time period.
2022-12-10 00:20:41 +00:00
f721e6a910
[Refactor] Decouple CLI arguments from Manager
The Manager doesn't need to know that it was called from the CLI, so it
now is initialized with the database and performs an action, based on
the operation it receives and its parameters.

The work isn't finished, some Manager actions are still based on the CLI
arguments.

The CLI logic and creation of parameters to pass to the manager have
been moved to the __main__.py file, which brings it to line to the
program being called as a package from the command line.
2022-12-09 19:43:09 +00:00
471331ffc9
Categories rules for rule-based categorization
This patch extends the categories_rules with a rule for each field of a
transaction.

It also changes the ORM classes to behave as dataclasses again.
2022-12-09 19:38:12 +00:00
63d6a2aab7
Finishes category schedule implementation 2022-12-08 20:15:48 +00:00
f09b328c66
Package now run as python3 -m pfbudget
Moves in line in the normal way python packages are run.
https://docs.python.org/3/library/__main__.html
2022-12-08 20:11:33 +00:00
6f97b883fb
Add weekly option on the schedule period 2022-12-08 16:44:07 +00:00
72974c90aa
Operations refactor
Unify the manager operations under one enum.
Start cleaning up the argparser structure to easily add new operations.
2022-12-08 16:15:53 +00:00
d11bc6df1d
Adds category recurring possibility 2022-12-08 13:31:31 +00:00
d409038072
Adds category selector column
This column indicates where has the category came from.
2022-12-08 01:22:26 +00:00
9d33df78a8
Adds command line option to add/remove categories
Implements the argument parser, the manager logic and the DB client
methods.
Encapsulates the DbClient connection under the _db attribute on the
manager.

Adds verbose option to enable ORM increased logging.
2022-12-08 00:25:56 +00:00
882a77d24c
[Refactor] CLI argparser passes options to Manager
Move all BL to the manager. The ArgParser now only parses the CLI
arguments and creates a command which contains which command was run. In
turn, this information is passed to the manager, which will run the
appropriate business logic.

This will make it easier to add new options, separating the parsing of
the CLI options from the implementation of the logic. It also simplifies
any future effort in adding a different input (e.g. GUI).

Warning: some function were commented out, this is only a tracer bullet.
2022-12-04 17:48:28 +00:00
be67612f67
Introduces categorizer that works on ORM classes
Categorizer will work directly on ORM classes, which will cleanup the
code, since changes will automatically be persisted when change the
objects.

Adds wrapper session class inside the DbClient for the manager to use.
The manager will have to have some DB session knowledge, which adds some
unfortunate coupling.

Removes some unnecessary relations between tables that were added by
mistake.

category CLI option now uses the manager.
2022-12-04 16:13:05 +00:00
78e545589d
Adds category groups 2022-12-04 15:14:24 +00:00
b8142f4f99
Add available categories table 2022-12-04 11:19:21 +00:00
e379d77995
Nordigen -> PostgreSQL path completed
Can now download from all banks registered on the banks/nordigen table
and write to the PostgresSQL DB (or any DB, since we're now agnostic).

Commented out most of the managers functions until the integration with
the new DB client is complete.
Set Optional relationships. Remove the DB types as dataclasses, it only
increased the verbosity of the types w/o much going for it.
Change the name Original to Transaction, since the type is the
placeholder for the rest of the transaction information.
2022-12-04 00:02:45 +00:00
91514f71b1
Adds relationships and CASCADE on deletes 2022-12-03 16:57:27 +00:00
246c948d76
All meal card to account type enum 2022-12-03 16:13:26 +00:00
0d22b02b3f
Adds alembic tool for DB versioning
Alembic is a lightweight database migration tool for usage with the
SQLAlchemy Database Toolkit for Python.
https://alembic.sqlalchemy.org/en/latest/index.html

Adds first version of DB schema.
2022-12-03 16:13:23 +00:00
395576d73f
Move SQLite client to separate file 2022-11-25 19:02:34 +00:00
56 changed files with 4311 additions and 741 deletions

1
.gitignore vendored
View File

@ -153,4 +153,5 @@ dmypy.json
### Default user directories ### Default user directories
export/ export/
tmp/
.pfbudget .pfbudget

105
alembic.ini Normal file
View File

@ -0,0 +1,105 @@
# A generic, single database configuration.
[alembic]
# path to migration scripts
script_location = alembic
# template used to generate migration file names; The default value is %%(rev)s_%%(slug)s
# Uncomment the line below if you want the files to be prepended with date and time
# see https://alembic.sqlalchemy.org/en/latest/tutorial.html#editing-the-ini-file
# for all available tokens
# file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
# sys.path path, will be prepended to sys.path if present.
# defaults to the current working directory.
prepend_sys_path = .
# timezone to use when rendering the date within the migration file
# as well as the filename.
# If specified, requires the python-dateutil library that can be
# installed by adding `alembic[tz]` to the pip requirements
# string value is passed to dateutil.tz.gettz()
# leave blank for localtime
timezone = UTC
# max length of characters to apply to the
# "slug" field
# truncate_slug_length = 40
# set to 'true' to run the environment during
# the 'revision' command, regardless of autogenerate
# revision_environment = false
# set to 'true' to allow .pyc and .pyo files without
# a source .py file to be detected as revisions in the
# versions/ directory
# sourceless = false
# version location specification; This defaults
# to alembic/versions. When using multiple version
# directories, initial revisions must be specified with --version-path.
# The path separator used here should be the separator specified by "version_path_separator" below.
# version_locations = %(here)s/bar:%(here)s/bat:alembic/versions
# version path separator; As mentioned above, this is the character used to split
# version_locations. The default within new alembic.ini files is "os", which uses os.pathsep.
# If this key is omitted entirely, it falls back to the legacy behavior of splitting on spaces and/or commas.
# Valid values for version_path_separator are:
#
# version_path_separator = :
# version_path_separator = ;
# version_path_separator = space
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
# the output encoding used when revision files
# are written from script.py.mako
# output_encoding = utf-8
sqlalchemy.url = postgresql://pf-budget:muster-neutron-omega@database.lan/pf-budget
[post_write_hooks]
# post_write_hooks defines scripts or Python functions that are run
# on newly generated revision scripts. See the documentation for further
# detail and examples
# format using "black" - use the console_scripts runner, against the "black" entrypoint
hooks = black
black.type = console_scripts
black.entrypoint = black
# black.options = -l 79 REVISION_SCRIPT_FILENAME
# Logging configuration
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

1
alembic/README Normal file
View File

@ -0,0 +1 @@
Generic single-database configuration.

88
alembic/env.py Normal file
View File

@ -0,0 +1,88 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
from pfbudget.db.model import Base
target_metadata = Base.metadata
# other values from the config, defined by the needs of env.py,
# can be acquired:
# my_important_option = config.get_main_option("my_important_option")
# ... etc.
def include_name(name, type_, parent_names):
if type_ == "schema":
return name in ["bank", "category", "tag", "transactions"]
else:
return True
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
include_name=include_name,
include_schemas=True,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()

24
alembic/script.py.mako Normal file
View File

@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade() -> None:
${upgrades if upgrades else "pass"}
def downgrade() -> None:
${downgrades if downgrades else "pass"}

View File

@ -0,0 +1,32 @@
"""Regex rule
Revision ID: 0ce89e987770
Revises: 7adf89ec8d14
Create Date: 2022-12-10 14:00:49.418494+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "0ce89e987770"
down_revision = "7adf89ec8d14"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column("regex", sa.String(), nullable=True),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_column("categories_rules", "regex", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,40 @@
"""Split member of base transaction
Revision ID: 18572111d9ff
Revises: 28556ab17c56
Create Date: 2023-01-23 20:09:37.892997+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "18572111d9ff"
down_revision = "28556ab17c56"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"transactions",
"split",
existing_type=sa.BOOLEAN(),
nullable=False,
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"transactions",
"split",
existing_type=sa.BOOLEAN(),
nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,88 @@
"""Selector back to transaction
Revision ID: 28556ab17c56
Revises: e455c78df789
Create Date: 2023-01-23 00:34:39.062562+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "28556ab17c56"
down_revision = "e455c78df789"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("tags", "tagged", schema="transactions")
op.create_table(
"selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_selector")),
schema="transactions",
)
op.drop_table("selector", schema="category")
op.rename_table("originals", "transactions", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.rename_table("transactions", "originals", schema="transactions")
op.create_table(
"selector",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="category",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name="fk_selector_id_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_selector"),
schema="category",
)
op.drop_table("selector", schema="transactions")
op.rename_table("tagged", "tags", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,109 @@
"""Add relationships
Revision ID: 287fe9e6682a
Revises: d3534f493239
Create Date: 2022-12-03 16:43:39.633382+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "287fe9e6682a"
down_revision = "d3534f493239"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"fk_categorized_id_originals",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_id_originals"),
"categorized",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_notes_id_originals", "notes", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_notes_id_originals"),
"notes",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_tags_id_originals", "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_tags_id_originals"),
"tags",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_id_originals"), "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
"fk_tags_id_originals",
"tags",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_notes_id_originals"),
"notes",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_notes_id_originals",
"notes",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_categorized_id_originals"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_id_originals",
"categorized",
"originals",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,49 @@
"""Available categories and rules
Revision ID: 2d0891f1be11
Revises: 287fe9e6682a
Create Date: 2022-12-04 11:15:22.758487+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "2d0891f1be11"
down_revision = "287fe9e6682a"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_available")),
schema="transactions",
)
op.create_table(
"categories_rules",
sa.Column("name", sa.String(), nullable=False),
sa.Column("rule", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name=op.f("fk_categories_rules_name_categories_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", "rule", name=op.f("pk_categories_rules")),
schema="transactions",
)
op.alter_column("categorized", "category", new_column_name="name", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column("categorized", "name", new_column_name="category", schema="transactions")
op.drop_table("categories_rules", schema="transactions")
op.drop_table("categories_available", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,74 @@
"""Inheritance
Revision ID: 37d80de801a7
Revises: 8cc9870b0d74
Create Date: 2023-01-10 22:41:03.540108+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "37d80de801a7"
down_revision = "8cc9870b0d74"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"originals",
sa.Column("type", sa.String(), nullable=False),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("split", sa.Boolean(), nullable=True),
schema="transactions",
)
op.add_column(
"originals",
sa.Column("original", sa.BigInteger(), nullable=True),
schema="transactions",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=True,
schema="transactions",
)
op.create_foreign_key(
op.f("fk_originals_original_originals"),
"originals",
"originals",
["original"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_originals_original_originals"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.alter_column(
"originals",
"bank",
existing_type=sa.TEXT(),
nullable=False,
schema="transactions",
)
op.drop_column("originals", "original", schema="transactions")
op.drop_column("originals", "split", schema="transactions")
op.drop_column("originals", "type", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,111 @@
"""Init
Revision ID: 50ff1fbb8a00
Revises:
Create Date: 2022-12-03 11:49:30.450115+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "50ff1fbb8a00"
down_revision = None
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"banks",
sa.Column("name", sa.String(), nullable=False),
sa.Column("BIC", sa.String(length=8), nullable=False),
sa.Column(
"type",
sa.Enum(
"checking",
"savings",
"investment",
"VISA",
"MASTERCARD",
name="accounttype",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name=op.f("pk_banks")),
sa.UniqueConstraint("name", name=op.f("uq_banks_name")),
schema="transactions",
)
op.create_table(
"nordigen",
sa.Column("name", sa.Text(), nullable=False),
sa.Column("bank_id", sa.String(), nullable=True),
sa.Column("requisition_id", sa.String(), nullable=True),
sa.Column("invert", sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(
["name"], ["transactions.banks.name"], name=op.f("fk_nordigen_name_banks")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_nordigen")),
schema="transactions",
)
op.create_table(
"originals",
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column("date", sa.Date(), nullable=False),
sa.Column("description", sa.String(), nullable=True),
sa.Column("bank", sa.Text(), nullable=False),
sa.Column("amount", sa.Numeric(precision=16, scale=2), nullable=False),
sa.ForeignKeyConstraint(
["bank"], ["transactions.banks.name"], name=op.f("fk_originals_bank_banks")
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_originals")),
schema="transactions",
)
op.create_table(
"categorized",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("category", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.originals.id"],
name=op.f("fk_categorized_id_originals"),
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_categorized")),
schema="transactions",
)
op.create_table(
"notes",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("note", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"], ["transactions.originals.id"], name=op.f("fk_notes_id_originals")
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_notes")),
schema="transactions",
)
op.create_table(
"tags",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"], ["transactions.originals.id"], name=op.f("fk_tags_id_originals")
),
sa.PrimaryKeyConstraint("id", "tag", name=op.f("pk_tags")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("tags", schema="transactions")
op.drop_table("notes", schema="transactions")
op.drop_table("categorized", schema="transactions")
op.drop_table("originals", schema="transactions")
op.drop_table("nordigen", schema="transactions")
op.drop_table("banks", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,54 @@
"""Category selector
Revision ID: 6863dda76ea2
Revises: 83f4c9837f6e
Create Date: 2022-12-08 00:56:59.032641+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6863dda76ea2"
down_revision = "83f4c9837f6e"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector",
schema="transactions",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_categories_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_categories_selector")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("categories_selector", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,152 @@
"""Rule inheritance
Revision ID: 6b293f78cc97
Revises: 37d80de801a7
Create Date: 2023-01-22 20:05:32.887092+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "6b293f78cc97"
down_revision = "37d80de801a7"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), autoincrement=True, nullable=False),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("regex", sa.String(), nullable=True),
sa.Column("bank", sa.String(), nullable=True),
sa.Column("min", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("max", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("type", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_categories_rules_id_rules"),
"categories_rules",
"rules",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_column("categories_rules", "bank", schema="transactions")
op.drop_column("categories_rules", "min", schema="transactions")
op.drop_column("categories_rules", "date", schema="transactions")
op.drop_column("categories_rules", "regex", schema="transactions")
op.drop_column("categories_rules", "description", schema="transactions")
op.drop_column("categories_rules", "max", schema="transactions")
op.create_foreign_key(
op.f("fk_tag_rules_id_rules"),
"tag_rules",
"rules",
["id"],
["id"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.drop_column("tag_rules", "bank", schema="transactions")
op.drop_column("tag_rules", "min", schema="transactions")
op.drop_column("tag_rules", "date", schema="transactions")
op.drop_column("tag_rules", "regex", schema="transactions")
op.drop_column("tag_rules", "description", schema="transactions")
op.drop_column("tag_rules", "max", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"tag_rules",
sa.Column(
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column(
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"tag_rules",
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.drop_constraint(
op.f("fk_tag_rules_id_rules"),
"tag_rules",
schema="transactions",
type_="foreignkey",
)
op.add_column(
"categories_rules",
sa.Column(
"max", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("description", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("regex", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("date", sa.DATE(), autoincrement=False, nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column(
"min", sa.NUMERIC(precision=16, scale=2), autoincrement=False, nullable=True
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("bank", sa.VARCHAR(), autoincrement=False, nullable=True),
schema="transactions",
)
op.drop_constraint(
op.f("fk_categories_rules_id_rules"),
"categories_rules",
schema="transactions",
type_="foreignkey",
)
op.drop_table("rules", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,49 @@
"""Rule min/max
Revision ID: 753c0bfb2062
Revises: e36e6321568e
Create Date: 2022-12-18 00:24:03.861461+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "753c0bfb2062"
down_revision = "e36e6321568e"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min_amount",
new_column_name="min",
schema="transactions",
)
op.alter_column(
"categories_rules",
"max_amount",
new_column_name="max",
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min",
new_column_name="min_amount",
schema="transactions",
)
op.alter_column(
"categories_rules",
"max",
new_column_name="max_amount",
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,43 @@
"""Category rule date format
Revision ID: 7adf89ec8d14
Revises: 83603bb7ef9c
Create Date: 2022-12-10 00:08:47.535765+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "7adf89ec8d14"
down_revision = "83603bb7ef9c"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"date",
existing_type=sa.VARCHAR(),
type_=sa.Date(),
existing_nullable=True,
schema="transactions",
postgresql_using="date::date"
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"date",
existing_type=sa.Date(),
type_=sa.VARCHAR(),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,38 @@
"""Amount of transaction per period
Revision ID: 83603bb7ef9c
Revises: 8b5d5fbc8211
Create Date: 2022-12-09 23:12:15.644758+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "83603bb7ef9c"
down_revision = "8b5d5fbc8211"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_schedules",
sa.Column("amount", sa.Integer(), nullable=True),
schema="transactions",
)
op.drop_column("categories_schedules", "recurring", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_schedules",
sa.Column("recurring", sa.BOOLEAN(), autoincrement=False, nullable=False),
schema="transactions",
)
op.drop_column("categories_schedules", "amount", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,69 @@
"""Category groups and relationships
Revision ID: 83f4c9837f6e
Revises: 2d0891f1be11
Create Date: 2022-12-04 15:10:51.924875+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "83f4c9837f6e"
down_revision = "2d0891f1be11"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_groups",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_groups")),
schema="transactions",
)
op.add_column(
"categories_available",
sa.Column("group", sa.String(), nullable=True),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_categories_available_group_categories_groups"),
"categories_available",
"categories_groups",
["group"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.create_foreign_key(
op.f("fk_categorized_name_categories_available"),
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_categorized_name_categories_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.drop_constraint(
op.f("fk_categories_available_group_categories_groups"),
"categories_available",
schema="transactions",
type_="foreignkey",
)
op.drop_column("categories_available", "group", schema="transactions")
op.drop_table("categories_groups", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,92 @@
"""Transaction based rules
Revision ID: 8b5d5fbc8211
Revises: e77395969585
Create Date: 2022-12-08 21:05:41.378466+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "8b5d5fbc8211"
down_revision = "e77395969585"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.drop_constraint("pk_categories_rules", "categories_rules", schema="transactions")
op.execute(sa.schema.CreateSequence(sa.schema.Sequence("categories_rules_id_seq", schema="transactions")))
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column(
"id",
sa.BigInteger(),
autoincrement=True,
nullable=False,
server_default=sa.text(
"nextval('transactions.categories_rules_id_seq'::regclass)"
),
),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("date", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("description", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("bank", sa.String(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("min_amount", sa.Float(), nullable=True),
schema="transactions",
)
op.add_column(
"categories_rules",
sa.Column("max_amount", sa.Float(), nullable=True),
schema="transactions",
)
op.drop_column("categories_rules", "rule", schema="transactions")
# ### end Alembic commands ###
op.create_primary_key(
"pk_categories_rules",
"categories_rules",
["id"],
schema="transactions",
)
def downgrade() -> None:
op.drop_constraint("pk_categories_rules", "categories_rules", schema="transactions")
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"categories_rules",
sa.Column("rule", sa.String(), autoincrement=False, nullable=False),
schema="transactions",
)
op.drop_column("categories_rules", "max_amount", schema="transactions")
op.drop_column("categories_rules", "min_amount", schema="transactions")
op.drop_column("categories_rules", "bank", schema="transactions")
op.drop_column("categories_rules", "description", schema="transactions")
op.drop_column("categories_rules", "date", schema="transactions")
op.drop_column("categories_rules", "id", schema="transactions")
# ### end Alembic commands ###
op.execute(sa.schema.DropSequence(sa.schema.Sequence("categories_rules_id_seq", schema="transactions")))
op.create_primary_key(
"pk_categories_rules",
"categories_rules",
["name", "rule"],
schema="transactions",
)

View File

@ -0,0 +1,46 @@
"""Links
Revision ID: 8cc9870b0d74
Revises: a910e1b2214d
Create Date: 2022-12-19 22:10:25.136479+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "8cc9870b0d74"
down_revision = "a910e1b2214d"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"links",
sa.Column("original", sa.BigInteger(), nullable=False),
sa.Column("link", sa.BigInteger(), nullable=False),
sa.ForeignKeyConstraint(
["link"],
["transactions.originals.id"],
name=op.f("fk_links_link_originals"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["original"],
["transactions.originals.id"],
name=op.f("fk_links_original_originals"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("original", "link", name=op.f("pk_links")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("links", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,68 @@
"""Tag rules
Revision ID: 9028b0f3b985
Revises: 753c0bfb2062
Create Date: 2022-12-18 22:53:13.334046+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "9028b0f3b985"
down_revision = "753c0bfb2062"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"tags_available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_tags_available")),
schema="transactions",
)
op.create_table(
"tag_rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.Column("date", sa.Date(), nullable=True),
sa.Column("description", sa.String(), nullable=True),
sa.Column("regex", sa.String(), nullable=True),
sa.Column("bank", sa.String(), nullable=True),
sa.Column("min", sa.Numeric(precision=16, scale=2), nullable=True),
sa.Column("max", sa.Numeric(precision=16, scale=2), nullable=True),
sa.ForeignKeyConstraint(
["tag"],
["transactions.tags_available.name"],
name=op.f("fk_tag_rules_tag_tags_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_tag_rules")),
schema="transactions",
)
op.create_foreign_key(
op.f("fk_tags_tag_tags_available"),
"tags",
"tags_available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_tag_tags_available"),
"tags",
schema="transactions",
type_="foreignkey",
)
op.drop_table("tag_rules", schema="transactions")
op.drop_table("tags_available", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,32 @@
"""Start/End date rule
Revision ID: 952de57a3c43
Revises: 18572111d9ff
Create Date: 2023-02-06 21:57:57.545327+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "952de57a3c43"
down_revision = "18572111d9ff"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.add_column(
"rules", sa.Column("start", sa.Date(), nullable=True), schema="transactions"
)
op.alter_column("rules", column_name="date", new_column_name="end", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column("rules", column_name="end", new_column_name="date", schema="transactions")
op.drop_column("rules", "start", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,56 @@
"""Rule inheritance
Revision ID: a910e1b2214d
Revises: 9028b0f3b985
Create Date: 2022-12-19 20:48:04.682812+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "a910e1b2214d"
down_revision = "9028b0f3b985"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
"fk_categorized_name_categories_available",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_name_categories_available"),
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_categorized_name_categories_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_name_categories_available",
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,53 @@
"""Category schedule
Revision ID: d18cbd50f7c6
Revises: 6863dda76ea2
Create Date: 2022-12-08 13:30:29.048811+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "d18cbd50f7c6"
down_revision = "6863dda76ea2"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"categories_schedules",
sa.Column("name", sa.String(), nullable=False),
sa.Column("recurring", sa.Boolean(), nullable=False),
sa.Column(
"period",
sa.Enum(
"daily",
"monthly",
"yearly",
name="period",
schema="transactions",
inherit_schema=True,
),
nullable=True,
),
sa.Column("period_multiplier", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name=op.f("fk_categories_schedules_name_categories_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_categories_schedules")),
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table("categories_schedules", schema="transactions")
# ### end Alembic commands ###

View File

@ -0,0 +1,36 @@
"""Add meal card
Revision ID: d3534f493239
Revises: 50ff1fbb8a00
Create Date: 2022-12-03 12:18:33.519666+00:00
"""
from alembic import op
# revision identifiers, used by Alembic.
revision = "d3534f493239"
down_revision = "50ff1fbb8a00"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("ALTER TYPE transactions.accounttype ADD VALUE 'mealcard' BEFORE 'VISA'")
def downgrade() -> None:
op.execute(
"""CREATE TYPE transactions.accounttype_new
AS ENUM ('checking', 'savings', 'investment', 'VISA', 'MASTERCARD')
"""
)
op.execute("UPDATE transactions.banks SET type = DEFAULT WHERE type = 'mealcard'")
op.execute(
"""ALTER TABLE transactions.banks
ALTER COLUMN type TYPE transactions.accounttype_new
USING type::text::transactions.accounttype_new
"""
)
op.execute("DROP TYPE transactions.accounttype")
op.execute("ALTER TYPE transactions.accounttype_new RENAME TO accounttype")

View File

@ -0,0 +1,58 @@
"""Rules min/max money
Revision ID: e36e6321568e
Revises: 0ce89e987770
Create Date: 2022-12-10 18:55:07.149010+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "e36e6321568e"
down_revision = "0ce89e987770"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"min_amount",
existing_type=sa.DOUBLE_PRECISION(precision=53),
type_=sa.Numeric(precision=16, scale=2),
existing_nullable=True,
schema="transactions",
)
op.alter_column(
"categories_rules",
"max_amount",
existing_type=sa.DOUBLE_PRECISION(precision=53),
type_=sa.Numeric(precision=16, scale=2),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.alter_column(
"categories_rules",
"max_amount",
existing_type=sa.Numeric(precision=16, scale=2),
type_=sa.DOUBLE_PRECISION(precision=53),
existing_nullable=True,
schema="transactions",
)
op.alter_column(
"categories_rules",
"min_amount",
existing_type=sa.Numeric(precision=16, scale=2),
type_=sa.DOUBLE_PRECISION(precision=53),
existing_nullable=True,
schema="transactions",
)
# ### end Alembic commands ###

View File

@ -0,0 +1,452 @@
"""Divide by schemas
Revision ID: e455c78df789
Revises: 6b293f78cc97
Create Date: 2023-01-22 23:38:23.266906+00:00
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = "e455c78df789"
down_revision = "6b293f78cc97"
branch_labels = None
depends_on = None
def upgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.create_table(
"banks",
sa.Column("name", sa.String(), nullable=False),
sa.Column("BIC", sa.String(length=8), nullable=False),
sa.Column(
"type",
sa.Enum(
"checking",
"savings",
"investment",
"mealcard",
"VISA",
"MASTERCARD",
name="accounttype",
schema="bank",
inherit_schema=True,
),
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name=op.f("pk_banks")),
sa.UniqueConstraint("name", name=op.f("uq_banks_name")),
schema="bank",
)
op.create_table(
"groups",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_groups")),
schema="category",
)
op.create_table(
"available",
sa.Column("name", sa.String(), nullable=False),
sa.PrimaryKeyConstraint("name", name=op.f("pk_available")),
schema="tag",
)
op.create_table(
"nordigen",
sa.Column("name", sa.Text(), nullable=False),
sa.Column("bank_id", sa.String(), nullable=True),
sa.Column("requisition_id", sa.String(), nullable=True),
sa.Column("invert", sa.Boolean(), nullable=True),
sa.ForeignKeyConstraint(
["name"], ["bank.banks.name"], name=op.f("fk_nordigen_name_banks")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_nordigen")),
schema="bank",
)
op.create_table(
"available",
sa.Column("name", sa.String(), nullable=False),
sa.Column("group", sa.String(), nullable=True),
sa.ForeignKeyConstraint(
["group"], ["category.groups.name"], name=op.f("fk_available_group_groups")
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_available")),
schema="category",
)
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("tag", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name=op.f("fk_rules_id_rules"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["tag"],
["tag.available.name"],
name=op.f("fk_rules_tag_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="tag",
)
op.create_table(
"rules",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column("name", sa.String(), nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name=op.f("fk_rules_id_rules"),
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["name"],
["category.available.name"],
name=op.f("fk_rules_name_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_rules")),
schema="category",
)
op.create_table(
"schedules",
sa.Column("name", sa.String(), nullable=False),
sa.Column(
"period",
sa.Enum(
"daily",
"weekly",
"monthly",
"yearly",
name="period",
schema="category",
inherit_schema=True,
),
nullable=True,
),
sa.Column("period_multiplier", sa.Integer(), nullable=True),
sa.Column("amount", sa.Integer(), nullable=True),
sa.ForeignKeyConstraint(
["name"],
["category.available.name"],
name=op.f("fk_schedules_name_available"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name=op.f("pk_schedules")),
schema="category",
)
op.create_table(
"selector",
sa.Column("id", sa.BigInteger(), nullable=False),
sa.Column(
"selector",
sa.Enum(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector_t",
schema="category",
inherit_schema=True,
),
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name=op.f("fk_selector_id_categorized"),
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name=op.f("pk_selector")),
schema="category",
)
op.drop_constraint(
"fk_categorized_name_categories_available",
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_categorized_name_available"),
"categorized",
"available",
["name"],
["name"],
source_schema="transactions",
referent_schema="category",
ondelete="CASCADE",
)
op.drop_constraint(
"fk_originals_bank_banks",
"originals",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
op.f("fk_originals_bank_banks"),
"originals",
"banks",
["bank"],
["name"],
source_schema="transactions",
referent_schema="bank",
)
op.drop_constraint(
"fk_tags_tag_tags_available", "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
op.f("fk_tags_tag_available"),
"tags",
"available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="tag",
)
op.drop_table("categories_schedules", schema="transactions")
op.drop_table("categories_rules", schema="transactions")
op.drop_table("categories_available", schema="transactions")
op.drop_table("tag_rules", schema="transactions")
op.drop_table("nordigen", schema="transactions")
op.drop_table("tags_available", schema="transactions")
op.drop_table("banks", schema="transactions")
op.drop_table("categories_selector", schema="transactions")
op.drop_table("categories_groups", schema="transactions")
# ### end Alembic commands ###
def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.drop_constraint(
op.f("fk_tags_tag_available"), "tags", schema="transactions", type_="foreignkey"
)
op.create_foreign_key(
"fk_tags_tag_tags_available",
"tags",
"tags_available",
["tag"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_originals_bank_banks"),
"originals",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_originals_bank_banks",
"originals",
"banks",
["bank"],
["name"],
source_schema="transactions",
referent_schema="transactions",
)
op.drop_constraint(
op.f("fk_categorized_name_available"),
"categorized",
schema="transactions",
type_="foreignkey",
)
op.create_foreign_key(
"fk_categorized_name_categories_available",
"categorized",
"categories_available",
["name"],
["name"],
source_schema="transactions",
referent_schema="transactions",
ondelete="CASCADE",
)
op.create_table(
"categories_groups",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("name", name="pk_categories_groups"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"categories_selector",
sa.Column("id", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"selector",
postgresql.ENUM(
"unknown",
"nullifier",
"vacations",
"rules",
"algorithm",
"manual",
name="selector",
schema="transactions",
),
autoincrement=False,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.categorized.id"],
name="fk_categories_selector_id_categorized",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_categories_selector"),
schema="transactions",
)
op.create_table(
"banks",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("BIC", sa.VARCHAR(length=8), autoincrement=False, nullable=False),
sa.Column(
"type",
postgresql.ENUM(
"checking",
"savings",
"investment",
"mealcard",
"VISA",
"MASTERCARD",
name="accounttype",
schema="transactions",
),
autoincrement=False,
nullable=False,
),
sa.PrimaryKeyConstraint("BIC", "type", name="pk_banks"),
sa.UniqueConstraint("name", name="uq_banks_name"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"tags_available",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.PrimaryKeyConstraint("name", name="pk_tags_available"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"nordigen",
sa.Column("name", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("bank_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("requisition_id", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.Column("invert", sa.BOOLEAN(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["name"], ["transactions.banks.name"], name="fk_nordigen_name_banks"
),
sa.PrimaryKeyConstraint("name", name="pk_nordigen"),
schema="transactions",
)
op.create_table(
"tag_rules",
sa.Column(
"id",
sa.BIGINT(),
server_default=sa.text(
"nextval('transactions.tag_rules_id_seq'::regclass)"
),
autoincrement=True,
nullable=False,
),
sa.Column("tag", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name="fk_tag_rules_id_rules",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["tag"],
["transactions.tags_available.name"],
name="fk_tag_rules_tag_tags_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_tag_rules"),
schema="transactions",
)
op.create_table(
"categories_rules",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"id",
sa.BIGINT(),
server_default=sa.text(
"nextval('transactions.categories_rules_id_seq'::regclass)"
),
autoincrement=True,
nullable=False,
),
sa.ForeignKeyConstraint(
["id"],
["transactions.rules.id"],
name="fk_categories_rules_id_rules",
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name="fk_categories_rules_name_categories_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("id", name="pk_categories_rules"),
schema="transactions",
)
op.create_table(
"categories_available",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column("group", sa.VARCHAR(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["group"],
["transactions.categories_groups.name"],
name="fk_categories_available_group_categories_groups",
),
sa.PrimaryKeyConstraint("name", name="pk_categories_available"),
schema="transactions",
postgresql_ignore_search_path=False,
)
op.create_table(
"categories_schedules",
sa.Column("name", sa.VARCHAR(), autoincrement=False, nullable=False),
sa.Column(
"period",
postgresql.ENUM(
"daily",
"weekly",
"monthly",
"yearly",
name="period",
schema="transactions",
),
autoincrement=False,
nullable=True,
),
sa.Column(
"period_multiplier", sa.INTEGER(), autoincrement=False, nullable=True
),
sa.Column("amount", sa.INTEGER(), autoincrement=False, nullable=True),
sa.ForeignKeyConstraint(
["name"],
["transactions.categories_available.name"],
name="fk_categories_schedules_name_categories_available",
ondelete="CASCADE",
),
sa.PrimaryKeyConstraint("name", name="pk_categories_schedules"),
schema="transactions",
)
op.drop_table("selector", schema="category")
op.drop_table("schedules", schema="category")
op.drop_table("rules", schema="category")
op.drop_table("rules", schema="tag")
op.drop_table("available", schema="category")
op.drop_table("nordigen", schema="bank")
op.drop_table("available", schema="tag")
op.drop_table("groups", schema="category")
op.drop_table("banks", schema="bank")
# ### end Alembic commands ###

View File

@ -0,0 +1,37 @@
"""Weekly period
Revision ID: e77395969585
Revises: d18cbd50f7c6
Create Date: 2022-12-08 16:35:27.506504+00:00
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "e77395969585"
down_revision = "d18cbd50f7c6"
branch_labels = None
depends_on = None
def upgrade() -> None:
op.execute("ALTER TYPE transactions.period ADD VALUE 'weekly' AFTER 'daily'")
def downgrade() -> None:
op.execute(
"""CREATE TYPE transactions.period_new
AS ENUM ('daily', 'monthly', 'yearly')
"""
)
op.execute("UPDATE transactions.categories_schedules SET period = DEFAULT WHERE period = 'weekly'")
op.execute(
"""ALTER TABLE transactions.categories_schedules
ALTER COLUMN period TYPE transactions.period_new
USING period::text::transactions.period_new
"""
)
op.execute("DROP TYPE transactions.period")
op.execute("ALTER TYPE transactions.period_new RENAME TO period")

View File

@ -1,4 +0,0 @@
from pfbudget import run
if __name__ == "__main__":
run()

View File

@ -1,7 +1,2 @@
__all__ = ["run", "parse_data", "categorize_data"]
__author__ = "Luís Murta" __author__ = "Luís Murta"
__version__ = "0.1" __version__ = "0.1"
from pfbudget.core.categories import categorize_data
from pfbudget.cli.runnable import run
from pfbudget.input.parsers import parse_data

View File

@ -1,4 +1,267 @@
from pfbudget.cli.runnable import run from pfbudget.cli.argparser import argparser
from pfbudget.cli.interactive import Interactive
from pfbudget.common.types import Operation
from pfbudget.core.manager import Manager
import pfbudget.db.model as type
from pfbudget.utils.utils import parse_args_period
if __name__ == "__main__": if __name__ == "__main__":
run() argparser = argparser()
args = vars(argparser.parse_args())
assert "op" in args, "No Operation selected"
op: Operation = args.pop("op")
assert "database" in args, "No database selected"
db = args.pop("database")
assert "verbose" in args, "No verbose level specified"
verbosity = args.pop("verbose")
params = []
match (op):
case Operation.ManualCategorization:
Interactive(Manager(db, verbosity)).start()
exit()
case Operation.Categorize:
keys = {"no_nulls"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["no_nulls"]]
case Operation.Parse:
keys = {"path", "bank", "creditcard"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["path"], args["bank"], args["creditcard"]]
case Operation.RequisitionId:
keys = {"name", "country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["name"][0], args["country"][0]]
case Operation.Download:
keys = {"all", "banks", "interval", "start", "end", "year", "dry_run"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
start, end = parse_args_period(args)
params = [start, end, args["dry_run"]]
if not args["all"]:
params.append(args["banks"])
else:
params.append([])
case Operation.BankAdd:
keys = {"bank", "bic", "type"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.Bank(
args["bank"][0],
args["bic"][0],
args["type"][0],
)
]
case Operation.BankMod:
keys = {"bank", "bic", "type", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["bic", "type"]
param = {"name": args["bank"][0]}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params = [param]
case Operation.BankDel:
assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"]
case Operation.NordigenAdd:
keys = {"bank", "bank_id", "requisition_id", "invert"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.Nordigen(
args["bank"][0],
args["bank_id"][0] if args["bank_id"] else None,
args["requisition_id"][0] if args["requisition_id"] else None,
args["invert"] if args["invert"] else None,
)
]
case Operation.NordigenMod:
keys = {"bank", "bank_id", "requisition_id", "invert", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["bank_id", "requisition_id"]
nargs_0 = ["invert"]
param = {"name": args["bank"][0]}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: v for k, v in args.items() if k in nargs_0}
param |= {k: None for k in args["remove"] if k in nargs_1}
params = [param]
case Operation.NordigenDel:
assert len(args["bank"]) > 0, "argparser ill defined"
params = args["bank"]
case Operation.NordigenCountryBanks:
keys = {"country"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["country"][0]]
case Operation.CategoryAdd:
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Category(cat, args["group"]) for cat in args["category"]]
case Operation.CategoryUpdate:
keys = {"category", "group"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Category(cat) for cat in args["category"]]
params.append(args["group"])
case Operation.CategoryRemove:
assert "category" in args, "argparser ill defined"
params = [type.Category(cat) for cat in args["category"]]
case Operation.CategorySchedule:
keys = {"category", "period", "frequency"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.CategorySchedule(
cat, args["period"][0], args["frequency"][0], None
)
for cat in args["category"]
]
case Operation.RuleAdd:
keys = {"category", "start", "end", "description", "regex", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.CategoryRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
cat,
)
for cat in args["category"]
]
case Operation.RuleRemove | Operation.TagRuleRemove:
keys = {"id"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = args["id"]
case Operation.RuleModify:
keys = {
"id",
"category",
"date",
"description",
"bank",
"min",
"max",
"remove",
}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["category", "date", "description", "regex", "bank", "min", "max"]
params = []
for id in args["id"]:
param = {"id": id}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params.append(param)
case Operation.TagAdd:
keys = {"tag"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [type.Tag(tag) for tag in args["tag"]]
case Operation.TagRuleAdd:
keys = {"tag", "start", "end", "description", "regex", "bank", "min", "max"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [
type.TagRule(
args["start"][0] if args["start"] else None,
args["end"][0] if args["end"] else None,
args["description"][0] if args["description"] else None,
args["regex"][0] if args["regex"] else None,
args["bank"][0] if args["bank"] else None,
args["min"][0] if args["min"] else None,
args["max"][0] if args["max"] else None,
tag,
)
for tag in args["tag"]
]
case Operation.TagRuleModify:
keys = {"id", "tag", "date", "description", "bank", "min", "max", "remove"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
nargs_1 = ["tag", "date", "description", "regex", "bank", "min", "max"]
params = []
for id in args["id"]:
param = {"id": id}
param |= {k: v[0] for k, v in args.items() if k in nargs_1 and args[k]}
param |= {k: None for k in args["remove"] if k in nargs_1}
params.append(param)
case Operation.GroupAdd:
assert "group" in args, "argparser ill defined"
params = [type.CategoryGroup(group) for group in args["group"]]
case Operation.GroupRemove:
assert "group" in args, "argparser ill defined"
params = [type.CategoryGroup(group) for group in args["group"]]
case Operation.Forge | Operation.Dismantle:
keys = {"original", "links"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["original"][0], args["links"]]
case (
Operation.Export
| Operation.Import
| Operation.ExportBanks
| Operation.ImportBanks
| Operation.ExportCategoryRules
| Operation.ImportCategoryRules
| Operation.ExportTagRules
| Operation.ImportTagRules
| Operation.ExportCategories
| Operation.ImportCategories
| Operation.ExportCategoryGroups
| Operation.ImportCategoryGroups
):
keys = {"file", "format"}
assert args.keys() >= keys, f"missing {args.keys() - keys}"
params = [args["file"][0], args["format"][0]]
Manager(db, verbosity).action(op, params)

405
pfbudget/cli/argparser.py Normal file
View File

@ -0,0 +1,405 @@
import argparse
import datetime as dt
import decimal
from dotenv import load_dotenv
import os
import re
from pfbudget.common.types import Operation
from pfbudget.db.model import AccountType, Period
from pfbudget.db.sqlite import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils.utils
load_dotenv()
DEFAULT_DB = os.environ.get("DEFAULT_DB")
def argparser() -> argparse.ArgumentParser:
universal = argparse.ArgumentParser(add_help=False)
universal.add_argument(
"-db",
"--database",
nargs="?",
help="select current database",
default=DEFAULT_DB,
)
universal.add_argument("-v", "--verbose", action="count", default=0)
period = argparse.ArgumentParser(add_help=False)
period_group = period.add_mutually_exclusive_group()
period_group.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
)
period_group.add_argument("--start", type=str, nargs=1, help="graph start date")
period_group.add_argument("--end", type=str, nargs=1, help="graph end date")
period_group.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser(
description="does cool finance stuff",
parents=[universal],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
if version := re.search(
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]', open("pfbudget/__init__.py").read()
):
parser.add_argument(
"--version",
action="version",
version=version.group(1),
)
subparsers = parser.add_subparsers(required=True)
# TODO Init
# init = subparsers.add_parser("init")
# init.set_defaults(op=Operation.Init)
# Exports transactions to .csv file
export = subparsers.add_parser("export")
export.set_defaults(op=Operation.Export)
file_options(export)
pimport = subparsers.add_parser("import")
pimport.set_defaults(op=Operation.Import)
pimport.add_argument("file", nargs=1, type=str)
# Parse from .csv
parse = subparsers.add_parser("parse")
parse.set_defaults(op=Operation.Parse)
parse.add_argument("path", nargs="+", type=str)
parse.add_argument("--bank", nargs=1, type=str)
parse.add_argument("--creditcard", nargs=1, type=str)
# Automatic/manual categorization
categorize = subparsers.add_parser("categorize").add_subparsers(required=True)
auto = categorize.add_parser("auto")
auto.set_defaults(op=Operation.Categorize)
auto.add_argument("--no-nulls", action="store_false")
categorize.add_parser("manual").set_defaults(op=Operation.ManualCategorization)
"""
Graph
"""
p_graph = subparsers.add_parser(
"graph",
description="Graph of the transactions",
parents=[universal, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_graph.add_argument(
"option",
type=str,
choices=["monthly", "discrete", "networth"],
nargs="?",
default="monthly",
help="graph option help",
)
p_graph.add_argument("--save", action="store_true")
p_graph.set_defaults(func=graph)
"""
Report
"""
p_report = subparsers.add_parser(
"report",
description="Prints report of transaction groups",
parents=[universal, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_report.add_argument(
"option",
type=str,
choices=["net", "detailed"],
nargs="?",
default="net",
help="report option help",
)
p_report.set_defaults(func=report)
# Banks
bank(subparsers.add_parser("bank"))
# Nordigen access token
subparsers.add_parser("token").set_defaults(op=Operation.Token)
# Nordigen requisition id
requisition = subparsers.add_parser("eua")
requisition.set_defaults(op=Operation.RequisitionId)
requisition.add_argument("id", nargs=1, type=str)
requisition.add_argument("country", nargs=1, type=str)
# Download through the Nordigen API
download = subparsers.add_parser("download", parents=[period])
download.set_defaults(op=Operation.Download)
download_banks = download.add_mutually_exclusive_group()
download_banks.add_argument("--all", action="store_true")
download_banks.add_argument("--banks", nargs="+", type=str)
download.add_argument("--dry-run", action="store_true")
# List available banks in country C
banks = subparsers.add_parser("banks")
banks.set_defaults(op=Operation.NordigenCountryBanks)
banks.add_argument("country", nargs=1, type=str)
# Categories
category(subparsers.add_parser("category"))
# Tag
tags(subparsers.add_parser("tag"))
# Link
link(subparsers.add_parser("link"))
return parser
def graph(args):
"""Plots the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.utils.parse_args_period(args)
if args.option == "monthly":
pfbudget.reporting.graph.monthly(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "discrete":
pfbudget.reporting.graph.discrete(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "networth":
pfbudget.reporting.graph.networth(
DatabaseClient(args.database), vars(args), start, end
)
def report(args):
"""Prints a detailed report of the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.utils.parse_args_period(args)
if args.option == "net":
pfbudget.reporting.report.net(DatabaseClient(args.database), start, end)
elif args.option == "detailed":
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
def bank(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.BankAdd)
add.add_argument("bank", nargs=1, type=str)
add.add_argument("bic", nargs=1, type=str)
add.add_argument("type", nargs=1, type=str, choices=[e.name for e in AccountType])
rem = commands.add_parser("del")
rem.set_defaults(op=Operation.BankDel)
rem.add_argument("bank", nargs="+", type=str)
mod = commands.add_parser("mod")
mod.set_defaults(op=Operation.BankMod)
mod.add_argument("bank", nargs=1, type=str)
mod.add_argument("--bic", nargs=1, type=str)
mod.add_argument("--type", nargs=1, type=str, choices=[e.name for e in AccountType])
mod.add_argument("--remove", nargs="*", default=[], type=str)
nordigen(commands.add_parser("nordigen"))
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportBanks)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportBanks)
file_options(pimport)
def nordigen(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.NordigenAdd)
add.add_argument("bank", nargs=1, type=str)
add.add_argument("--bank_id", nargs=1, type=str)
add.add_argument("--requisition_id", nargs=1, type=str)
add.add_argument("--invert", action="store_true")
rem = commands.add_parser("del")
rem.set_defaults(op=Operation.NordigenDel)
rem.add_argument("bank", nargs="+", type=str)
mod = commands.add_parser("mod")
mod.set_defaults(op=Operation.NordigenMod)
mod.add_argument("bank", nargs=1, type=str)
mod.add_argument("--bank_id", nargs=1, type=str)
mod.add_argument("--requisition_id", nargs=1, type=str)
mod.add_argument("--invert", action="store_true")
mod.add_argument("--remove", nargs="*", default=[], type=str)
def category(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.CategoryAdd)
add.add_argument("category", nargs="+", type=str)
add.add_argument("--group", nargs="?", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.CategoryRemove)
remove.add_argument("category", nargs="+", type=str)
update = commands.add_parser("update")
update.set_defaults(op=Operation.CategoryUpdate)
update.add_argument("category", nargs="+", type=str)
update.add_argument("--group", nargs="?", type=str)
schedule = commands.add_parser("schedule")
schedule.set_defaults(op=Operation.CategorySchedule)
schedule.add_argument("category", nargs="+", type=str)
schedule.add_argument("period", nargs=1, choices=[e.value for e in Period])
schedule.add_argument("--frequency", nargs=1, default=[1], type=int)
rule = commands.add_parser("rule")
category_rule(rule)
group = commands.add_parser("group")
category_group(group)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategories)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategories)
file_options(pimport)
def category_group(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.GroupAdd)
add.add_argument("group", nargs="+", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.GroupRemove)
remove.add_argument("group", nargs="+", type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryGroups)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryGroups)
file_options(pimport)
def category_rule(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.RuleAdd)
add.add_argument("category", nargs="+", type=str)
rules(add)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.RuleRemove)
remove.add_argument("id", nargs="+", type=int)
modify = commands.add_parser("modify")
modify.set_defaults(op=Operation.RuleModify)
modify.add_argument("id", nargs="+", type=int)
modify.add_argument("--category", nargs=1, type=str)
rules(modify)
modify.add_argument("--remove", nargs="*", default=[], type=str)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportCategoryRules)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportCategoryRules)
file_options(pimport)
def tags(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.TagAdd)
add.add_argument("tag", nargs="+", type=str)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.TagRemove)
remove.add_argument("tag", nargs="+", type=str)
rule = commands.add_parser("rule")
tag_rule(rule)
def tag_rule(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
add = commands.add_parser("add")
add.set_defaults(op=Operation.TagRuleAdd)
add.add_argument("tag", nargs="+", type=str)
rules(add)
remove = commands.add_parser("remove")
remove.set_defaults(op=Operation.TagRuleRemove)
remove.add_argument("id", nargs="+", type=int)
modify = commands.add_parser("modify")
modify.set_defaults(op=Operation.TagRuleModify)
modify.add_argument("id", nargs="+", type=int)
modify.add_argument("--tag", nargs=1, type=str)
rules(modify)
export = commands.add_parser("export")
export.set_defaults(op=Operation.ExportTagRules)
file_options(export)
pimport = commands.add_parser("import")
pimport.set_defaults(op=Operation.ImportTagRules)
file_options(pimport)
def rules(parser: argparse.ArgumentParser):
parser.add_argument("--start", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--end", nargs=1, type=dt.date.fromisoformat)
parser.add_argument("--description", nargs=1, type=str)
parser.add_argument("--regex", nargs=1, type=str)
parser.add_argument("--bank", nargs=1, type=str)
parser.add_argument("--min", nargs=1, type=decimal.Decimal)
parser.add_argument("--max", nargs=1, type=decimal.Decimal)
def link(parser: argparse.ArgumentParser):
commands = parser.add_subparsers(required=True)
forge = commands.add_parser("forge")
forge.set_defaults(op=Operation.Forge)
forge.add_argument("original", nargs=1, type=int)
forge.add_argument("links", nargs="+", type=int)
dismantle = commands.add_parser("dismantle")
dismantle.set_defaults(op=Operation.Dismantle)
dismantle.add_argument("original", nargs=1, type=int)
dismantle.add_argument("links", nargs="+", type=int)
def file_options(parser: argparse.ArgumentParser):
parser.add_argument("file", nargs=1, type=str)
parser.add_argument("format", nargs=1, default="pickle")

120
pfbudget/cli/interactive.py Normal file
View File

@ -0,0 +1,120 @@
import decimal
from ..core.manager import Manager
from ..db.model import (
Category,
CategorySelector,
Note,
Selector_T,
SplitTransaction,
Tag,
Transaction,
TransactionCategory,
TransactionTag,
)
class Interactive:
help = "category(:tag)/split/note:/skip/quit"
selector = Selector_T.manual
def __init__(self, manager: Manager) -> None:
self.manager = manager
with self.manager.db.session() as session:
self.categories = session.get(Category)
self.tags = session.get(Tag)
session.expunge_all()
def intro(self) -> None:
print(
f"Welcome! Available categories are {[c.name for c in self.categories]} and"
f" currently existing tags are {[t.name for t in self.tags]}"
)
def start(self) -> None:
self.intro()
with self.manager.db.session() as session:
uncategorized = session.uncategorized()
n = len(uncategorized)
print(f"{n} left to categorize")
i = 0
new = []
next = uncategorized[i]
print(next)
while (command := input("$ ")) != "quit":
match command:
case "help":
print(self.help)
case "skip":
i += 1
case "quit":
break
case "split":
new = self.split(next)
session.add(new)
case other:
if not other:
print(self.help)
continue
if other.startswith("note:"):
# TODO adding notes to a splitted transaction won't allow categorization
next.note = Note(other[len("note:") :].strip())
else:
ct = other.split(":")
if (category := ct[0]) not in [
c.name for c in self.categories
]:
print(self.help, self.categories)
tags = []
if len(ct) > 1:
tags = ct[1:]
next.category = TransactionCategory(
category, CategorySelector(self.selector)
)
for tag in tags:
if tag not in [t.name for t in self.tags]:
session.add([Tag(tag)])
self.tags = session.get(Tag)
next.tags.add(TransactionTag(tag))
i += 1
session.commit()
next = uncategorized[i] if len(new) == 0 else new.pop()
print(next)
def split(self, original: Transaction) -> list[SplitTransaction]:
total = original.amount
new = []
done = False
while not done:
if abs(sum(t.amount for t in new)) > abs(total):
print("Overflow, try again")
new.clear()
continue
if sum(t.amount for t in new) == total:
done = True
break
amount = decimal.Decimal(input("amount: "))
new.append(
SplitTransaction(
original.date, original.description, amount, original.id
)
)
return new

View File

@ -1,321 +0,0 @@
from pathlib import Path
import argparse
import re
from pfbudget.core.categories import categorize_data
from pfbudget.core.manager import Manager
from pfbudget.input.json import JsonParser
from pfbudget.input.nordigen import NordigenInput
from pfbudget.db.client import DatabaseClient
import pfbudget.reporting.graph
import pfbudget.reporting.report
import pfbudget.utils
DEFAULT_DB = "data.db"
class PfBudgetInitialized(Exception):
pass
class PfBudgetNotInitialized(Exception):
pass
class DataFileMissing(Exception):
pass
def argparser(manager: Manager) -> argparse.ArgumentParser:
help = argparse.ArgumentParser(add_help=False)
help.add_argument(
"-db",
"--database",
nargs="?",
help="select current database",
default=DEFAULT_DB,
)
help.add_argument(
"-q", "--quiet", action="store_true", help="reduces the amount of verbose"
)
period = argparse.ArgumentParser(add_help=False).add_mutually_exclusive_group()
period.add_argument(
"--interval", type=str, nargs=2, help="graph interval", metavar=("START", "END")
)
period.add_argument("--start", type=str, nargs=1, help="graph start date")
period.add_argument("--end", type=str, nargs=1, help="graph end date")
period.add_argument("--year", type=str, nargs=1, help="graph year")
parser = argparse.ArgumentParser(
description="does cool finance stuff",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
parser.add_argument(
"--version",
action="version",
version=re.search(
r'__version__\s*=\s*[\'"]([^\'"]*)[\'"]',
open("pfbudget/__init__.py").read(),
).group(1),
)
subparsers = parser.add_subparsers(dest="command", required=True)
"""
Init
"""
p_init = subparsers.add_parser(
"init",
description="Initializes the SQLite3 database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_init.set_defaults(func=lambda args: manager.init())
"""
Exporting
"""
p_export = subparsers.add_parser(
"export",
description="Exports the selected database to a .csv file",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_export.set_defaults(func=lambda args: DatabaseClient(args.database).export())
"""
Parsing
"""
p_parse = subparsers.add_parser(
"parse",
description="Parses and adds the requested transactions into the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_parse.add_argument("path", nargs="+", type=str)
p_parse.add_argument("--bank", nargs=1, type=str)
p_parse.add_argument("--creditcard", nargs=1, type=str)
p_parse.add_argument("--category", nargs=1, type=int)
p_parse.set_defaults(func=lambda args: parse(manager, args))
"""
Categorizing
"""
p_categorize = subparsers.add_parser(
"categorize",
description="Categorizes the transactions in the selected database",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_categorize.set_defaults(
func=lambda args: categorize_data(DatabaseClient(args.database))
)
"""
Graph
"""
p_graph = subparsers.add_parser(
"graph",
description="Graph of the transactions",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_graph.add_argument(
"option",
type=str,
choices=["monthly", "discrete", "networth"],
nargs="?",
default="monthly",
help="graph option help",
)
p_graph.add_argument("--save", action="store_true")
p_graph.set_defaults(func=graph)
"""
Report
"""
p_report = subparsers.add_parser(
"report",
description="Prints report of transaction groups",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_report.add_argument(
"option",
type=str,
choices=["net", "detailed"],
nargs="?",
default="net",
help="report option help",
)
p_report.set_defaults(func=report)
"""
Register bank
"""
p_register = subparsers.add_parser(
"register",
description="Register a bank",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.add_argument(
"--requisition", type=str, nargs=1, help="requisition option help"
)
p_register.add_argument("--invert", action="store_true")
p_register.set_defaults(func=lambda args: manager.register(vars(args)))
"""
Unregister bank
"""
p_register = subparsers.add_parser(
"unregister",
description="Unregister a bank",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_register.add_argument("bank", type=str, nargs=1, help="bank option help")
p_register.set_defaults(func=lambda args: manager.unregister(vars(args)))
"""
Nordigen API
"""
p_nordigen_access = subparsers.add_parser(
"token",
description="Get new access token",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_access.set_defaults(func=lambda args: NordigenInput(manager).token())
"""
(Re)new bank requisition ID
"""
p_nordigen_access = subparsers.add_parser(
"renew",
description="(Re)new the Bank requisition ID",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_access.add_argument("name", nargs=1, type=str)
p_nordigen_access.add_argument("country", nargs=1, type=str)
p_nordigen_access.set_defaults(
func=lambda args: NordigenInput(manager).requisition(
args.name[0], args.country[0]
)
)
"""
Downloading through Nordigen API
"""
p_nordigen_download = subparsers.add_parser(
"download",
description="Downloads transactions using Nordigen API",
parents=[help, period],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_download.add_argument("--id", nargs="+", type=str)
p_nordigen_download.add_argument("--name", nargs="+", type=str)
p_nordigen_download.add_argument("--all", action="store_true")
p_nordigen_download.set_defaults(func=lambda args: download(manager, args))
"""
List available banks on Nordigen API
"""
p_nordigen_list = subparsers.add_parser(
"list",
description="Lists banks in {country}",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_list.add_argument("country", nargs=1, type=str)
p_nordigen_list.set_defaults(func=lambda args: nordigen_banks(manager, args))
"""
Nordigen JSONs
"""
p_nordigen_json = subparsers.add_parser(
"json",
description="",
parents=[help],
formatter_class=argparse.ArgumentDefaultsHelpFormatter,
)
p_nordigen_json.add_argument("json", nargs=1, type=str)
p_nordigen_json.add_argument("bank", nargs=1, type=str)
p_nordigen_json.add_argument("--invert", action=argparse.BooleanOptionalAction)
p_nordigen_json.set_defaults(
func=lambda args: manager.parser(JsonParser(vars(args)))
)
return parser
def parse(manager: Manager, args):
"""Parses the contents of the path in args to the selected database.
Args:
args (dict): argparse variables
"""
for path in args.path:
if (dir := Path(path)).is_dir():
for file in dir.iterdir():
manager.parse(file, vars(args))
elif Path(path).is_file():
manager.parse(path, vars(args))
else:
raise FileNotFoundError
def graph(args):
"""Plots the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "monthly":
pfbudget.reporting.graph.monthly(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "discrete":
pfbudget.reporting.graph.discrete(
DatabaseClient(args.database), vars(args), start, end
)
elif args.option == "networth":
pfbudget.reporting.graph.networth(
DatabaseClient(args.database), vars(args), start, end
)
def report(args):
"""Prints a detailed report of the transactions over a period of time.
Args:
args (dict): argparse variables
"""
start, end = pfbudget.utils.parse_args_period(args)
if args.option == "net":
pfbudget.reporting.report.net(DatabaseClient(args.database), start, end)
elif args.option == "detailed":
pfbudget.reporting.report.detailed(DatabaseClient(args.database), start, end)
def nordigen_banks(manager: Manager, args):
input = NordigenInput(manager)
input.list(vars(args)["country"][0])
def download(manager: Manager, args):
start, end = pfbudget.utils.parse_args_period(args)
manager.parser(NordigenInput(manager, vars(args), start, end))
def run():
manager = Manager(DEFAULT_DB)
args = argparser(manager).parse_args()
args.func(args)

View File

@ -4,6 +4,53 @@ from decimal import Decimal, InvalidOperation
from enum import Enum, auto from enum import Enum, auto
class Operation(Enum):
Init = auto()
Transactions = auto()
Parse = auto()
Download = auto()
Categorize = auto()
ManualCategorization = auto()
Token = auto()
RequisitionId = auto()
CategoryAdd = auto()
CategoryUpdate = auto()
CategoryRemove = auto()
CategorySchedule = auto()
RuleAdd = auto()
RuleRemove = auto()
RuleModify = auto()
GroupAdd = auto()
GroupRemove = auto()
TagAdd = auto()
TagRemove = auto()
TagRuleAdd = auto()
TagRuleRemove = auto()
TagRuleModify = auto()
Forge = auto()
Dismantle = auto()
Split = auto()
BankAdd = auto()
BankMod = auto()
BankDel = auto()
NordigenAdd = auto()
NordigenMod = auto()
NordigenDel = auto()
NordigenCountryBanks = auto()
Export = auto()
Import = auto()
ExportBanks = auto()
ImportBanks = auto()
ExportCategoryRules = auto()
ImportCategoryRules = auto()
ExportTagRules = auto()
ImportTagRules = auto()
ExportCategories = auto()
ImportCategories = auto()
ExportCategoryGroups = auto()
ImportCategoryGroups = auto()
class TransactionError(Exception): class TransactionError(Exception):
pass pass

View File

@ -9,7 +9,7 @@ import yaml
if TYPE_CHECKING: if TYPE_CHECKING:
from pfbudget.common.types import Transaction from pfbudget.common.types import Transaction
from pfbudget.db.client import DatabaseClient from pfbudget.db.sqlite import DatabaseClient
Options = namedtuple( Options = namedtuple(

View File

@ -0,0 +1,154 @@
from codetiming import Timer
from datetime import timedelta
from typing import Sequence
import pfbudget.db.model as t
class Categorizer:
options = {}
def __init__(self):
self.options["null_days"] = 3
def rules(
self,
transactions: Sequence[t.BankTransaction],
categories: Sequence[t.Category],
tags: Sequence[t.Tag],
nullify: bool = True
):
"""Overarching categorization tool
Receives a list of transactions (by ref) and updates their category according
to the rules defined for each category
Args:
transactions (Sequence[BankTransaction]): uncategorized transactions
categories (Sequence[Category]): available categories
tags (Sequence[Tag]): currently available tags
"""
if nullify:
try:
null = next(cat for cat in categories if cat.name == "null")
print("Nullifying")
self._nullify(transactions, null)
except StopIteration:
print("Null category not defined")
categories = [cat for cat in categories if cat.name != "null"]
self._rule_based_categories(transactions, categories)
self._rule_based_tags(transactions, tags)
@Timer(name="nullify")
def _nullify(self, transactions: Sequence[t.BankTransaction], null: t.Category):
count = 0
matching = []
for transaction in transactions:
for cancel in (
cancel
for cancel in transactions
if (
transaction.date - timedelta(days=self.options["null_days"])
<= cancel.date
<= transaction.date + timedelta(days=self.options["null_days"])
and cancel != transaction
and cancel.bank != transaction.bank
and cancel.amount == -transaction.amount
and transaction not in matching
and cancel not in matching
and all(r.matches(transaction) for r in null.rules)
and all(r.matches(cancel) for r in null.rules)
)
):
transaction.category = t.TransactionCategory(
name="null",
selector=t.CategorySelector(t.Selector_T.nullifier),
)
cancel.category = t.TransactionCategory(
name="null",
selector=t.CategorySelector(t.Selector_T.nullifier),
)
matching.extend([transaction, cancel])
count += 2
break
print(f"Nullified {count} of {len(transactions)} transactions")
@Timer(name="categoryrules")
def _rule_based_categories(
self,
transactions: Sequence[t.BankTransaction],
categories: Sequence[t.Category],
):
print(f"Categorizing {len(transactions)} transactions")
d = {}
for category in [c for c in categories if c.rules]:
for rule in category.rules:
# for transaction in [t for t in transactions if not t.category]:
for transaction in [
t
for t in transactions
if not t.category or t.category.name != "null"
]:
if not rule.matches(transaction):
continue
# passed all conditions, assign category
if transaction.category:
if transaction.category.name == category.name:
continue
if (
input(
f"Overwrite {transaction} with {category.name}? (y/n)"
)
== "y"
):
transaction.category.name = category.name
transaction.category.selector.selector = t.Selector_T.rules
else:
transaction.category = t.TransactionCategory(
category.name, t.CategorySelector(t.Selector_T.rules)
)
if rule in d:
d[rule] += 1
else:
d[rule] = 1
for k, v in d.items():
print(f"{v}: {k}")
@Timer(name="tagrules")
def _rule_based_tags(
self, transactions: Sequence[t.BankTransaction], tags: Sequence[t.Tag]
):
print(f"Tagging {len(transactions)} transactions")
d = {}
for tag in [t for t in tags if len(t.rules) > 0]:
for rule in tag.rules:
# for transaction in [t for t in transactions if not t.category]:
for transaction in [
t
for t in transactions
if tag.name not in [tag.tag for tag in t.tags]
]:
if not rule.matches(transaction):
continue
if not transaction.tags:
transaction.tags = {t.TransactionTag(tag.name)}
else:
transaction.tags.add(t.TransactionTag(tag.name))
if rule in d:
d[rule] += 1
else:
d[rule] = 1
for k, v in d.items():
print(f"{v}: {k}")

View File

@ -1,47 +1,412 @@
from pfbudget.input.input import Input import csv
from pathlib import Path
import pickle
import webbrowser
from pfbudget.common.types import Operation
from pfbudget.core.categorizer import Categorizer
from pfbudget.db.client import DbClient
from pfbudget.db.model import (
Bank,
BankTransaction,
Category,
CategoryGroup,
CategoryRule,
CategorySchedule,
CategorySelector,
Link,
MoneyTransaction,
Nordigen,
Rule,
Selector_T,
SplitTransaction,
Tag,
TagRule,
Transaction,
TransactionCategory,
)
from pfbudget.input.nordigen import NordigenInput
from pfbudget.input.parsers import parse_data from pfbudget.input.parsers import parse_data
from pfbudget.common.types import Bank, Banks, Transaction, Transactions
from pfbudget.db.client import DatabaseClient
from pfbudget.utils import convert
class Manager: class Manager:
def __init__(self, db: str): def __init__(self, db: str, verbosity: int = 0):
self.__db = db self._db = db
self._verbosity = verbosity
def init(self): def action(self, op: Operation, params=None):
client = DatabaseClient(self.__db) if self._verbosity > 0:
client.init() print(f"op={op}, params={params}")
def register(self, args: dict): if params is None:
bank = Bank(args["bank"][0], "", args["requisition"][0], args["invert"]) params = []
client = DatabaseClient(self.__db)
client.register_bank(convert(bank))
def unregister(self, args: dict): match (op):
client = DatabaseClient(self.__db) case Operation.Init:
client.unregister_bank(args["bank"][0])
def parser(self, parser: Input):
transactions = parser.parse()
self.add_transactions(transactions)
def parse(self, filename: str, args: dict):
transactions = parse_data(filename, args)
self.add_transactions(transactions)
def transactions() -> list[Transaction]:
pass pass
def add_transactions(self, transactions: Transactions): case Operation.Transactions:
client = DatabaseClient(self.__db) with self.db.session() as session:
client.insert_transactions([convert(t) for t in transactions]) transactions = session.get(Transaction)
ret = [t.format for t in transactions]
return ret
def get_bank_by(self, key: str, value: str) -> Bank: case Operation.Parse:
client = DatabaseClient(self.__db) # Adapter for the parse_data method. Can be refactored.
bank = client.get_bank(key, value) args = {"bank": params[1], "creditcard": params[2], "category": None}
return convert(bank) transactions = []
for path in [Path(p) for p in params[0]]:
if path.is_dir():
for file in path.iterdir():
transactions.extend(self.parse(file, args))
elif path.is_file():
transactions.extend(self.parse(path, args))
else:
raise FileNotFoundError(path)
def get_banks(self) -> Banks: if (
client = DatabaseClient(self.__db) len(transactions) > 0
return [convert(bank) for bank in client.get_banks()] and input(f"{transactions[:5]}\nCommit? (y/n)") == "y"
):
with self.db.session() as session:
session.add(sorted(transactions))
case Operation.Download:
client = NordigenInput()
with self.db.session() as session:
if len(params[3]) == 0:
client.banks = session.get(Bank, Bank.nordigen)
else:
client.banks = session.get(Bank, Bank.name, params[3])
session.expunge_all()
client.start = params[0]
client.end = params[1]
transactions = client.parse()
# dry-run
if not params[2]:
with self.db.session() as session:
session.add(sorted(transactions))
else:
print(transactions)
case Operation.Categorize:
with self.db.session() as session:
uncategorized = session.get(
BankTransaction, ~BankTransaction.category.has()
)
categories = session.get(Category)
tags = session.get(Tag)
Categorizer().rules(uncategorized, categories, tags, params[0])
case Operation.BankMod:
with self.db.session() as session:
session.update(Bank, params)
case Operation.NordigenMod:
with self.db.session() as session:
session.update(Nordigen, params)
case Operation.BankDel:
with self.db.session() as session:
session.remove_by_name(Bank, params)
case Operation.NordigenDel:
with self.db.session() as session:
session.remove_by_name(Nordigen, params)
case Operation.Token:
NordigenInput().token()
case Operation.RequisitionId:
link, _ = NordigenInput().requisition(params[0], params[1])
print(f"Opening {link} to request access to {params[0]}")
webbrowser.open(link)
case Operation.NordigenCountryBanks:
banks = NordigenInput().country_banks(params[0])
print(banks)
case (
Operation.BankAdd
| Operation.CategoryAdd
| Operation.NordigenAdd
| Operation.RuleAdd
| Operation.TagAdd
| Operation.TagRuleAdd
):
with self.db.session() as session:
session.add(params)
case Operation.CategoryUpdate:
with self.db.session() as session:
session.updategroup(*params)
case Operation.CategoryRemove:
with self.db.session() as session:
session.remove_by_name(Category, params)
case Operation.CategorySchedule:
with self.db.session() as session:
session.updateschedules(params)
case Operation.RuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.remove_by_id(CategoryRule, params)
case Operation.TagRemove:
with self.db.session() as session:
session.remove_by_name(Tag, params)
case Operation.TagRuleRemove:
assert all(isinstance(param, int) for param in params)
with self.db.session() as session:
session.remove_by_id(TagRule, params)
case Operation.RuleModify | Operation.TagRuleModify:
assert all(isinstance(param, dict) for param in params)
with self.db.session() as session:
session.update(Rule, params)
case Operation.GroupAdd:
with self.db.session() as session:
session.add(params)
case Operation.GroupRemove:
assert all(isinstance(param, CategoryGroup) for param in params)
with self.db.session() as session:
session.remove_by_name(CategoryGroup, params)
case Operation.Forge:
if not (
isinstance(params[0], int)
and all(isinstance(p, int) for p in params[1])
):
raise TypeError("f{params} are not transaction ids")
with self.db.session() as session:
original = session.get(Transaction, Transaction.id, params[0])[0]
links = session.get(Transaction, Transaction.id, params[1])
if not original.category:
original.category = self.askcategory(original)
for link in links:
if (
not link.category
or link.category.name != original.category.name
):
print(
f"{link} category will change to"
f" {original.category.name}"
)
link.category = original.category
tobelinked = [Link(original.id, link.id) for link in links]
session.add(tobelinked)
case Operation.Dismantle:
assert all(isinstance(param, Link) for param in params)
with self.db.session() as session:
original = params[0].original
links = [link.link for link in params]
session.remove_links(original, links)
case Operation.Split:
if len(params) < 1 and not all(
isinstance(p, Transaction) for p in params
):
raise TypeError(f"{params} are not transactions")
# t -> t1, t2, t3; t.value == Σti.value
original: Transaction = params[0]
if not original.amount == sum(t.amount for t in params[1:]):
raise ValueError(
f"{original.amount}€ != {sum(v for v, _ in params[1:])}"
)
with self.db.session() as session:
originals = session.get(Transaction, Transaction.id, [original.id])
assert len(originals) == 1, ">1 transactions matched {original.id}!"
originals[0].split = True
transactions = []
for t in params[1:]:
if originals[0].date != t.date:
t.date = originals[0].date
print(
f"{t.date} is different from original date"
f" {originals[0].date}, using original"
)
splitted = SplitTransaction(
t.date, t.description, t.amount, originals[0].id
)
splitted.category = t.category
transactions.append(splitted)
session.add(transactions)
case Operation.Export:
with self.db.session() as session:
self.dump(params[0], params[1], sorted(session.get(Transaction)))
case Operation.Import:
transactions = []
for row in self.load(params[0], params[1]):
match row["type"]:
case "bank":
transaction = BankTransaction(
row["date"],
row["description"],
row["amount"],
row["bank"],
)
case "money":
transaction = MoneyTransaction(
row["date"], row["description"], row["amount"]
)
# TODO case "split" how to match to original transaction?? also save ids?
case _:
continue
if category := row.pop("category", None):
transaction.category = TransactionCategory(
category["name"],
CategorySelector(category["selector"]["selector"]),
)
transactions.append(transaction)
if self.certify(transactions):
with self.db.session() as session:
session.add(transactions)
case Operation.ExportBanks:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(Bank))
case Operation.ImportBanks:
banks = []
for row in self.load(params[0], params[1]):
bank = Bank(row["name"], row["BIC"], row["type"])
if row["nordigen"]:
bank.nordigen = Nordigen(**row["nordigen"])
banks.append(bank)
if self.certify(banks):
with self.db.session() as session:
session.add(banks)
case Operation.ExportCategoryRules:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(CategoryRule))
case Operation.ImportCategoryRules:
rules = [CategoryRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
session.add(rules)
case Operation.ExportTagRules:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(TagRule))
case Operation.ImportTagRules:
rules = [TagRule(**row) for row in self.load(params[0], params[1])]
if self.certify(rules):
with self.db.session() as session:
session.add(rules)
case Operation.ExportCategories:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(Category))
case Operation.ImportCategories:
# rules = [Category(**row) for row in self.load(params[0])]
categories = []
for row in self.load(params[0], params[1]):
category = Category(row["name"], row["group"])
if len(row["rules"]) > 0:
# Only category rules could have been created with a rule
rules = row["rules"]
for rule in rules:
del rule["type"]
category.rules = set(CategoryRule(**rule) for rule in rules)
if row["schedule"]:
category.schedule = CategorySchedule(**row["schedule"])
categories.append(category)
if self.certify(categories):
with self.db.session() as session:
session.add(categories)
case Operation.ExportCategoryGroups:
with self.db.session() as session:
self.dump(params[0], params[1], session.get(CategoryGroup))
case Operation.ImportCategoryGroups:
groups = [
CategoryGroup(**row) for row in self.load(params[0], params[1])
]
if self.certify(groups):
with self.db.session() as session:
session.add(groups)
def parse(self, filename: Path, args: dict):
return parse_data(filename, args)
def askcategory(self, transaction: Transaction):
selector = CategorySelector(Selector_T.manual)
with self.db.session() as session:
categories = session.get(Category)
while True:
category = input(f"{transaction}: ")
if category in [c.name for c in categories]:
return TransactionCategory(category, selector)
@staticmethod
def dump(fn, format, sequence):
if format == "pickle":
with open(fn, "wb") as f:
pickle.dump([e.format for e in sequence], f)
elif format == "csv":
with open(fn, "w", newline="") as f:
csv.writer(f).writerows([e.format.values() for e in sequence])
else:
print("format not well specified")
@staticmethod
def load(fn, format):
if format == "pickle":
with open(fn, "rb") as f:
return pickle.load(f)
elif format == "csv":
raise Exception("CSV import not supported")
else:
print("format not well specified")
return []
@staticmethod
def certify(imports: list) -> bool:
if input(f"{imports[:10]}\nDoes the import seem correct? (y/n)") == "y":
return True
return False
@property
def db(self) -> DbClient:
return DbClient(self._db, self._verbosity > 2)
@db.setter
def db(self, url: str):
self._db = url

View File

@ -1,212 +1,123 @@
from __future__ import annotations from dataclasses import asdict
from decimal import Decimal from sqlalchemy import create_engine, delete, select, update
import csv from sqlalchemy.dialects.postgresql import insert
import datetime from sqlalchemy.orm import Session
import logging from sqlalchemy.sql.expression import false
import logging.config from typing import Sequence, Type, TypeVar
import pathlib
import sqlite3
from pfbudget.common.types import Transaction from pfbudget.db.model import (
import pfbudget.db.schema as Q Category,
CategoryGroup,
CategorySchedule,
Link,
Transaction,
)
if not pathlib.Path("logs").is_dir(): class DbClient:
pathlib.Path("logs").mkdir() """
logging.config.fileConfig("logging.conf") General database client using sqlalchemy
logger = logging.getLogger("pfbudget.transactions") """
sqlite3.register_adapter(Decimal, lambda d: float(d)) __sessions: list[Session]
__DB_NAME = "data.db" def __init__(self, url: str, echo=False) -> None:
self._engine = create_engine(url, echo=echo)
@property
def engine(self):
return self._engine
class DatabaseClient: class ClientSession:
"""SQLite DB connection manager""" def __init__(self, engine):
self.__engine = engine
__EXPORT_DIR = "export" def __enter__(self):
self.__session = Session(self.__engine)
return self
def __init__(self, db: str): def __exit__(self, exc_type, exc_value, exc_tb):
self.db = db self.commit()
self.__session.close()
def __execute(self, query: str, params: tuple = None) -> list | None: def commit(self):
ret = None self.__session.commit()
try:
con = sqlite3.connect(self.db) def expunge_all(self):
with con: self.__session.expunge_all()
if params:
ret = con.execute(query, params).fetchall() T = TypeVar("T")
logger.debug(f"[{self.db}] < {query}{params}")
def get(self, type: Type[T], column=None, values=None) -> Sequence[T]:
if column is not None:
if values:
if isinstance(values, Sequence):
stmt = select(type).where(column.in_(values))
else: else:
ret = con.execute(query).fetchall() stmt = select(type).where(column == values)
logger.debug(f"[{self.db}] < {query}") else:
stmt = select(type).where(column)
else:
stmt = select(type)
if ret: return self.__session.scalars(stmt).all()
logger.debug(f"[{self.db}] > {ret}")
except sqlite3.Error:
logger.exception(f"Error while executing [{self.db}] < {query}")
finally:
con.close()
return ret def uncategorized(self) -> Sequence[Transaction]:
"""Selects all valid uncategorized transactions
At this moment that includes:
- Categories w/o category
- AND non-split categories
def __executemany(self, query: str, list_of_params: list[tuple]) -> list | None: Returns:
ret = None Sequence[Transaction]: transactions left uncategorized
try: """
con = sqlite3.connect(self.db) stmt = (
with con: select(Transaction)
ret = con.executemany(query, list_of_params).fetchall() .where(~Transaction.category.has())
logger.debug(f"[{self.db}] < {query}{list_of_params}") .where(Transaction.split == false())
except sqlite3.Error:
logger.exception(
f"Error while executing [{self.db}] < {query} {list_of_params}"
) )
finally: return self.__session.scalars(stmt).all()
con.close()
return ret def add(self, rows: list):
self.__session.add_all(rows)
def __create_tables(self, tables: tuple[tuple]): def remove_by_name(self, type, rows: list):
for table_name, query in tables: stmt = delete(type).where(type.name.in_([row.name for row in rows]))
logger.info(f"Creating table {table_name} if it doesn't exist already") self.__session.execute(stmt)
self.__execute(query)
def init(self): def updategroup(self, categories: list[Category], group: CategoryGroup):
logging.info(f"Initializing {self.db} database") stmt = (
self.__create_tables( update(Category)
( .where(Category.name.in_([cat.name for cat in categories]))
("transactions", Q.CREATE_TRANSACTIONS_TABLE), .values(group=group)
("backups", Q.CREATE_BACKUPS_TABLE),
("banks", Q.CREATE_BANKS_TABLE),
) )
self.__session.execute(stmt)
def updateschedules(self, schedules: list[CategorySchedule]):
stmt = insert(CategorySchedule).values([asdict(s) for s in schedules])
stmt = stmt.on_conflict_do_update(
index_elements=[CategorySchedule.name],
set_=dict(
recurring=stmt.excluded.recurring,
period=stmt.excluded.period,
period_multiplier=stmt.excluded.period_multiplier,
),
) )
self.__session.execute(stmt)
"""Transaction table methods""" def remove_by_id(self, type, ids: list[int]):
stmt = delete(type).where(type.id.in_(ids))
self.__session.execute(stmt)
def select_all(self) -> list[Transaction] | None: def update(self, type, values: list[dict]):
logger.info(f"Reading all transactions from {self.db}") print(type, values)
transactions = self.__execute("SELECT * FROM transactions") self.__session.execute(update(type), values)
if transactions:
return [Transaction(t) for t in transactions]
return None
def insert_transaction(self, transaction: Transaction): def remove_links(self, original: int, links: list[int]):
logger.info(f"Adding {transaction} into {self.db}") stmt = delete(Link).where(
self.__execute(Q.ADD_TRANSACTION, (transaction.to_list(),)) Link.original == original, Link.link.in_(link for link in links)
def insert_transactions(self, transactions: Q.DbTransactions):
logger.info(f"Adding {len(transactions)} into {self.db}")
self.__executemany(Q.ADD_TRANSACTION, [t.tuple() for t in transactions])
def update_category(self, transaction: Transaction):
logger.info(f"Update {transaction} category")
self.__execute(Q.UPDATE_CATEGORY, transaction.update_category())
def update_categories(self, transactions: list[Transaction]):
logger.info(f"Update {len(transactions)} transactions' categories")
self.__executemany(
Q.UPDATE_CATEGORY,
[transaction.update_category() for transaction in transactions],
) )
self.__session.execute(stmt)
def get_duplicated_transactions(self) -> list[Transaction] | None: def session(self) -> ClientSession:
logger.info("Get duplicated transactions") return self.ClientSession(self.engine)
transactions = self.__execute(Q.DUPLICATED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_sorted_transactions(self) -> list[Transaction] | None:
logger.info("Get transactions sorted by date")
transactions = self.__execute(Q.SORTED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange(self, start: datetime, end: datetime) -> list[Transaction] | None:
logger.info(f"Get transactions from {start} to {end}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_category(self, value: str) -> list[Transaction] | None:
logger.info(f"Get transactions where category = {value}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange_category(
self, start: datetime, end: datetime, category: str
) -> list[Transaction] | None:
logger.info(
f"Get transactions from {start} to {end} where category = {category}"
)
transactions = self.__execute(
Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_by_period(self, period: str) -> list[Transaction] | None:
logger.info(f"Get transactions by {period}")
transactions = self.__execute(Q.SELECT_TRANSACTION_BY_PERIOD, period)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_uncategorized_transactions(self) -> list[Transaction] | None:
logger.debug("Get uncategorized transactions")
return self.get_category(None)
def get_daterange_uncategorized_transactions(self, start: datetime, end: datetime):
logger.debug("Get uncategorized transactions from {start} to {end}")
return self.get_daterange_category(start, end, None)
def get_daterage_without(
self, start: datetime, end: datetime, *categories: str
) -> list[Transaction] | None:
logger.info(f"Get transactions between {start} and {end} not in {categories}")
query = Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
"(" + ", ".join("?" for _ in categories) + ")"
)
transactions = self.__execute(query, (start, end, *categories))
if transactions:
return [Transaction(t) for t in transactions]
return None
def export(self):
filename = pathlib.Path(
"@".join([self.db, datetime.datetime.now().isoformat()])
).with_suffix(".csv")
transactions = self.select_all()
logger.info(f"Exporting {self.db} into {filename}")
if not (dir := pathlib.Path(self.__EXPORT_DIR)).is_dir():
dir.mkdir()
with open(dir / filename, "w", newline="") as f:
csv.writer(f, delimiter="\t").writerows(transactions)
"""Banks table methods"""
def register_bank(self, bank: Q.DbBank):
logger.info(f"Registering {bank}")
self.__execute(Q.ADD_BANK, bank.tuple())
def unregister_bank(self, bank: str):
logger.info(f"Unregistering {bank}")
self.__execute(Q.DELETE_BANK, (bank,))
def get_bank(self, key: str, value: str) -> Q.DbBank | None:
logger.info(f"Get bank with {key} = {value}")
bank = self.__execute(Q.SELECT_BANK.format(key), (value, ))
if bank:
return Q.DbBank(*bank[0])
def get_banks(self) -> Q.DbBanks:
logger.info("Get all banks")
banks = self.__execute(Q.SELECT_BANKS)
if banks:
return [Q.DbBank(*bank) for bank in banks]
return []

440
pfbudget/db/model.py Normal file
View File

@ -0,0 +1,440 @@
from __future__ import annotations
import datetime as dt
import decimal
import enum
import re
from typing import Annotated, Any, Optional
from sqlalchemy import (
BigInteger,
Enum,
ForeignKey,
MetaData,
Numeric,
String,
Text,
)
from sqlalchemy.orm import (
DeclarativeBase,
Mapped,
mapped_column,
MappedAsDataclass,
relationship,
)
class Base(MappedAsDataclass, DeclarativeBase):
metadata = MetaData(
schema="transactions",
naming_convention={
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_`%(constraint_name)s`",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s",
},
)
class AccountType(enum.Enum):
checking = enum.auto()
savings = enum.auto()
investment = enum.auto()
mealcard = enum.auto()
VISA = enum.auto()
MASTERCARD = enum.auto()
accounttype = Annotated[
AccountType,
mapped_column(Enum(AccountType, inherit_schema=True)),
]
class Export:
@property
def format(self) -> dict[str, Any]:
raise NotImplementedError
class Bank(Base, Export):
__table_args__ = {"schema": "bank"}
__tablename__ = "banks"
name: Mapped[str] = mapped_column(unique=True)
BIC: Mapped[str] = mapped_column(String(8), primary_key=True)
type: Mapped[accounttype] = mapped_column(primary_key=True)
nordigen: Mapped[Optional[Nordigen]] = relationship(lazy="joined", init=False)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
BIC=self.BIC,
type=self.type,
nordigen=self.nordigen.format if self.nordigen else None,
)
bankfk = Annotated[str, mapped_column(Text, ForeignKey(Bank.name))]
idpk = Annotated[int, mapped_column(BigInteger, primary_key=True, autoincrement=True)]
money = Annotated[decimal.Decimal, mapped_column(Numeric(16, 2))]
class Transaction(Base, Export):
__tablename__ = "transactions"
id: Mapped[idpk] = mapped_column(init=False)
date: Mapped[dt.date]
description: Mapped[Optional[str]]
amount: Mapped[money]
split: Mapped[bool] = mapped_column(init=False, default=False)
type: Mapped[str] = mapped_column(init=False)
category: Mapped[Optional[TransactionCategory]] = relationship(init=False)
note: Mapped[Optional[Note]] = relationship(
cascade="all, delete-orphan", init=False, passive_deletes=True
)
tags: Mapped[set[TransactionTag]] = relationship(init=False)
__mapper_args__ = {"polymorphic_on": "type", "polymorphic_identity": "transaction"}
@property
def format(self) -> dict[str, Any]:
return dict(
id=self.id,
date=self.date,
description=self.description,
amount=self.amount,
split=self.split,
type=self.type,
category=self.category.format if self.category else None,
# TODO note
tags=[tag.format for tag in self.tags] if self.tags else None,
)
def __lt__(self, other: Transaction):
return self.date < other.date
idfk = Annotated[
int, mapped_column(BigInteger, ForeignKey(Transaction.id, ondelete="CASCADE"))
]
class BankTransaction(Transaction):
bank: Mapped[bankfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "bank", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(bank=self.bank)
class MoneyTransaction(Transaction):
__mapper_args__ = {"polymorphic_identity": "money"}
class SplitTransaction(Transaction):
original: Mapped[idfk] = mapped_column(nullable=True)
__mapper_args__ = {"polymorphic_identity": "split", "polymorphic_load": "inline"}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(original=self.original)
class CategoryGroup(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "groups"
name: Mapped[str] = mapped_column(primary_key=True)
@property
def format(self) -> dict[str, Any]:
return dict(name=self.name)
class Category(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "available"
name: Mapped[str] = mapped_column(primary_key=True)
group: Mapped[Optional[str]] = mapped_column(
ForeignKey(CategoryGroup.name), default=None
)
rules: Mapped[set[CategoryRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
)
schedule: Mapped[Optional[CategorySchedule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default=None
)
def __repr__(self) -> str:
return (
f"Category(name={self.name}, group={self.group}, #rules={len(self.rules)},"
f" schedule={self.schedule})"
)
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
group=self.group if self.group else None,
rules=[rule.format for rule in self.rules],
schedule=self.schedule.format if self.schedule else None,
)
catfk = Annotated[
str,
mapped_column(ForeignKey(Category.name, ondelete="CASCADE")),
]
class TransactionCategory(Base, Export):
__tablename__ = "categorized"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
name: Mapped[catfk]
selector: Mapped[CategorySelector] = relationship(
cascade="all, delete-orphan", lazy="joined"
)
@property
def format(self):
return dict(name=self.name, selector=self.selector.format)
class Note(Base):
__tablename__ = "notes"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
note: Mapped[str]
class Nordigen(Base, Export):
__table_args__ = {"schema": "bank"}
__tablename__ = "nordigen"
name: Mapped[bankfk] = mapped_column(primary_key=True)
bank_id: Mapped[Optional[str]]
requisition_id: Mapped[Optional[str]]
invert: Mapped[Optional[bool]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
bank_id=self.bank_id,
requisition_id=self.requisition_id,
invert=self.invert,
)
class Tag(Base):
__table_args__ = {"schema": "tag"}
__tablename__ = "available"
name: Mapped[str] = mapped_column(primary_key=True)
rules: Mapped[set[TagRule]] = relationship(
cascade="all, delete-orphan", passive_deletes=True, default_factory=set
)
class TransactionTag(Base, Export):
__tablename__ = "tagged"
id: Mapped[idfk] = mapped_column(primary_key=True, init=False)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name), primary_key=True)
@property
def format(self):
return dict(tag=self.tag)
def __hash__(self):
return hash(self.id)
class Selector_T(enum.Enum):
unknown = enum.auto()
nullifier = enum.auto()
vacations = enum.auto()
rules = enum.auto()
algorithm = enum.auto()
manual = enum.auto()
categoryselector = Annotated[
Selector_T,
mapped_column(Enum(Selector_T, inherit_schema=True), default=Selector_T.unknown),
]
class CategorySelector(Base, Export):
__tablename__ = "selector"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(TransactionCategory.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
selector: Mapped[categoryselector]
@property
def format(self):
return dict(selector=self.selector)
class Period(enum.Enum):
daily = "daily"
weekly = "weekly"
monthly = "monthly"
yearly = "yearly"
scheduleperiod = Annotated[Selector_T, mapped_column(Enum(Period, inherit_schema=True))]
class CategorySchedule(Base, Export):
__table_args__ = {"schema": "category"}
__tablename__ = "schedules"
name: Mapped[catfk] = mapped_column(primary_key=True)
period: Mapped[Optional[scheduleperiod]]
period_multiplier: Mapped[Optional[int]]
amount: Mapped[Optional[int]]
@property
def format(self) -> dict[str, Any]:
return dict(
name=self.name,
period=self.period,
period_multiplier=self.period_multiplier,
amount=self.amount,
)
class Link(Base):
__tablename__ = "links"
original: Mapped[idfk] = mapped_column(primary_key=True)
link: Mapped[idfk] = mapped_column(primary_key=True)
class Rule(Base, Export):
__tablename__ = "rules"
id: Mapped[idpk] = mapped_column(init=False)
start: Mapped[Optional[dt.date]]
end: Mapped[Optional[dt.date]]
description: Mapped[Optional[str]]
regex: Mapped[Optional[str]]
bank: Mapped[Optional[str]]
min: Mapped[Optional[money]]
max: Mapped[Optional[money]]
type: Mapped[str] = mapped_column(init=False)
__mapper_args__ = {
"polymorphic_identity": "rule",
"polymorphic_on": "type",
}
def matches(self, t: BankTransaction) -> bool:
valid = None
if self.regex:
valid = re.compile(self.regex, re.IGNORECASE)
ops = (
Rule.exists(self.start, lambda r: r < t.date),
Rule.exists(self.end, lambda r: r > t.date),
Rule.exists(self.description, lambda r: r == t.description),
Rule.exists(
valid,
lambda r: r.search(t.description) if t.description else False,
),
Rule.exists(self.bank, lambda r: r == t.bank),
Rule.exists(self.min, lambda r: r < t.amount),
Rule.exists(self.max, lambda r: r > t.amount),
)
if all(ops):
return True
return False
@property
def format(self) -> dict[str, Any]:
return dict(
start=self.start,
end=self.end,
description=self.description,
regex=self.regex,
bank=self.bank,
min=self.min,
max=self.max,
type=self.type,
)
@staticmethod
def exists(r, op) -> bool:
return op(r) if r is not None else True
class CategoryRule(Rule):
__table_args__ = {"schema": "category"}
__tablename__ = "rules"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(Rule.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
name: Mapped[catfk]
__mapper_args__ = {
"polymorphic_identity": "category_rule",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(name=self.name)
def __hash__(self):
return hash(self.id)
class TagRule(Rule):
__table_args__ = {"schema": "tag"}
__tablename__ = "rules"
id: Mapped[int] = mapped_column(
BigInteger,
ForeignKey(Rule.id, ondelete="CASCADE"),
primary_key=True,
init=False,
)
tag: Mapped[str] = mapped_column(ForeignKey(Tag.name, ondelete="CASCADE"))
__mapper_args__ = {
"polymorphic_identity": "tag_rule",
}
@property
def format(self) -> dict[str, Any]:
return super().format | dict(tag=self.tag)
def __hash__(self):
return hash(self.id)

212
pfbudget/db/sqlite.py Normal file
View File

@ -0,0 +1,212 @@
from __future__ import annotations
from decimal import Decimal
import csv
import datetime
import logging
import logging.config
import pathlib
import sqlite3
from pfbudget.common.types import Transaction
import pfbudget.db.schema as Q
if not pathlib.Path("logs").is_dir():
pathlib.Path("logs").mkdir()
logging.config.fileConfig("logging.conf")
logger = logging.getLogger("pfbudget.transactions")
sqlite3.register_adapter(Decimal, lambda d: float(d))
__DB_NAME = "data.db"
class DatabaseClient:
"""SQLite DB connection manager"""
__EXPORT_DIR = "export"
def __init__(self, db: str):
self.db = db
def __execute(self, query: str, params: tuple = None) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
if params:
ret = con.execute(query, params).fetchall()
logger.debug(f"[{self.db}] < {query}{params}")
else:
ret = con.execute(query).fetchall()
logger.debug(f"[{self.db}] < {query}")
if ret:
logger.debug(f"[{self.db}] > {ret}")
except sqlite3.Error:
logger.exception(f"Error while executing [{self.db}] < {query}")
finally:
con.close()
return ret
def __executemany(self, query: str, list_of_params: list[tuple]) -> list | None:
ret = None
try:
con = sqlite3.connect(self.db)
with con:
ret = con.executemany(query, list_of_params).fetchall()
logger.debug(f"[{self.db}] < {query}{list_of_params}")
except sqlite3.Error:
logger.exception(
f"Error while executing [{self.db}] < {query} {list_of_params}"
)
finally:
con.close()
return ret
def __create_tables(self, tables: tuple[tuple]):
for table_name, query in tables:
logger.info(f"Creating table {table_name} if it doesn't exist already")
self.__execute(query)
def init(self):
logging.info(f"Initializing {self.db} database")
self.__create_tables(
(
("transactions", Q.CREATE_TRANSACTIONS_TABLE),
("backups", Q.CREATE_BACKUPS_TABLE),
("banks", Q.CREATE_BANKS_TABLE),
)
)
"""Transaction table methods"""
def select_all(self) -> list[Transaction] | None:
logger.info(f"Reading all transactions from {self.db}")
transactions = self.__execute("SELECT * FROM transactions")
if transactions:
return [Transaction(t) for t in transactions]
return None
def insert_transaction(self, transaction: Transaction):
logger.info(f"Adding {transaction} into {self.db}")
self.__execute(Q.ADD_TRANSACTION, (transaction.to_list(),))
def insert_transactions(self, transactions: Q.DbTransactions):
logger.info(f"Adding {len(transactions)} into {self.db}")
self.__executemany(Q.ADD_TRANSACTION, [t.tuple() for t in transactions])
def update_category(self, transaction: Transaction):
logger.info(f"Update {transaction} category")
self.__execute(Q.UPDATE_CATEGORY, transaction.update_category())
def update_categories(self, transactions: list[Transaction]):
logger.info(f"Update {len(transactions)} transactions' categories")
self.__executemany(
Q.UPDATE_CATEGORY,
[transaction.update_category() for transaction in transactions],
)
def get_duplicated_transactions(self) -> list[Transaction] | None:
logger.info("Get duplicated transactions")
transactions = self.__execute(Q.DUPLICATED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_sorted_transactions(self) -> list[Transaction] | None:
logger.info("Get transactions sorted by date")
transactions = self.__execute(Q.SORTED_TRANSACTIONS)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange(self, start: datetime, end: datetime) -> list[Transaction] | None:
logger.info(f"Get transactions from {start} to {end}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BETWEEN_DATES, (start, end))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_category(self, value: str) -> list[Transaction] | None:
logger.info(f"Get transactions where category = {value}")
transactions = self.__execute(Q.SELECT_TRANSACTIONS_BY_CATEGORY, (value,))
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_daterange_category(
self, start: datetime, end: datetime, category: str
) -> list[Transaction] | None:
logger.info(
f"Get transactions from {start} to {end} where category = {category}"
)
transactions = self.__execute(
Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITH_CATEGORY, (start, end, category)
)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_by_period(self, period: str) -> list[Transaction] | None:
logger.info(f"Get transactions by {period}")
transactions = self.__execute(Q.SELECT_TRANSACTION_BY_PERIOD, period)
if transactions:
return [Transaction(t) for t in transactions]
return None
def get_uncategorized_transactions(self) -> list[Transaction] | None:
logger.debug("Get uncategorized transactions")
return self.get_category(None)
def get_daterange_uncategorized_transactions(self, start: datetime, end: datetime):
logger.debug("Get uncategorized transactions from {start} to {end}")
return self.get_daterange_category(start, end, None)
def get_daterage_without(
self, start: datetime, end: datetime, *categories: str
) -> list[Transaction] | None:
logger.info(f"Get transactions between {start} and {end} not in {categories}")
query = Q.SELECT_TRANSACTIONS_BETWEEN_DATES_WITHOUT_CATEGORIES.format(
"(" + ", ".join("?" for _ in categories) + ")"
)
transactions = self.__execute(query, (start, end, *categories))
if transactions:
return [Transaction(t) for t in transactions]
return None
def export(self):
filename = pathlib.Path(
"@".join([self.db, datetime.datetime.now().isoformat()])
).with_suffix(".csv")
transactions = self.select_all()
logger.info(f"Exporting {self.db} into {filename}")
if not (dir := pathlib.Path(self.__EXPORT_DIR)).is_dir():
dir.mkdir()
with open(dir / filename, "w", newline="") as f:
csv.writer(f, delimiter="\t").writerows(transactions)
"""Banks table methods"""
def register_bank(self, bank: Q.DbBank):
logger.info(f"Registering {bank}")
self.__execute(Q.ADD_BANK, bank.tuple())
def unregister_bank(self, bank: str):
logger.info(f"Unregistering {bank}")
self.__execute(Q.DELETE_BANK, (bank,))
def get_bank(self, key: str, value: str) -> Q.DbBank | None:
logger.info(f"Get bank with {key} = {value}")
bank = self.__execute(Q.SELECT_BANK.format(key), (value, ))
if bank:
return Q.DbBank(*bank[0])
def get_banks(self) -> Q.DbBanks:
logger.info("Get all banks")
banks = self.__execute(Q.SELECT_BANKS)
if banks:
return [Q.DbBank(*bank) for bank in banks]
return []

View File

@ -1,21 +1,9 @@
from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from typing import TYPE_CHECKING
from pfbudget.common.types import Transactions from pfbudget.db.model import Transaction
if TYPE_CHECKING:
from pfbudget.core.manager import Manager
class Input(ABC): class Input(ABC):
def __init__(self, manager: Manager):
self._manager = manager
@abstractmethod @abstractmethod
def parse(self) -> Transactions: def parse(self) -> list[Transaction]:
return NotImplemented return NotImplementedError
@property
def manager(self):
return self._manager

View File

@ -1,30 +0,0 @@
import json
from .input import Input
from pfbudget.common.types import Transactions
from pfbudget.utils import convert, parse_decimal
class JsonParser(Input):
def __init__(self, manager, options):
super().__init__(manager)
self.options = options
def parse(self) -> Transactions:
try:
with open(self.options["json"][0], "r") as f:
return [
convert(
[
t["bookingDate"],
t["remittanceInformationUnstructured"],
self.options["bank"][0],
parse_decimal(t["transactionAmount"]["amount"])
if not self.options["invert"]
else -parse_decimal(t["transactionAmount"]["amount"]),
],
)
for t in json.load(f)["transactions"]["booked"]
]
except KeyError:
print("No json file defined")

View File

@ -1,59 +1,51 @@
from datetime import date import datetime as dt
from time import sleep import dotenv
from requests import HTTPError, ReadTimeout
from dotenv import load_dotenv
from nordigen import NordigenClient
from uuid import uuid4
import json import json
import nordigen
import os import os
import webbrowser import requests
import time
import uuid
import pfbudget.db.model as t
from pfbudget.utils.converters import convert
from .input import Input from .input import Input
from pfbudget.common.types import NoBankSelected, Transactions
from pfbudget.utils import convert
load_dotenv() dotenv.load_dotenv()
class NordigenInput(Input): class NordigenInput(Input):
def __init__(self, manager, options: dict = {}, start=date.min, end=date.max): redirect_url = "https://murta.dev"
super().__init__(manager)
self._client = NordigenClient( def __init__(self):
secret_key=os.environ.get("SECRET_KEY"), super().__init__()
secret_id=os.environ.get("SECRET_ID"),
if not (key := os.environ.get("SECRET_KEY")) or not (
id := os.environ.get("SECRET_ID")
):
raise
self._client = nordigen.NordigenClient(
secret_key=key,
secret_id=id,
) )
self.client.token = self.__token() self._client.token = self.__token()
self._start = dt.date.min
self._end = dt.date.max
# print(options) def parse(self) -> list[t.BankTransaction]:
if "all" in options and options["all"]:
self.__banks = self.manager.get_banks()
elif "id" in options and options["id"]:
self.__banks = [
self.manager.get_bank_by("nordigen_id", b) for b in options["id"]
]
elif "name" in options and options["name"]:
self.__banks = [
self.manager.get_bank_by("name", b) for b in options["name"]
]
else:
self.__banks = None
self.__from = start
self.__to = end
def parse(self) -> Transactions:
transactions = [] transactions = []
if not self.__banks: assert len(self._banks) > 0
raise NoBankSelected
for bank in self.__banks: for bank in self._banks:
print(f"Downloading from {bank}...") print(f"Downloading from {bank}...")
requisition = self.client.requisition.get_requisition_by_id( requisition = self.client.requisition.get_requisition_by_id(
bank.requisition_id bank.nordigen.requisition_id
) )
print(requisition)
for acc in requisition["accounts"]: for acc in requisition["accounts"]:
account = self._client.account_api(acc) account = self._client.account_api(acc)
@ -63,14 +55,14 @@ class NordigenInput(Input):
try: try:
downloaded = account.get_transactions() downloaded = account.get_transactions()
break break
except ReadTimeout: except requests.ReadTimeout:
retries += 1 retries += 1
print(f"Request #{retries} timed-out, retrying in 1s") print(f"Request #{retries} timed-out, retrying in 1s")
sleep(1) time.sleep(1)
except HTTPError as e: except requests.HTTPError as e:
retries += 1 retries += 1
print(f"Request #{retries} failed with {e}, retrying in 1s") print(f"Request #{retries} failed with {e}, retrying in 1s")
sleep(1) time.sleep(1)
if not downloaded: if not downloaded:
print(f"Couldn't download transactions for {account}") print(f"Couldn't download transactions for {account}")
@ -84,44 +76,59 @@ class NordigenInput(Input):
] ]
transactions.extend( transactions.extend(
[t for t in converted if self.__from <= t.date <= self.__to] [t for t in converted if self._start <= t.date <= self._end]
) )
return transactions return sorted(transactions)
def token(self): def token(self):
token = self._client.generate_token() token = self._client.generate_token()
print(f"New access token: {token}") print(f"New access token: {token}")
return token return token
def requisition(self, institution: str, country: str = "PT"): def requisition(self, id: str, country: str = "PT"):
link, _ = self.__requisition_id(institution, country) requisition = self._client.initialize_session(
webbrowser.open(link) redirect_uri=self.redirect_url,
institution_id=id,
reference_id=str(uuid.uuid4()),
)
return requisition.link, requisition.requisition_id
def list(self, country: str): def country_banks(self, country: str):
print(self._client.institution.get_institutions(country)) return self._client.institution.get_institutions(country)
@property @property
def client(self): def client(self):
return self._client return self._client
@property
def banks(self):
return self._banks
@banks.setter
def banks(self, value):
self._banks = value
@property
def start(self):
return self._start
@start.setter
def start(self, value):
self._start = value
@property
def end(self):
return self._end
@end.setter
def end(self, value):
self._end = value
def __token(self): def __token(self):
if token := os.environ.get("TOKEN"): if token := os.environ.get("TOKEN"):
return token return token
else: else:
token = self._client.generate_token() token = self._client.generate_token()
print(f"New access token: {token}") print(f"New access token: {token}")
return token return token["access"]
def __requisition_id(self, i: str, c: str):
id = self._client.institution.get_institution_id_by_name(
country=c, institution=i
)
init = self._client.initialize_session(
redirect_uri="https://murta.dev",
institution_id=id,
reference_id=str(uuid4()),
)
print(f"{i}({c}) link: {init.link} and requisition ID: {init.requisition_id}")
return (init.link, init.requisition_id)

View File

@ -1,10 +1,12 @@
from collections import namedtuple from collections import namedtuple
from decimal import Decimal from decimal import Decimal
from importlib import import_module from importlib import import_module
from pathlib import Path
import datetime as dt import datetime as dt
import yaml import yaml
from pfbudget.common.types import NoBankSelected, Transaction, Transactions from pfbudget.common.types import NoBankSelected
from pfbudget.db.model import Transaction
from pfbudget.utils import utils from pfbudget.utils import utils
Index = namedtuple( Index = namedtuple(
@ -43,7 +45,7 @@ Options = namedtuple(
) )
def parse_data(filename: str, args: dict) -> Transactions: def parse_data(filename: Path, args: dict) -> list[Transaction]:
cfg: dict = yaml.safe_load(open("parsers.yaml")) cfg: dict = yaml.safe_load(open("parsers.yaml"))
assert ( assert (
"Banks" in cfg "Banks" in cfg
@ -84,7 +86,7 @@ def parse_data(filename: str, args: dict) -> Transactions:
class Parser: class Parser:
def __init__(self, filename: str, bank: str, options: dict): def __init__(self, filename: Path, bank: str, options: dict):
self.filename = filename self.filename = filename
self.bank = bank self.bank = bank
@ -157,7 +159,7 @@ class Parser:
category = line[options.category] category = line[options.category]
transaction = Transaction(date, text, bank, value, category) transaction = Transaction(date, text, bank, value, category)
else: else:
transaction = Transaction(date, text, bank, value, options.category) transaction = Transaction(date, text, bank, value)
if options.additional_parser: if options.additional_parser:
func(transaction) func(transaction)

View File

@ -9,7 +9,7 @@ import pfbudget.core.categories
if TYPE_CHECKING: if TYPE_CHECKING:
from pfbudget.db.client import DatabaseClient from pfbudget.db.sqlite import DatabaseClient
groups = pfbudget.core.categories.cfg["Groups"] groups = pfbudget.core.categories.cfg["Groups"]

View File

@ -6,7 +6,7 @@ import datetime as dt
import pfbudget.core.categories import pfbudget.core.categories
if TYPE_CHECKING: if TYPE_CHECKING:
from pfbudget.db.client import DatabaseClient from pfbudget.db.sqlite import DatabaseClient
def net(db: DatabaseClient, start: dt.date = dt.date.min, end: dt.date = dt.date.max): def net(db: DatabaseClient, start: dt.date = dt.date.min, end: dt.date = dt.date.max):

View File

@ -1,2 +0,0 @@
from .converters import convert
from .utils import *

View File

@ -1,71 +1,30 @@
from datetime import timedelta import datetime as dt
from functools import singledispatch import functools
from typing import Any
from pfbudget.common.types import TransactionError
import pfbudget.db.model as t
from pfbudget.common.types import Bank, Transaction, TransactionError
from pfbudget.db.schema import DbBank, DbTransaction
from .utils import parse_decimal from .utils import parse_decimal
@singledispatch @functools.singledispatch
def convert(t): def convert(t) -> Any:
print("No converter as been found") print("No converter has been found")
pass pass
@convert.register @convert.register
def _(t: Transaction) -> DbTransaction: def _(json: dict, bank: t.Bank) -> t.BankTransaction | None:
return DbTransaction( i = -1 if bank.nordigen and bank.nordigen.invert else 1
t.date,
t.description,
t.bank,
t.value,
t.category,
t.original,
t.additional_comment,
)
@convert.register
def _(db: DbTransaction) -> Transaction:
try: try:
return Transaction(db) transaction = t.BankTransaction(
except TransactionError: date=dt.date.fromisoformat(json["bookingDate"]),
print(f"{db} is in the wrong format") description=json["remittanceInformationUnstructured"],
bank=bank.name,
amount=i * parse_decimal(json["transactionAmount"]["amount"]),
@convert.register
def _(db: DbBank, key: str = "") -> Bank:
bank = Bank(db.name, db.bic, db.requisition_id, db.invert, db.offset, key=key)
if not bank.invert:
bank.invert = False
if not bank.offset:
bank.offset = 0
return bank
@convert.register
def _(bank: Bank) -> DbBank:
bank = DbBank(
bank.name, bank.bic, "", "", bank.requisition_id, bank.invert, bank.offset
) )
if not bank.invert: # transaction.date += timedelta(days=bank.offset)
bank.invert = False
if not bank.offset:
bank.offset = 0
return bank
@convert.register
def _(json: dict, bank: Bank) -> Transaction:
i = -1 if bank.invert else 1
try:
transaction = Transaction(
json["bookingDate"],
json["remittanceInformationUnstructured"],
bank.name,
i * parse_decimal(json["transactionAmount"]["amount"]),
)
transaction.date += timedelta(days=bank.offset)
return transaction return transaction
except TransactionError: except TransactionError:

View File

@ -59,21 +59,21 @@ def find_credit_institution(fn, banks, creditcards):
return bank, cc return bank, cc
def parse_args_period(args): def parse_args_period(args: dict):
start, end = date.min, date.max start, end = date.min, date.max
if args.start: if args["start"]:
start = datetime.strptime(args.start[0], "%Y/%m/%d").date() start = datetime.strptime(args["start"][0], "%Y/%m/%d").date()
if args.end: if args["end"]:
end = datetime.strptime(args.end[0], "%Y/%m/%d").date() end = datetime.strptime(args["end"][0], "%Y/%m/%d").date()
if args.interval: if args["interval"]:
start = datetime.strptime(args.interval[0], "%Y/%m/%d").date() start = datetime.strptime(args["interval"][0], "%Y/%m/%d").date()
end = datetime.strptime(args.interval[1], "%Y/%m/%d").date() end = datetime.strptime(args["interval"][1], "%Y/%m/%d").date()
if args.year: if args["year"]:
start = datetime.strptime(args.year[0], "%Y").date() start = datetime.strptime(args["year"][0], "%Y").date()
end = datetime.strptime(str(int(args.year[0]) + 1), "%Y").date() - timedelta( end = datetime.strptime(str(int(args["year"][0]) + 1), "%Y").date() - timedelta(
days=1 days=1
) )

View File

@ -1,5 +1,7 @@
codetiming==1.4.0
matplotlib==3.6.1 matplotlib==3.6.1
nordigen==1.3.0 nordigen==1.3.0
python-dateutil==2.8.2 python-dateutil==2.8.2
python-dotenv==0.21.0 python-dotenv==0.21.0
PyYAML==6.0 PyYAML==6.0
SQLAlchemy==2.0.0rc2