Merge branch 'caronc:master' into master

This commit is contained in:
André 2025-03-10 16:07:22 +01:00 committed by GitHub
commit e0e5edb19d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 1249 additions and 2 deletions

View File

@ -3,6 +3,8 @@ Alerts
Apprise API
Automated Packet Reporting System
AWS
Bark
BlueSky
BulkSMS
BulkVS
Burst SMS

View File

@ -61,6 +61,7 @@ The table below identifies the services this tool supports and some example serv
| [Apprise API](https://github.com/caronc/apprise/wiki/Notify_apprise_api) | apprise:// or apprises:// | (TCP) 80 or 443 | apprise://hostname/Token
| [AWS SES](https://github.com/caronc/apprise/wiki/Notify_ses) | ses:// | (TCP) 443 | ses://user@domain/AccessKeyID/AccessSecretKey/RegionName<br/>ses://user@domain/AccessKeyID/AccessSecretKey/RegionName/email1/email2/emailN
| [Bark](https://github.com/caronc/apprise/wiki/Notify_bark) | bark:// | (TCP) 80 or 443 | bark://hostname<br />bark://hostname/device_key<br />bark://hostname/device_key1/device_key2/device_keyN<br/>barks://hostname<br />barks://hostname/device_key<br />barks://hostname/device_key1/device_key2/device_keyN
| [BlueSky](https://github.com/caronc/apprise/wiki/Notify_bluesky) | bluesky:// | (TCP) 443 | bluesky://Handle:AppPw<br />bluesky://Handle:AppPw/TargetHandle<br />bluesky://Handle:AppPw/TargetHandle1/TargetHandle2/TargetHandleN
| [Chanify](https://github.com/caronc/apprise/wiki/Notify_chanify) | chantify:// | (TCP) 443 | chantify://token
| [Discord](https://github.com/caronc/apprise/wiki/Notify_discord) | discord:// | (TCP) 443 | discord://webhook_id/webhook_token<br />discord://avatar@webhook_id/webhook_token
| [Emby](https://github.com/caronc/apprise/wiki/Notify_emby) | emby:// or embys:// | (TCP) 8096 | emby://user@hostname/<br />emby://user:password@hostname

View File

@ -759,7 +759,14 @@ class Apprise:
"""
Returns all of the loaded URLs defined in this apprise object.
"""
return [x.url(privacy=privacy) for x in self.servers]
urls = []
for s in self.servers:
if isinstance(s, (ConfigBase, AppriseConfig)):
for _s in s.servers():
urls.append(_s.url(privacy=privacy))
else:
urls.append(s.url(privacy=privacy))
return urls
def pop(self, index):
"""

579
apprise/plugins/bluesky.py Normal file
View File

@ -0,0 +1,579 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# 1. Create a BlueSky account
# 2. Access Settings -> Privacy and Security
# 3. Generate an App Password. Optionally grant yourself access to Direct
# Messages if you want to be able to send them
# 4. Assemble your Apprise URL like:
# bluesky://handle@you-token-here
#
import re
import requests
import json
from datetime import (datetime, timezone, timedelta)
from ..attachment.base import AttachBase
from .base import NotifyBase
from ..url import PrivacyMode
from ..common import NotifyType
from ..locale import gettext_lazy as _
# For parsing handles
HANDLE_HOST_PARSE_RE = re.compile(r'(?P<handle>[^.]+)\.+(?P<host>.+)$')
IS_USER = re.compile(r'^\s*@?(?P<user>[A-Z0-9_]+)(\.+(?P<host>.+))?$', re.I)
class NotifyBlueSky(NotifyBase):
"""
A wrapper for BlueSky Notifications
"""
# The default descriptive name associated with the Notification
service_name = 'BlueSky'
# The services URL
service_url = 'https://bluesky.us/'
# Protocol
secure_protocol = ('bsky', 'bluesky')
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_bluesky'
# Support attachments
attachment_support = True
# XRPC Suffix URLs; Structured as:
# https://host/{suffix}
# Taken right from google.auth.helpers:
clock_skew = timedelta(seconds=10)
# 1 hour in seconds (the lifetime of our token)
access_token_lifetime_sec = timedelta(seconds=3600)
# Detect your Decentralized Identitifer (DID), then you can get your Auth
# Token.
xrpc_suffix_did = "/xrpc/com.atproto.identity.resolveHandle"
xrpc_suffix_session = "/xrpc/com.atproto.server.createSession"
xrpc_suffix_record = "/xrpc/com.atproto.repo.createRecord"
xrpc_suffix_blob = "/xrpc/com.atproto.repo.uploadBlob"
# BlueSky is kind enough to return how many more requests we're allowed to
# continue to make within it's header response as:
# RateLimit-Reset: The epoc time (in seconds) we can expect our
# rate-limit to be reset.
# RateLimit-Remaining: an integer identifying how many requests we're
# still allow to make.
request_rate_per_sec = 0
# For Tracking Purposes
ratelimit_reset = datetime.now(timezone.utc).replace(tzinfo=None)
# Remaining messages
ratelimit_remaining = 1
# The default BlueSky host to use if one isn't specified
bluesky_default_host = 'bsky.social'
# Our message body size
body_maxlen = 280
# BlueSky does not support a title
title_maxlen = 0
# Define object templates
templates = (
'{schema}://{user}@{password}',
)
# Define our template tokens
template_tokens = dict(NotifyBase.template_tokens, **{
'user': {
'name': _('Username'),
'type': 'string',
'required': True,
},
'password': {
'name': _('Password'),
'type': 'string',
'private': True,
'required': True,
},
})
def __init__(self, **kwargs):
"""
Initialize BlueSky Object
"""
super().__init__(**kwargs)
# Our access token
self.__access_token = self.store.get('access_token')
self.__refresh_token = None
self.__access_token_expiry = datetime.now(timezone.utc)
if not self.user:
msg = 'A BlueSky UserID/Handle must be specified.'
self.logger.warning(msg)
raise TypeError(msg)
# Set our default host
self.host = self.bluesky_default_host
# Identify our Handle (if define)
results = HANDLE_HOST_PARSE_RE.match(self.user)
if results:
self.user = results.group('handle').strip()
self.host = results.group('host').strip()
return
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform BlueSky Notification
"""
if not self.__access_token and not self.login():
# We failed to authenticate - we're done
return False
# Track our returning blob IDs as they're stored on the BlueSky server
blobs = []
if attach and self.attachment_support:
url = f'https://{self.host}{self.xrpc_suffix_blob}'
# We need to upload our payload first so that we can source it
# in remaining messages
for no, attachment in enumerate(attach, start=1):
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attachment.url(privacy=True)))
return False
if not re.match(r'^image/.*', attachment.mimetype, re.I):
# Only support images at this time
self.logger.warning(
'Ignoring unsupported BlueSky attachment {}.'.format(
attachment.url(privacy=True)))
continue
self.logger.debug(
'Preparing BlueSky attachment {}'.format(
attachment.url(privacy=True)))
# Upload our image and get our blob associated with it
postokay, response = self._fetch(
url,
payload=attachment,
)
if not postokay:
# We can't post our attachment
return False
# Prepare our filename
filename = attachment.name \
if attachment.name else f'file{no:03}.dat'
if not (isinstance(response, dict)
and response.get('blob')):
self.logger.debug(
'Could not attach the file to BlueSky: %s (mime=%s)',
filename, attachment.mimetype)
continue
blobs.append((response.get('blob'), filename))
# Prepare our URL
url = f'https://{self.host}{self.xrpc_suffix_record}'
# prepare our batch of payloads to create
payloads = []
payload = {
"collection": "app.bsky.feed.post",
"repo": self.get_identifier(),
"record": {
"text": body,
# 'YYYY-mm-ddTHH:MM:SSZ'
"createdAt": datetime.now(
tz=timezone.utc).strftime('%FT%XZ'),
"$type": "app.bsky.feed.post"
}
}
if blobs:
for no, blob in enumerate(blobs, start=1):
_payload = payload.copy()
if no > 1:
#
# multiple instances
#
# 1. update createdAt time
# 2. Change text to identify image no
_payload['record']['createdAt'] = \
datetime.now(tz=timezone.utc).strftime('%FT%XZ')
_payload['record']['text'] = \
'{:02d}/{:02d}'.format(no, len(blobs))
_payload['record']['embed'] = {
"images": [
{
"image": blob[0],
"alt": blob[1],
}
],
"$type": "app.bsky.embed.images"
}
payloads.append(_payload)
else:
payloads.append(payload)
for payload in payloads:
# Send Login Information
postokay, response = self._fetch(
url,
payload=json.dumps(payload),
)
if not postokay:
# We failed
# Bad responses look like:
# {
# 'error': 'InvalidRequest',
# 'message': 'reason'
# }
return False
return True
def get_identifier(self, user=None, login=False):
"""
Performs a Decentralized User Lookup and returns the identifier
"""
if user is None:
user = self.user
user = f'{user}.{self.host}' if '.' not in user else f'{user}'
key = f'did.{user}'
did = self.store.get(key)
if did:
return did
url = f'https://{self.host}{self.xrpc_suffix_did}'
params = {'handle': user}
# Send Login Information
postokay, response = self._fetch(
url,
params=params,
method='GET',
# We set this boolean so internal recursion doesn't take place.
login=login,
)
if not postokay or not response or 'did' not in response:
# We failed
return False
# Acquire our Decentralized Identitifer
did = response.get('did')
self.store.set(key, did)
return did
def login(self):
"""
A simple wrapper to authenticate with the BlueSky Server
"""
# Acquire our Decentralized Identitifer
did = self.get_identifier(self.user, login=True)
if not did:
return False
url = f'https://{self.host}{self.xrpc_suffix_session}'
payload = {
"identifier": did,
"password": self.password,
}
# Send Login Information
postokay, response = self._fetch(
url,
payload=json.dumps(payload),
# We set this boolean so internal recursion doesn't take place.
login=True,
)
# Our response object looks like this (content has been altered for
# presentation purposes):
# {
# 'did': 'did:plc:ruk414jakghak402j1jqekj2',
# 'didDoc': {
# '@context': [
# 'https://www.w3.org/ns/did/v1',
# 'https://w3id.org/security/multikey/v1',
# 'https://w3id.org/security/suites/secp256k1-2019/v1'
# ],
# 'id': 'did:plc:ruk414jakghak402j1jqekj2',
# 'alsoKnownAs': ['at://apprise.bsky.social'],
# 'verificationMethod': [
# {
# 'id': 'did:plc:ruk414jakghak402j1jqekj2#atproto',
# 'type': 'Multikey',
# 'controller': 'did:plc:ruk414jakghak402j1jqekj2',
# 'publicKeyMultibase' 'redacted'
# }
# ],
# 'service': [
# {
# 'id': '#atproto_pds',
# 'type': 'AtprotoPersonalDataServer',
# 'serviceEndpoint':
# 'https://woodtuft.us-west.host.bsky.network'
# }
# ]
# },
# 'handle': 'apprise.bsky.social',
# 'email': 'whoami@gmail.com',
# 'emailConfirmed': True,
# 'emailAuthFactor': False,
# 'accessJwt': 'redacted',
# 'refreshJwt': 'redacted',
# 'active': True,
# }
if not postokay or not response:
# We failed
return False
# Acquire our Token
self.__access_token = response.get('accessJwt')
# Handle other optional arguments we can use
self.__access_token_expiry = self.access_token_lifetime_sec + \
datetime.now(timezone.utc) - self.clock_skew
# The Refresh Token
self.__refresh_token = response.get('refreshJwt', self.__refresh_token)
self.store.set(
'access_token', self.__access_token, self.__access_token_expiry)
self.store.set(
'refresh_token', self.__refresh_token, self.__access_token_expiry)
self.logger.info('Authenticated to BlueSky as {}.{}'.format(
self.user, self.host))
return True
def _fetch(self, url, payload=None, params=None, method='POST',
content_type=None, login=False):
"""
Wrapper to BlueSky API requests object
"""
# use what was specified, otherwise build headers dynamically
headers = {
'User-Agent': self.app_id,
'Content-Type':
payload.mimetype if isinstance(payload, AttachBase) else (
'application/x-www-form-urlencoded; charset=utf-8'
if method == 'GET' else 'application/json')
}
if self.__access_token:
# Set our token
headers['Authorization'] = 'Bearer {}'.format(self.__access_token)
# Some Debug Logging
self.logger.debug('BlueSky {} URL: {} (cert_verify={})'.format(
method, url, self.verify_certificate))
self.logger.debug(
'BlueSky Payload: %s', str(payload)
if not isinstance(payload, AttachBase)
else 'attach: ' + payload.name)
# By default set wait to None
wait = None
if self.ratelimit_remaining == 0:
# Determine how long we should wait for or if we should wait at
# all. This isn't fool-proof because we can't be sure the client
# time (calling this script) is completely synced up with the
# Twitter server. One would hope we're on NTP and our clocks are
# the same allowing this to role smoothly:
now = datetime.now(timezone.utc).replace(tzinfo=None)
if now < self.ratelimit_reset:
# We need to throttle for the difference in seconds
# We add 0.3 seconds to the end just to allow a grace
# period.
wait = (self.ratelimit_reset - now).total_seconds() + 0.3
# Always call throttle before any remote server i/o is made;
self.throttle(wait=wait)
# Initialize a default value for our content value
content = {}
# acquire our request mode
fn = requests.post if method == 'POST' else requests.get
try:
r = fn(
url,
data=payload if not isinstance(payload, AttachBase)
else payload.open(),
params=params,
headers=headers,
verify=self.verify_certificate,
timeout=self.request_timeout,
)
# Get our JSON content if it's possible
try:
content = json.loads(r.content)
except (TypeError, ValueError, AttributeError):
# TypeError = r.content is not a String
# ValueError = r.content is Unparsable
# AttributeError = r.content is None
content = {}
# Rate limit handling... our header objects at this point are:
# 'RateLimit-Limit': '10', # Total # of requests per hour
# 'RateLimit-Remaining': '9', # Requests remaining
# 'RateLimit-Reset': '1741631362', # Epoch Time
# 'RateLimit-Policy': '10;w=86400' # NoEntries;w=<window>
try:
# Capture rate limiting if possible
self.ratelimit_remaining = \
int(r.headers.get('ratelimit-remaining'))
self.ratelimit_reset = datetime.fromtimestamp(
int(r.headers.get('ratelimit-reset')), timezone.utc
).replace(tzinfo=None)
except (TypeError, ValueError):
# This is returned if we could not retrieve this information
# gracefully accept this state and move on
pass
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
NotifyBlueSky.http_response_code_lookup(r.status_code)
self.logger.warning(
'Failed to send BlueSky {} to {}: '
'{}error={}.'.format(
method,
url,
', ' if status_str else '',
r.status_code))
self.logger.debug(
'Response Details:\r\n{}'.format(r.content))
# Mark our failure
return (False, content)
except requests.RequestException as e:
self.logger.warning(
'Exception received when sending BlueSky {} to {}: '.
format(method, url))
self.logger.debug('Socket Exception: %s' % str(e))
# Mark our failure
return (False, content)
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while handling {}.'.format(
payload.name if isinstance(payload, AttachBase)
else payload))
self.logger.debug('I/O Exception: %s' % str(e))
return (False, content)
return (True, content)
@property
def url_identifier(self):
"""
Returns all of the identifiers that make this URL unique from
another simliar one. Targets or end points should never be identified
here.
"""
return (
self.secure_protocol[0],
self.user, self.password,
)
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Apply our other parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
user = self.user
if self.host != self.bluesky_default_host:
user += f'.{self.host}'
# our URL
return '{schema}://{user}@{password}?{params}'.format(
schema=self.secure_protocol[0],
user=NotifyBlueSky.quote(user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
params=NotifyBlueSky.urlencode(params),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to re-instantiate this object.
"""
results = NotifyBase.parse_url(url, verify_host=False)
if not results:
# We're done early as we couldn't load the results
return results
if not results.get('password') and results['host']:
results['password'] = NotifyBlueSky.unquote(results['host'])
# Do not use host field
results['host'] = None
return results

View File

@ -39,7 +39,7 @@ Apprise is a Python package for simplifying access to all of the different
notification services that are out there. Apprise opens the door and makes
it easy to access:
Africas Talking, Apprise API, APRS, AWS SES, AWS SNS, Bark, Burst SMS,
Africas Talking, Apprise API, APRS, AWS SES, AWS SNS, Bark, BlueSky, Burst SMS,
BulkSMS, BulkVS, Chanify, ClickSend, DAPNET, DingTalk, Discord, E-Mail, Emby,
FCM, Feishu, Flock, Free Mobile, Google Chat, Gotify, Growl, Guilded, Home
Assistant, httpSMS, IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, Line,

View File

@ -895,6 +895,28 @@ include strict://{}""".format(str(cfg04), str(cfg04), str(cfg04)))
assert len(ac.servers()) == 3
def test_apprise_config_file_loading(tmpdir):
"""
API: AppriseConfig() URL Testing
"""
config_path = tmpdir / "apprise.yml"
# Create a temporary config file
config_path.write("urls:\n - json://localhost")
# Flow from README.md
ap = Apprise()
ap.add('xml://localhost')
config = AppriseConfig()
config.add(str(config_path))
ap.add(config)
# Using urls()
assert len(ap.urls()) == 2
def test_apprise_config_matrix_load():
"""
API: AppriseConfig() matrix initialization

View File

@ -297,6 +297,9 @@ def test_apprise_trans_windows_users_nux(mock_getlocale):
# 4105 = en_CA
windll.kernel32.GetUserDefaultUILanguage.return_value = 4105
# Store default value to not break other tests
default_language = locale.AppriseLocale._default_language
with environ('LANGUAGE', 'LC_ALL', 'LC_CTYPE', 'LANG'):
# Our default language
locale.AppriseLocale._default_language = 'zz'
@ -325,6 +328,9 @@ def test_apprise_trans_windows_users_nux(mock_getlocale):
delattr(ctypes, 'windll')
# Restore default value
locale.AppriseLocale._default_language = default_language
@pytest.mark.skipif(sys.platform == "win32", reason="Unique Nux test cases")
@mock.patch('locale.getlocale')

630
test/test_plugin_bluesky.py Normal file
View File

@ -0,0 +1,630 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import json
import logging
import os
from datetime import datetime
from datetime import timezone
from unittest.mock import Mock, patch
import pytest
import requests
from apprise import Apprise
from apprise import NotifyType
from apprise import AppriseAttachment
from apprise.plugins.bluesky import NotifyBlueSky
from helpers import AppriseURLTester
# Disable logging for a cleaner testing output
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
TWITTER_SCREEN_NAME = 'apprise'
# Our Testing URLs
apprise_url_tests = (
##################################
# NotifyBlueSky
##################################
('bluesky://', {
# Missing user and app_pass
'instance': TypeError,
}),
('bluesky://:@/', {
'instance': TypeError,
}),
('bluesky://app-pw', {
# Missing User
'instance': TypeError,
}),
('bluesky://user@app-pw', {
'instance': NotifyBlueSky,
# Expected notify() response False (because we won't be able
# to detect our user)
'notify_response': False,
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'bsky://user@****',
}),
('bluesky://user@app-pw1?cache=no', {
'instance': NotifyBlueSky,
# At minimum we need an access token and did; below has no did
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
},
'notify_response': False,
}),
('bluesky://user@app-pw2?cache=no', {
'instance': NotifyBlueSky,
# valid payload
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
},
}),
('bluesky://user@app-pw3', {
# no cache; so we store our results
'instance': NotifyBlueSky,
# valid payload
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
# For handling attachments
'blob': 'content',
},
}),
('bluesky://user.example.ca@app-pw3', {
# no cache; so we store our results
'instance': NotifyBlueSky,
# valid payload
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
# For handling attachments
'blob': 'content',
},
}),
# A duplicate of the entry above, this will cause cache to be referenced
('bluesky://user@app-pw3', {
# no cache; so we store our results
'instance': NotifyBlueSky,
# valid payload
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
# For handling attachments
'blob': 'content',
},
}),
('bluesky://user@app-pw', {
'instance': NotifyBlueSky,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
},
}),
('bluesky://user@app-pw', {
'instance': NotifyBlueSky,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
},
}),
)
def good_response(data=None):
"""
Prepare a good response.
"""
response = Mock()
response.content = json.dumps({
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
} if data is None else data)
response.status_code = requests.codes.ok
# Epoch time:
epoch = datetime.fromtimestamp(0, timezone.utc)
# Generate a very large rate-limit header window
response.headers = {
'ratelimit-reset': (
datetime.now(timezone.utc) - epoch).total_seconds() + 86400,
'ratelimit-remaining': '1000',
}
return response
def bad_response(data=None):
"""
Prepare a bad response.
"""
response = Mock()
response.content = json.dumps({
"error": 'InvalidRequest',
"message": "Something failed",
} if data is None else data)
response.headers = {}
response.status_code = requests.codes.internal_server_error
return response
@pytest.fixture
def bluesky_url():
url = 'bluesky://user@app-key'
return url
@pytest.fixture
def good_message_response():
"""
Prepare a good response.
"""
response = good_response()
return response
@pytest.fixture
def bad_message_response():
"""
Prepare a bad message response.
"""
response = bad_response()
return response
@pytest.fixture
def good_media_response():
"""
Prepare a good media response.
"""
response = Mock()
response.content = json.dumps({
'blob': {
'$type': 'blob',
'mimeType': 'image/jpeg',
'ref': {
'$link': 'baf124idksduabcjkaa3iey4bfyq'},
'size': 73667,
}
})
response.headers = {}
response.status_code = requests.codes.ok
return response
def test_plugin_bluesky_urls():
"""
NotifyBlueSky() Apprise URLs
"""
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()
def test_plugin_bluesky_general(mocker):
"""
NotifyBlueSky() General Tests
"""
mock_get = mocker.patch("requests.get")
mock_post = mocker.patch("requests.post")
# Epoch time:
epoch = datetime.fromtimestamp(0, timezone.utc)
request = good_response()
request.headers = {
'ratelimit-reset': (
datetime.now(timezone.utc) - epoch).total_seconds(),
'ratelimit-remaining': '1',
}
# Prepare Mock
mock_get.return_value = request
mock_post.return_value = request
# Variation Initializations
obj = NotifyBlueSky(user='handle', password='app-password')
assert isinstance(obj, NotifyBlueSky) is True
assert isinstance(obj.url(), str) is True
# apprise room was found
assert obj.send(body="test") is True
# Change our status code and try again
request.status_code = 403
assert obj.send(body="test") is False
assert obj.ratelimit_remaining == 1
# Return the status
request.status_code = requests.codes.ok
# Force a reset
request.headers['ratelimit-remaining'] = 0
# behind the scenes, it should cause us to update our rate limit
assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 0
# This should cause us to block
request.headers['ratelimit-remaining'] = 10
assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 10
# Handle cases where we simply couldn't get this field
del request.headers['ratelimit-remaining']
assert obj.send(body="test") is True
# It remains set to the last value
assert obj.ratelimit_remaining == 10
# Reset our variable back to 1
request.headers['ratelimit-remaining'] = 1
# Handle cases where our epoch time is wrong
del request.headers['ratelimit-reset']
assert obj.send(body="test") is True
# Return our object, but place it in the future forcing us to block
request.headers['ratelimit-reset'] = \
(datetime.now(timezone.utc) - epoch).total_seconds() + 1
request.headers['ratelimit-remaining'] = 0
obj.ratelimit_remaining = 0
assert obj.send(body="test") is True
# Return our object, but place it in the future forcing us to block
request.headers['ratelimit-reset'] = \
(datetime.now(timezone.utc) - epoch).total_seconds() - 1
request.headers['ratelimit-remaining'] = 0
obj.ratelimit_remaining = 0
assert obj.send(body="test") is True
# Return our limits to always work
request.headers['ratelimit-reset'] = \
(datetime.now(timezone.utc) - epoch).total_seconds()
request.headers['ratelimit-remaining'] = 1
obj.ratelimit_remaining = 1
assert obj.send(body="test") is True
# Flush our cache forcing it is re-creating
NotifyBlueSky._user_cache = {}
assert obj.send(body="test") is True
# Cause content response to be None
request.content = None
assert obj.send(body="test") is True
# Invalid JSON
request.content = '{'
assert obj.send(body="test") is True
# Return it to a parseable string
request.content = '{}'
results = NotifyBlueSky.parse_url('bluesky://handle@app-pass-word')
assert isinstance(results, dict) is True
# cause a json parsing issue now
response_obj = None
assert obj.send(body="test") is True
response_obj = '{'
assert obj.send(body="test") is True
# Flush out our cache
NotifyBlueSky._user_cache = {}
response_obj = {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234'
}
request.content = json.dumps(response_obj)
obj = NotifyBlueSky(user='handle', password='app-pass-word')
assert obj.send(body="test") is True
# Alter the key forcing us to look up a new value of ourselves again
NotifyBlueSky._user_cache = {}
NotifyBlueSky._whoami_cache = None
obj.ckey = 'different.then.it.was'
assert obj.send(body="test") is True
NotifyBlueSky._whoami_cache = None
obj.ckey = 'different.again'
assert obj.send(body="test") is True
def test_plugin_bluesky_edge_cases():
"""
NotifyBlueSky() Edge Cases
"""
with pytest.raises(TypeError):
NotifyBlueSky()
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_basic(
mock_get, mock_post, bluesky_url, good_message_response,
good_media_response):
"""
NotifyBlueSky() Attachment Checks - Basic
"""
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_message_response, good_media_response, good_message_response]
# Create application objects.
obj = Apprise.instantiate(bluesky_url)
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Send our notification
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 3
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[2][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_bad_message_response(
mock_get, mock_post, bluesky_url, good_media_response,
good_message_response, bad_message_response):
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_message_response, bad_message_response, good_message_response]
# Create application objects.
obj = Apprise.instantiate(bluesky_url)
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Our notification will fail now since our message will error out.
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_upload_fails(
mock_get, mock_post, bluesky_url, good_media_response,
good_message_response):
# Test case where upload fails.
mock_get.return_value = good_message_response
mock_post.side_effect = [good_message_response, OSError]
# Create application objects.
obj = Apprise.instantiate(bluesky_url)
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Send our notification; it will fail because of the message response.
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_invalid_attachment(
mock_get, mock_post, bluesky_url, good_message_response,
good_media_response):
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_message_response, good_media_response]
# Create application objects.
obj = Apprise.instantiate(bluesky_url)
attach = AppriseAttachment(
os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg'))
# An invalid attachment will cause a failure.
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
# No post request as attachment is not good.
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_multiple_batch(
mock_get, mock_post, bluesky_url, good_message_response,
good_media_response):
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_message_response, good_media_response, good_media_response,
good_media_response, good_media_response, good_message_response,
good_message_response, good_message_response, good_message_response]
# instantiate our object
obj = Apprise.instantiate(bluesky_url)
# 4 images are produced
attach = [
os.path.join(TEST_VAR_DIR, 'apprise-test.gif'),
os.path.join(TEST_VAR_DIR, 'apprise-test.gif'),
os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg'),
os.path.join(TEST_VAR_DIR, 'apprise-test.png'),
# This one is not supported, so it's ignored gracefully
os.path.join(TEST_VAR_DIR, 'apprise-test.mp4'),
]
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 9
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[2][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[3][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[4][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[5][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[6][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[7][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[8][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
# If we call the functions again, the only difference is
# we no longer need to resolve the handle or create a session
# as the previous one is fine.
mock_get.reset_mock()
mock_post.reset_mock()
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_media_response, good_media_response, good_media_response,
good_media_response, good_message_response, good_message_response,
good_message_response, good_message_response]
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Verify API calls.
assert mock_get.call_count == 0
assert mock_post.call_count == 8
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[2][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[3][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[4][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[5][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[6][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[7][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_auth_failure(
mock_get, mock_post, bluesky_url, good_message_response,
bad_message_response):
mock_get.return_value = good_message_response
mock_post.return_value = bad_message_response
# instantiate our object
obj = Apprise.instantiate(bluesky_url)
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is False
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'