Python backend for a Slack's kudos plugin.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: reset wallet notification

+40 -8
+1
kefi/config/__init__.py
··· 12 12 DATABASE_TEST_SUFFIX: str = "_test" 13 13 SLACK_TEAM_ID: str 14 14 RECHARGE_KEFIS_AMOUNT: int = 550 15 + REDIS_HOST: str = "redis" 15 16 16 17 class Config: 17 18 env_file = ".env"
+11 -1
kefi/models/helpers.py
··· 7 7 from kefi.constants import Command 8 8 from kefi.dependencies import SlashCommandParams 9 9 from kefi.models.database import Action, Transaction, User 10 - from kefi.routers.responses import ActionResponse 10 + from kefi.routers.responses import ActionResponse, ResetResponse 11 11 from kefi.slack import Slack 12 12 13 13 ··· 118 118 amount = settings.RECHARGE_KEFIS_AMOUNT - balance 119 119 transaction = Transaction(amount=amount, user=user) # type: ignore 120 120 session.add(transaction) 121 + 122 + 123 + def notify_reset_wallet(session: Session) -> None: 124 + """Sends notifications after the wallet was reset.""" 125 + amount = settings.RECHARGE_KEFIS_AMOUNT 126 + users = session.exec(select(User)).all() 127 + for user in users: 128 + slack = Slack() 129 + response = ResetResponse(amount=amount) 130 + slack.post_message_user(user.slack_user_id, blocks=response.render()["blocks"]) 121 131 122 132 123 133 def create_default_actions(session: Session) -> None:
+21 -1
kefi/routers/responses.py
··· 138 138 section = Section( 139 139 text=MarkDown( 140 140 text=f"*¡Buenas noticias!* \n Acabamos de ingresar {amount} kefis en tu cartera" 141 - ) 141 + ), 142 + accessory=Image( 143 + image_url="https://storage.staging.dekaside.com/kefi/static/images/kefis_400.png", 144 + alt_text="Kefis", 145 + ), 146 + ) 147 + self.response = Response(text=text, blocks=[section]) 148 + 149 + 150 + class ResetResponse(SlackResponse): 151 + def __init__(self, amount: int) -> None: 152 + super().__init__() 153 + text = f"¡Buenas noticias! Acabamos de ajustar tu cartera para que tengas {amount} kefis disponibles" 154 + section = Section( 155 + text=MarkDown( 156 + text=f"*¡Buenas noticias!*\n Acabamos de ajustar tu cartera para que tengas {amount} kefis disponibles" 157 + ), 158 + accessory=Image( 159 + image_url="https://storage.staging.dekaside.com/kefi/static/images/kefis_400.png", 160 + alt_text="Kefis", 161 + ), 142 162 ) 143 163 self.response = Response(text=text, blocks=[section]) 144 164
+3 -3
kefi/tasks/main.py
··· 2 2 from arq.connections import RedisSettings 3 3 from sqlmodel import Session 4 4 5 - from kefi.config import get_settings 5 + from kefi.config import settings 6 6 from kefi.models.database import engine 7 7 from kefi.tasks.wallets import recharge_wallets_task 8 8 9 9 10 10 async def startup(ctx): 11 11 ctx["session"] = Session(engine) 12 - ctx["settings"] = get_settings() 12 + ctx["settings"] = settings 13 13 14 14 15 15 async def shutdown(ctx): ··· 17 17 18 18 19 19 class WorkerSettings: 20 - redis_settings = RedisSettings(host="redis") 20 + redis_settings = RedisSettings(host=settings.REDIS_HOST) 21 21 cron_jobs = [cron(recharge_wallets_task, day=1)] # type: ignore 22 22 on_startup = startup 23 23 on_shutdown = shutdown
+4 -3
kefi/tasks/wallets.py
··· 1 - from kefi.models.helpers import recharge_wallets 1 + from kefi.models.helpers import notify_reset_wallet, reset_wallets 2 2 3 3 4 4 async def recharge_wallets_task(ctx): 5 - settings = ctx["settings"] 6 - recharge_wallets(settings.RECHARGE_KEFIS_AMOUNT, ctx["session"]) 5 + """Task to reset the wallets of the users, and then, sends a notification.""" 6 + reset_wallets(ctx["session"]) 7 7 ctx["session"].commit() 8 + notify_reset_wallet(ctx["session"])