import asyncio
from datetime import timedelta, datetime
from io import BytesIO
from typing import TYPE_CHECKING, Optional, Union, AsyncIterator, Self, Callable
from . import utils
from .colour import Colour
from .embeds import Embed
from .emoji import EmojiParser
from .enums import MessageReferenceType, MessageType, InteractionType
from .errors import HTTPException
from .file import File
from .flags import AttachmentFlags
from .mentions import AllowedMentions
from .object import PartialBase, Snowflake
from .response import MessageResponse
from .role import Role, PartialRole
from .sticker import PartialSticker
from .user import User
from .view import View
if TYPE_CHECKING:
from .channel import BaseChannel, PartialChannel, PublicThread
from .guild import Guild, PartialGuild
from .http import DiscordAPI
from .member import Member
MISSING = utils.MISSING
__all__ = (
"Attachment",
"JumpURL",
"Message",
"MessageInteraction",
"MessageReference",
"PartialMessage",
"Poll",
"WebhookMessage",
)
[docs]
class MessageInteraction(PartialBase):
def __init__(
self,
*,
state: "DiscordAPI",
data: dict
):
super().__init__(id=int(data["id"]))
self._state = state
self.type: InteractionType = InteractionType(data["type"])
self.name: str | None = data.get("name", None)
self.user: User = User(
state=state,
data=data["user"]
)
class MessageReaction:
def __init__(self, *, state: "DiscordAPI", message: "Message", data: dict):
self._state = state
self._message = message
self.count: int = int(data["count"])
self.burst_count: int = int(data["burst_count"])
self.me: bool = data.get("me", False)
self.emoji: EmojiParser = EmojiParser.from_dict(data["emoji"])
self.me_burst: bool = data.get("me_burst", False)
self.burst_me: bool = data.get("burst_me", False)
self.burst_count: int = data.get("burst_count", 0)
self.burst_colors: list[Colour] = [
Colour.from_hex(g)
for g in data.get("burst_colors", [])
]
async def add(self) -> None:
"""
Make the bot react with this emoji
"""
_parsed = self.emoji.to_reaction()
await self._state.query(
"PUT",
f"/channels/{self._message.channel.id}/messages/{self._message.id}/reactions/{_parsed}/@me",
res_method="text"
)
async def remove(self, *, user_id: int | None = None) -> None:
"""
Remove the reaction from the message
Parameters
----------
user_id: `int | None`
User ID to remove the reaction from
If none provided, it will remove the reaction from the bot
"""
_parsed = self.emoji.to_reaction()
_url = (
f"/channels/{self._message.channel.id}/messages/{self._message.id}/reactions/{_parsed}"
f"/{user_id}" if user_id is not None else "/@me"
)
await self._state.query(
"DELETE",
_url,
res_method="text"
)
[docs]
class JumpURL:
def __init__(
self,
*,
state: "DiscordAPI",
url: Optional[str] = None,
guild_id: Optional[int] = None,
channel_id: Optional[int] = None,
message_id: Optional[int] = None
):
self._state = state
self.guild_id: Optional[int] = int(guild_id) if guild_id else None
self.channel_id: Optional[int] = channel_id or None
self.message_id: Optional[int] = message_id or None
if url:
if any([guild_id, channel_id, message_id]):
raise ValueError("Cannot provide both a URL and a guild_id, channel_id or message_id")
_parse_url: Optional[list[tuple[str, str, Optional[str]]]] = utils.re_jump_url.findall(url)
if not _parse_url:
raise ValueError("Invalid jump URL provided")
gid, cid, mid = _parse_url[0]
self.channel_id = int(cid)
if gid != "@me":
self.guild_id = int(gid)
if mid:
self.message_id = int(mid)
if not self.channel_id:
raise ValueError("Cannot create a JumpURL without a channel_id")
def __repr__(self) -> str:
return (
f"<JumpURL guild_id={self.guild_id} channel_id={self.channel_id} "
f"message_id={self.message_id}>"
)
def __str__(self) -> str:
return self.url
@property
def guild(self) -> Optional["Guild | PartialGuild"]:
""" `Optional[PartialGuild]`: The guild the message was sent in """
if not self.guild_id:
return None
cache = self._state.cache.get_guild(self.guild_id)
if cache:
return cache
from .guild import PartialGuild
return PartialGuild(
state=self._state,
id=self.guild_id
)
[docs]
async def fetch_guild(self) -> "Guild":
""" `Optional[Guild]`: Returns the guild the message was sent in """
if not self.guild_id:
raise ValueError("Cannot fetch a guild without a guild_id available")
return await self.guild.fetch()
@property
def channel(self) -> Optional["BaseChannel | PartialChannel"]:
"""
`BaseChannel | PartialChannel`: Returns the channel the message was sent in.
If guild and channel cache is enabled, it can also return full channel object.
"""
if not self.channel_id:
return None
if self.guild_id:
cache = self._state.cache.get_channel_thread(
guild_id=self.guild_id,
channel_id=self.channel_id
)
if cache:
return cache
from .channel import PartialChannel
return PartialChannel(
state=self._state,
id=self.channel_id,
guild_id=self.guild_id
)
[docs]
async def fetch_channel(self) -> "BaseChannel":
""" `BaseChannel`: Returns the channel the message was sent in """
return await self.channel.fetch()
@property
def message(self) -> Optional["PartialMessage"]:
""" `Optional[PartialMessage]`: Returns the message if a message_id is available """
if not self.channel_id or not self.message_id:
return None
return PartialMessage(
state=self._state,
channel_id=self.channel_id,
guild_id=self.guild_id,
id=self.message_id
)
[docs]
async def fetch_message(self) -> "Message":
""" `Message`: Returns the message if a message_id is available """
if not self.message_id:
raise ValueError("Cannot fetch a message without a message_id available")
return await self.message.fetch()
@property
def url(self) -> str:
""" `Optional[str]`: Returns the jump URL """
if self.channel_id and self.message_id:
return f"https://discord.com/channels/{self.guild_id or '@me'}/{self.channel_id}/{self.message_id}"
return f"https://discord.com/channels/{self.guild_id or '@me'}/{self.channel_id}"
class PollAnswer:
def __init__(
self,
*,
id: int,
text: Optional[str] = None,
emoji: Optional[Union[EmojiParser, str]] = None
):
self.id: int = id
self.text: Optional[str] = text
self.emoji: Optional[Union[EmojiParser, str]] = None
if isinstance(emoji, str):
self.emoji = EmojiParser(emoji)
if self.text is None and self.emoji is None:
raise ValueError("Either text or emoji must be provided")
# Data only available when fetching message data
self.count: int = 0
self.me_voted: bool = False
def __repr__(self) -> str:
return f"<PollAnswer id={self.id} count={self.count}>"
def __int__(self) -> int:
return self.id
def __str__(self) -> str:
return self.text or str(self.emoji)
def to_dict(self) -> dict:
data = {
"answer_id": self.id,
"poll_media": {}
}
if self.text:
data["poll_media"]["text"] = self.text
if isinstance(self.emoji, EmojiParser):
data["poll_media"]["emoji"] = self.emoji.to_dict()
return data
@classmethod
def from_dict(cls, data: dict) -> Self:
emoji = data["poll_media"].get("emoji", None)
if emoji:
emoji = EmojiParser.from_dict(emoji)
return cls(
id=data["answer_id"],
text=data["poll_media"].get("text", None),
emoji=emoji
)
[docs]
class Poll:
def __init__(
self,
*,
text: str,
allow_multiselect: bool = False,
duration: Optional[Union[timedelta, int]] = None
):
self.text: Optional[str] = text
self.allow_multiselect: bool = allow_multiselect
self.answers: list[PollAnswer] = []
self.duration: Optional[int] = None
if duration is not None:
if isinstance(duration, timedelta):
duration = int(duration.total_seconds())
self.duration = duration
if self.duration > timedelta(days=7).total_seconds():
raise ValueError("Duration cannot be more than 7 days")
# Convert to hours int
self.duration = int(self.duration / 3600)
self.layout_type: int = 1 # This is the only layout type available
# Data only available when fetching message data
self.expiry: Optional[datetime] = None
self.is_finalized: bool = False
def __repr__(self) -> str:
return f"<Poll text='{self.text}' answers={self.answers}>"
def __str__(self) -> str:
return self.text or ""
def __len__(self) -> int:
return len(self.answers)
[docs]
def add_answer(
self,
*,
text: Optional[str] = None,
emoji: Optional[Union[EmojiParser, str]] = None
) -> PollAnswer:
"""
Add an answer to the poll
Parameters
----------
text: `Optional[str]`
The text of the answer
emoji: `Optional[Union[EmojiParser, str]]`
The emoji of the answer
"""
if not text and not emoji:
raise ValueError("Either text or emoji must be provided")
answer = PollAnswer(
id=len(self.answers) + 1,
text=text,
emoji=emoji
)
self.answers.append(answer)
return answer
[docs]
def remove_answer(
self,
answer_id: Union[PollAnswer, int]
) -> None:
"""
Remove an answer from the poll
Parameters
----------
answer: `Union[PollAnswer, int]`
The ID to the answer to remove
Raises
------
`ValueError`
- If the answer ID does not exist
- If the answer is not a PollAnswer or integer
"""
try:
self.answers.pop(int(answer_id) - 1)
except IndexError:
raise ValueError("Answer ID does not exist")
except ValueError:
raise ValueError("Answer must be an PollAnswer or integer")
# Make sure IDs are in order
for i, a in enumerate(self.answers, start=1):
a.id = i
[docs]
def to_dict(self) -> dict:
return {
"question": {"text": self.text},
"answers": [a.to_dict() for a in self.answers],
"duration": self.duration,
"allow_multiselect": self.allow_multiselect,
"layout_type": self.layout_type
}
[docs]
@classmethod
def from_dict(cls, data: dict) -> Self:
poll = cls(
text=data["question"]["text"],
allow_multiselect=data["allow_multiselect"],
)
poll.answers = [PollAnswer.from_dict(a) for a in data["answers"]]
if data.get("expiry", None):
poll.expiry = utils.parse_time(data["expiry"])
_results = data.get("results", {})
poll.is_finalized = _results.get("is_finalized", False)
for g in _results.get("answer_counts", []):
find_answer = next(
(a for a in poll.answers if a.id == g["id"]),
None
)
if not find_answer:
continue
find_answer.count = g["count"]
find_answer.me_voted = g["me_voted"]
return poll
[docs]
class MessageReference:
def __init__(self, *, state: "DiscordAPI", data: dict):
self._state = state
self.type: MessageReferenceType = MessageReferenceType(data["type"])
self.guild_id: int | None = utils.get_int(data, "guild_id")
self.channel_id: int | None = utils.get_int(data, "channel_id")
self.message_id: int | None = utils.get_int(data, "message_id")
def __repr__(self) -> str:
return (
f"<MessageReference guild_id={self.guild_id} channel_id={self.channel_id} "
f"message_id={self.message_id}>"
)
@property
def jump_url(self) -> JumpURL:
""" `JumpURL`: The jump URL of the message """
return JumpURL(
state=self._state,
url=f"https://discord.com/channels/{self.guild_id or '@me'}/{self.channel_id}/{self.message_id}"
)
@property
def guild(self) -> "Guild | PartialGuild | None":
""" `Optional[PartialGuild]`: The guild the message was sent in """
if not self.guild_id:
return None
cache = self._state.cache.get_guild(self.guild_id)
if cache:
return cache
from .guild import PartialGuild
return PartialGuild(
state=self._state,
id=self.guild_id
)
@property
def channel(self) -> "PartialChannel | None":
""" `Optional[PartialChannel]`: Returns the channel the message was sent in """
if not self.channel_id:
return None
if self.guild_id:
cache = self._state.cache.get_channel_thread(
guild_id=self.guild_id,
channel_id=self.channel_id
)
if cache:
return cache
from .channel import PartialChannel
return PartialChannel(
state=self._state,
id=self.channel_id,
guild_id=self.guild_id
)
@property
def message(self) -> "PartialMessage | None":
""" `Optional[PartialMessage]`: Returns the message if a message_id and channel_id is available """
if not self.channel_id or not self.message_id:
return None
return PartialMessage(
state=self._state,
channel_id=self.channel_id,
guild_id=self.guild_id,
id=self.message_id
)
[docs]
def to_dict(self) -> dict:
""" `dict`: Returns the message reference as a dictionary """
payload = {}
if self.guild_id:
payload["guild_id"] = self.guild_id
if self.channel_id:
payload["channel_id"] = self.channel_id
if self.message_id:
payload["message_id"] = self.message_id
if self.type:
payload["type"] = int(self.type)
return payload
[docs]
class Attachment:
def __init__(self, *, state: "DiscordAPI", data: dict):
self._state = state
self.id: int = int(data["id"])
self.filename: str = data["filename"]
self.size: int = int(data["size"])
self.url: str = data["url"]
self.proxy_url: str = data["proxy_url"]
self.ephemeral: bool = data.get("ephemeral", False)
self.flags: AttachmentFlags = AttachmentFlags(data.get("flags", 0))
self.content_type: Optional[str] = data.get("content_type", None)
self.title: Optional[str] = data.get("title", None)
self.description: Optional[str] = data.get("description", None)
self.height: Optional[int] = data.get("height", None)
self.width: Optional[int] = data.get("width", None)
self.ephemeral: bool = data.get("ephemeral", False)
self.duration_secs: Optional[int] = data.get("duration_secs", None)
self.waveform: Optional[str] = data.get("waveform", None)
def __str__(self) -> str:
return self.filename or ""
def __int__(self) -> int:
return self.id
def __repr__(self) -> str:
return (
f"<Attachment id={self.id} filename='{self.filename}' "
f"url='{self.url}'>"
)
[docs]
def is_spoiler(self) -> bool:
""" `bool`: Whether the attachment is a spoiler or not """
return self.filename.startswith("SPOILER_")
[docs]
def is_voice_message(self) -> bool:
""":class:`bool`: Whether this attachment is a voice message."""
return self.duration_secs is not None and "voice-message" in self.url
[docs]
async def fetch(self, *, use_cached: bool = False) -> bytes:
"""
Fetches the file from the attachment URL and returns it as bytes
Parameters
----------
use_cached: `bool`
Whether to use the cached URL or not, defaults to `False`
Returns
-------
`bytes`
The attachment as bytes
Raises
------
`HTTPException`
If the request returned anything other than 2XX
"""
r = await self._state.http.request(
"GET",
self.proxy_url if use_cached else self.url,
res_method="read"
)
if r.status not in range(200, 300):
raise HTTPException(r)
return r.response
[docs]
async def save(
self,
path: str,
*,
use_cached: bool = False
) -> int:
"""
Fetches the file from the attachment URL and saves it locally to the path
Parameters
----------
path: `str`
Path to save the file to, which includes the filename and extension.
Example: `./path/to/file.png`
use_cached: `bool`
Whether to use the cached URL or not, defaults to `False`
Returns
-------
`int`
The amount of bytes written to the file
"""
data = await self.fetch(use_cached=use_cached)
with open(path, "wb") as f:
return f.write(data)
[docs]
async def to_file(
self,
*,
filename: Optional[str] = MISSING,
spoiler: bool = False
) -> File:
"""
Convert the attachment to a sendable File object for Message.send()
Parameters
----------
filename: `Optional[str]`
Filename for the file, if empty, the attachment's filename will be used
spoiler: `bool`
Weather the file should be marked as a spoiler or not, defaults to `False`
Returns
-------
`File`
The attachment as a File object
"""
if filename is MISSING:
filename = self.filename
data = await self.fetch()
return File(
data=BytesIO(data),
filename=str(filename),
spoiler=spoiler,
description=self.description
)
[docs]
def to_dict(self) -> dict:
""" `dict`: The attachment as a dictionary """
data = {
"id": self.id,
"filename": self.filename,
"size": self.size,
"url": self.url,
"proxy_url": self.proxy_url,
"spoiler": self.is_spoiler(),
"flags": self.flags
}
if self.title is not None:
data["title"] = self.title
if self.description is not None:
data["description"] = self.description
if self.height:
data["height"] = self.height
if self.width:
data["width"] = self.width
if self.content_type:
data["content_type"] = self.content_type
if self.duration_secs:
data["duration_secs"] = self.duration_secs
if self.waveform:
data["waveform"] = self.waveform
return data
[docs]
class PartialMessage(PartialBase):
def __init__(
self,
*,
state: "DiscordAPI",
id: int,
channel_id: int,
guild_id: int | None = None
):
super().__init__(id=int(id))
self._state = state
self.channel_id: int = int(channel_id)
self.guild_id: int | None = int(guild_id) if guild_id else None
def __repr__(self) -> str:
return f"<PartialMessage id={self.id}>"
@property
def channel(self) -> "BaseChannel | PartialChannel":
""" `PartialChannel`: Returns the channel the message was sent in """
if self.guild_id:
cache = self._state.cache.get_channel_thread(
guild_id=self.guild_id,
channel_id=self.channel_id
)
if cache:
return cache
from .channel import PartialChannel
return PartialChannel(
state=self._state,
id=self.channel_id,
guild_id=self.guild_id
)
@property
def guild(self) -> "Guild | PartialGuild | None":
""" `PartialGuild` | `None`: Returns the guild the message was sent in """
if not self.guild_id:
return None
cache = self._state.cache.get_guild(self.guild_id)
if cache:
return cache
from .guild import PartialGuild
return PartialGuild(state=self._state, id=self.guild_id)
@property
def jump_url(self) -> JumpURL:
""" `JumpURL`: Returns the jump URL of the message, GuildID will always be @me """
return JumpURL(
state=self._state,
url=f"https://discord.com/channels/@me/{self.channel_id}/{self.id}"
)
[docs]
async def fetch(self) -> "Message":
""" `Message`: Returns the message object """
r = await self._state.query(
"GET",
f"/channels/{self.channel.id}/messages/{self.id}"
)
return Message(
state=self._state,
data=r.response,
guild=self.channel.guild
)
[docs]
async def delete(
self,
*,
delay: float | None = None,
reason: str | None = None
) -> None:
"""
Delete the message
Parameters
----------
delay: `float` | `None`:
How many seconds it should wait in background to delete
reason: `str` | `None`:
Reason for deleting the message
(Only applies when deleting messages not made by yourself)
"""
async def _delete():
await self._state.query(
"DELETE",
f"/channels/{self.channel.id}/messages/{self.id}",
reason=reason,
res_method="text"
)
async def _delete_after(d: float):
await asyncio.sleep(d)
try:
await _delete()
except HTTPException:
pass
if delay is not None:
asyncio.create_task(_delete_after(delay))
else:
await _delete()
[docs]
async def expire_poll(self) -> "Message":
"""
Immediately end the poll, then returns new Message object.
This can only be done if you created it
Returns
-------
`Message`
The message object of the poll
"""
r = await self._state.query(
"POST",
f"/channels/{self.channel_id}/polls/{self.id}/expire"
)
return Message(
state=self._state,
data=r.response
)
[docs]
async def fetch_poll_voters(
self,
answer: Union[PollAnswer, int],
after: Optional[Union[Snowflake, int]] = None,
limit: Optional[int] = 100,
) -> AsyncIterator["User"]:
"""
Fetch the users who voted for this answer
Parameters
----------
answer: `Union[PollAnswer, int]`
The answer to fetch the voters from
after: `Optional[Union[Snowflake, int]]`
The user ID to start fetching from
limit: `Optional[int]`
The amount of users to fetch, defaults to 100.
`None` will fetch all users.
Yields
-------
`User`
User object of people who voted
"""
answer_id = answer
if isinstance(answer, PollAnswer):
answer_id = answer.id
async def _get_history(limit: int, **kwargs):
params = {"limit": min(limit, 100)}
for key, value in kwargs.items():
if value is None:
continue
params[key] = int(value)
return await self._state.query(
"GET",
f"/channels/{self.channel_id}/polls/"
f"{self.id}/answers/{answer_id}",
params=params
)
async def _after_http(http_limit: int, after_id: Optional[int], limit: Optional[int]):
r = await _get_history(http_limit, after=after_id)
if r.response:
if limit is not None:
limit -= len(r.response["users"])
after_id = r.response["users"][-1]["id"]
return r.response, after_id, limit
if after:
strategy, state = _after_http, utils.normalize_entity_id(after)
else:
strategy, state = _after_http, None
while True:
http_limit: int = 100 if limit is None else min(limit, 100)
if http_limit <= 0:
break
strategy: Callable
users, state, limit = await strategy(http_limit, state, limit)
i = 0
for i, u in enumerate(users["users"], start=1):
yield User(state=self._state, data=u)
if i < 100:
break
[docs]
async def edit(
self,
*,
content: Optional[str] = MISSING,
embed: Optional[Embed] = MISSING,
embeds: Optional[list[Embed]] = MISSING,
view: Optional[View] = MISSING,
attachment: Optional[File] = MISSING,
attachments: Optional[list[File]] = MISSING,
allowed_mentions: Optional[AllowedMentions] = MISSING
) -> "Message":
"""
Edit the message
Parameters
----------
content: `Optional[str]`
Content of the message
embed: `Optional[Embed]`
Embed of the message
embeds: `Optional[list[Embed]]`
Embeds of the message
view: `Optional[View]`
Components of the message
attachment: `Optional[File]`
New attachment of the message
attachments: `Optional[list[File]]`
New attachments of the message
allowed_mentions: `Optional[AllowedMentions]`
Allowed mentions of the message
Returns
-------
`Message`
The edited message
"""
payload = MessageResponse(
content=content,
embed=embed,
embeds=embeds,
view=view,
attachment=attachment,
attachments=attachments,
allowed_mentions=allowed_mentions
)
r = await self._state.query(
"PATCH",
f"/channels/{self.channel.id}/messages/{self.id}",
headers={"Content-Type": payload.content_type},
data=payload.to_multipart(is_request=True),
)
return Message(
state=self._state,
data=r.response,
guild=self.channel.guild
)
[docs]
async def publish(self) -> "Message":
"""
Crosspost the message to another channel.
"""
r = await self._state.query(
"POST",
f"/channels/{self.channel.id}/messages/{self.id}/crosspost",
res_method="json"
)
return Message(
state=self._state,
data=r.response,
guild=self.channel.guild
)
[docs]
async def reply(
self,
content: Optional[str] = MISSING,
*,
embed: Optional[Embed] = MISSING,
embeds: Optional[list[Embed]] = MISSING,
file: Optional[File] = MISSING,
files: Optional[list[File]] = MISSING,
view: Optional[View] = MISSING,
tts: Optional[bool] = False,
allowed_mentions: Optional[AllowedMentions] = MISSING,
) -> "Message":
"""
Sends a reply to a message in a channel.
Parameters
----------
content: `Optional[str]`
Cotnent of the message
embed: `Optional[Embed]`
Includes an embed object
embeds: `Optional[list[Embed]]`
List of embed objects
file: `Optional[File]`
A file object
files: `Union[list[File], File]`
A list of file objects
view: `View`
Send components to the message
tts: `bool`
If the message should be sent as a TTS message
type: `Optional[ResponseType]`
The type of response to the message
allowed_mentions: `Optional[AllowedMentions]`
The allowed mentions for the message
Returns
-------
`Message`
The message object
"""
payload = MessageResponse(
content,
embed=embed,
embeds=embeds,
file=file,
files=files,
view=view,
tts=tts,
allowed_mentions=allowed_mentions,
message_reference=MessageReference(
state=self._state,
data={
"type": int(MessageReferenceType.default),
"channel_id": self.channel_id,
"message_id": self.id,
}
)
)
r = await self._state.query(
"POST",
f"/channels/{self.channel_id}/messages",
data=payload.to_multipart(is_request=True),
headers={"Content-Type": payload.content_type}
)
return Message(
state=self._state,
data=r.response
)
[docs]
async def pin(self, *, reason: Optional[str] = None) -> None:
"""
Pin the message
Parameters
----------
reason: `Optional[str]`
Reason for pinning the message
"""
await self._state.query(
"PUT",
f"/channels/{self.channel.id}/pins/{self.id}",
res_method="text",
reason=reason
)
[docs]
async def unpin(self, *, reason: Optional[str] = None) -> None:
"""
Unpin the message
Parameters
----------
reason: `Optional[str]`
Reason for unpinning the message
"""
await self._state.query(
"DELETE",
f"/channels/{self.channel.id}/pins/{self.id}",
res_method="text",
reason=reason
)
[docs]
async def add_reaction(self, emoji: str) -> None:
"""
Add a reaction to the message
Parameters
----------
emoji: `str`
Emoji to add to the message
"""
_parsed = EmojiParser(emoji).to_reaction()
await self._state.query(
"PUT",
f"/channels/{self.channel.id}/messages/{self.id}/reactions/{_parsed}/@me",
res_method="text"
)
[docs]
async def remove_reaction(
self,
emoji: str,
*,
user_id: Optional[int] = None
) -> None:
"""
Remove a reaction from the message
Parameters
----------
emoji: `str`
Emoji to remove from the message
user_id: `Optional[int]`
User ID to remove the reaction from
"""
_parsed = EmojiParser(emoji).to_reaction()
_url = (
f"/channels/{self.channel.id}/messages/{self.id}/reactions/{_parsed}"
f"/{user_id}" if user_id is not None else "/@me"
)
await self._state.query(
"DELETE",
_url,
res_method="text"
)
[docs]
async def remove_all_reactions(self) -> None:
""" Remove all reactions from the message """
await self._state.query(
"DELETE",
f"/channels/{self.channel.id}/messages/{self.id}/reactions",
res_method="text"
)
[docs]
async def create_public_thread(
self,
name: str,
*,
auto_archive_duration: Optional[int] = 60,
rate_limit_per_user: Optional[Union[timedelta, int]] = None,
reason: Optional[str] = None
) -> "PublicThread":
"""
Create a public thread from the message
Parameters
----------
name: `str`
Name of the thread
auto_archive_duration: `Optional[int]`
Duration in minutes to automatically archive the thread after recent activity,
rate_limit_per_user: `Optional[Union[timedelta, int]]`
A per-user rate limit for this thread (0-21600 seconds, default 0)
reason: `Optional[str]`
Reason for creating the thread
Returns
-------
`PublicThread`
The created thread
Raises
------
`ValueError`
- If `auto_archive_duration` is not 60, 1440, 4320 or 10080
- If `rate_limit_per_user` is not between 0 and 21600 seconds
"""
payload = {
"name": name,
"auto_archive_duration": auto_archive_duration,
}
if auto_archive_duration not in (60, 1440, 4320, 10080):
raise ValueError("auto_archive_duration must be 60, 1440, 4320 or 10080")
if rate_limit_per_user is not None:
if isinstance(rate_limit_per_user, timedelta):
rate_limit_per_user = int(rate_limit_per_user.total_seconds())
if rate_limit_per_user not in range(0, 21601):
raise ValueError("rate_limit_per_user must be between 0 and 21600 seconds")
payload["rate_limit_per_user"] = rate_limit_per_user
r = await self._state.query(
"POST",
f"/channels/{self.channel.id}/messages/{self.id}/threads",
json=payload,
reason=reason
)
from .channel import PublicThread
return PublicThread(
state=self._state,
data=r.response
)
class MessageSnapshot:
def __init__(
self,
*,
state: "DiscordAPI",
data: dict
):
self._state = state
self.type: MessageType = MessageType(data.get("type", 0))
self.content: str = data.get("content", "")
self.timestamp: datetime | None = None
self.edited_timestamp: datetime | None = None
self.embeds: list[Embed] = [
Embed.from_dict(embed)
for embed in data.get("embeds", [])
]
self.attachments: list[Attachment] = [
Attachment(state=state, data=a)
for a in data.get("attachments", [])
]
self._from_data(data)
def _from_data(self, data: dict) -> None:
if data.get("edited_timestamp", None):
self.edited_timestamp = utils.parse_time(data["edited_timestamp"])
if data.get("timestamp", None):
self.timestamp = utils.parse_time(data["timestamp"])
[docs]
class Message(PartialMessage):
def __init__(
self,
*,
state: "DiscordAPI",
data: dict,
guild: "PartialGuild | None" = None
):
super().__init__(
state=state,
id=int(data["id"]),
channel_id=int(data["channel_id"]),
guild_id=guild.id if guild else None
)
self.type: MessageType = MessageType(data["type"])
self.content: str = data.get("content", "")
self.author: Union[User, "Member"] = User(state=state, data=data["author"])
self.pinned: bool = data.get("pinned", False)
self.mention_everyone: bool = data.get("mention_everyone", False)
self.tts: bool = data.get("tts", False)
self.poll: Optional[Poll] = None
self.embeds: list[Embed] = [
Embed.from_dict(embed)
for embed in data.get("embeds", [])
]
self.attachments: list[Attachment] = [
Attachment(state=state, data=a)
for a in data.get("attachments", [])
]
self.stickers: list[PartialSticker] = [
PartialSticker(state=state, id=int(s["id"]), name=s["name"])
for s in data.get("sticker_items", [])
]
self.reactions: list[MessageReaction] = [
MessageReaction(state=state, message=self, data=g)
for g in data.get("reactions", [])
]
self.mentions: list["Member | User"] = []
self.view: View | None = None
self.edited_timestamp: datetime | None = None
self.reference: MessageReference | None = None
self.resolved_reply: Message | None = None
self.resolved_forward: list[MessageSnapshot] = []
self.interaction: MessageInteraction | None = None
self._from_data(data)
def __repr__(self) -> str:
return f"<Message id={self.id} author={self.author}>"
def __str__(self) -> str:
return self.content or ""
def _from_data(self, data: dict):
if data.get("components", None):
self.view = View.from_dict(data)
if data.get("message_reference", None):
self.reference = MessageReference(
state=self._state,
data=data["message_reference"]
)
if data.get("referenced_message", None):
self.resolved_reply = Message(
state=self._state,
data=data["referenced_message"],
guild=self.guild
)
if data.get("interaction_metadata", None):
self.interaction = MessageInteraction(
state=self._state,
data=data["interaction_metadata"]
)
for m in data.get("message_snapshots", []):
self.resolved_forward.append(
MessageSnapshot(
state=self._state,
data=m
)
)
if data.get("poll", None):
self.poll = Poll.from_dict(data["poll"])
if data.get("edited_timestamp", None):
self.edited_timestamp = utils.parse_time(data["edited_timestamp"])
if data.get("member", None):
from .member import Member
# Append author data to member data
data["member"]["user"] = data["author"]
self.author = Member(
state=self._state,
guild=self.guild, # type: ignore
data=data["member"]
)
if data.get("mentions", None):
from .member import Member
for m in data["mentions"]:
if m.get("member", None) and self.guild_id:
# This is only done through the gateway
_fake_member = m["member"]
_fake_member["user"] = m
Member(
state=self._state,
guild=self.guild, # type: ignore
data=_fake_member
)
else:
User(state=self._state, data=m)
[docs]
def is_system(self) -> bool:
""" `bool`: Returns whether the message is a system message """
return self.type not in (
MessageType.default,
MessageType.reply,
MessageType.chat_input_command,
MessageType.context_menu_command,
MessageType.thread_starter_message
)
@property
def emojis(self) -> list[EmojiParser]:
""" `list[EmojiParser]`: Returns the emojis in the message """
return [
EmojiParser(f"<{e[0]}:{e[1]}:{e[2]}>")
for e in utils.re_emoji.findall(self.content)
]
@property
def jump_url(self) -> JumpURL:
""" `JumpURL`: Returns the jump URL of the message """
return JumpURL(
state=self._state,
url=f"https://discord.com/channels/{self.guild_id or '@me'}/{self.channel_id}/{self.id}"
)
@property
def role_mentions(self) -> list[Role | PartialRole]:
"""
`list[PartialRole]`: Returns the role mentions in the message.
Can return full role object if guild and role cache is enabled
"""
if not self.guild_id:
return []
return [
self.guild.get_role(int(role_id)) or
PartialRole(
state=self._state,
id=int(role_id),
guild_id=self.guild_id
)
for role_id in utils.re_role.findall(self.content)
]
@property
def channel_mentions(self) -> list["BaseChannel | PartialChannel"]:
"""
`list[PartialChannel]`: Returns the channel mentions in the message
Can return full role object if guild and channel cache is enabled
"""
from .channel import PartialChannel
return [
self.guild.get_channel(int(channel_id)) or
PartialChannel(state=self._state, id=int(channel_id))
for channel_id in utils.re_channel.findall(self.content)
]
@property
def jump_urls(self) -> list[JumpURL]:
""" `list[JumpURL]`: Returns the jump URLs in the message """
return [
JumpURL(
state=self._state,
guild_id=int(gid) if gid != "@me" else None,
channel_id=int(cid),
message_id=int(mid) if mid else None
)
for gid, cid, mid in utils.re_jump_url.findall(self.content)
]
[docs]
class WebhookMessage(Message):
def __init__(self, *, state: "DiscordAPI", data: dict, application_id: int, token: str):
super().__init__(state=state, data=data)
self.application_id = int(application_id)
self.token = token
[docs]
async def edit(
self,
*,
content: Optional[str] = MISSING,
embed: Optional[Embed] = MISSING,
embeds: Optional[list[Embed]] = MISSING,
attachment: Optional[File] = MISSING,
attachments: Optional[list[File]] = MISSING,
view: Optional[View] = MISSING,
allowed_mentions: Optional[AllowedMentions] = MISSING
) -> "WebhookMessage":
"""
Edit the webhook message
Parameters
----------
content: `Optional[str]`
Content of the message
embed: `Optional[Embed]`
Embed of the message
embeds: `Optional[list[Embed]]`
Embeds of the message
attachment: `Optional[File]`
Attachment of the message
attachments: `Optional[list[File]]`
Attachments of the message
view: `Optional[View]`
Components of the message
allowed_mentions: `Optional[AllowedMentions]`
Allowed mentions of the message
Returns
-------
`WebhookMessage`
The edited message
"""
payload = MessageResponse(
content=content,
embed=embed,
embeds=embeds,
view=view,
attachment=attachment,
attachments=attachments,
allowed_mentions=allowed_mentions
)
r = await self._state.query(
"PATCH",
f"/webhooks/{self.application_id}/{self.token}/messages/{self.id}",
webhook=True,
headers={"Content-Type": payload.content_type},
data=payload.to_multipart(is_request=True),
)
return WebhookMessage(
state=self._state,
data=r.response,
application_id=self.application_id,
token=self.token
)
[docs]
async def delete(
self,
*,
delay: float | None = None
) -> None:
"""
Delete the webhook message
Parameters
----------
reason: `Optional[str]`
Reason for deleting the message
"""
async def _delete():
await self._state.query(
"DELETE",
f"/webhooks/{self.application_id}/{self.token}/messages/{self.id}",
webhook=True,
res_method="text"
)
async def _delete_after(d: float):
await asyncio.sleep(d)
try:
await _delete()
except HTTPException:
pass
if delay is not None:
asyncio.create_task(_delete_after(delay))
else:
await _delete()