Source code for discord_http.context

import inspect
import logging

from typing import TYPE_CHECKING, Callable, Union, Optional, Any, Self
from datetime import datetime, timedelta

from . import utils
from .channel import (
    TextChannel, DMChannel, VoiceChannel,
    GroupDMChannel, CategoryChannel, NewsThread,
    PublicThread, PrivateThread, StageChannel,
    DirectoryChannel, ForumChannel, StoreChannel,
    NewsChannel, BaseChannel
)
from .cooldowns import Cooldown
from .embeds import Embed
from .entitlements import Entitlements
from .enums import (
    ApplicationCommandType, CommandOptionType,
    ResponseType, ChannelType, InteractionType
)
from .file import File
from .flag import Permissions
from .guild import PartialGuild
from .member import Member
from .mentions import AllowedMentions
from .message import Message, Attachment, Poll
from .response import (
    MessageResponse, DeferResponse,
    AutocompleteResponse, ModalResponse
)
from .role import Role
from .user import User
from .view import View, Modal
from .webhook import Webhook

if TYPE_CHECKING:
    from .client import Client
    from .commands import Command

_log = logging.getLogger(__name__)

MISSING = utils.MISSING

channel_types = {
    int(ChannelType.guild_text): TextChannel,
    int(ChannelType.dm): DMChannel,
    int(ChannelType.guild_voice): VoiceChannel,
    int(ChannelType.group_dm): GroupDMChannel,
    int(ChannelType.guild_category): CategoryChannel,
    int(ChannelType.guild_news): NewsChannel,
    int(ChannelType.guild_store): StoreChannel,
    int(ChannelType.guild_news_thread): NewsThread,
    int(ChannelType.guild_public_thread): PublicThread,
    int(ChannelType.guild_private_thread): PrivateThread,
    int(ChannelType.guild_stage_voice): StageChannel,
    int(ChannelType.guild_directory): DirectoryChannel,
    int(ChannelType.guild_forum): ForumChannel,
}

__all__ = (
    "Context",
    "InteractionResponse",
)


class SelectValues:
    def __init__(self, ctx: "Context", data: dict):
        self._parsed_data = {
            "members": [], "users": [],
            "channels": [], "roles": [],
            "strings": [],
        }

        self._from_data(ctx, data)

    def _from_data(self, ctx: "Context", data: dict):
        self._parsed_data["strings"] = data.get("data", {}).get("values", [])

        _resolved = data.get("data", {}).get("resolved", {})
        data_to_resolve = ["members", "users", "channels", "roles"]

        for key in data_to_resolve:
            self._parse_resolved(ctx, key, _resolved)

    @classmethod
    def none(cls, ctx: "Context") -> Self:
        """ `SelectValues`: with no values """
        return cls(ctx, {})

    @property
    def members(self) -> list[Member]:
        """ `List[Member]`: of members selected """
        return self._parsed_data["members"]

    @property
    def users(self) -> list[User]:
        """ `List[User]`: of users selected """
        return self._parsed_data["users"]

    @property
    def channels(self) -> list[BaseChannel]:
        """ `List[BaseChannel]`: of channels selected """
        return self._parsed_data["channels"]

    @property
    def roles(self) -> list[Role]:
        """ `List[Role]`: of roles selected """
        return self._parsed_data["roles"]

    @property
    def strings(self) -> list[str]:
        """ `List[str]`: of strings selected """
        return self._parsed_data["strings"]

    def is_empty(self) -> bool:
        """ `bool`: Whether no values were selected """
        return not any(self._parsed_data.values())

    def _parse_resolved(self, ctx: "Context", key: str, data: dict):
        if not data.get(key, {}):
            return None

        for g in data[key]:
            if key == "members":
                data["members"][g]["user"] = data["users"][g]

            to_append: list = self._parsed_data[key]
            _data = data[key][g]

            match key:
                case "members":
                    if not ctx.guild:
                        raise ValueError("While parsing members, guild object was not available")
                    to_append.append(Member(state=ctx.bot.state, guild=ctx.guild, data=_data))

                case "users":
                    to_append.append(User(state=ctx.bot.state, data=_data))

                case "channels":
                    to_append.append(channel_types[g["type"]](state=ctx.bot.state, data=_data))

                case "roles":
                    if not ctx.guild:
                        raise ValueError("While parsing roles, guild object was not available")
                    to_append.append(Role(state=ctx.bot.state, guild=ctx.guild, data=_data))

                case _:
                    pass


[docs] class InteractionResponse: def __init__(self, parent: "Context"): self._parent = parent
[docs] def pong(self) -> dict: """ Only used to acknowledge a ping from Discord Developer portal Interaction URL """ return {"type": 1}
[docs] def defer( self, ephemeral: bool = False, thinking: bool = False, call_after: Optional[Callable] = None ) -> DeferResponse: """ Defer the response to the interaction Parameters ---------- ephemeral: `bool` If the response should be ephemeral (show only to the user) thinking: `bool` If the response should show the "thinking" status call_after: `Optional[Callable]` A coroutine to run after the response is sent Returns ------- `DeferResponse` The response to the interaction Raises ------ `TypeError` If `call_after` is not a coroutine """ if call_after: if not inspect.iscoroutinefunction(call_after): raise TypeError("call_after must be a coroutine") self._parent.bot.loop.create_task( self._parent._background_task_manager(call_after) ) return DeferResponse(ephemeral=ephemeral, thinking=thinking)
[docs] def send_modal( self, modal: Modal, *, call_after: Optional[Callable] = None ) -> ModalResponse: """ Send a modal to the interaction Parameters ---------- modal: `Modal` The modal to send call_after: `Optional[Callable]` A coroutine to run after the response is sent Returns ------- `ModalResponse` The response to the interaction Raises ------ `TypeError` - If `modal` is not a `Modal` instance - If `call_after` is not a coroutine """ if not isinstance(modal, Modal): raise TypeError("modal must be a Modal instance") if call_after: if not inspect.iscoroutinefunction(call_after): raise TypeError("call_after must be a coroutine") self._parent.bot.loop.create_task( self._parent._background_task_manager(call_after) ) return ModalResponse(modal=modal)
[docs] def send_message( self, content: Optional[str] = MISSING, *, embed: Optional[Embed] = MISSING, embeds: Optional[list[Embed]] = MISSING, file: Optional[File] = MISSING, files: Optional[list[File]] = MISSING, ephemeral: Optional[bool] = False, view: Optional[View] = MISSING, tts: Optional[bool] = False, type: Union[ResponseType, int] = 4, allowed_mentions: Optional[AllowedMentions] = MISSING, poll: Optional[Poll] = MISSING, call_after: Optional[Callable] = None ) -> MessageResponse: """ Send a message to the interaction Parameters ---------- content: `Optional[str]` Content of the message embed: `Optional[Embed]` The embed to send embeds: `Optional[list[Embed]]` Multiple embeds to send file: `Optional[File]` A file to send files: `Optional[Union[list[File], File]]` Multiple files to send ephemeral: `bool` If the message should be ephemeral (show only to the user) view: `Optional[View]` Components to include in the message tts: `bool` Whether the message should be sent using text-to-speech type: `Optional[ResponseType]` The type of response to send allowed_mentions: `Optional[AllowedMentions]` Allowed mentions for the message call_after: `Optional[Callable]` A coroutine to run after the response is sent Returns ------- `MessageResponse` The response to the interaction Raises ------ `ValueError` - If both `embed` and `embeds` are passed - If both `file` and `files` are passed `TypeError` If `call_after` is not a coroutine """ if call_after: if not inspect.iscoroutinefunction(call_after): raise TypeError("call_after must be a coroutine") self._parent.bot.loop.create_task( self._parent._background_task_manager(call_after) ) if embed is not MISSING and embeds is not MISSING: raise ValueError("Cannot pass both embed and embeds") if file is not MISSING and files is not MISSING: raise ValueError("Cannot pass both file and files") if isinstance(embed, Embed): embeds = [embed] if isinstance(file, File): files = [file] return MessageResponse( content=content, embeds=embeds, ephemeral=ephemeral, view=view, tts=tts, attachments=files, type=type, poll=poll, allowed_mentions=( allowed_mentions or self._parent.bot._default_allowed_mentions ) )
[docs] def edit_message( self, *, content: Optional[str] = MISSING, embed: Optional[Embed] = MISSING, embeds: Optional[list[Embed]] = MISSING, view: Optional[View] = MISSING, attachment: Optional[File] = MISSING, attachments: Optional[list[File]] = MISSING, allowed_mentions: Optional[AllowedMentions] = MISSING, call_after: Optional[Callable] = None ) -> MessageResponse: """ Edit the original message of the interaction Parameters ---------- content: `Optional[str]` Content of the message embed: `Optional[Embed]` Embed to edit the message with embeds: `Optional[list[Embed]]` Multiple embeds to edit the message with view: `Optional[View]` Components to include in the message attachment: `Optional[File]` New file to edit the message with attachments: `Optional[Union[list[File], File]]` Multiple new files to edit the message with allowed_mentions: `Optional[AllowedMentions]` Allowed mentions for the message call_after: `Optional[Callable]` A coroutine to run after the response is sent Returns ------- `MessageResponse` The response to the interaction Raises ------ `ValueError` - If both `embed` and `embeds` are passed - If both `attachment` and `attachments` are passed `TypeError` If `call_after` is not a coroutine """ if call_after: if not inspect.iscoroutinefunction(call_after): raise TypeError("call_after must be a coroutine") self._parent.bot.loop.create_task( self._parent._background_task_manager(call_after) ) if embed is not MISSING and embeds is not MISSING: raise ValueError("Cannot pass both embed and embeds") if attachment is not MISSING and attachments is not MISSING: raise ValueError("Cannot pass both attachment and attachments") if isinstance(embed, Embed): embeds = [embed] if isinstance(attachment, File): attachments = [attachment] return MessageResponse( content=content, embeds=embeds, attachments=attachments, view=view, type=int(ResponseType.update_message), allowed_mentions=( allowed_mentions or self._parent.bot._default_allowed_mentions ) )
[docs] def send_autocomplete( self, choices: dict[Any, str] ) -> AutocompleteResponse: """ Send an autocomplete response to the interaction Parameters ---------- choices: `dict[Union[str, int, float], str]` The choices to send Returns ------- `AutocompleteResponse` The response to the interaction Raises ------ `TypeError` - If `choices` is not a `dict` - If `choices` is not a `dict[Union[str, int, float], str]` """ if not isinstance(choices, dict): raise TypeError("choices must be a dict") for k, v in choices.items(): if ( not isinstance(k, str) and not isinstance(k, int) and not isinstance(k, float) ): raise TypeError( f"key {k} must be a string, got {type(k)}" ) if (isinstance(k, int) or isinstance(k, float)) and k >= 2**53: _log.warn( f"'{k}: {v}' (int) is too large, " "Discord might ignore it and make autocomplete fail" ) if not isinstance(v, str): raise TypeError( f"value {v} must be a string, got {type(v)}" ) return AutocompleteResponse(choices)
[docs] class Context: def __init__( self, bot: "Client", data: dict ): self.bot = bot self.id: int = int(data["id"]) self.type: InteractionType = InteractionType(data["type"]) self.command_type: ApplicationCommandType = ApplicationCommandType( data.get("data", {}).get("type", ApplicationCommandType.chat_input) ) # Arguments that gets parsed on runtime self.command: Optional["Command"] = None self.app_permissions: Permissions = Permissions(int(data.get("app_permissions", 0))) self.custom_id: Optional[str] = data.get("data", {}).get("custom_id", None) self.select_values: SelectValues = SelectValues.none(self) self.modal_values: dict[str, str] = {} self.options: list[dict] = data.get("data", {}).get("options", []) self.followup_token: str = data.get("token", None) self._original_response: Optional[Message] = None self._resolved: dict = data.get("data", {}).get("resolved", {}) self.entitlements: list[Entitlements] = [ Entitlements(state=self.bot.state, data=g) for g in data.get("entitlements", []) ] # Should not be used, but if you *really* want the raw data, here it is self._data: dict = data self._from_data(data) def _from_data(self, data: dict): self.channel_id: Optional[int] = None if data.get("channel_id", None): self.channel_id = int(data["channel_id"]) self.channel: Optional[BaseChannel] = None if data.get("channel", None): self.channel = channel_types[data["channel"]["type"]]( state=self.bot.state, data=data["channel"] ) self.guild: Optional[PartialGuild] = None if data.get("guild_id", None): self.guild = PartialGuild( state=self.bot.state, id=int(data["guild_id"]) ) self.message: Optional[Message] = None if data.get("message", None): self.message = Message( state=self.bot.state, data=data["message"], guild=self.guild ) elif self._resolved.get("messages", {}): _first_msg = next(iter(self._resolved["messages"].values()), None) if _first_msg: self.message = Message( state=self.bot.state, data=_first_msg, guild=self.guild ) self.author: Optional[Union[Member, User]] = None if self.message is not None: self.author = self.message.author self.user: Union[Member, User] = self._parse_user(data) match self.type: case InteractionType.message_component: self.select_values = SelectValues(self, data) case InteractionType.modal_submit: for comp in data["data"]["components"]: ans = comp["components"][0] self.modal_values[ans["custom_id"]] = ans["value"] async def _background_task_manager(self, call_after: Callable) -> None: try: await call_after() except Exception as e: if self.bot.has_any_dispatch("interaction_error"): self.bot.dispatch("interaction_error", self, e) else: _log.error( f"Error while running call_after:{call_after}", exc_info=e ) @property def created_at(self) -> datetime: """ `datetime` Returns the time the interaction was created """ return utils.snowflake_time(self.id) @property def cooldown(self) -> Optional[Cooldown]: """ `Optional[Cooldown]` Returns the context cooldown """ _cooldown = self.command.cooldown if _cooldown is None: return None return _cooldown.get_bucket( self, self.created_at.timestamp() ) @property def expires_at(self) -> datetime: """ `datetime` Returns the time the interaction expires """ return self.created_at + timedelta(minutes=15)
[docs] def is_expired(self) -> bool: """ `bool` Returns whether the interaction is expired """ return utils.utcnow() >= self.expires_at
@property def response(self) -> InteractionResponse: """ `InteractionResponse` Returns the response to the interaction """ return InteractionResponse(self) @property def followup(self) -> Webhook: """ `Webhook` Returns the followup webhook object """ payload = { "application_id": self.bot.application_id, "token": self.followup_token, "type": 3, } return Webhook.from_state( state=self.bot.state, data=payload )
[docs] async def original_response(self) -> Message: """ `Message` Returns the original response to the interaction """ if self._original_response is not None: return self._original_response r = await self.bot.state.query( "GET", f"/webhooks/{self.bot.application_id}/{self.followup_token}/messages/@original" ) msg = Message( state=self.bot.state, data=r.response, guild=self.guild ) self._original_response = msg return msg
[docs] async def edit_original_response( self, *, content: Optional[str] = MISSING, embed: Optional[Embed] = MISSING, embeds: Optional[list[Embed]] = MISSING, view: Optional[View] = MISSING, attachment: Optional[File] = MISSING, attachments: Optional[list[File]] = MISSING, allowed_mentions: Optional[AllowedMentions] = MISSING ) -> Message: """ `Message` Edit the original response to the interaction """ _msg_kwargs = MessageResponse( content=content, embeds=embeds, embed=embed, attachment=attachment, attachments=attachments, view=view, allowed_mentions=allowed_mentions ) r = await self.bot.state.query( "PATCH", f"/webhooks/{self.bot.application_id}/{self.followup_token}/messages/@original", json=_msg_kwargs.to_dict()["data"] ) msg = Message( state=self.bot.state, data=r.response, guild=self.guild ) self._original_response = msg return msg
[docs] async def delete_original_response(self) -> None: """ Delete the original response to the interaction """ await self.bot.state.query( "DELETE", f"/webhooks/{self.bot.application_id}/{self.followup_token}/messages/@original" )
def _create_args(self) -> tuple[list[Union[Member, User, Message, None]], dict]: match self.command_type: case ApplicationCommandType.chat_input: return [], self._create_args_chat_input() case ApplicationCommandType.user: if self._resolved.get("members", {}): _first: Optional[dict] = next( iter(self._resolved["members"].values()), None ) if not _first: raise ValueError("User command detected members, but was unable to parse it") if not self.guild: raise ValueError("While parsing members, guild was not available") _first["user"] = next( iter(self._resolved["users"].values()), None ) _target = Member( state=self.bot.state, guild=self.guild, data=_first ) elif self._resolved.get("users", {}): _first: Optional[dict] = next( iter(self._resolved["users"].values()), None ) if not _first: raise ValueError("User command detected users, but was unable to parse it") _target = User(state=self.bot.state, data=_first) else: raise ValueError("Neither members nor users were detected while parsing user command") return [_target], {} case ApplicationCommandType.message: return [self.message], {} case _: raise ValueError("Unknown command type") def _create_args_chat_input(self) -> dict: def _create_args_recursive(data, resolved) -> dict: if not data.get("options"): return {} kwargs = {} for option in data["options"]: match option["type"]: case x if x in ( CommandOptionType.sub_command, CommandOptionType.sub_command_group ): sub_kwargs = _create_args_recursive(option, resolved) kwargs.update(sub_kwargs) case CommandOptionType.user: if "members" in resolved: member_data = resolved["members"][option["value"]] member_data["user"] = resolved["users"][option["value"]] if not self.guild: raise ValueError("Guild somehow was not available while parsing Member") kwargs[option["name"]] = Member( state=self.bot.state, guild=self.guild, data=member_data ) else: kwargs[option["name"]] = User( state=self.bot.state, data=resolved["users"][option["value"]] ) case CommandOptionType.channel: type_id = resolved["channels"][option["value"]]["type"] kwargs[option["name"]] = channel_types[type_id]( state=self.bot.state, data=resolved["channels"][option["value"]] ) case CommandOptionType.attachment: kwargs[option["name"]] = Attachment( state=self.bot.state, data=resolved["attachments"][option["value"]] ) case CommandOptionType.role: if not self.guild: raise ValueError("Guild somehow was not available while parsing Role") kwargs[option["name"]] = Role( state=self.bot.state, guild=self.guild, data=resolved["roles"][option["value"]] ) case CommandOptionType.string: kwargs[option["name"]] = option["value"] case CommandOptionType.integer: kwargs[option["name"]] = int(option["value"]) case CommandOptionType.number: kwargs[option["name"]] = float(option["value"]) case CommandOptionType.boolean: kwargs[option["name"]] = bool(option["value"]) case _: kwargs[option["name"]] = option["value"] return kwargs return _create_args_recursive( {"options": self.options}, self._resolved ) def _parse_user(self, data: dict) -> Union[Member, User]: if data.get("member", None): return Member( state=self.bot.state, guild=self.guild, # type: ignore data=data["member"] ) elif data.get("user", None): return User( state=self.bot.state, data=data["user"] ) else: raise ValueError( "Neither member nor user was detected while parsing user" )