tgram:// support for topic values as target arguments (#1028)

This commit is contained in:
Chris Caron 2023-12-27 20:10:24 -05:00 committed by GitHub
parent 9dcf769397
commit d6e0d2ee07
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 76 additions and 52 deletions

View File

@ -74,8 +74,10 @@ TELEGRAM_IMAGE_XY = NotifyImageSize.XY_256
# Chat ID is required
# If the Chat ID is positive, then it's addressed to a single person
# If the Chat ID is negative, then it's targeting a group
# We can support :topic (an integer) if specified as well
IS_CHAT_ID_RE = re.compile(
r'^(@*(?P<idno>-?[0-9]{1,32})|(?P<name>[a-z_-][a-z0-9_-]+))$',
r'^((?P<idno>-?[0-9]{1,32})|(@|%40)?(?P<name>[a-z_-][a-z0-9_-]+))'
r'((:|%3A)(?P<topic>[0-9]+))?$',
re.IGNORECASE,
)
@ -360,9 +362,6 @@ class NotifyTelegram(NotifyBase):
self.logger.warning(err)
raise TypeError(err)
# Parse our list
self.targets = parse_list(targets)
# Define whether or not we should make audible alarms
self.silent = self.template_args['silent']['default'] \
if silent is None else bool(silent)
@ -403,15 +402,41 @@ class NotifyTelegram(NotifyBase):
# URL later to directly include the user that we should message.
self.detect_owner = detect_owner
if self.user:
# Treat this as a channel too
self.targets.append(self.user)
# Parse our list
self.targets = []
for target in parse_list(targets):
results = IS_CHAT_ID_RE.match(target)
if not results:
self.logger.warning(
'Dropped invalid Telegram chat/group ({}) specified.'
.format(target),
)
# Ensure we don't fall back to owner detection
self.detect_owner = False
continue
try:
topic = int(
results.group('topic')
if results.group('topic') else self.topic)
except TypeError:
# No worries
topic = None
if results.group('name') is not None:
# Name
self.targets.append(('@%s' % results.group('name'), topic))
else: # ID
self.targets.append((int(results.group('idno')), topic))
# Track whether or not we want to send an image with our notification
# or not.
self.include_image = include_image
def send_media(self, chat_id, notify_type, attach=None):
def send_media(self, target, notify_type, attach=None):
"""
Sends a sticker based on the specified notify type
@ -470,9 +495,12 @@ class NotifyTelegram(NotifyBase):
# content can arrive together.
self.throttle()
# Extract our target
chat_id, topic = target
payload = {'chat_id': chat_id}
if self.topic:
payload['message_thread_id'] = self.topic
if topic:
payload['message_thread_id'] = topic
try:
with open(path, 'rb') as f:
@ -658,7 +686,7 @@ class NotifyTelegram(NotifyBase):
_id = self.detect_bot_owner()
if _id:
# Permanently store our id in our target list for next time
self.targets.append(str(_id))
self.targets.append((str(_id), None))
self.logger.info(
'Update your Telegram Apprise URL to read: '
'{}'.format(self.url(privacy=True)))
@ -681,26 +709,23 @@ class NotifyTelegram(NotifyBase):
'sendMessage'
)
payload = {
_payload = {
# Notification Audible Control
'disable_notification': self.silent,
# Display Web Page Preview (if possible)
'disable_web_page_preview': not self.preview,
}
if self.topic:
payload['message_thread_id'] = self.topic
# Prepare Message Body
if self.notify_format == NotifyFormat.MARKDOWN:
payload['parse_mode'] = 'MARKDOWN'
_payload['parse_mode'] = 'MARKDOWN'
payload['text'] = body
_payload['text'] = body
else: # HTML
# Use Telegram's HTML mode
payload['parse_mode'] = 'HTML'
_payload['parse_mode'] = 'HTML'
for r, v, m in self.__telegram_escape_html_entries:
if 'html' in m:
@ -712,7 +737,7 @@ class NotifyTelegram(NotifyBase):
body = r.sub(v, body)
# Prepare our payload based on HTML or TEXT
payload['text'] = body
_payload['text'] = body
# Handle payloads without a body specified (but an attachment present)
attach_content = \
@ -721,41 +746,31 @@ class NotifyTelegram(NotifyBase):
# Create a copy of the chat_ids list
targets = list(self.targets)
while len(targets):
chat_id = targets.pop(0)
chat_id = IS_CHAT_ID_RE.match(chat_id)
if not chat_id:
self.logger.warning(
"The specified chat_id '%s' is invalid; skipping." % (
chat_id,
)
)
target = targets.pop(0)
chat_id, topic = target
# Flag our error
has_error = True
continue
# Printable chat_id details
pchat_id = f'{chat_id}' if not topic else f'{chat_id}:{topic}'
if chat_id.group('name') is not None:
# Name
payload['chat_id'] = '@%s' % chat_id.group('name')
else:
# ID
payload['chat_id'] = int(chat_id.group('idno'))
payload = _payload.copy()
payload['chat_id'] = chat_id
if topic:
payload['message_thread_id'] = topic
if self.include_image is True:
# Define our path
if not self.send_media(payload['chat_id'], notify_type):
if not self.send_media(target, notify_type):
# We failed to send the image associated with our
notify_type
self.logger.warning(
'Failed to send Telegram type image to {}.',
payload['chat_id'])
pchat_id)
if attach and self.attachment_support and \
attach_content == TelegramContentPlacement.AFTER:
# Send our attachments now (if specified and if it exists)
if not self._send_attachments(
chat_id=payload['chat_id'], notify_type=notify_type,
target, notify_type=notify_type,
attach=attach):
has_error = True
@ -803,7 +818,7 @@ class NotifyTelegram(NotifyBase):
self.logger.warning(
'Failed to send Telegram notification to {}: '
'{}, error={}.'.format(
payload['chat_id'],
pchat_id,
error_msg if error_msg else status_str,
r.status_code))
@ -817,7 +832,7 @@ class NotifyTelegram(NotifyBase):
except requests.RequestException as e:
self.logger.warning(
'A connection error occurred sending Telegram:%s ' % (
payload['chat_id']) + 'notification.'
pchat_id) + 'notification.'
)
self.logger.debug('Socket Exception: %s' % str(e))
@ -833,7 +848,7 @@ class NotifyTelegram(NotifyBase):
# it was identified to send the content before the attachments
# which is now done.
if not self._send_attachments(
chat_id=payload['chat_id'],
target=target,
notify_type=notify_type,
attach=attach):
@ -842,14 +857,14 @@ class NotifyTelegram(NotifyBase):
return not has_error
def _send_attachments(self, chat_id, notify_type, attach):
def _send_attachments(self, target, notify_type, attach):
"""
Sends our attachments
"""
has_error = False
# Send our attachments now (if specified and if it exists)
for attachment in attach:
if not self.send_media(chat_id, notify_type, attach=attachment):
if not self.send_media(target, notify_type, attach=attachment):
# We failed; don't continue
has_error = True
@ -880,13 +895,21 @@ class NotifyTelegram(NotifyBase):
# Extend our parameters
params.update(self.url_parameters(privacy=privacy, *args, **kwargs))
targets = []
for (chat_id, _topic) in self.targets:
topic = _topic if _topic else self.topic
targets.append(''.join(
[NotifyTelegram.quote(f'{chat_id}', safe='@')
if isinstance(chat_id, str) else f'{chat_id}',
'' if not topic else f':{topic}']))
# No need to check the user token because the user automatically gets
# appended into the list of chat ids
return '{schema}://{bot_token}/{targets}/?{params}'.format(
schema=self.secure_protocol,
bot_token=self.pprint(self.bot_token, privacy, safe=''),
targets='/'.join(
[NotifyTelegram.quote('@{}'.format(x)) for x in self.targets]),
targets='/'.join(targets),
params=NotifyTelegram.urlencode(params))
def __len__(self):
@ -987,6 +1010,7 @@ class NotifyTelegram(NotifyBase):
# Include images with our message
results['detect_owner'] = \
parse_bool(results['qsd'].get('detect', True))
parse_bool(
results['qsd'].get('detect', not results['targets']))
return results

View File

@ -243,7 +243,7 @@ def test_plugin_telegram_general(mock_post):
invalid_bot_token = 'abcd:123'
# Chat ID
chat_ids = 'l2g, lead2gold'
chat_ids = 'l2g:1234, lead2gold'
# Prepare Mock
mock_post.return_value = requests.Request()
@ -397,7 +397,7 @@ def test_plugin_telegram_general(mock_post):
obj = NotifyTelegram(bot_token=bot_token, targets='12345')
assert len(obj.targets) == 1
assert obj.targets[0] == '12345'
assert obj.targets[0] == (12345, None)
# Test the escaping of characters since Telegram escapes stuff for us to
# which we need to consider
@ -440,7 +440,7 @@ def test_plugin_telegram_general(mock_post):
assert obj.notify(title='hello', body='world') is True
assert len(obj.targets) == 1
assert obj.targets[0] == '532389719'
assert obj.targets[0] == ('532389719', None)
# Do the test again, but without the expected (parsed response)
mock_post.return_value.content = dumps({
@ -550,7 +550,7 @@ def test_plugin_telegram_general(mock_post):
'tgram://123456789:ABCdefghijkl123456789opqyz/-123456789525')
assert isinstance(obj, NotifyTelegram)
assert len(obj.targets) == 1
assert '-123456789525' in obj.targets
assert (-123456789525, None) in obj.targets
@mock.patch('requests.post')