Source code for discord_http.channel

import asyncio
import time

from datetime import datetime, timedelta
from typing import (
    Union, TYPE_CHECKING, Optional, AsyncIterator,
    Callable, Self, Generator, overload
)

from . import utils
from .embeds import Embed
from .emoji import EmojiParser
from .errors import NotFound
from .enums import (
    ChannelType, ResponseType, VideoQualityType,
    SortOrderType, ForumLayoutType, PrivacyLevelType
)
from .file import File
from .flags import PermissionOverwrite, ChannelFlags, Permissions
from .mentions import AllowedMentions
from .multipart import MultipartData
from .object import PartialBase, Snowflake
from .response import MessageResponse
from .view import View
from .webhook import Webhook

if TYPE_CHECKING:
    from .guild import Guild, PartialGuild, PartialScheduledEvent
    from .http import DiscordAPI
    from .invite import Invite
    from .member import Member
    from .member import ThreadMember
    from .message import PartialMessage, Message, Poll
    from .types import channels
    from .user import PartialUser, User

MISSING = utils.MISSING

__all__ = (
    "BaseChannel",
    "CategoryChannel",
    "DMChannel",
    "DirectoryChannel",
    "ForumChannel",
    "ForumTag",
    "ForumThread",
    "GroupDMChannel",
    "NewsChannel",
    "NewsThread",
    "PartialChannel",
    "PartialThread",
    "PrivateThread",
    "PublicThread",
    "StageChannel",
    "StoreChannel",
    "TextChannel",
    "Thread",
    "VoiceChannel",
    "VoiceRegion",
)


def _typing_done_callback(f: asyncio.Future):
    try:
        f.exception()
    except (asyncio.CancelledError, Exception):
        pass


class Typing:
    def __init__(self, *, state: "DiscordAPI", channel: "PartialChannel"):
        self._state = state

        self.loop = state.bot.loop
        self.channel = channel

    def __await__(self) -> Generator[None, None, None]:
        return self._send_typing().__await__()

    async def __aenter__(self) -> None:
        await self._send_typing()
        self.task = self.loop.create_task(self.do_typing_loop())
        self.task.add_done_callback(_typing_done_callback)

    async def __aexit__(self, exc_type, exc, traceback) -> None:
        self.task.cancel()

    async def _send_typing(self) -> None:
        await self._state.query(
            "POST",
            f"/channels/{self.channel.id}/typing",
            res_method="text"
        )

    async def do_typing_loop(self) -> None:
        while True:
            await asyncio.sleep(5)
            await self._send_typing()


[docs] class PartialChannel(PartialBase): def __init__( self, *, state: "DiscordAPI", id: int, guild_id: Optional[int] = None ): super().__init__(id=int(id)) self._state = state self.guild_id: Optional[int] = int(guild_id) if guild_id else None self.parent_id: Optional[int] = None self._raw_type: ChannelType = ChannelType.unknown def __repr__(self) -> str: return f"<PartialChannel id={self.id}>" @property def mention(self) -> str: """ `str`: The channel's mention """ return f"<#{self.id}>" @property def guild(self) -> "Guild | PartialGuild | None": """ `Optional[Guild | PartialGuild]`: The guild the channel belongs to (if available). If you are using gateway cache, it can return full object too """ if not self.guild_id: return None cache = self._state.cache.get_guild(self.guild_id) if cache: return cache from .guild import PartialGuild return PartialGuild(state=self._state, id=self.guild_id) @property def channel(self) -> "BaseChannel | CategoryChannel | PartialChannel | None": """ `BaseChannel | CategoryChannel | PartialChannel | None`: Returns the channel the thread is in. Only returns a full object if cache is enabled for guild and channel. """ if self.guild_id: cache = self._state.cache.get_channel_thread( guild_id=self.guild_id, channel_id=self.id ) if cache: return cache return PartialChannel( state=self._state, id=self.id, guild_id=self.guild_id ) @property def parent(self) -> "BaseChannel | CategoryChannel | PartialChannel | None": """ `BaseChannel | CategoryChannel | PartialChannel | None`: Returns the parent channel of the thread or the parent category of the channel. Only returns a full object if cache is enabled for guild and channel. """ if not self.parent_id: return None if self.guild_id: cache = self._state.cache.get_channel_thread( guild_id=self.guild_id, channel_id=self.parent_id ) if cache: return cache return PartialChannel( state=self._state, id=self.parent_id, guild_id=self.guild_id )
[docs] def permissions_for(self, member: "Member") -> Permissions: """ Returns the permissions for a member in the channel. However since this is Partial, it will always return Permissions.none() Parameters ---------- member: `Member` The member to get the permissions for. Returns ------- `Permissions` The permissions for the member in the channel. """ return Permissions.none()
@property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return self._raw_type
[docs] def get_partial_message(self, message_id: int) -> "PartialMessage": """ Get a partial message object from the channel Parameters ---------- message_id: `int` The message ID to get the partial message from Returns ------- `PartialMessage` The partial message object """ from .message import PartialMessage return PartialMessage( state=self._state, channel_id=self.id, guild_id=self.guild_id, id=message_id, )
[docs] async def fetch_message(self, message_id: int) -> "Message": """ Fetch a message from the channel Parameters ---------- message_id: `int` The message ID to fetch Returns ------- `Message` The message object """ r = await self._state.query( "GET", f"/channels/{self.id}/messages/{message_id}" ) from .message import Message return Message( state=self._state, data=r.response, guild=self.guild )
[docs] async def fetch_pins(self) -> list["Message"]: """ Fetch all pinned messages for the channel in question Returns ------- `list[Message]` The list of pinned messages """ r = await self._state.query( "GET", f"/channels/{self.id}/pins" ) from .message import Message return [ Message( state=self._state, data=data, guild=self.guild ) for data in r.response ]
[docs] async def follow_announcement_channel( self, source_channel_id: Union[Snowflake, int] ) -> None: """ Follow an announcement channel to send messages to the webhook Parameters ---------- source_channel_id: `int` The channel ID to follow """ await self._state.query( "POST", f"/channels/{source_channel_id}/followers", json={"webhook_channel_id": self.id}, res_method="text" )
[docs] async def fetch_archived_public_threads(self) -> list["PublicThread"]: """ Fetch all archived public threads Returns ------- `list[PublicThread]` The list of public threads """ r = await self._state.query( "GET", f"/channels/{self.id}/threads/archived/public" ) from .channel import PublicThread return [ PublicThread( state=self._state, data=data ) for data in r.response ]
[docs] async def fetch_archived_private_threads( self, *, client: bool = False ) -> list["PrivateThread"]: """ Fetch all archived private threads Parameters ---------- client: `bool` If it should fetch only where the client is a member of the thread Returns ------- `list[PrivateThread]` The list of private threads """ path = f"/channels/{self.id}/threads/archived/private" if client: path = f"/channels/{self.id}/users/@me/threads/archived/private" r = await self._state.query("GET", path) from .channel import PrivateThread return [ PrivateThread( state=self._state, data=data ) for data in r.response ]
[docs] async def create_invite( self, *, max_age: Union[timedelta, int] = 86400, # 24 hours max_uses: Optional[int] = 0, temporary: bool = False, unique: bool = False, ) -> "Invite": """ Create an invite for the channel Parameters ---------- max_age: `Union[timedelta, int]` How long the invite should last temporary: `bool` If the invite should be temporary unique: `bool` If the invite should be unique Returns ------- `Invite` The invite object """ if isinstance(max_age, timedelta): max_age = int(max_age.total_seconds()) r = await self._state.query( "POST", f"/channels/{self.id}/invites", json={ "max_age": max_age, "max_uses": max_uses, "temporary": temporary, "unique": unique } ) from .invite import Invite return Invite( state=self._state, data=r.response )
[docs] async def send( self, content: Optional[str] = MISSING, *, embed: Optional[Embed] = MISSING, embeds: Optional[list[Embed]] = MISSING, file: Optional[File] = MISSING, files: Optional[list[File]] = MISSING, view: Optional[View] = MISSING, tts: Optional[bool] = False, type: Union[ResponseType, int] = 4, poll: Optional["Poll"] = MISSING, allowed_mentions: Optional[AllowedMentions] = MISSING, delete_after: Optional[float] = None ) -> "Message": """ Send a message to the channel Parameters ---------- content: `Optional[str]` Cotnent of the message embed: `Optional[Embed]` Includes an embed object embeds: `Optional[list[Embed]]` List of embed objects file: `Optional[File]` A file object files: `Union[list[File], File]` A list of file objects view: `View` Send components to the message tts: `bool` If the message should be sent as a TTS message type: `Optional[ResponseType]` The type of response to the message allowed_mentions: `Optional[AllowedMentions]` The allowed mentions for the message poll: `Optional[Poll]` The poll to be sent delete_after: `Optional[float]` How long to wait before deleting the message Returns ------- `Message` The message object """ payload = MessageResponse( content, embed=embed, embeds=embeds, file=file, files=files, view=view, tts=tts, type=type, poll=poll, allowed_mentions=allowed_mentions, ) r = await self._state.query( "POST", f"/channels/{self.id}/messages", data=payload.to_multipart(is_request=True), headers={"Content-Type": payload.content_type} ) from .message import Message _msg = Message( state=self._state, data=r.response ) if delete_after is not None: await _msg.delete(delay=float(delete_after)) return _msg
def _class_to_return( self, data: dict, *, state: Optional["DiscordAPI"] = None, guild_id: int | None = None ) -> "BaseChannel": match data["type"]: case x if x in (ChannelType.guild_text, ChannelType.guild_news): _class = TextChannel case ChannelType.guild_voice: _class = VoiceChannel case ChannelType.guild_category: _class = CategoryChannel case ChannelType.guild_news_thread: _class = NewsThread case ChannelType.guild_public_thread: _class = PublicThread case ChannelType.guild_private_thread: _class = PrivateThread case ChannelType.guild_stage_voice: _class = StageChannel case ChannelType.guild_forum: _class = ForumChannel case _: _class = BaseChannel _class: type["BaseChannel"] if guild_id is not None: data["guild_id"] = int(guild_id) return _class( state=state or self._state, data=data )
[docs] @classmethod def from_dict(cls, *, state: "DiscordAPI", data: dict) -> Self: """ Create a channel object from a dictionary Requires the state to be set Parameters ---------- state: `DiscordAPI` The state to use data: `dict` Data provided by Discord API Returns ------- `BaseChannel` The channel object """ temp_class = cls( state=state, id=int(data["id"]), guild_id=utils.get_int(data, "guild_id") ) return temp_class._class_to_return(data=data, state=state) # type: ignore
[docs] async def fetch(self) -> "BaseChannel": """ `BaseChannel`: Fetches the channel and returns the channel object """ r = await self._state.query( "GET", f"/channels/{self.id}" ) return self._class_to_return( data=r.response )
[docs] async def edit( self, *, name: Optional[str] = MISSING, type: Optional[Union[ChannelType, int]] = MISSING, position: Optional[int] = MISSING, topic: Optional[str] = MISSING, nsfw: Optional[bool] = MISSING, rate_limit_per_user: Optional[int] = MISSING, bitrate: Optional[int] = MISSING, user_limit: Optional[int] = MISSING, overwrites: Optional[list[PermissionOverwrite]] = MISSING, parent_id: Optional[Union[Snowflake, int]] = MISSING, rtc_region: Optional[str] = MISSING, video_quality_mode: Optional[Union[VideoQualityType, int]] = MISSING, default_auto_archive_duration: Optional[int] = MISSING, flags: Optional[ChannelFlags] = MISSING, available_tags: Optional[list["ForumTag"]] = MISSING, default_reaction_emoji: Optional[str] = MISSING, default_thread_rate_limit_per_user: Optional[int] = MISSING, default_sort_order: Optional[Union[SortOrderType, int]] = MISSING, default_forum_layout: Optional[Union[ForumLayoutType, int]] = MISSING, archived: Optional[bool] = MISSING, auto_archive_duration: Optional[int] = MISSING, locked: Optional[bool] = MISSING, invitable: Optional[bool] = MISSING, applied_tags: Optional[list[Union["ForumTag", int]]] = MISSING, reason: Optional[str] = None, ) -> Self: """ Edit the channel Note that this method globaly edits any channel type. So be sure to use the correct parameters for the channel. Parameters ---------- name: `Optional[str]` New name of the channel (All) type: `Optional[Union[ChannelType, int]]` The new type of the channel (Text, Announcement) position: `Optional[int]` The new position of the channel (All) topic: `Optional[str]` The new topic of the channel (Text, Announcement, Forum, Media) nsfw: `Optional[bool]` If the channel should be NSFW (Text, Voice, Announcement, Stage, Forum, Media) rate_limit_per_user: `Optional[int]` How long the slowdown should be (Text, Voice, Stage, Forum, Media) bitrate: `Optional[int]` The new bitrate of the channel (Voice, Stage) user_limit: `Optional[int]` The new user limit of the channel (Voice, Stage) overwrites: `Optional[list[PermissionOverwrite]]` The new permission overwrites of the channel (All) parent_id: `Optional[Union[Snowflake, int]]` The new parent ID of the channel (Text, Voice, Announcement, Stage, Forum, Media) rtc_region: `Optional[str]` The new RTC region of the channel (Voice, Stage) video_quality_mode: `Optional[Union[VideoQualityType, int]]` The new video quality mode of the channel (Voice, Stage) default_auto_archive_duration: `Optional[int]` The new default auto archive duration of the channel (Text, Announcement, Forum, Media) flags: `Optional[ChannelFlags]` The new flags of the channel (Forum, Media) available_tags: `Optional[list[ForumTag]]` The new available tags of the channel (Forum, Media) default_reaction_emoji: `Optional[str]` The new default reaction emoji of the channel (Forum, Media) default_thread_rate_limit_per_user: `Optional[int]` The new default thread rate limit per user of the channel (Text, Forum, Media) default_sort_order: `Optional[Union[SortOrderType, int]]` The new default sort order of the channel (Forum, Media) default_forum_layout: `Optional[Union[ForumLayoutType, int]]` The new default forum layout of the channel (Forum) archived: `Optional[bool]` If the thread should be archived (Thread, Forum) auto_archive_duration: `Optional[int]` The new auto archive duration of the thread (Thread, Forum) locked: `Optional[bool]` If the thread should be locked (Thread, Forum) invitable: `Optional[bool]` If the thread should be invitable by everyone (Thread) applied_tags: `Optional[list[Union[ForumTag, int]]` The new applied tags of the forum thread (Forum, Media) reason: `Optional[str]` The reason for editing the channel (All) Returns ------- `BaseChannel` The channel object """ payload = {} if name is not MISSING: payload["name"] = str(name) if type is not MISSING: payload["type"] = int(type or 0) if position is not MISSING: payload["position"] = int(position or 0) if topic is not MISSING: payload["topic"] = topic if nsfw is not MISSING: payload["nsfw"] = bool(nsfw) if rate_limit_per_user is not MISSING: payload["rate_limit_per_user"] = int( rate_limit_per_user or 0 ) if bitrate is not MISSING: payload["bitrate"] = int(bitrate or 64000) if user_limit is not MISSING: payload["user_limit"] = int(user_limit or 0) if overwrites is not MISSING: if overwrites is None: payload["permission_overwrites"] = [] else: payload["permission_overwrites"] = [ g.to_dict() for g in overwrites if isinstance(g, PermissionOverwrite) ] if parent_id is not MISSING: if parent_id is None: payload["parent_id"] = None else: payload["parent_id"] = str(int(parent_id)) if rtc_region is not MISSING: payload["rtc_region"] = rtc_region if video_quality_mode is not MISSING: payload["video_quality_mode"] = int( video_quality_mode or 1 ) if default_auto_archive_duration is not MISSING: payload["default_auto_archive_duration"] = int( default_auto_archive_duration or 4320 ) if flags is not MISSING: payload["flags"] = int(flags or 0) if available_tags is not MISSING: if available_tags is None: payload["available_tags"] = [] else: payload["available_tags"] = [ g.to_dict() for g in available_tags if isinstance(g, ForumTag) ] if default_reaction_emoji is not MISSING: if default_reaction_emoji is None: payload["default_reaction_emoji"] = None else: _emoji = EmojiParser(default_reaction_emoji) payload["default_reaction_emoji"] = _emoji.to_forum_dict() if default_thread_rate_limit_per_user is not MISSING: payload["default_thread_rate_limit_per_user"] = int( default_thread_rate_limit_per_user or 0 ) if default_sort_order is not MISSING: payload["default_sort_order"] = int( default_sort_order or 0 ) if default_forum_layout is not MISSING: payload["default_forum_layout"] = int( default_forum_layout or 0 ) if archived is not MISSING: payload["archived"] = bool(archived) if auto_archive_duration is not MISSING: payload["auto_archive_duration"] = int( auto_archive_duration or 4320 ) if locked is not MISSING: payload["locked"] = bool(locked) if invitable is not MISSING: payload["invitable"] = bool(invitable) if applied_tags is not MISSING: if applied_tags is None: payload["applied_tags"] = [] else: payload["applied_tags"] = [ str(int(g)) for g in applied_tags ] r = await self._state.query( "PATCH", f"/channels/{self.id}", json=payload, reason=reason ) return self._class_to_return(data=r.response) # type: ignore
[docs] def typing(self) -> Typing: """ Makes the bot trigger the typing indicator. There are two ways you can use this: - Usual await call - Using `async with` to type as long as you need .. code-block:: python # Method 1 await channel.typing() # Stops after 10 seconds or message sent # Method 2 async with channel.typing(): asyncio.sleep(4) """ return Typing(state=self._state, channel=self)
[docs] async def set_permission( self, overwrite: PermissionOverwrite, *, reason: Optional[str] = None ) -> None: """ Set a permission overwrite for the channel Parameters ---------- overwrite: `PermissionOverwrite` The new overwrite permissions for the spesific role/user reason: `Optional[str]` The reason for editing the overwrite """ await self._state.query( "PUT", f"/channels/{self.id}/permissions/{int(overwrite.target.id)}", json=overwrite.to_dict(), res_method="text", reason=reason )
[docs] async def delete_permission( self, id: Union[Snowflake, int], *, reason: Optional[str] = None ) -> None: """ Delete a permission overwrite for the channel Parameters ---------- id: `Union[Snowflake, int]` The ID of the overwrite reason: `Optional[str]` The reason for deleting the overwrite """ await self._state.query( "DELETE", f"/channels/{self.id}/permissions/{int(id)}", res_method="text", reason=reason )
[docs] async def delete( self, *, reason: Optional[str] = None ) -> None: """ Delete the channel Parameters ---------- reason: `Optional[str]` The reason for deleting the channel """ await self._state.query( "DELETE", f"/channels/{self.id}", reason=reason, res_method="text" )
[docs] async def create_webhook( self, name: str, *, avatar: Optional[Union[File, bytes]] = None, reason: Optional[str] = None ) -> Webhook: """ Create a webhook for the channel Parameters ---------- name: `str` The name of the webhook avatar: `Optional[File]` The avatar of the webhook reason: `Optional[str]` The reason for creating the webhook that appears in audit logs Returns ------- `Webhook` The webhook object """ payload = {"name": name} if avatar is not None: payload["avatar"] = utils.bytes_to_base64(avatar) r = await self._state.query( "POST", f"/channels/{self.id}/webhooks", json=payload, reason=reason, ) return Webhook(state=self._state, data=r.response)
[docs] async def create_forum_or_media( self, name: str, *, content: Optional[str] = None, embed: Optional[Embed] = None, embeds: Optional[list[Embed]] = None, file: Optional[File] = None, files: Optional[list[File]] = None, allowed_mentions: Optional[AllowedMentions] = None, view: Optional[View] = None, auto_archive_duration: Optional[int] = 4320, rate_limit_per_user: Optional[int] = None, applied_tags: Optional[list[Union["ForumTag", int]]] = None ) -> "ForumThread": """ Create a forum or media thread in the channel Parameters ---------- name: `str` The name of the thread content: `Optional[str]` The content of the message embed: `Optional[Embed]` Embed to be sent embeds: `Optional[list[Embed]]` List of embeds to be sent file: `Optional[File]` File to be sent files: `Optional[list[File]]` List of files to be sent allowed_mentions: `Optional[AllowedMentions]` The allowed mentions for the message view: `Optional[View]` The view to be sent auto_archive_duration: `Optional[int]` The duration in minutes to automatically archive the thread after recent activity rate_limit_per_user: `Optional[int]` How long the slowdown should be applied_tags: `Optional[list[Union[&quot;ForumTag&quot;, int]]]` The tags to be applied to the thread Returns ------- `ForumThread` _description_ """ payload = { "name": name, "message": {} } if auto_archive_duration in (60, 1440, 4320, 10080): payload["auto_archive_duration"] = auto_archive_duration if rate_limit_per_user is not None: payload["rate_limit_per_user"] = int(rate_limit_per_user) if applied_tags is not None: payload["applied_tags"] = [ str(int(g)) for g in applied_tags ] temp_msg = MessageResponse( embeds=embeds or ([embed] if embed else None), files=files or ([file] if file else None), ) if content is not None: payload["message"]["content"] = str(content) if allowed_mentions is not None: payload["message"]["allowed_mentions"] = allowed_mentions.to_dict() if view is not None: payload["message"]["components"] = view.to_dict() if temp_msg.embeds is not None: payload["message"]["embeds"] = [ e.to_dict() for e in temp_msg.embeds ] if temp_msg.files is not None: multidata = MultipartData() for i, file in enumerate(temp_msg.files): multidata.attach( f"files[{i}]", file, # type: ignore filename=file.filename ) multidata.attach("payload_json", payload) r = await self._state.query( "POST", f"/channels/{self.id}/threads", headers={"Content-Type": multidata.content_type}, data=multidata.finish(), ) else: r = await self._state.query( "POST", f"/channels/{self.id}/threads", json=payload ) return ForumThread( state=self._state, data=r.response )
[docs] async def create_thread( self, name: str, *, type: Union[ChannelType, int] = ChannelType.guild_private_thread, auto_archive_duration: Optional[int] = 4320, invitable: bool = True, rate_limit_per_user: Optional[Union[timedelta, int]] = None, reason: Optional[str] = None ) -> Union["PublicThread", "PrivateThread", "NewsThread"]: """ Creates a thread in the channel Parameters ---------- name: `str` The name of the thread type: `Optional[Union[ChannelType, int]]` The type of thread to create auto_archive_duration: `Optional[int]` The duration in minutes to automatically archive the thread after recent activity invitable: `bool` If the thread is invitable rate_limit_per_user: `Optional[Union[timedelta, int]]` How long the slowdown should be reason: `Optional[str]` The reason for creating the thread Returns ------- `Union[PublicThread, PrivateThread, NewsThread]` The thread object Raises ------ `ValueError` - If the auto_archive_duration is not 60, 1440, 4320 or 10080 - If the rate_limit_per_user is not between 0 and 21600 seconds """ payload = { "name": name, "type": int(type), "invitable": invitable, } if auto_archive_duration not in (60, 1440, 4320, 10080): raise ValueError("auto_archive_duration must be 60, 1440, 4320 or 10080") if rate_limit_per_user is not None: if isinstance(rate_limit_per_user, timedelta): rate_limit_per_user = int(rate_limit_per_user.total_seconds()) if rate_limit_per_user not in range(0, 21601): raise ValueError("rate_limit_per_user must be between 0 and 21600 seconds") payload["rate_limit_per_user"] = rate_limit_per_user r = await self._state.query( "POST", f"/channels/{self.id}/threads", json=payload, reason=reason ) match r.response["type"]: case ChannelType.guild_public_thread: _class = PublicThread case ChannelType.guild_private_thread: _class = PrivateThread case ChannelType.guild_news_thread: _class = NewsThread case _: raise ValueError("Invalid thread type") return _class( state=self._state, data=r.response )
[docs] async def fetch_history( self, *, before: Optional[Union[datetime, "Message", Snowflake, int]] = None, after: Optional[Union[datetime, "Message", Snowflake, int]] = None, around: Optional[Union[datetime, "Message", Snowflake, int]] = None, limit: Optional[int] = 100, ) -> AsyncIterator["Message"]: """ Fetch the channel's message history Parameters ---------- before: `Optional[Union[datetime, Message, Snowflake, int]]` Get messages before this message after: `Optional[Union[datetime, Message, Snowflake, int]]` Get messages after this message around: `Optional[Union[datetime, Message, Snowflake, int]]` Get messages around this message limit: `Optional[int]` The maximum amount of messages to fetch. `None` will fetch all users. Yields ------ `Message` The message object """ async def _get_history(limit: int, **kwargs): params = {"limit": limit} for key, value in kwargs.items(): if value is None: continue params[key] = utils.normalize_entity_id(value) return await self._state.query( "GET", f"/channels/{self.id}/messages", params=params ) async def _around_http( http_limit: int, around_id: Optional[int], limit: Optional[int] ): r = await _get_history(limit=http_limit, around=around_id) return r.response, None, limit async def _after_http( http_limit: int, after_id: Optional[int], limit: Optional[int] ): r = await _get_history(limit=http_limit, after=after_id) if r.response: if limit is not None: limit -= len(r.response) after_id = int(r.response[0]["id"]) return r.response, after_id, limit async def _before_http( http_limit: int, before_id: Optional[int], limit: Optional[int] ): r = await _get_history(limit=http_limit, before=before_id) if r.response: if limit is not None: limit -= len(r.response) before_id = int(r.response[-1]["id"]) return r.response, before_id, limit if around: if limit is None: raise ValueError("limit must be specified when using around") if limit > 100: raise ValueError("limit must be less than or equal to 100 when using around") strategy, state = _around_http, utils.normalize_entity_id(around) elif after: strategy, state = _after_http, utils.normalize_entity_id(after) elif before: strategy, state = _before_http, utils.normalize_entity_id(before) else: strategy, state = _before_http, None # Must be imported here to avoid circular import # From the top of the file from .message import Message while True: http_limit: int = 100 if limit is None else min(limit, 100) if http_limit <= 0: break strategy: Callable messages, state, limit = await strategy(http_limit, state, limit) i = 0 for i, msg in enumerate(messages, start=1): yield Message( state=self._state, data=msg, guild=self.guild ) if i < 100: break
@overload async def bulk_delete_messages( self, *, check: Callable[["Message"], bool] | None = None, before: "datetime | Message | Snowflake | int | None" = None, after: "datetime | Message | Snowflake | int | None" = None, around: "datetime | Message | Snowflake | int | None" = None, message_ids: list["Message | Snowflake | int"], limit: int | None = 100, reason: str | None = None ) -> None: ... @overload async def bulk_delete_messages( self, *, check: Callable[["Message"], bool] | None = None, before: "datetime | Message | Snowflake | int | None" = None, after: "datetime | Message | Snowflake | int | None" = None, around: "datetime | Message | Snowflake | int | None" = None, message_ids: None = None, limit: int | None = 100, reason: str | None = None ) -> list["Message"]: ...
[docs] async def bulk_delete_messages( self, *, check: Callable[["Message"], bool] | None = None, before: "datetime | Message | Snowflake | int | None" = None, after: "datetime | Message | Snowflake | int | None" = None, around: "datetime | Message | Snowflake | int | None" = None, message_ids: list["Message | Snowflake | int"] | None = None, limit: int | None = 100, reason: str | None = None ) -> list["Message"] | None: """ Deletes messages in bulk Parameters ---------- check: `Callable[[Message], bool] | None` A function to check if the message should be deleted before: `datetime | Message | Snowflake | int | None` The message before which to delete after: `datetime | Message | Snowflake | int | None` The message after which to delete around: `datetime | Message | Snowflake | int | None` The message around which to delete message_ids: `list[Message | Snowflake | int] | None` The message IDs to delete limit: `int | None` The maximum amount of messages to delete reason: `str | None` The reason for deleting the messages Returns ------- `list[Message] | None` Returns a list of messages deleted If you provide message_ids upfront, it will skip history search and delete """ _msg_collector: list["Message"] = [] async def _bulk_delete(messages: list["Message"]): if len(messages) > 1: await self._state.query( "POST", f"/channels/{self.id}/messages/bulk-delete", res_method="text", json={"messages": [str(int(g)) for g in messages]}, reason=reason ) else: await _single_delete(messages) async def _single_delete(messages: list["Message"]): for g in messages: try: await g.delete() except NotFound as e: if e.code == 10008: pass raise e if message_ids is not None: # Remove duplicates just in case message_ids = list(set(message_ids)) await _bulk_delete(message_ids) # type: ignore return None count = 0 minimum_time = int((time.time() - 14 * 24 * 60 * 60) * 1000 - 1420070400000) << 22 strategy = _bulk_delete async for message in self.fetch_history( before=before, after=after, around=around, limit=limit ): if count == 100: to_delete = _msg_collector[-100:] await strategy(to_delete) count = 0 await asyncio.sleep(0.5) if check is not None and not check(message): continue if message.id < minimum_time: if count == 1: await _msg_collector[-1].delete() elif count >= 2: await strategy(_msg_collector[-count:]) count = 0 strategy = _single_delete count += 1 _msg_collector.append(message) if count != 0: await strategy(_msg_collector[-count:]) return _msg_collector
[docs] async def join_thread(self) -> None: """ Make the bot join a thread """ await self._state.query( "PUT", f"/channels/{self.id}/thread-members/@me", res_method="text" )
[docs] async def leave_thread(self) -> None: """ Make the bot leave a thread """ await self._state.query( "DELETE", f"/channels/{self.id}/thread-members/@me", res_method="text" )
[docs] async def add_thread_member( self, user_id: int ) -> None: """ Add a thread member Parameters ---------- user_id: `int` The user ID to add """ await self._state.query( "PUT", f"/channels/{self.id}/thread-members/{user_id}", res_method="text" )
[docs] async def remove_thread_member( self, user_id: int ) -> None: """ Remove a thread member Parameters ---------- user_id: `int` The user ID to remove """ await self._state.query( "DELETE", f"/channels/{self.id}/thread-members/{user_id}", res_method="text" )
[docs] async def fetch_thread_member( self, user_id: int ) -> "ThreadMember": """ Fetch a thread member Parameters ---------- user_id: `int` The user ID to fetch Returns ------- `ThreadMember` The thread member object """ if not self.guild: raise ValueError("Cannot fetch thread member without guild_id") r = await self._state.query( "GET", f"/channels/{self.id}/thread-members/{user_id}", params={"with_member": "true"} ) from .member import ThreadMember return ThreadMember( state=self._state, guild=self.guild, data=r.response, )
[docs] async def fetch_thread_members(self) -> list["ThreadMember"]: """ Fetch all thread members Returns ------- `list[ThreadMember]` The list of thread members """ if not self.guild: raise ValueError("Cannot fetch thread member without guild_id") r = await self._state.query( "GET", f"/channels/{self.id}/thread-members", params={"with_member": "true"}, ) from .member import ThreadMember return [ ThreadMember( state=self._state, guild=self.guild, data=data ) for data in r.response ]
[docs] class BaseChannel(PartialChannel): def __init__( self, *, state: "DiscordAPI", data: dict, guild_id: int | None = None ): super().__init__( state=state, id=int(data["id"]), guild_id=utils.get_int(data, "guild_id", default=guild_id) ) self.name: Optional[str] = data.get("name", None) self.nsfw: bool = data.get("nsfw", False) self.topic: Optional[str] = data.get("topic", None) self.position: Optional[int] = utils.get_int(data, "position") self.last_message_id: Optional[int] = utils.get_int(data, "last_message_id") self.parent_id: Optional[int] = utils.get_int(data, "parent_id") self.rate_limit_per_user: int = data.get("rate_limit_per_user", 0) self._raw_type: ChannelType = ChannelType(data["type"]) self.permission_overwrites: list[PermissionOverwrite] = [ PermissionOverwrite.from_dict(g) for g in data.get("permission_overwrites", []) ] def __repr__(self) -> str: return f"<Channel id={self.id} name='{self.name}'>" def __str__(self) -> str: return self.name or ""
[docs] def permissions_for(self, member: "Member") -> Permissions: """ Returns the permissions for a member in the channel. Note that this only works if you are using Gateway with guild, role and channel cache. Parameters ---------- member: `Member` The member to get the permissions for. Returns ------- `Permissions` The permissions for the member in the channel. """ if getattr(self.guild, "owner_id", None) == member.id: return Permissions.all() base: Permissions = getattr( self.guild.default_role, "permissions", Permissions.none() ) for r in member.roles: role = self.guild.get_role(r.id) if role is None: continue base |= getattr(role, "permissions", Permissions.none()) if Permissions.administrator in base: return Permissions.all() _everyone = next(( g for g in self.permission_overwrites if g.target.id == self.guild.default_role.id ), None) if _everyone: base = base.handle_overwrite(int(_everyone.allow), int(_everyone.deny)) _overwrites = [ g for g in self.permission_overwrites if g.target.id != _everyone.target.id ] else: _overwrites = self.permission_overwrites allows, denies = 0, 0 for ow in _overwrites: if ow.is_role() and ow.target.id in member.roles: allows |= int(ow.allow) denies |= int(ow.deny) base = base.handle_overwrite(allows, denies) for ow in _overwrites: if ow.is_member() and ow.target.id == member.id: allows |= int(ow.allow) denies |= int(ow.deny) break if member.is_timed_out(): _timeout_perm = ( Permissions.view_channel | Permissions.read_message_history ) if Permissions.view_channel not in base: _timeout_perm &= ~Permissions.view_channel if Permissions.read_message_history not in base: _timeout_perm &= ~Permissions.read_message_history base = _timeout_perm return base
@property def mention(self) -> str: """ `str`: The channel's mention """ return f"<#{self.id}>" @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.guild_text
[docs] @classmethod def from_dict( cls, *, state: "DiscordAPI", data: dict, guild_id: int | None = None ) -> "BaseChannel": """ Create a channel object from a dictionary Requires the state to be set Parameters ---------- state: `DiscordAPI` The state to use data: `dict` Data provided by Discord API Returns ------- `BaseChannel` The channel object """ _class = cls(state=state, data=data)._class_to_return( data=data, state=state, guild_id=guild_id ) return _class
[docs] class TextChannel(BaseChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) def __repr__(self) -> str: return f"<TextChannel id={self.id} name='{self.name}'>" @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ if self._raw_type == 0: return ChannelType.guild_text return ChannelType.guild_news
[docs] class DMChannel(BaseChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) self.name: Optional[str] = None self.user: Optional["User"] = None self.last_message: Optional["PartialMessage"] = None self._from_data(data) def __repr__(self) -> str: return f"<DMChannel id={self.id} name='{self.user}'>" def _from_data(self, data: dict): if data.get("recipients", None): from .user import User self.user = User(state=self._state, data=data["recipients"][0]) self.name = self.user.name if data.get("last_message_id", None): from .message import PartialMessage self.last_message = PartialMessage( state=self._state, channel_id=self.id, id=int(data["last_message_id"]) ) if data.get("last_pin_timestamp", None): self.last_pin_timestamp = utils.parse_time(data["last_pin_timestamp"]) @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.dm @property def mention(self) -> str: """ `str`: The channel's mention """ return f"<@{self.id}>"
[docs] async def edit(self, *args, **kwargs) -> None: """ Only here to prevent errors Raises ------ `TypeError` If you try to edit a DM channel """ raise TypeError("Cannot edit a DM channel")
[docs] class StoreChannel(BaseChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) def __repr__(self) -> str: return f"<StoreChannel id={self.id} name='{self.name}'>" @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.guild_store
[docs] class GroupDMChannel(BaseChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) def __repr__(self) -> str: return f"<GroupDMChannel id={self.id} name='{self.name}'>" @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.group_dm
[docs] class DirectoryChannel(BaseChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) def __repr__(self) -> str: return f"<DirectoryChannel id={self.id} name='{self.name}'>" @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.guild_directory
[docs] class CategoryChannel(BaseChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) def __repr__(self) -> str: return f"<CategoryChannel id={self.id} name='{self.name}'>" @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.guild_category @property def channels(self) -> list["BaseChannel | PartialChannel"]: """ `list[BaseChannel | PartialChannel]`: Returns a list of channels in this category. This will only return channels that are in the same guild as the category. """ guild = self._state.cache.get_guild(self.guild_id) if not guild: return [] channels: list["BaseChannel | PartialChannel"] = [ g for g in guild.channels if g.parent_id == self.id ] _voice_types = [ ChannelType.guild_voice, ChannelType.guild_stage_voice ] return sorted( channels, key=lambda x: ( 1 if x.type in _voice_types else 0, getattr(x, "position", 0) ) )
[docs] async def create_text_channel( self, name: str, **kwargs ) -> TextChannel: """ Create a text channel in the category Parameters ---------- name: `str` The name of the channel topic: `Optional[str]` The topic of the channel rate_limit_per_user: `Optional[int]` The rate limit per user of the channel overwrites: `Optional[list[PermissionOverwrite]]` The permission overwrites of the category parent_id: `Optional[Snowflake]` The Category ID where the channel will be placed nsfw: `Optional[bool]` Whether the channel is NSFW or not reason: `Optional[str]` The reason for creating the text channel Returns ------- `TextChannel` The channel object """ return await self.guild.create_text_channel( name=name, parent_id=self.id, **kwargs )
[docs] async def create_voice_channel( self, name: str, **kwargs ) -> "VoiceChannel": """ Create a voice channel to category Parameters ---------- name: `str` The name of the channel bitrate: `Optional[int]` The bitrate of the channel user_limit: `Optional[int]` The user limit of the channel rate_limit_per_user: `Optional` The rate limit per user of the channel overwrites: `Optional[list[PermissionOverwrite]]` The permission overwrites of the category position: `Optional[int]` The position of the channel parent_id: `Optional[Snowflake]` The Category ID where the channel will be placed nsfw: `Optional[bool]` Whether the channel is NSFW or not reason: `Optional[str]` The reason for creating the voice channel Returns ------- `VoiceChannel` The channel object """ return await self.guild.create_voice_channel( name=name, parent_id=self.id, **kwargs )
[docs] async def create_stage_channel( self, name: str, **kwargs ) -> "StageChannel": """ Create a stage channel Parameters ---------- name: `str` The name of the channel bitrate: `Optional[int]` The bitrate of the channel user_limit: `Optional[int]` The user limit of the channel overwrites: `Optional[list[PermissionOverwrite]]` The permission overwrites of the category position: `Optional[int]` The position of the channel video_quality_mode: `Optional[Union[VideoQualityType, int]]` The video quality mode of the channel parent_id: `Optional[Union[Snowflake, int]]` The Category ID where the channel will be placed reason: `Optional[str]` The reason for creating the stage channel Returns ------- `StageChannel` The created channel """ return await self.guild.create_stage_channel( name=name, parent_id=self.id, **kwargs )
[docs] class NewsChannel(BaseChannel): def __init__(self, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) def __repr__(self) -> str: return f"<NewsChannel id={self.id} name='{self.name}'>" @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.guild_news
# Thread channels
[docs] class PartialThread(PartialChannel): def __init__( self, *, state: "DiscordAPI", id: int, guild_id: int, parent_id: int, type: ChannelType | int ): super().__init__(state=state, id=int(id), guild_id=int(guild_id)) self.parent_id: int = int(parent_id) self._raw_type: ChannelType = ChannelType(int(type)) def __repr__(self) -> str: return f"<PartialThread id={self.id} type={self.type}>"
[docs] class PublicThread(BaseChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) self.name: str = data["name"] self.message_count: int = utils.get_int(data, "message_count") or 0 self.member_count: int = utils.get_int(data, "member_count") or 0 self.rate_limit_per_user: int = utils.get_int(data, "rate_limit_per_user") or 0 self.total_message_sent: int = utils.get_int(data, "total_message_sent") or 0 self._metadata: dict = data.get("thread_metadata", {}) self.locked: bool = self._metadata.get("locked", False) self.archived: bool = self._metadata.get("archived", False) self.auto_archive_duration: int = self._metadata.get("auto_archive_duration", 60) self.channel_id: int = int(data["id"]) self.newly_created: bool = data.get("newly_created", False) self.guild_id: Optional[int] = utils.get_int(data, "guild_id") self.owner_id: Optional[int] = utils.get_int(data, "owner_id") self.last_message_id: Optional[int] = utils.get_int(data, "last_message_id") def __repr__(self) -> str: return f"<PublicThread id={self.id} name='{self.name}'>" @property def guild(self) -> "Guild | PartialGuild | None": """ `PartialGuild`: Returns a partial guild object """ if not self.guild_id: return None cache = self._state.cache.get_guild(self.guild_id) if cache: return cache from .guild import PartialGuild return PartialGuild(state=self._state, id=self.guild_id) @property def owner(self) -> Optional["PartialUser"]: """ `PartialUser`: Returns a partial user object """ if not self.owner_id: return None from .user import PartialUser return PartialUser(state=self._state, id=self.owner_id) @property def last_message(self) -> Optional["PartialMessage"]: """ `Optional[PartialMessage]`: Returns a partial message object if the last message ID is available """ if not self.last_message_id: return None from .message import PartialMessage return PartialMessage( state=self._state, channel_id=self.channel_id, guild_id=self.guild_id, id=self.last_message_id )
[docs] class ForumTag: def __init__(self, *, data: dict): self.id: Optional[int] = utils.get_int(data, "id") self.name: str = data["name"] self.moderated: bool = data.get("moderated", False) self.emoji_id: Optional[int] = utils.get_int(data, "emoji_id") self.emoji_name: Optional[str] = data.get("emoji_name", None) def __repr__(self) -> str: return f"<ForumTag id={self.id} name='{self.name}'>" def __str__(self) -> str: return self.name def __int__(self) -> int: return int(self.id or -1)
[docs] @classmethod def create( cls, name: Optional[str] = None, *, emoji_id: Optional[int] = None, emoji_name: Optional[str] = None, moderated: bool = False ) -> "ForumTag": """ Create a forum tag, used for editing available_tags Parameters ---------- name: `Optional[str]` The name of the tag emoji_id: `Optional[int]` The emoji ID of the tag emoji_name: `Optional[str]` The emoji name of the tag moderated: `bool` If the tag is moderated Returns ------- `ForumTag` The tag object """ if emoji_id and emoji_name: raise ValueError( "Cannot have both emoji_id and " "emoji_name defined for a tag." ) return cls(data={ "name": name or "New Tag", "emoji_id": emoji_id, "emoji_name": emoji_name, "moderated": moderated })
[docs] def to_dict(self) -> dict: payload = { "name": self.name, "moderated": self.moderated, } if self.id: payload["id"] = str(self.id) if self.emoji_id: payload["emoji_id"] = str(self.emoji_id) if self.emoji_name: payload["emoji_name"] = self.emoji_name return payload
[docs] @classmethod def from_data(cls, *, data: dict) -> Self: self = cls.__new__(cls) self.name = data["name"] self.id = int(data["id"]) self.moderated = data.get("moderated", False) self.emoji_id = utils.get_int(data, "emoji_id") self.emoji_name = data.get("emoji_name", None) return self
[docs] class ForumChannel(PublicThread): def __init__(self, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) self.default_reaction_emoji: Optional[EmojiParser] = None self.tags: list[ForumTag] = [ ForumTag(data=g) for g in data.get("tags", []) ] self._from_data(data) def __repr__(self) -> str: return f"<ForumChannel id={self.id} name='{self.name}'>" def _from_data(self, data: dict): if data.get("default_reaction_emoji", None): _target = ( data["default_reaction_emoji"].get("id", None) or data["default_reaction_emoji"].get("name", None) ) if _target: self.default_reaction_emoji = EmojiParser(_target)
[docs] class ForumThread(PublicThread): def __init__(self, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) self._from_data(data) def __repr__(self) -> str: return f"<ForumThread id={self.id} name='{self.name}'>" def __str__(self) -> str: return self.name def _from_data(self, data: dict): from .message import Message self.message: Message = Message( state=self._state, data=data["message"], guild=self.guild )
[docs] class NewsThread(PublicThread): def __init__(self, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) def __repr__(self) -> str: return f"<NewsThread id={self.id} name='{self.name}'>"
[docs] class PrivateThread(PublicThread): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.guild_private_thread
[docs] class Thread(PublicThread): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ if self._raw_type == 11: return ChannelType.guild_public_thread return ChannelType.guild_private_thread
# Voice channels
[docs] class VoiceRegion: def __init__(self, *, data: dict): self.id: str = data["id"] self.name: str = data["name"] self.custom: bool = data["custom"] self.deprecated: bool = data["deprecated"] self.optimal: bool = data["optimal"] def __str__(self) -> str: return self.name def __repr__(self) -> str: return f"<VoiceRegion id='{self.id}' name='{self.name}'>"
[docs] class VoiceChannel(BaseChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) self.bitrate: int = int(data["bitrate"]) self.user_limit: int = int(data["user_limit"]) self.rtc_region: Optional[str] = data.get("rtc_region", None) def __repr__(self) -> str: return f"<VoiceChannel id={self.id} name='{self.name}'>" @property def type(self) -> ChannelType: """ `ChannelType`: Returns the channel's type """ return ChannelType.guild_voice
class StageInstance(PartialBase): """Represents a stage instance for a stage channel. This holds information about a live stage. Attributes ---------- id: `int` The ID of the stage instance channel_id: `int` The ID of the stage channel guild_id: `int` The associated guild ID of the stage channel topic: `str` The topic of the stage instance privacy_level: `PrivacyLevel` The privacy level of the stage instance guild_scheduled_event_id: `Optional[int]` The guild scheduled event ID associated with this stage instance """ def __init__( self, *, state: "DiscordAPI", data: "channels.StageInstance", guild: "PartialGuild | None" = None, ) -> None: super().__init__(id=int(data["id"])) self._state: "DiscordAPI" = state self._guild: "PartialGuild | None" = guild self._from_data(data) def _from_data(self, data: "channels.StageInstance") -> None: self.channel_id: int = int(data["channel_id"]) self.guild_id: int = int(data["guild_id"]) self.topic: str = data["topic"] self.privacy_level: PrivacyLevelType = PrivacyLevelType(data["privacy_level"]) self.guild_scheduled_event_id: Optional[int] = utils.get_int(data, "guild_scheduled_event_id") # type: ignore # todo types @property def guild(self) -> "Guild | PartialGuild | None": if not self.guild_id: return None cache = self._state.cache.get_guild(self.guild_id) if cache: return cache from .guild import PartialGuild return PartialGuild(state=self._state, id=self.guild_id) @property def channel(self) -> "PartialChannel | StageChannel": channel = self.guild.get_channel(self.channel_id) or ( PartialChannel(state=self._state, id=self.channel_id) ) return channel @property def scheduled_event(self) -> "PartialScheduledEvent | None": if not self.guild_scheduled_event_id: return None from .guild import PartialScheduledEvent return PartialScheduledEvent( state=self._state, id=self.guild_scheduled_event_id, guild_id=self.guild_id ) def __repr__(self) -> str: return f"<StageInstance id={self.id!r} topic={self.topic!r}>" async def edit( self, *, topic: str = MISSING, privacy_level: PrivacyLevelType = MISSING, reason: Optional[str] = None ) -> Self: """Edit this stage instance Parameters ---------- topic: `str` The new topic of this stage instance. privacy_level: `PrivacyLevel` The new privacy level of this stage instance. reason: `Optional[str]` The reason for editing the stage instance. Returns ------- `StageInstance` The edited stage instance """ payload = {} if topic is not MISSING: payload["topic"] = str(topic) if privacy_level is not MISSING: payload["privacy_level"] = int(privacy_level) r = await self._state.query( "PATCH", f"/stage-instances/{self.id}", json=payload, reason=reason ) return self.__class__( state=self._state, data=r.response, # type: ignore # todo types guild=self._guild, ) async def delete(self, *, reason: Optional[str] = None) -> None: """Delete this stage instance Parameters ---------- reason: `Optional[str]` The reason for deleting the stage instance """ await self._state.query( "DELETE", f"/stage-instances/{self.id}", res_method="text", reason=reason )
[docs] class StageChannel(VoiceChannel): def __init__(self, *, state: "DiscordAPI", data: dict): super().__init__(state=state, data=data) self._stage_instance: Optional[StageInstance] = None def __repr__(self) -> str: return f"<StageChannel id={self.id} name='{self.name}'>" @property def type(self) -> ChannelType: """`ChannelType`: Returns the channel's type """ return ChannelType.guild_stage_voice @property def stage_instance(self) -> Optional[StageInstance]: """`Optional[StageInstance]`: Returns the stage instance for this channel, if available and cached.""" return self._stage_instance
[docs] async def fetch_stage_instance(self) -> StageInstance: """Fetch the stage instance associated with this stage channel Returns ------- `StageInstance` The stage instance of the channel """ r = await self._state.query( "GET", f"/stage-instances/{self.id}" ) return StageInstance( state=self._state, data=r.response, # type: ignore # todo types guild=self.guild )
[docs] async def create_stage_instance( self, *, topic: str, privacy_level: PrivacyLevelType = MISSING, send_start_notification: bool = MISSING, guild_scheduled_event: Snowflake | int = MISSING, reason: Optional[str] = None ) -> StageInstance: """ Create a stage instance Parameters ---------- topic: `str` The topic of the stage instance privacy_level: `PrivacyLevelType` The privacy level of the stage instance. Defaults to `PrivacyLevelType.guild_only` send_start_notification: `bool` Whether to notify @everyone that the stage instance has started. guild_scheduled_event: `Optional[Snowflake | int]` The guild scheduled event to associate with this stage instance. reason: `Optional[str]` The reason for creating the stage instance Returns ------- `StageInstance` The created stage instance """ payload = { "channel_id": self.id, "topic": topic, } if privacy_level is not MISSING: payload["privacy_level"] = int(privacy_level) if send_start_notification is not MISSING: payload["send_start_notification"] = send_start_notification if guild_scheduled_event is not MISSING: payload["guild_scheduled_event_id"] = utils.normalize_entity_id(guild_scheduled_event) r = await self._state.query( "POST", "/stage-instances", json=payload, reason=reason ) self._stage_instance = StageInstance( state=self._state, data=r.response, # type: ignore # todo types guild=self.guild ) return self._stage_instance