Python backend for a Slack's kudos plugin.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: added helper to notify calls

+35 -2
+1
kefi/config/__init__.py
··· 16 16 PLAZA_PRICE: int = 10 17 17 PLAZA_DEFAULT_HOUR: int = 10 18 18 PLAZA_DEFAULT_MINUTE: int = 0 19 + PLAZA_SIZE: int = 3 19 20 20 21 class Config: 21 22 env_file = ".env"
+34 -2
kefi/models/plazas/helpers.py
··· 1 1 import datetime 2 + import random 2 3 import uuid 3 4 4 5 import pytz ··· 9 10 from kefi.models.core.helpers import available_balance 10 11 from kefi.models.database import Attendance, Plaza, Transaction, User 11 12 from kefi.models.plazas.exceptions import AlreadyAttending, NotAttending 13 + from kefi.slack import Slack 12 14 13 15 14 - def generate_random_meet() -> str: 15 - return f"http://g.co/meet/kefi-plaza-{str(uuid.uuid4())}" 16 + def generate_random_meet() -> tuple[str, str]: 17 + """Generates a call id and a link to this meet id.""" 18 + meet_id = str(uuid.uuid4()) 19 + return meet_id, f"http://g.co/meet/kefi-plaza-{meet_id}" 16 20 17 21 18 22 def next_plaza_appointment() -> datetime.datetime: ··· 89 93 transaction = transaction_results.first() 90 94 session.delete(transaction) 91 95 session.delete(attendance) 96 + 97 + 98 + def plaza_groups(session: Session, plaza: Plaza | None = None) -> list[list]: 99 + """Generates the groups for the current plaza or the given plaza.""" 100 + current_plaza: Plaza = plaza or get_or_create_current_plaza(session=session) 101 + query = select(Attendance).filter(Attendance.plaza == current_plaza) 102 + results = session.exec(query).all() 103 + users = [attendance.user for attendance in results] 104 + random.shuffle(users) 105 + return [ 106 + users[index : index + settings.PLAZA_SIZE] 107 + for index in range(0, len(users), settings.PLAZA_SIZE) 108 + ] 109 + 110 + 111 + def notify_plaza(session: Session, plaza: Plaza | None = None): 112 + slack = Slack() 113 + current_plaza: Plaza = plaza or get_or_create_current_plaza(session=session) 114 + groups = plaza_groups(session=session, plaza=current_plaza) 115 + for group in groups: 116 + slack_users = [user.slack_user_id for user in group] 117 + meet_id, meet_url = generate_random_meet() 118 + call = slack.create_group_call( 119 + url=meet_url, 120 + users=slack_users, 121 + external_unique_id=meet_id, 122 + ) 123 + slack.notify_call(users=slack_users, call_id=call["id"])