Refactored graph module

Bring graph up-to-date with sqlite3 database and yaml files.
Fix sorted transaction query. Removes date as passed parameters.
Updates graph command in runnable.py.
This commit is contained in:
Luís Murta 2021-07-04 00:53:07 +01:00
parent 206a828b4a
commit 42ab10fd90
Signed by: satprog
GPG Key ID: DDF2EFC6179009DC
3 changed files with 101 additions and 243 deletions

View File

@ -63,13 +63,14 @@ HAVING COUNT(*) > 1
SORTED_TRANSACTIONS = """ SORTED_TRANSACTIONS = """
SELECT * SELECT *
FROM transactions FROM transactions
ORDER BY (?) ORDER BY date ASC
""" """
SELECT_TRANSACTIONS_BETWEEN_DATES = """ SELECT_TRANSACTIONS_BETWEEN_DATES = """
SELECT * SELECT *
FROM transactions FROM transactions
WHERE date BETWEEN (?) AND (?) WHERE date BETWEEN (?) AND (?)
ORDER BY date ASC
""" """
SELECT_TRANSACTIONS_BY_CATEGORY = """ SELECT_TRANSACTIONS_BY_CATEGORY = """
@ -191,9 +192,9 @@ class DBManager:
return [Transaction(t) for t in transactions] return [Transaction(t) for t in transactions]
return None return None
def get_sorted_transactions(self, key: str) -> list[Transaction] | None: def get_sorted_transactions(self) -> list[Transaction] | None:
logger.info(f"Get transactions sorted by {key}") logger.info("Get transactions sorted by date")
transactions = self.__execute(SORTED_TRANSACTIONS, key) transactions = self.__execute(SORTED_TRANSACTIONS)
if transactions: if transactions:
return [Transaction(t) for t in transactions] return [Transaction(t) for t in transactions]
return None return None

View File

@ -1,242 +1,98 @@
from __future__ import annotations
from calendar import monthrange
from dateutil.rrule import rrule, MONTHLY
from typing import TYPE_CHECKING
import datetime as dt
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from .categories import ( import pfbudget.categories as categories
get_income_categories,
get_fixed_expenses, if TYPE_CHECKING:
get_required_expenses, from pfbudget.database import DBManager
get_health_expenses,
get_discretionary_expenses,
)
from .transactions import load_transactions, daterange, by_month_and_category
def monthly(state, start, end): def monthly(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date.max):
transactions = load_transactions(state.data_dir) transactions = db.get_daterange(start, end)
if not start: start, end = transactions[0].date, transactions[-1].date
start = transactions[0].date monthly_transactions = tuple(
if not end: (
end = transactions[-1].date month,
{
income, fixed, required, health, discretionary = [], [], [], [], [] group: sum(
monthly_transactions_by_categories = by_month_and_category(transactions, start, end) transaction.value
for transaction in transactions
for _, transactions_by_category in monthly_transactions_by_categories.items(): if transaction.category in categories
income.append( and month
sum( <= transaction.date
float(t.value) <= month
for category, transactions in transactions_by_category.items() + dt.timedelta(days=monthrange(month.year, month.month)[1] - 1)
if transactions and category in get_income_categories() )
for t in transactions for group, categories in categories.groups.items()
) },
) )
fixed.append( for month in [
sum( month.date()
-float(t.value) for month in rrule(
for category, transactions in transactions_by_category.items() MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1)
if transactions and category in get_fixed_expenses()
for t in transactions
) )
) ]
required.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_required_expenses()
for t in transactions
)
)
health.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_health_expenses()
for t in transactions
)
)
discretionary.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_discretionary_expenses()
for t in transactions
)
)
plt.figure(figsize=(30, 10))
plt.plot(daterange(start, end, "month"), income, label="Income")
plt.stackplot(
daterange(start, end, "month"),
fixed,
required,
health,
discretionary,
labels=["Fixed", "Required", "Health", "Discretionary"],
) )
plt.legend(loc="upper left")
plt.tight_layout()
plt.savefig("graph.png")
def discrete(state, start, end):
transactions = load_transactions(state.data_dir)
if not start:
start = transactions[0].date
if not end:
end = transactions[-1].date
income, fixed, required, health, discretionary = [], [], [], [], []
monthly_transactions_by_categories = by_month_and_category(transactions, start, end)
for _, transactions_by_category in monthly_transactions_by_categories.items():
income.append(
sum(
float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_income_categories()
for t in transactions
)
)
fixed.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_fixed_expenses()
for t in transactions
)
)
required.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_required_expenses()
for t in transactions
)
)
health.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_health_expenses()
for t in transactions
)
)
d = []
for category, transactions in transactions_by_category.items():
if category in get_discretionary_expenses():
try:
d.append(sum(-float(t.value) for t in transactions))
except TypeError:
d.append(0)
discretionary.append(d)
# transposing discretionary
discretionary = list(map(list, zip(*discretionary)))
plt.figure(figsize=(30, 10)) plt.figure(figsize=(30, 10))
plt.plot(daterange(start, end, "month"), income, label="Income") plt.plot(
plt.stackplot( list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))),
daterange(start, end, "month"), [groups["income"] for _, groups in monthly_transactions],
fixed,
required,
health,
*discretionary,
labels=["Fixed", "Required", "Health", *get_discretionary_expenses()],
) )
plt.legend(loc="upper left")
plt.grid()
plt.tight_layout()
plt.savefig("graph.png")
def average(state, start, end):
transactions = load_transactions(state.data_dir)
income, fixed, required, health, discretionary = [], [], [], [], []
monthly_transactions_by_categories = by_month_and_category(transactions, start, end)
for _, transactions_by_category in monthly_transactions_by_categories.items():
income.append(
sum(
float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_income_categories()
for t in transactions
)
)
fixed.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_fixed_expenses()
for t in transactions
)
)
required.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_required_expenses()
for t in transactions
)
)
health.append(
sum(
-float(t.value)
for category, transactions in transactions_by_category.items()
if transactions and category in get_health_expenses()
for t in transactions
)
)
d = []
for category, transactions in transactions_by_category.items():
if category in get_discretionary_expenses():
try:
d.append(sum(-float(t.value) for t in transactions))
except TypeError:
d.append(0)
discretionary.append(d)
# transposing discretionary
discretionary = list(map(list, zip(*discretionary)))
print(discretionary)
n = len(daterange(start, end, "month"))
avg_income = sum(income) / n
l_avg_income = [avg_income] * n
avg_fixed = [sum(fixed) / n] * n
avg_required = [sum(required) / n] * n
avg_health = [sum(health) / n] * n
avg_discretionary = [[sum(d) / n] * n for d in discretionary]
print(avg_discretionary)
plt.figure(figsize=(30, 10))
plt.plot(daterange(start, end, "month"), l_avg_income, label=f"Income {avg_income}")
plt.stackplot( plt.stackplot(
daterange(start, end, "month"), list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))),
avg_fixed, [
avg_required, [-groups[group] for _, groups in monthly_transactions]
avg_health, for group in categories.groups.keys()
*avg_discretionary, if group != "income"
labels=[
f"Fixed {avg_fixed[0]/avg_income * 100}%",
f"Required {avg_required[0]/avg_income * 100}%",
f"Health {avg_health[0]/avg_income * 100}%",
*[
f"{e} {avg_discretionary[i][0]/avg_income * 100}%"
for i, e in enumerate(get_discretionary_expenses())
],
], ],
labels=[group for group in categories.groups.keys() if group != "income"],
) )
plt.legend(bbox_to_anchor=(1, 1), loc="upper left") plt.legend(loc="upper left")
plt.tight_layout() plt.tight_layout()
plt.savefig( plt.savefig("graph.png")
"graph.png",
dpi=600,
def discrete(db: DBManager, start: dt.date = dt.date.min, end: dt.date = dt.date.max):
transactions = db.get_daterange(start, end)
start, end = transactions[0].date, transactions[-1].date
monthly_transactions = tuple(
(
month,
{
category: sum(
transaction.value
for transaction in transactions
if transaction.category == category
and month
<= transaction.date
<= month
+ dt.timedelta(days=monthrange(month.year, month.month)[1] - 1)
)
for category in categories.categories.keys()
},
)
for month in [
month.date()
for month in rrule(
MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1)
)
]
) )
plt.figure(figsize=(30, 10))
plt.stackplot(
list(rrule(MONTHLY, dtstart=start.replace(day=1), until=end.replace(day=1))),
[
[-categories[category] for _, categories in monthly_transactions]
for category in categories.categories.keys()
],
labels=[category for category in categories.categories.keys()],
)
plt.legend(loc="upper left")
plt.tight_layout()
plt.savefig("graph.png")

View File

@ -67,10 +67,10 @@ def argparser() -> argparse.ArgumentParser:
func=lambda args: categorize_data(DBManager(args.database)) func=lambda args: categorize_data(DBManager(args.database))
) )
p_graph = subparsers.add_parser("graph", help="graph help") """
p_report = subparsers.add_parser("report", help="report help") Graph
p_status = subparsers.add_parser("status", help="status help") """
p_graph = subparsers.add_parser("graph", parents=[help])
p_graph.add_argument( p_graph.add_argument(
"option", "option",
type=str, type=str,
@ -86,9 +86,12 @@ def argparser() -> argparse.ArgumentParser:
p_graph_interval.add_argument("--start", type=str, nargs=1, help="graph start date") p_graph_interval.add_argument("--start", type=str, nargs=1, help="graph start date")
p_graph_interval.add_argument("--end", type=str, nargs=1, help="graph end date") p_graph_interval.add_argument("--end", type=str, nargs=1, help="graph end date")
p_graph_interval.add_argument("--year", type=str, nargs=1, help="graph year") p_graph_interval.add_argument("--year", type=str, nargs=1, help="graph year")
p_graph.set_defaults(func=graph)
p_report = subparsers.add_parser("report", help="report help")
p_status = subparsers.add_parser("status", help="status help")
p_status.set_defaults(func=status) p_status.set_defaults(func=status)
p_graph.set_defaults(func=graph)
p_report.set_defaults(func=f_report) p_report.set_defaults(func=f_report)
return parser return parser
@ -123,16 +126,14 @@ def status(state, args):
print(state) print(state)
def graph(state, args): def graph(args):
"""Graph """Plots the transactions over a period of time.
Plots the transactions over a period of time.
Args: Args:
state (PFState): Internal state of the program state (PFState): Internal state of the program
args (dict): argparse variables args (dict): argparse variables
""" """
start, end = None, None start, end = dt.date.min, dt.date.max
if args.start or args.interval: if args.start or args.interval:
start = dt.datetime.strptime(args.start[0], "%Y/%m/%d").date() start = dt.datetime.strptime(args.start[0], "%Y/%m/%d").date()
@ -150,9 +151,9 @@ def graph(state, args):
).date() - dt.timedelta(days=1) ).date() - dt.timedelta(days=1)
if args.option == "monthly": if args.option == "monthly":
monthly(state, start, end) monthly(DBManager(args.database), start, end)
elif args.option == "discrete": elif args.option == "discrete":
discrete(state, start, end) discrete(DBManager(args.database), start, end)
def f_report(state, args): def f_report(state, args):