import asyncio
import time
from datetime import datetime, timedelta
from typing import (
Union, TYPE_CHECKING, Optional, AsyncIterator,
Callable, Self, Generator, overload
)
from . import utils
from .embeds import Embed
from .emoji import EmojiParser
from .errors import NotFound
from .enums import (
ChannelType, ResponseType, VideoQualityType,
SortOrderType, ForumLayoutType, PrivacyLevelType
)
from .file import File
from .flags import PermissionOverwrite, ChannelFlags, Permissions
from .mentions import AllowedMentions
from .multipart import MultipartData
from .object import PartialBase, Snowflake
from .response import MessageResponse
from .view import View
from .webhook import Webhook
if TYPE_CHECKING:
from .guild import Guild, PartialGuild, PartialScheduledEvent
from .http import DiscordAPI
from .invite import Invite
from .member import Member
from .member import ThreadMember
from .message import PartialMessage, Message, Poll
from .types import channels
from .user import PartialUser, User
MISSING = utils.MISSING
__all__ = (
"BaseChannel",
"CategoryChannel",
"DMChannel",
"DirectoryChannel",
"ForumChannel",
"ForumTag",
"ForumThread",
"GroupDMChannel",
"NewsChannel",
"NewsThread",
"PartialChannel",
"PartialThread",
"PrivateThread",
"PublicThread",
"StageChannel",
"StoreChannel",
"TextChannel",
"Thread",
"VoiceChannel",
"VoiceRegion",
)
def _typing_done_callback(f: asyncio.Future):
try:
f.exception()
except (asyncio.CancelledError, Exception):
pass
class Typing:
def __init__(self, *, state: "DiscordAPI", channel: "PartialChannel"):
self._state = state
self.loop = state.bot.loop
self.channel = channel
def __await__(self) -> Generator[None, None, None]:
return self._send_typing().__await__()
async def __aenter__(self) -> None:
await self._send_typing()
self.task = self.loop.create_task(self.do_typing_loop())
self.task.add_done_callback(_typing_done_callback)
async def __aexit__(self, exc_type, exc, traceback) -> None:
self.task.cancel()
async def _send_typing(self) -> None:
await self._state.query(
"POST",
f"/channels/{self.channel.id}/typing",
res_method="text"
)
async def do_typing_loop(self) -> None:
while True:
await asyncio.sleep(5)
await self._send_typing()
[docs]
class PartialChannel(PartialBase):
def __init__(
self,
*,
state: "DiscordAPI",
id: int,
guild_id: Optional[int] = None
):
super().__init__(id=int(id))
self._state = state
self.guild_id: Optional[int] = int(guild_id) if guild_id else None
self.parent_id: Optional[int] = None
self._raw_type: ChannelType = ChannelType.unknown
def __repr__(self) -> str:
return f"<PartialChannel id={self.id}>"
@property
def mention(self) -> str:
""" `str`: The channel's mention """
return f"<#{self.id}>"
@property
def guild(self) -> "Guild | PartialGuild | None":
"""
`Optional[Guild | PartialGuild]`:
The guild the channel belongs to (if available).
If you are using gateway cache, it can return full object too
"""
if not self.guild_id:
return None
cache = self._state.cache.get_guild(self.guild_id)
if cache:
return cache
from .guild import PartialGuild
return PartialGuild(state=self._state, id=self.guild_id)
@property
def channel(self) -> "BaseChannel | CategoryChannel | PartialChannel | None":
"""
`BaseChannel | CategoryChannel | PartialChannel | None`:
Returns the channel the thread is in.
Only returns a full object if cache is enabled for guild and channel.
"""
if self.guild_id:
cache = self._state.cache.get_channel_thread(
guild_id=self.guild_id,
channel_id=self.id
)
if cache:
return cache
return PartialChannel(
state=self._state,
id=self.id,
guild_id=self.guild_id
)
@property
def parent(self) -> "BaseChannel | CategoryChannel | PartialChannel | None":
"""
`BaseChannel | CategoryChannel | PartialChannel | None`:
Returns the parent channel of the thread or the parent category of the channel.
Only returns a full object if cache is enabled for guild and channel.
"""
if not self.parent_id:
return None
if self.guild_id:
cache = self._state.cache.get_channel_thread(
guild_id=self.guild_id,
channel_id=self.parent_id
)
if cache:
return cache
return PartialChannel(
state=self._state,
id=self.parent_id,
guild_id=self.guild_id
)
[docs]
def permissions_for(self, member: "Member") -> Permissions:
"""
Returns the permissions for a member in the channel.
However since this is Partial, it will always return Permissions.none()
Parameters
----------
member: `Member`
The member to get the permissions for.
Returns
-------
`Permissions`
The permissions for the member in the channel.
"""
return Permissions.none()
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return self._raw_type
[docs]
def get_partial_message(self, message_id: int) -> "PartialMessage":
"""
Get a partial message object from the channel
Parameters
----------
message_id: `int`
The message ID to get the partial message from
Returns
-------
`PartialMessage`
The partial message object
"""
from .message import PartialMessage
return PartialMessage(
state=self._state,
channel_id=self.id,
guild_id=self.guild_id,
id=message_id,
)
[docs]
async def fetch_message(self, message_id: int) -> "Message":
"""
Fetch a message from the channel
Parameters
----------
message_id: `int`
The message ID to fetch
Returns
-------
`Message`
The message object
"""
r = await self._state.query(
"GET",
f"/channels/{self.id}/messages/{message_id}"
)
from .message import Message
return Message(
state=self._state,
data=r.response,
guild=self.guild
)
[docs]
async def fetch_pins(self) -> list["Message"]:
"""
Fetch all pinned messages for the channel in question
Returns
-------
`list[Message]`
The list of pinned messages
"""
r = await self._state.query(
"GET",
f"/channels/{self.id}/pins"
)
from .message import Message
return [
Message(
state=self._state,
data=data,
guild=self.guild
)
for data in r.response
]
[docs]
async def follow_announcement_channel(
self,
source_channel_id: Union[Snowflake, int]
) -> None:
"""
Follow an announcement channel to send messages to the webhook
Parameters
----------
source_channel_id: `int`
The channel ID to follow
"""
await self._state.query(
"POST",
f"/channels/{source_channel_id}/followers",
json={"webhook_channel_id": self.id},
res_method="text"
)
[docs]
async def fetch_archived_public_threads(self) -> list["PublicThread"]:
"""
Fetch all archived public threads
Returns
-------
`list[PublicThread]`
The list of public threads
"""
r = await self._state.query(
"GET",
f"/channels/{self.id}/threads/archived/public"
)
from .channel import PublicThread
return [
PublicThread(
state=self._state,
data=data
)
for data in r.response
]
[docs]
async def fetch_archived_private_threads(
self,
*,
client: bool = False
) -> list["PrivateThread"]:
"""
Fetch all archived private threads
Parameters
----------
client: `bool`
If it should fetch only where the client is a member of the thread
Returns
-------
`list[PrivateThread]`
The list of private threads
"""
path = f"/channels/{self.id}/threads/archived/private"
if client:
path = f"/channels/{self.id}/users/@me/threads/archived/private"
r = await self._state.query("GET", path)
from .channel import PrivateThread
return [
PrivateThread(
state=self._state,
data=data
)
for data in r.response
]
[docs]
async def create_invite(
self,
*,
max_age: Union[timedelta, int] = 86400, # 24 hours
max_uses: Optional[int] = 0,
temporary: bool = False,
unique: bool = False,
) -> "Invite":
"""
Create an invite for the channel
Parameters
----------
max_age: `Union[timedelta, int]`
How long the invite should last
temporary: `bool`
If the invite should be temporary
unique: `bool`
If the invite should be unique
Returns
-------
`Invite`
The invite object
"""
if isinstance(max_age, timedelta):
max_age = int(max_age.total_seconds())
r = await self._state.query(
"POST",
f"/channels/{self.id}/invites",
json={
"max_age": max_age,
"max_uses": max_uses,
"temporary": temporary,
"unique": unique
}
)
from .invite import Invite
return Invite(
state=self._state,
data=r.response
)
[docs]
async def send(
self,
content: Optional[str] = MISSING,
*,
embed: Optional[Embed] = MISSING,
embeds: Optional[list[Embed]] = MISSING,
file: Optional[File] = MISSING,
files: Optional[list[File]] = MISSING,
view: Optional[View] = MISSING,
tts: Optional[bool] = False,
type: Union[ResponseType, int] = 4,
poll: Optional["Poll"] = MISSING,
allowed_mentions: Optional[AllowedMentions] = MISSING,
delete_after: Optional[float] = None
) -> "Message":
"""
Send a message to the channel
Parameters
----------
content: `Optional[str]`
Cotnent of the message
embed: `Optional[Embed]`
Includes an embed object
embeds: `Optional[list[Embed]]`
List of embed objects
file: `Optional[File]`
A file object
files: `Union[list[File], File]`
A list of file objects
view: `View`
Send components to the message
tts: `bool`
If the message should be sent as a TTS message
type: `Optional[ResponseType]`
The type of response to the message
allowed_mentions: `Optional[AllowedMentions]`
The allowed mentions for the message
poll: `Optional[Poll]`
The poll to be sent
delete_after: `Optional[float]`
How long to wait before deleting the message
Returns
-------
`Message`
The message object
"""
payload = MessageResponse(
content,
embed=embed,
embeds=embeds,
file=file,
files=files,
view=view,
tts=tts,
type=type,
poll=poll,
allowed_mentions=allowed_mentions,
)
r = await self._state.query(
"POST",
f"/channels/{self.id}/messages",
data=payload.to_multipart(is_request=True),
headers={"Content-Type": payload.content_type}
)
from .message import Message
_msg = Message(
state=self._state,
data=r.response
)
if delete_after is not None:
await _msg.delete(delay=float(delete_after))
return _msg
def _class_to_return(
self,
data: dict,
*,
state: Optional["DiscordAPI"] = None,
guild_id: int | None = None
) -> "BaseChannel":
match data["type"]:
case x if x in (ChannelType.guild_text, ChannelType.guild_news):
_class = TextChannel
case ChannelType.guild_voice:
_class = VoiceChannel
case ChannelType.guild_category:
_class = CategoryChannel
case ChannelType.guild_news_thread:
_class = NewsThread
case ChannelType.guild_public_thread:
_class = PublicThread
case ChannelType.guild_private_thread:
_class = PrivateThread
case ChannelType.guild_stage_voice:
_class = StageChannel
case ChannelType.guild_forum:
_class = ForumChannel
case _:
_class = BaseChannel
_class: type["BaseChannel"]
if guild_id is not None:
data["guild_id"] = int(guild_id)
return _class(
state=state or self._state,
data=data
)
[docs]
@classmethod
def from_dict(cls, *, state: "DiscordAPI", data: dict) -> Self:
"""
Create a channel object from a dictionary
Requires the state to be set
Parameters
----------
state: `DiscordAPI`
The state to use
data: `dict`
Data provided by Discord API
Returns
-------
`BaseChannel`
The channel object
"""
temp_class = cls(
state=state,
id=int(data["id"]),
guild_id=utils.get_int(data, "guild_id")
)
return temp_class._class_to_return(data=data, state=state) # type: ignore
[docs]
async def fetch(self) -> "BaseChannel":
""" `BaseChannel`: Fetches the channel and returns the channel object """
r = await self._state.query(
"GET",
f"/channels/{self.id}"
)
return self._class_to_return(
data=r.response
)
[docs]
async def edit(
self,
*,
name: Optional[str] = MISSING,
type: Optional[Union[ChannelType, int]] = MISSING,
position: Optional[int] = MISSING,
topic: Optional[str] = MISSING,
nsfw: Optional[bool] = MISSING,
rate_limit_per_user: Optional[int] = MISSING,
bitrate: Optional[int] = MISSING,
user_limit: Optional[int] = MISSING,
overwrites: Optional[list[PermissionOverwrite]] = MISSING,
parent_id: Optional[Union[Snowflake, int]] = MISSING,
rtc_region: Optional[str] = MISSING,
video_quality_mode: Optional[Union[VideoQualityType, int]] = MISSING,
default_auto_archive_duration: Optional[int] = MISSING,
flags: Optional[ChannelFlags] = MISSING,
available_tags: Optional[list["ForumTag"]] = MISSING,
default_reaction_emoji: Optional[str] = MISSING,
default_thread_rate_limit_per_user: Optional[int] = MISSING,
default_sort_order: Optional[Union[SortOrderType, int]] = MISSING,
default_forum_layout: Optional[Union[ForumLayoutType, int]] = MISSING,
archived: Optional[bool] = MISSING,
auto_archive_duration: Optional[int] = MISSING,
locked: Optional[bool] = MISSING,
invitable: Optional[bool] = MISSING,
applied_tags: Optional[list[Union["ForumTag", int]]] = MISSING,
reason: Optional[str] = None,
) -> Self:
"""
Edit the channel
Note that this method globaly edits any channel type.
So be sure to use the correct parameters for the channel.
Parameters
----------
name: `Optional[str]`
New name of the channel (All)
type: `Optional[Union[ChannelType, int]]`
The new type of the channel (Text, Announcement)
position: `Optional[int]`
The new position of the channel (All)
topic: `Optional[str]`
The new topic of the channel (Text, Announcement, Forum, Media)
nsfw: `Optional[bool]`
If the channel should be NSFW (Text, Voice, Announcement, Stage, Forum, Media)
rate_limit_per_user: `Optional[int]`
How long the slowdown should be (Text, Voice, Stage, Forum, Media)
bitrate: `Optional[int]`
The new bitrate of the channel (Voice, Stage)
user_limit: `Optional[int]`
The new user limit of the channel (Voice, Stage)
overwrites: `Optional[list[PermissionOverwrite]]`
The new permission overwrites of the channel (All)
parent_id: `Optional[Union[Snowflake, int]]`
The new parent ID of the channel (Text, Voice, Announcement, Stage, Forum, Media)
rtc_region: `Optional[str]`
The new RTC region of the channel (Voice, Stage)
video_quality_mode: `Optional[Union[VideoQualityType, int]]`
The new video quality mode of the channel (Voice, Stage)
default_auto_archive_duration: `Optional[int]`
The new default auto archive duration of the channel (Text, Announcement, Forum, Media)
flags: `Optional[ChannelFlags]`
The new flags of the channel (Forum, Media)
available_tags: `Optional[list[ForumTag]]`
The new available tags of the channel (Forum, Media)
default_reaction_emoji: `Optional[str]`
The new default reaction emoji of the channel (Forum, Media)
default_thread_rate_limit_per_user: `Optional[int]`
The new default thread rate limit per user of the channel (Text, Forum, Media)
default_sort_order: `Optional[Union[SortOrderType, int]]`
The new default sort order of the channel (Forum, Media)
default_forum_layout: `Optional[Union[ForumLayoutType, int]]`
The new default forum layout of the channel (Forum)
archived: `Optional[bool]`
If the thread should be archived (Thread, Forum)
auto_archive_duration: `Optional[int]`
The new auto archive duration of the thread (Thread, Forum)
locked: `Optional[bool]`
If the thread should be locked (Thread, Forum)
invitable: `Optional[bool]`
If the thread should be invitable by everyone (Thread)
applied_tags: `Optional[list[Union[ForumTag, int]]`
The new applied tags of the forum thread (Forum, Media)
reason: `Optional[str]`
The reason for editing the channel (All)
Returns
-------
`BaseChannel`
The channel object
"""
payload = {}
if name is not MISSING:
payload["name"] = str(name)
if type is not MISSING:
payload["type"] = int(type or 0)
if position is not MISSING:
payload["position"] = int(position or 0)
if topic is not MISSING:
payload["topic"] = topic
if nsfw is not MISSING:
payload["nsfw"] = bool(nsfw)
if rate_limit_per_user is not MISSING:
payload["rate_limit_per_user"] = int(
rate_limit_per_user or 0
)
if bitrate is not MISSING:
payload["bitrate"] = int(bitrate or 64000)
if user_limit is not MISSING:
payload["user_limit"] = int(user_limit or 0)
if overwrites is not MISSING:
if overwrites is None:
payload["permission_overwrites"] = []
else:
payload["permission_overwrites"] = [
g.to_dict() for g in overwrites
if isinstance(g, PermissionOverwrite)
]
if parent_id is not MISSING:
if parent_id is None:
payload["parent_id"] = None
else:
payload["parent_id"] = str(int(parent_id))
if rtc_region is not MISSING:
payload["rtc_region"] = rtc_region
if video_quality_mode is not MISSING:
payload["video_quality_mode"] = int(
video_quality_mode or 1
)
if default_auto_archive_duration is not MISSING:
payload["default_auto_archive_duration"] = int(
default_auto_archive_duration or 4320
)
if flags is not MISSING:
payload["flags"] = int(flags or 0)
if available_tags is not MISSING:
if available_tags is None:
payload["available_tags"] = []
else:
payload["available_tags"] = [
g.to_dict() for g in available_tags
if isinstance(g, ForumTag)
]
if default_reaction_emoji is not MISSING:
if default_reaction_emoji is None:
payload["default_reaction_emoji"] = None
else:
_emoji = EmojiParser(default_reaction_emoji)
payload["default_reaction_emoji"] = _emoji.to_forum_dict()
if default_thread_rate_limit_per_user is not MISSING:
payload["default_thread_rate_limit_per_user"] = int(
default_thread_rate_limit_per_user or 0
)
if default_sort_order is not MISSING:
payload["default_sort_order"] = int(
default_sort_order or 0
)
if default_forum_layout is not MISSING:
payload["default_forum_layout"] = int(
default_forum_layout or 0
)
if archived is not MISSING:
payload["archived"] = bool(archived)
if auto_archive_duration is not MISSING:
payload["auto_archive_duration"] = int(
auto_archive_duration or 4320
)
if locked is not MISSING:
payload["locked"] = bool(locked)
if invitable is not MISSING:
payload["invitable"] = bool(invitable)
if applied_tags is not MISSING:
if applied_tags is None:
payload["applied_tags"] = []
else:
payload["applied_tags"] = [
str(int(g))
for g in applied_tags
]
r = await self._state.query(
"PATCH",
f"/channels/{self.id}",
json=payload,
reason=reason
)
return self._class_to_return(data=r.response) # type: ignore
[docs]
def typing(self) -> Typing:
"""
Makes the bot trigger the typing indicator.
There are two ways you can use this:
- Usual await call
- Using `async with` to type as long as you need
.. code-block:: python
# Method 1
await channel.typing() # Stops after 10 seconds or message sent
# Method 2
async with channel.typing():
asyncio.sleep(4)
"""
return Typing(state=self._state, channel=self)
[docs]
async def set_permission(
self,
overwrite: PermissionOverwrite,
*,
reason: Optional[str] = None
) -> None:
"""
Set a permission overwrite for the channel
Parameters
----------
overwrite: `PermissionOverwrite`
The new overwrite permissions for the spesific role/user
reason: `Optional[str]`
The reason for editing the overwrite
"""
await self._state.query(
"PUT",
f"/channels/{self.id}/permissions/{int(overwrite.target.id)}",
json=overwrite.to_dict(),
res_method="text",
reason=reason
)
[docs]
async def delete_permission(
self,
id: Union[Snowflake, int],
*,
reason: Optional[str] = None
) -> None:
"""
Delete a permission overwrite for the channel
Parameters
----------
id: `Union[Snowflake, int]`
The ID of the overwrite
reason: `Optional[str]`
The reason for deleting the overwrite
"""
await self._state.query(
"DELETE",
f"/channels/{self.id}/permissions/{int(id)}",
res_method="text",
reason=reason
)
[docs]
async def delete(
self,
*,
reason: Optional[str] = None
) -> None:
"""
Delete the channel
Parameters
----------
reason: `Optional[str]`
The reason for deleting the channel
"""
await self._state.query(
"DELETE",
f"/channels/{self.id}",
reason=reason,
res_method="text"
)
[docs]
async def create_webhook(
self,
name: str,
*,
avatar: Optional[Union[File, bytes]] = None,
reason: Optional[str] = None
) -> Webhook:
"""
Create a webhook for the channel
Parameters
----------
name: `str`
The name of the webhook
avatar: `Optional[File]`
The avatar of the webhook
reason: `Optional[str]`
The reason for creating the webhook that appears in audit logs
Returns
-------
`Webhook`
The webhook object
"""
payload = {"name": name}
if avatar is not None:
payload["avatar"] = utils.bytes_to_base64(avatar)
r = await self._state.query(
"POST",
f"/channels/{self.id}/webhooks",
json=payload,
reason=reason,
)
return Webhook(state=self._state, data=r.response)
[docs]
async def create_thread(
self,
name: str,
*,
type: Union[ChannelType, int] = ChannelType.guild_private_thread,
auto_archive_duration: Optional[int] = 4320,
invitable: bool = True,
rate_limit_per_user: Optional[Union[timedelta, int]] = None,
reason: Optional[str] = None
) -> Union["PublicThread", "PrivateThread", "NewsThread"]:
"""
Creates a thread in the channel
Parameters
----------
name: `str`
The name of the thread
type: `Optional[Union[ChannelType, int]]`
The type of thread to create
auto_archive_duration: `Optional[int]`
The duration in minutes to automatically archive the thread after recent activity
invitable: `bool`
If the thread is invitable
rate_limit_per_user: `Optional[Union[timedelta, int]]`
How long the slowdown should be
reason: `Optional[str]`
The reason for creating the thread
Returns
-------
`Union[PublicThread, PrivateThread, NewsThread]`
The thread object
Raises
------
`ValueError`
- If the auto_archive_duration is not 60, 1440, 4320 or 10080
- If the rate_limit_per_user is not between 0 and 21600 seconds
"""
payload = {
"name": name,
"type": int(type),
"invitable": invitable,
}
if auto_archive_duration not in (60, 1440, 4320, 10080):
raise ValueError("auto_archive_duration must be 60, 1440, 4320 or 10080")
if rate_limit_per_user is not None:
if isinstance(rate_limit_per_user, timedelta):
rate_limit_per_user = int(rate_limit_per_user.total_seconds())
if rate_limit_per_user not in range(0, 21601):
raise ValueError("rate_limit_per_user must be between 0 and 21600 seconds")
payload["rate_limit_per_user"] = rate_limit_per_user
r = await self._state.query(
"POST",
f"/channels/{self.id}/threads",
json=payload,
reason=reason
)
match r.response["type"]:
case ChannelType.guild_public_thread:
_class = PublicThread
case ChannelType.guild_private_thread:
_class = PrivateThread
case ChannelType.guild_news_thread:
_class = NewsThread
case _:
raise ValueError("Invalid thread type")
return _class(
state=self._state,
data=r.response
)
[docs]
async def fetch_history(
self,
*,
before: Optional[Union[datetime, "Message", Snowflake, int]] = None,
after: Optional[Union[datetime, "Message", Snowflake, int]] = None,
around: Optional[Union[datetime, "Message", Snowflake, int]] = None,
limit: Optional[int] = 100,
) -> AsyncIterator["Message"]:
"""
Fetch the channel's message history
Parameters
----------
before: `Optional[Union[datetime, Message, Snowflake, int]]`
Get messages before this message
after: `Optional[Union[datetime, Message, Snowflake, int]]`
Get messages after this message
around: `Optional[Union[datetime, Message, Snowflake, int]]`
Get messages around this message
limit: `Optional[int]`
The maximum amount of messages to fetch.
`None` will fetch all users.
Yields
------
`Message`
The message object
"""
async def _get_history(limit: int, **kwargs):
params = {"limit": limit}
for key, value in kwargs.items():
if value is None:
continue
params[key] = utils.normalize_entity_id(value)
return await self._state.query(
"GET",
f"/channels/{self.id}/messages",
params=params
)
async def _around_http(
http_limit: int,
around_id: Optional[int],
limit: Optional[int]
):
r = await _get_history(limit=http_limit, around=around_id)
return r.response, None, limit
async def _after_http(
http_limit: int,
after_id: Optional[int],
limit: Optional[int]
):
r = await _get_history(limit=http_limit, after=after_id)
if r.response:
if limit is not None:
limit -= len(r.response)
after_id = int(r.response[0]["id"])
return r.response, after_id, limit
async def _before_http(
http_limit: int,
before_id: Optional[int],
limit: Optional[int]
):
r = await _get_history(limit=http_limit, before=before_id)
if r.response:
if limit is not None:
limit -= len(r.response)
before_id = int(r.response[-1]["id"])
return r.response, before_id, limit
if around:
if limit is None:
raise ValueError("limit must be specified when using around")
if limit > 100:
raise ValueError("limit must be less than or equal to 100 when using around")
strategy, state = _around_http, utils.normalize_entity_id(around)
elif after:
strategy, state = _after_http, utils.normalize_entity_id(after)
elif before:
strategy, state = _before_http, utils.normalize_entity_id(before)
else:
strategy, state = _before_http, None
# Must be imported here to avoid circular import
# From the top of the file
from .message import Message
while True:
http_limit: int = 100 if limit is None else min(limit, 100)
if http_limit <= 0:
break
strategy: Callable
messages, state, limit = await strategy(http_limit, state, limit)
i = 0
for i, msg in enumerate(messages, start=1):
yield Message(
state=self._state,
data=msg,
guild=self.guild
)
if i < 100:
break
@overload
async def bulk_delete_messages(
self,
*,
check: Callable[["Message"], bool] | None = None,
before: "datetime | Message | Snowflake | int | None" = None,
after: "datetime | Message | Snowflake | int | None" = None,
around: "datetime | Message | Snowflake | int | None" = None,
message_ids: list["Message | Snowflake | int"],
limit: int | None = 100,
reason: str | None = None
) -> None:
...
@overload
async def bulk_delete_messages(
self,
*,
check: Callable[["Message"], bool] | None = None,
before: "datetime | Message | Snowflake | int | None" = None,
after: "datetime | Message | Snowflake | int | None" = None,
around: "datetime | Message | Snowflake | int | None" = None,
message_ids: None = None,
limit: int | None = 100,
reason: str | None = None
) -> list["Message"]:
...
[docs]
async def bulk_delete_messages(
self,
*,
check: Callable[["Message"], bool] | None = None,
before: "datetime | Message | Snowflake | int | None" = None,
after: "datetime | Message | Snowflake | int | None" = None,
around: "datetime | Message | Snowflake | int | None" = None,
message_ids: list["Message | Snowflake | int"] | None = None,
limit: int | None = 100,
reason: str | None = None
) -> list["Message"] | None:
"""
Deletes messages in bulk
Parameters
----------
check: `Callable[[Message], bool] | None`
A function to check if the message should be deleted
before: `datetime | Message | Snowflake | int | None`
The message before which to delete
after: `datetime | Message | Snowflake | int | None`
The message after which to delete
around: `datetime | Message | Snowflake | int | None`
The message around which to delete
message_ids: `list[Message | Snowflake | int] | None`
The message IDs to delete
limit: `int | None`
The maximum amount of messages to delete
reason: `str | None`
The reason for deleting the messages
Returns
-------
`list[Message] | None`
Returns a list of messages deleted
If you provide message_ids upfront, it will skip history search and delete
"""
_msg_collector: list["Message"] = []
async def _bulk_delete(messages: list["Message"]):
if len(messages) > 1:
await self._state.query(
"POST",
f"/channels/{self.id}/messages/bulk-delete",
res_method="text",
json={"messages": [str(int(g)) for g in messages]},
reason=reason
)
else:
await _single_delete(messages)
async def _single_delete(messages: list["Message"]):
for g in messages:
try:
await g.delete()
except NotFound as e:
if e.code == 10008:
pass
raise e
if message_ids is not None:
# Remove duplicates just in case
message_ids = list(set(message_ids))
await _bulk_delete(message_ids) # type: ignore
return None
count = 0
minimum_time = int((time.time() - 14 * 24 * 60 * 60) * 1000 - 1420070400000) << 22
strategy = _bulk_delete
async for message in self.fetch_history(
before=before,
after=after,
around=around,
limit=limit
):
if count == 100:
to_delete = _msg_collector[-100:]
await strategy(to_delete)
count = 0
await asyncio.sleep(0.5)
if check is not None and not check(message):
continue
if message.id < minimum_time:
if count == 1:
await _msg_collector[-1].delete()
elif count >= 2:
await strategy(_msg_collector[-count:])
count = 0
strategy = _single_delete
count += 1
_msg_collector.append(message)
if count != 0:
await strategy(_msg_collector[-count:])
return _msg_collector
[docs]
async def join_thread(self) -> None:
""" Make the bot join a thread """
await self._state.query(
"PUT",
f"/channels/{self.id}/thread-members/@me",
res_method="text"
)
[docs]
async def leave_thread(self) -> None:
""" Make the bot leave a thread """
await self._state.query(
"DELETE",
f"/channels/{self.id}/thread-members/@me",
res_method="text"
)
[docs]
async def add_thread_member(
self,
user_id: int
) -> None:
"""
Add a thread member
Parameters
----------
user_id: `int`
The user ID to add
"""
await self._state.query(
"PUT",
f"/channels/{self.id}/thread-members/{user_id}",
res_method="text"
)
[docs]
async def remove_thread_member(
self,
user_id: int
) -> None:
"""
Remove a thread member
Parameters
----------
user_id: `int`
The user ID to remove
"""
await self._state.query(
"DELETE",
f"/channels/{self.id}/thread-members/{user_id}",
res_method="text"
)
[docs]
async def fetch_thread_member(
self,
user_id: int
) -> "ThreadMember":
"""
Fetch a thread member
Parameters
----------
user_id: `int`
The user ID to fetch
Returns
-------
`ThreadMember`
The thread member object
"""
if not self.guild:
raise ValueError("Cannot fetch thread member without guild_id")
r = await self._state.query(
"GET",
f"/channels/{self.id}/thread-members/{user_id}",
params={"with_member": "true"}
)
from .member import ThreadMember
return ThreadMember(
state=self._state,
guild=self.guild,
data=r.response,
)
[docs]
async def fetch_thread_members(self) -> list["ThreadMember"]:
"""
Fetch all thread members
Returns
-------
`list[ThreadMember]`
The list of thread members
"""
if not self.guild:
raise ValueError("Cannot fetch thread member without guild_id")
r = await self._state.query(
"GET",
f"/channels/{self.id}/thread-members",
params={"with_member": "true"},
)
from .member import ThreadMember
return [
ThreadMember(
state=self._state,
guild=self.guild,
data=data
)
for data in r.response
]
[docs]
class BaseChannel(PartialChannel):
def __init__(
self,
*,
state: "DiscordAPI",
data: dict,
guild_id: int | None = None
):
super().__init__(
state=state,
id=int(data["id"]),
guild_id=utils.get_int(data, "guild_id", default=guild_id)
)
self.name: Optional[str] = data.get("name", None)
self.nsfw: bool = data.get("nsfw", False)
self.topic: Optional[str] = data.get("topic", None)
self.position: Optional[int] = utils.get_int(data, "position")
self.last_message_id: Optional[int] = utils.get_int(data, "last_message_id")
self.parent_id: Optional[int] = utils.get_int(data, "parent_id")
self.rate_limit_per_user: int = data.get("rate_limit_per_user", 0)
self._raw_type: ChannelType = ChannelType(data["type"])
self.permission_overwrites: list[PermissionOverwrite] = [
PermissionOverwrite.from_dict(g)
for g in data.get("permission_overwrites", [])
]
def __repr__(self) -> str:
return f"<Channel id={self.id} name='{self.name}'>"
def __str__(self) -> str:
return self.name or ""
[docs]
def permissions_for(self, member: "Member") -> Permissions:
"""
Returns the permissions for a member in the channel.
Note that this only works if you are using Gateway with guild, role and channel cache.
Parameters
----------
member: `Member`
The member to get the permissions for.
Returns
-------
`Permissions`
The permissions for the member in the channel.
"""
if getattr(self.guild, "owner_id", None) == member.id:
return Permissions.all()
base: Permissions = getattr(
self.guild.default_role,
"permissions",
Permissions.none()
)
for r in member.roles:
role = self.guild.get_role(r.id)
if role is None:
continue
base |= getattr(role, "permissions", Permissions.none())
if Permissions.administrator in base:
return Permissions.all()
_everyone = next((
g for g in self.permission_overwrites
if g.target.id == self.guild.default_role.id
), None)
if _everyone:
base = base.handle_overwrite(int(_everyone.allow), int(_everyone.deny))
_overwrites = [
g for g in self.permission_overwrites
if g.target.id != _everyone.target.id
]
else:
_overwrites = self.permission_overwrites
allows, denies = 0, 0
for ow in _overwrites:
if ow.is_role() and ow.target.id in member.roles:
allows |= int(ow.allow)
denies |= int(ow.deny)
base = base.handle_overwrite(allows, denies)
for ow in _overwrites:
if ow.is_member() and ow.target.id == member.id:
allows |= int(ow.allow)
denies |= int(ow.deny)
break
if member.is_timed_out():
_timeout_perm = (
Permissions.view_channel |
Permissions.read_message_history
)
if Permissions.view_channel not in base:
_timeout_perm &= ~Permissions.view_channel
if Permissions.read_message_history not in base:
_timeout_perm &= ~Permissions.read_message_history
base = _timeout_perm
return base
@property
def mention(self) -> str:
""" `str`: The channel's mention """
return f"<#{self.id}>"
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.guild_text
[docs]
@classmethod
def from_dict(
cls,
*,
state: "DiscordAPI",
data: dict,
guild_id: int | None = None
) -> "BaseChannel":
"""
Create a channel object from a dictionary
Requires the state to be set
Parameters
----------
state: `DiscordAPI`
The state to use
data: `dict`
Data provided by Discord API
Returns
-------
`BaseChannel`
The channel object
"""
_class = cls(state=state, data=data)._class_to_return(
data=data,
state=state,
guild_id=guild_id
)
return _class
[docs]
class TextChannel(BaseChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
def __repr__(self) -> str:
return f"<TextChannel id={self.id} name='{self.name}'>"
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
if self._raw_type == 0:
return ChannelType.guild_text
return ChannelType.guild_news
[docs]
class DMChannel(BaseChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
self.name: Optional[str] = None
self.user: Optional["User"] = None
self.last_message: Optional["PartialMessage"] = None
self._from_data(data)
def __repr__(self) -> str:
return f"<DMChannel id={self.id} name='{self.user}'>"
def _from_data(self, data: dict):
if data.get("recipients", None):
from .user import User
self.user = User(state=self._state, data=data["recipients"][0])
self.name = self.user.name
if data.get("last_message_id", None):
from .message import PartialMessage
self.last_message = PartialMessage(
state=self._state,
channel_id=self.id,
id=int(data["last_message_id"])
)
if data.get("last_pin_timestamp", None):
self.last_pin_timestamp = utils.parse_time(data["last_pin_timestamp"])
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.dm
@property
def mention(self) -> str:
""" `str`: The channel's mention """
return f"<@{self.id}>"
[docs]
async def edit(self, *args, **kwargs) -> None:
"""
Only here to prevent errors
Raises
------
`TypeError`
If you try to edit a DM channel
"""
raise TypeError("Cannot edit a DM channel")
[docs]
class StoreChannel(BaseChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
def __repr__(self) -> str:
return f"<StoreChannel id={self.id} name='{self.name}'>"
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.guild_store
[docs]
class GroupDMChannel(BaseChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
def __repr__(self) -> str:
return f"<GroupDMChannel id={self.id} name='{self.name}'>"
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.group_dm
[docs]
class DirectoryChannel(BaseChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
def __repr__(self) -> str:
return f"<DirectoryChannel id={self.id} name='{self.name}'>"
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.guild_directory
[docs]
class CategoryChannel(BaseChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
def __repr__(self) -> str:
return f"<CategoryChannel id={self.id} name='{self.name}'>"
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.guild_category
@property
def channels(self) -> list["BaseChannel | PartialChannel"]:
"""
`list[BaseChannel | PartialChannel]`: Returns a list of channels in this category.
This will only return channels that are in the same guild as the category.
"""
guild = self._state.cache.get_guild(self.guild_id)
if not guild:
return []
channels: list["BaseChannel | PartialChannel"] = [
g for g in guild.channels
if g.parent_id == self.id
]
_voice_types = [
ChannelType.guild_voice,
ChannelType.guild_stage_voice
]
return sorted(
channels,
key=lambda x: (
1 if x.type in _voice_types else 0,
getattr(x, "position", 0)
)
)
[docs]
async def create_text_channel(
self,
name: str,
**kwargs
) -> TextChannel:
"""
Create a text channel in the category
Parameters
----------
name: `str`
The name of the channel
topic: `Optional[str]`
The topic of the channel
rate_limit_per_user: `Optional[int]`
The rate limit per user of the channel
overwrites: `Optional[list[PermissionOverwrite]]`
The permission overwrites of the category
parent_id: `Optional[Snowflake]`
The Category ID where the channel will be placed
nsfw: `Optional[bool]`
Whether the channel is NSFW or not
reason: `Optional[str]`
The reason for creating the text channel
Returns
-------
`TextChannel`
The channel object
"""
return await self.guild.create_text_channel(
name=name,
parent_id=self.id,
**kwargs
)
[docs]
async def create_voice_channel(
self,
name: str,
**kwargs
) -> "VoiceChannel":
"""
Create a voice channel to category
Parameters
----------
name: `str`
The name of the channel
bitrate: `Optional[int]`
The bitrate of the channel
user_limit: `Optional[int]`
The user limit of the channel
rate_limit_per_user: `Optional`
The rate limit per user of the channel
overwrites: `Optional[list[PermissionOverwrite]]`
The permission overwrites of the category
position: `Optional[int]`
The position of the channel
parent_id: `Optional[Snowflake]`
The Category ID where the channel will be placed
nsfw: `Optional[bool]`
Whether the channel is NSFW or not
reason: `Optional[str]`
The reason for creating the voice channel
Returns
-------
`VoiceChannel`
The channel object
"""
return await self.guild.create_voice_channel(
name=name,
parent_id=self.id,
**kwargs
)
[docs]
async def create_stage_channel(
self,
name: str,
**kwargs
) -> "StageChannel":
"""
Create a stage channel
Parameters
----------
name: `str`
The name of the channel
bitrate: `Optional[int]`
The bitrate of the channel
user_limit: `Optional[int]`
The user limit of the channel
overwrites: `Optional[list[PermissionOverwrite]]`
The permission overwrites of the category
position: `Optional[int]`
The position of the channel
video_quality_mode: `Optional[Union[VideoQualityType, int]]`
The video quality mode of the channel
parent_id: `Optional[Union[Snowflake, int]]`
The Category ID where the channel will be placed
reason: `Optional[str]`
The reason for creating the stage channel
Returns
-------
`StageChannel`
The created channel
"""
return await self.guild.create_stage_channel(
name=name,
parent_id=self.id,
**kwargs
)
[docs]
class NewsChannel(BaseChannel):
def __init__(self, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
def __repr__(self) -> str:
return f"<NewsChannel id={self.id} name='{self.name}'>"
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.guild_news
# Thread channels
[docs]
class PartialThread(PartialChannel):
def __init__(
self,
*,
state: "DiscordAPI",
id: int,
guild_id: int,
parent_id: int,
type: ChannelType | int
):
super().__init__(state=state, id=int(id), guild_id=int(guild_id))
self.parent_id: int = int(parent_id)
self._raw_type: ChannelType = ChannelType(int(type))
def __repr__(self) -> str:
return f"<PartialThread id={self.id} type={self.type}>"
[docs]
class PublicThread(BaseChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
self.name: str = data["name"]
self.message_count: int = utils.get_int(data, "message_count") or 0
self.member_count: int = utils.get_int(data, "member_count") or 0
self.rate_limit_per_user: int = utils.get_int(data, "rate_limit_per_user") or 0
self.total_message_sent: int = utils.get_int(data, "total_message_sent") or 0
self._metadata: dict = data.get("thread_metadata", {})
self.locked: bool = self._metadata.get("locked", False)
self.archived: bool = self._metadata.get("archived", False)
self.auto_archive_duration: int = self._metadata.get("auto_archive_duration", 60)
self.channel_id: int = int(data["id"])
self.newly_created: bool = data.get("newly_created", False)
self.guild_id: Optional[int] = utils.get_int(data, "guild_id")
self.owner_id: Optional[int] = utils.get_int(data, "owner_id")
self.last_message_id: Optional[int] = utils.get_int(data, "last_message_id")
def __repr__(self) -> str:
return f"<PublicThread id={self.id} name='{self.name}'>"
@property
def guild(self) -> "Guild | PartialGuild | None":
""" `PartialGuild`: Returns a partial guild object """
if not self.guild_id:
return None
cache = self._state.cache.get_guild(self.guild_id)
if cache:
return cache
from .guild import PartialGuild
return PartialGuild(state=self._state, id=self.guild_id)
@property
def owner(self) -> Optional["PartialUser"]:
""" `PartialUser`: Returns a partial user object """
if not self.owner_id:
return None
from .user import PartialUser
return PartialUser(state=self._state, id=self.owner_id)
@property
def last_message(self) -> Optional["PartialMessage"]:
""" `Optional[PartialMessage]`: Returns a partial message object if the last message ID is available """
if not self.last_message_id:
return None
from .message import PartialMessage
return PartialMessage(
state=self._state,
channel_id=self.channel_id,
guild_id=self.guild_id,
id=self.last_message_id
)
[docs]
class ForumTag:
def __init__(self, *, data: dict):
self.id: Optional[int] = utils.get_int(data, "id")
self.name: str = data["name"]
self.moderated: bool = data.get("moderated", False)
self.emoji_id: Optional[int] = utils.get_int(data, "emoji_id")
self.emoji_name: Optional[str] = data.get("emoji_name", None)
def __repr__(self) -> str:
return f"<ForumTag id={self.id} name='{self.name}'>"
def __str__(self) -> str:
return self.name
def __int__(self) -> int:
return int(self.id or -1)
[docs]
@classmethod
def create(
cls,
name: Optional[str] = None,
*,
emoji_id: Optional[int] = None,
emoji_name: Optional[str] = None,
moderated: bool = False
) -> "ForumTag":
"""
Create a forum tag, used for editing available_tags
Parameters
----------
name: `Optional[str]`
The name of the tag
emoji_id: `Optional[int]`
The emoji ID of the tag
emoji_name: `Optional[str]`
The emoji name of the tag
moderated: `bool`
If the tag is moderated
Returns
-------
`ForumTag`
The tag object
"""
if emoji_id and emoji_name:
raise ValueError(
"Cannot have both emoji_id and "
"emoji_name defined for a tag."
)
return cls(data={
"name": name or "New Tag",
"emoji_id": emoji_id,
"emoji_name": emoji_name,
"moderated": moderated
})
[docs]
def to_dict(self) -> dict:
payload = {
"name": self.name,
"moderated": self.moderated,
}
if self.id:
payload["id"] = str(self.id)
if self.emoji_id:
payload["emoji_id"] = str(self.emoji_id)
if self.emoji_name:
payload["emoji_name"] = self.emoji_name
return payload
[docs]
@classmethod
def from_data(cls, *, data: dict) -> Self:
self = cls.__new__(cls)
self.name = data["name"]
self.id = int(data["id"])
self.moderated = data.get("moderated", False)
self.emoji_id = utils.get_int(data, "emoji_id")
self.emoji_name = data.get("emoji_name", None)
return self
[docs]
class ForumChannel(PublicThread):
def __init__(self, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
self.default_reaction_emoji: Optional[EmojiParser] = None
self.tags: list[ForumTag] = [
ForumTag(data=g)
for g in data.get("tags", [])
]
self._from_data(data)
def __repr__(self) -> str:
return f"<ForumChannel id={self.id} name='{self.name}'>"
def _from_data(self, data: dict):
if data.get("default_reaction_emoji", None):
_target = (
data["default_reaction_emoji"].get("id", None) or
data["default_reaction_emoji"].get("name", None)
)
if _target:
self.default_reaction_emoji = EmojiParser(_target)
[docs]
class ForumThread(PublicThread):
def __init__(self, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
self._from_data(data)
def __repr__(self) -> str:
return f"<ForumThread id={self.id} name='{self.name}'>"
def __str__(self) -> str:
return self.name
def _from_data(self, data: dict):
from .message import Message
self.message: Message = Message(
state=self._state,
data=data["message"],
guild=self.guild
)
[docs]
class NewsThread(PublicThread):
def __init__(self, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
def __repr__(self) -> str:
return f"<NewsThread id={self.id} name='{self.name}'>"
[docs]
class PrivateThread(PublicThread):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.guild_private_thread
[docs]
class Thread(PublicThread):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
if self._raw_type == 11:
return ChannelType.guild_public_thread
return ChannelType.guild_private_thread
# Voice channels
[docs]
class VoiceRegion:
def __init__(self, *, data: dict):
self.id: str = data["id"]
self.name: str = data["name"]
self.custom: bool = data["custom"]
self.deprecated: bool = data["deprecated"]
self.optimal: bool = data["optimal"]
def __str__(self) -> str:
return self.name
def __repr__(self) -> str:
return f"<VoiceRegion id='{self.id}' name='{self.name}'>"
[docs]
class VoiceChannel(BaseChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
self.bitrate: int = int(data["bitrate"])
self.user_limit: int = int(data["user_limit"])
self.rtc_region: Optional[str] = data.get("rtc_region", None)
def __repr__(self) -> str:
return f"<VoiceChannel id={self.id} name='{self.name}'>"
@property
def type(self) -> ChannelType:
""" `ChannelType`: Returns the channel's type """
return ChannelType.guild_voice
class StageInstance(PartialBase):
"""Represents a stage instance for a stage channel.
This holds information about a live stage.
Attributes
----------
id: `int`
The ID of the stage instance
channel_id: `int`
The ID of the stage channel
guild_id: `int`
The associated guild ID of the stage channel
topic: `str`
The topic of the stage instance
privacy_level: `PrivacyLevel`
The privacy level of the stage instance
guild_scheduled_event_id: `Optional[int]`
The guild scheduled event ID associated with this stage instance
"""
def __init__(
self,
*,
state: "DiscordAPI",
data: "channels.StageInstance",
guild: "PartialGuild | None" = None,
) -> None:
super().__init__(id=int(data["id"]))
self._state: "DiscordAPI" = state
self._guild: "PartialGuild | None" = guild
self._from_data(data)
def _from_data(self, data: "channels.StageInstance") -> None:
self.channel_id: int = int(data["channel_id"])
self.guild_id: int = int(data["guild_id"])
self.topic: str = data["topic"]
self.privacy_level: PrivacyLevelType = PrivacyLevelType(data["privacy_level"])
self.guild_scheduled_event_id: Optional[int] = utils.get_int(data, "guild_scheduled_event_id") # type: ignore # todo types
@property
def guild(self) -> "Guild | PartialGuild | None":
if not self.guild_id:
return None
cache = self._state.cache.get_guild(self.guild_id)
if cache:
return cache
from .guild import PartialGuild
return PartialGuild(state=self._state, id=self.guild_id)
@property
def channel(self) -> "PartialChannel | StageChannel":
channel = self.guild.get_channel(self.channel_id) or (
PartialChannel(state=self._state, id=self.channel_id)
)
return channel
@property
def scheduled_event(self) -> "PartialScheduledEvent | None":
if not self.guild_scheduled_event_id:
return None
from .guild import PartialScheduledEvent
return PartialScheduledEvent(
state=self._state,
id=self.guild_scheduled_event_id,
guild_id=self.guild_id
)
def __repr__(self) -> str:
return f"<StageInstance id={self.id!r} topic={self.topic!r}>"
async def edit(
self,
*,
topic: str = MISSING,
privacy_level: PrivacyLevelType = MISSING,
reason: Optional[str] = None
) -> Self:
"""Edit this stage instance
Parameters
----------
topic: `str`
The new topic of this stage instance.
privacy_level: `PrivacyLevel`
The new privacy level of this stage instance.
reason: `Optional[str]`
The reason for editing the stage instance.
Returns
-------
`StageInstance`
The edited stage instance
"""
payload = {}
if topic is not MISSING:
payload["topic"] = str(topic)
if privacy_level is not MISSING:
payload["privacy_level"] = int(privacy_level)
r = await self._state.query(
"PATCH",
f"/stage-instances/{self.id}",
json=payload,
reason=reason
)
return self.__class__(
state=self._state,
data=r.response, # type: ignore # todo types
guild=self._guild,
)
async def delete(self, *, reason: Optional[str] = None) -> None:
"""Delete this stage instance
Parameters
----------
reason: `Optional[str]`
The reason for deleting the stage instance
"""
await self._state.query(
"DELETE",
f"/stage-instances/{self.id}",
res_method="text",
reason=reason
)
[docs]
class StageChannel(VoiceChannel):
def __init__(self, *, state: "DiscordAPI", data: dict):
super().__init__(state=state, data=data)
self._stage_instance: Optional[StageInstance] = None
def __repr__(self) -> str:
return f"<StageChannel id={self.id} name='{self.name}'>"
@property
def type(self) -> ChannelType:
"""`ChannelType`: Returns the channel's type """
return ChannelType.guild_stage_voice
@property
def stage_instance(self) -> Optional[StageInstance]:
"""`Optional[StageInstance]`: Returns the stage instance for this channel, if available and cached."""
return self._stage_instance
[docs]
async def fetch_stage_instance(self) -> StageInstance:
"""Fetch the stage instance associated with this stage channel
Returns
-------
`StageInstance`
The stage instance of the channel
"""
r = await self._state.query(
"GET",
f"/stage-instances/{self.id}"
)
return StageInstance(
state=self._state,
data=r.response, # type: ignore # todo types
guild=self.guild
)
[docs]
async def create_stage_instance(
self,
*,
topic: str,
privacy_level: PrivacyLevelType = MISSING,
send_start_notification: bool = MISSING,
guild_scheduled_event: Snowflake | int = MISSING,
reason: Optional[str] = None
) -> StageInstance:
"""
Create a stage instance
Parameters
----------
topic: `str`
The topic of the stage instance
privacy_level: `PrivacyLevelType`
The privacy level of the stage instance.
Defaults to `PrivacyLevelType.guild_only`
send_start_notification: `bool`
Whether to notify @everyone that the stage instance has started.
guild_scheduled_event: `Optional[Snowflake | int]`
The guild scheduled event to associate with this stage instance.
reason: `Optional[str]`
The reason for creating the stage instance
Returns
-------
`StageInstance`
The created stage instance
"""
payload = {
"channel_id": self.id,
"topic": topic,
}
if privacy_level is not MISSING:
payload["privacy_level"] = int(privacy_level)
if send_start_notification is not MISSING:
payload["send_start_notification"] = send_start_notification
if guild_scheduled_event is not MISSING:
payload["guild_scheduled_event_id"] = utils.normalize_entity_id(guild_scheduled_event)
r = await self._state.query(
"POST",
"/stage-instances",
json=payload,
reason=reason
)
self._stage_instance = StageInstance(
state=self._state,
data=r.response, # type: ignore # todo types
guild=self.guild
)
return self._stage_instance