test coverage added; dm support removed (not public api)

This commit is contained in:
Chris Caron 2025-03-09 20:10:37 -04:00
parent 2c90d218cc
commit e932736108
2 changed files with 812 additions and 205 deletions

View File

@ -37,27 +37,18 @@ import re
import requests
import json
from datetime import (datetime, timezone, timedelta)
from apprise.exception import AppriseException
from ..attachment.base import AttachBase
from .base import NotifyBase
from ..url import PrivacyMode
from ..common import NotifyType
from ..utils.parse import parse_list
from ..locale import gettext_lazy as _
# For parsing handles
HANDLE_HOST_PARSE_RE = re.compile(r'(?P<handle>[^.]+)\.+(?P<host>.+)')
HANDLE_HOST_PARSE_RE = re.compile(r'(?P<handle>[^.]+)\.+(?P<host>.+)$')
IS_USER = re.compile(r'^\s*@?(?P<user>[A-Z0-9_]+)(\.+(?P<host>.+))?$', re.I)
class BlueSkyDMUnsupported(AppriseException):
"""
Thrown when an disk i/o error occurs
"""
def __init__(self, message, error_code=-1):
super().__init__(message, error_code=error_code)
class NotifyBlueSky(NotifyBase):
"""
A wrapper for BlueSky Notifications
@ -75,6 +66,9 @@ class NotifyBlueSky(NotifyBase):
# A URL that takes you to the setup/help of the specific protocol
setup_url = 'https://github.com/caronc/apprise/wiki/Notify_bluesky'
# Support attachments
attachment_support = True
# XRPC Suffix URLs; Structured as:
# https://host/{suffix}
@ -89,18 +83,27 @@ class NotifyBlueSky(NotifyBase):
xrpc_suffix_did = "/xrpc/com.atproto.identity.resolveHandle"
xrpc_suffix_session = "/xrpc/com.atproto.server.createSession"
xrpc_suffix_record = "/xrpc/com.atproto.repo.createRecord"
xrpc_suffix_blob = "/xrpc/com.atproto.repo.uploadBlob"
# Bluesky
xrpc_suffix_lsconvo = "/xrpc/chat.bsky.convo.listConversations"
xrpc_suffix_sendmsg = "/xrpc/chat.bsky.convo.sendMessage"
# BlueSky is kind enough to return how many more requests we're allowed to
# continue to make within it's header response as:
# RateLimit-Reset: The epoc time (in seconds) we can expect our
# rate-limit to be reset.
# RateLimit-Remaining: an integer identifying how many requests we're
# still allow to make.
request_rate_per_sec = 0
# For Tracking Purposes
ratelimit_reset = datetime.now(timezone.utc).replace(tzinfo=None)
# Remaining messages
ratelimit_remaining = 1
# The default BlueSky host to use if one isn't specified
bluesky_default_host = 'bsky.social'
# Do not set body_maxlen as it is set in a property value below
# since the length varies depending if we are doing a direct message
# or a public post
# body_maxlen = see below @propery defined
# Our message body size
body_maxlen = 280
# BlueSky does not support a title
title_maxlen = 0
@ -108,7 +111,6 @@ class NotifyBlueSky(NotifyBase):
# Define object templates
templates = (
'{schema}://{user}@{password}',
'{schema}://{user}@{password}/{targets}',
)
# Define our template tokens
@ -124,26 +126,9 @@ class NotifyBlueSky(NotifyBase):
'private': True,
'required': True,
},
'target_user': {
'name': _('Target User'),
'type': 'string',
'prefix': '@',
'map_to': 'targets',
},
'targets': {
'name': _('Targets'),
'type': 'list:string',
},
})
# Define our template arguments
template_args = dict(NotifyBase.template_args, **{
'to': {
'alias_of': 'targets',
},
})
def __init__(self, targets=None, **kwargs):
def __init__(self, **kwargs):
"""
Initialize BlueSky Object
"""
@ -162,73 +147,125 @@ class NotifyBlueSky(NotifyBase):
# Set our default host
self.host = self.bluesky_default_host
# Identify our targets
# Identify our Handle (if define)
results = HANDLE_HOST_PARSE_RE.match(self.user)
if results:
self.user = results.group('handle')
self.host = results.group('host')
has_error = False
self.targets = []
for target in parse_list(targets):
match = IS_USER.match(target)
if match and match.group('user'):
self.targets.append(
'{}.{}'.format(
match.group('user'), match.group('host').lower()
if match.group('host') else self.host))
continue
has_error = True
self.logger.warning(
'Dropped invalid BlueSky user ({}) specified.'.format(target),
)
if has_error and not self.targets:
# We have specified that we want to notify one or more individual
# and we failed to load any of them. Since it's also valid to
# notify no one at all (which means we notify ourselves), it's
# important we don't switch from the users original intentions
self.targets = None
self.user = results.group('handle').strip()
self.host = results.group('host').strip()
return
def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs):
def send(self, body, title='', notify_type=NotifyType.INFO, attach=None,
**kwargs):
"""
Perform BlueSky Notification
"""
if self.targets is None:
# Users were specified, but were invalid
self.logger.warning('No valid BlueSky targets to notify.')
return False
if not self.__access_token and not self.login():
# We failed to authenticate - we're done
return False
if not self.targets: # Public Message
url = f'https://{self.host}{self.xrpc_suffix_record}'
now = datetime.now(tz=timezone.utc)
# Track our returning blob IDs as they're stored on the BlueSky server
blobs = []
payload = {
"collection": "app.bsky.feed.post",
"repo": self.get_identifier(),
"record": {
"text": body,
# 'YYYY-mm-ddTHH:MM:SSZ'
"createdAt": now.strftime('%FT%XZ'),
"$type": "app.bsky.feed.post"
}
if attach and self.attachment_support:
url = f'https://{self.host}{self.xrpc_suffix_blob}'
# We need to upload our payload first so that we can source it
# in remaining messages
for no, attachment in enumerate(attach, start=1):
# Perform some simple error checking
if not attachment:
# We could not access the attachment
self.logger.error(
'Could not access attachment {}.'.format(
attachment.url(privacy=True)))
return False
if not re.match(r'^image/.*', attachment.mimetype, re.I):
# Only support images at this time
self.logger.warning(
'Ignoring unsupported BlueSky attachment {}.'.format(
attachment.url(privacy=True)))
continue
self.logger.debug(
'Preparing BlueSky attachment {}'.format(
attachment.url(privacy=True)))
# Upload our image and get our blob associated with it
postokay, response = self._fetch(
url,
payload=attachment,
)
if not postokay:
# We can't post our attachment
return False
# Prepare our filename
filename = attachment.name \
if attachment.name else f'file{no:03}.dat'
if not (isinstance(response, dict)
and response.get('blob')):
self.logger.debug(
'Could not attach the file to BlueSky: %s (mime=%s)',
filename, attachment.mimetype)
continue
blobs.append((response.get('blob'), filename))
# Prepare our URL
url = f'https://{self.host}{self.xrpc_suffix_record}'
# prepare our batch of payloads to create
payloads = []
payload = {
"collection": "app.bsky.feed.post",
"repo": self.get_identifier(),
"record": {
"text": body,
# 'YYYY-mm-ddTHH:MM:SSZ'
"createdAt": datetime.now(
tz=timezone.utc).strftime('%FT%XZ'),
"$type": "app.bsky.feed.post"
}
}
if blobs:
for no, blob in enumerate(blobs, start=1):
_payload = payload.copy()
if no > 1:
#
# multiple instances
#
# 1. update createdAt time
# 2. Change text to identify image no
_payload['record']['createdAt'] = \
datetime.now(tz=timezone.utc).strftime('%FT%XZ')
_payload['record']['text'] = \
'{:02d}/{:02d}'.format(no, len(blobs))
_payload['record']['embed'] = {
"images": [
{
"image": blob[0],
"alt": blob[1],
}
],
"$type": "app.bsky.embed.images"
}
payloads.append(_payload)
else:
payloads.append(payload)
for payload in payloads:
# Send Login Information
postokay, response = self._fetch(
url,
payload=json.dumps(payload),
# We set this boolean so internal recursion doesn't take place.
login=True,
)
if not postokay:
# We failed
@ -238,94 +275,8 @@ class NotifyBlueSky(NotifyBase):
# 'message': 'reason'
# }
return False
return True
# If we get here, we're creating Private Message
url = f'https://{self.host}{self.xrpc_suffix_sendmsg}'
for target in self.targets:
try:
cid = self.get_conversation(target)
if not cid:
pass
except BlueSkyDMUnsupported:
return False
now = datetime.now(tz=timezone.utc)
payload = {
"convoId": cid,
"message": {
"createdAt": now.strftime('%FT%XZ'),
"text": body,
}
}
# Send Login Information
postokay, response = self._fetch(
url,
payload=json.dumps(payload),
# We set this boolean so internal recursion doesn't take place.
login=True,
)
if not postokay:
# We failed
# Bad responses look like:
# {
# 'error': 'InvalidRequest',
# 'message': 'reason'
# }
return False
return True
def get_conversation(self, user):
"""
Provided a user, a conversation is searched; you can not
start a brand new conversation (as it is unsupported)
"""
# First get our identifier
did = self.get_identifier(user)
if not did:
# Not possible to get conversation
return False
url = f'https://{self.host}{self.xrpc_suffix_lsconvo}'
# Track our retrievals (if more than one in a pagination response)
cursor = None
while True:
params = {}
if cursor:
params["cursor"] = cursor
# Send Login Information
postokay, response = self._fetch(
url,
params=params,
method='GET',
)
if not postokay:
# We had web request issues
if response.get('error') == 'MethodNotImplemented':
raise BlueSkyDMUnsupported()
return False
# Store our cursor (if defined)
cursor = response.get("cursor")
participant_dids = \
{p["did"] for p in response["conversation"]["participants"]}
if len(participant_dids) == 1:
# We do not want to post in collective groups involving
# this person, only an exclusive private message
return response['conversation']["id"]
if not cursor:
# Prevent looping forever
break
def get_identifier(self, user=None, login=False):
"""
Performs a Decentralized User Lookup and returns the identifier
@ -447,7 +398,7 @@ class NotifyBlueSky(NotifyBase):
return True
def _fetch(self, url, payload=None, params=None, method='POST',
login=False):
content_type=None, login=False):
"""
Wrapper to BlueSky API requests object
"""
@ -456,8 +407,9 @@ class NotifyBlueSky(NotifyBase):
headers = {
'User-Agent': self.app_id,
'Content-Type':
'application/x-www-form-urlencoded; charset=utf-8'
if method == 'GET' else 'application/json'
payload.mimetype if isinstance(payload, AttachBase) else (
'application/x-www-form-urlencoded; charset=utf-8'
if method == 'GET' else 'application/json')
}
if self.__access_token:
@ -467,10 +419,30 @@ class NotifyBlueSky(NotifyBase):
# Some Debug Logging
self.logger.debug('BlueSky {} URL: {} (cert_verify={})'.format(
method, url, self.verify_certificate))
self.logger.debug('BlueSky Payload: %s' % str(payload))
self.logger.debug(
'BlueSky Payload: %s', str(payload)
if not isinstance(payload, AttachBase)
else 'attach: ' + payload.name)
# By default set wait to None
wait = None
if self.ratelimit_remaining == 0:
# Determine how long we should wait for or if we should wait at
# all. This isn't fool-proof because we can't be sure the client
# time (calling this script) is completely synced up with the
# Twitter server. One would hope we're on NTP and our clocks are
# the same allowing this to role smoothly:
now = datetime.now(timezone.utc).replace(tzinfo=None)
if now < self.ratelimit_reset:
# We need to throttle for the difference in seconds
# We add 0.3 seconds to the end just to allow a grace
# period.
wait = (self.ratelimit_reset - now).total_seconds() + 0.3
# Always call throttle before any remote server i/o is made;
self.throttle()
self.throttle(wait=wait)
# Initialize a default value for our content value
content = {}
@ -480,7 +452,8 @@ class NotifyBlueSky(NotifyBase):
try:
r = fn(
url,
data=payload,
data=payload if not isinstance(payload, AttachBase)
else payload.open(),
params=params,
headers=headers,
verify=self.verify_certificate,
@ -497,6 +470,24 @@ class NotifyBlueSky(NotifyBase):
# AttributeError = r.content is None
content = {}
# Rate limit handling... our header objects at this point are:
# 'RateLimit-Limit': '10', # Total # of requests per hour
# 'RateLimit-Remaining': '9', # Requests remaining
# 'RateLimit-Reset': '1741631362', # Epoch Time
# 'RateLimit-Policy': '10;w=86400' # NoEntries;w=<window>
try:
# Capture rate limiting if possible
self.ratelimit_remaining = \
int(r.headers.get('ratelimit-remaining'))
self.ratelimit_reset = datetime.fromtimestamp(
int(r.headers.get('ratelimit-reset')), timezone.utc
).replace(tzinfo=None)
except (TypeError, ValueError):
# This is returned if we could not retrieve this information
# gracefully accept this state and move on
pass
if r.status_code != requests.codes.ok:
# We had a problem
status_str = \
@ -525,6 +516,14 @@ class NotifyBlueSky(NotifyBase):
# Mark our failure
return (False, content)
except (OSError, IOError) as e:
self.logger.warning(
'An I/O error occurred while handling {}.'.format(
payload.name if isinstance(payload, AttachBase)
else payload))
self.logger.debug('I/O Exception: %s' % str(e))
return (False, content)
return (True, content)
@property
@ -547,34 +546,19 @@ class NotifyBlueSky(NotifyBase):
# Apply our other parameters
params = self.url_parameters(privacy=privacy, *args, **kwargs)
user = self.user
if self.host != self.bluesky_default_host:
user += f'.{self.host}'
# our URL
return '{schema}://{user}@{password}/{targets}?{params}'.format(
schema=self.protocol,
user=NotifyBlueSky.quote(self.user, safe=''),
return '{schema}://{user}@{password}?{params}'.format(
schema=self.secure_protocol[0],
user=NotifyBlueSky.quote(user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
targets='/'.join(
[NotifyBlueSky.quote('@{}'.format(target), safe='@')
for target in self.targets]) if self.targets else '',
params=NotifyBlueSky.urlencode(params),
)
@property
def body_maxlen(self):
"""
The maximum allowable characters allowed in the body per message
This is used during a Private DM Message Size (not Public Posts
which are limited to 280 characters)
"""
return 10000 if self.targets else 280
def __len__(self):
"""
Returns the number of targets associated with this notification
"""
targets = len(self.targets)
return targets if targets > 0 else 1
@staticmethod
def parse_url(url):
"""
@ -589,13 +573,6 @@ class NotifyBlueSky(NotifyBase):
if not results.get('password') and results['host']:
results['password'] = NotifyBlueSky.unquote(results['host'])
results['targets'] = []
else:
# Get targets (if any)
results['targets'] = [NotifyBlueSky.unquote(results['host'])]
results['targets'] += NotifyBlueSky.split_path(results['fullpath'])
# Do not use host field
results['host'] = None

630
test/test_plugin_bluesky.py Normal file
View File

@ -0,0 +1,630 @@
# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import json
import logging
import os
from datetime import datetime
from datetime import timezone
from unittest.mock import Mock, patch
import pytest
import requests
from apprise import Apprise
from apprise import NotifyType
from apprise import AppriseAttachment
from apprise.plugins.bluesky import NotifyBlueSky
from helpers import AppriseURLTester
# Disable logging for a cleaner testing output
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
TWITTER_SCREEN_NAME = 'apprise'
# Our Testing URLs
apprise_url_tests = (
##################################
# NotifyBlueSky
##################################
('bluesky://', {
# Missing user and app_pass
'instance': TypeError,
}),
('bluesky://:@/', {
'instance': TypeError,
}),
('bluesky://app-pw', {
# Missing User
'instance': TypeError,
}),
('bluesky://user@app-pw', {
'instance': NotifyBlueSky,
# Expected notify() response False (because we won't be able
# to detect our user)
'notify_response': False,
# Our expected url(privacy=True) startswith() response:
'privacy_url': 'bsky://user@****',
}),
('bluesky://user@app-pw1?cache=no', {
'instance': NotifyBlueSky,
# At minimum we need an access token and did; below has no did
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
},
'notify_response': False,
}),
('bluesky://user@app-pw2?cache=no', {
'instance': NotifyBlueSky,
# valid payload
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
},
}),
('bluesky://user@app-pw3', {
# no cache; so we store our results
'instance': NotifyBlueSky,
# valid payload
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
# For handling attachments
'blob': 'content',
},
}),
('bluesky://user.example.ca@app-pw3', {
# no cache; so we store our results
'instance': NotifyBlueSky,
# valid payload
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
# For handling attachments
'blob': 'content',
},
}),
# A duplicate of the entry above, this will cause cache to be referenced
('bluesky://user@app-pw3', {
# no cache; so we store our results
'instance': NotifyBlueSky,
# valid payload
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
# For handling attachments
'blob': 'content',
},
}),
('bluesky://user@app-pw', {
'instance': NotifyBlueSky,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
},
}),
('bluesky://user@app-pw', {
'instance': NotifyBlueSky,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
'requests_response_text': {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
},
}),
)
def good_response(data=None):
"""
Prepare a good response.
"""
response = Mock()
response.content = json.dumps({
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234',
} if data is None else data)
response.status_code = requests.codes.ok
# Epoch time:
epoch = datetime.fromtimestamp(0, timezone.utc)
# Generate a very large rate-limit header window
response.headers = {
'ratelimit-reset': (
datetime.now(timezone.utc) - epoch).total_seconds() + 86400,
'ratelimit-remaining': '1000',
}
return response
def bad_response(data=None):
"""
Prepare a bad response.
"""
response = Mock()
response.content = json.dumps({
"error": 'InvalidRequest',
"message": "Something failed",
} if data is None else data)
response.headers = {}
response.status_code = requests.codes.internal_server_error
return response
@pytest.fixture
def bluesky_url():
url = 'bluesky://user@app-key'
return url
@pytest.fixture
def good_message_response():
"""
Prepare a good response.
"""
response = good_response()
return response
@pytest.fixture
def bad_message_response():
"""
Prepare a bad message response.
"""
response = bad_response()
return response
@pytest.fixture
def good_media_response():
"""
Prepare a good media response.
"""
response = Mock()
response.content = json.dumps({
'blob': {
'$type': 'blob',
'mimeType': 'image/jpeg',
'ref': {
'$link': 'baf124idksduabcjkaa3iey4bfyq'},
'size': 73667,
}
})
response.headers = {}
response.status_code = requests.codes.ok
return response
def test_plugin_bluesky_urls():
"""
NotifyBlueSky() Apprise URLs
"""
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()
def test_plugin_bluesky_general(mocker):
"""
NotifyBlueSky() General Tests
"""
mock_get = mocker.patch("requests.get")
mock_post = mocker.patch("requests.post")
# Epoch time:
epoch = datetime.fromtimestamp(0, timezone.utc)
request = good_response()
request.headers = {
'ratelimit-reset': (
datetime.now(timezone.utc) - epoch).total_seconds(),
'ratelimit-remaining': '1',
}
# Prepare Mock
mock_get.return_value = request
mock_post.return_value = request
# Variation Initializations
obj = NotifyBlueSky(user='handle', password='app-password')
assert isinstance(obj, NotifyBlueSky) is True
assert isinstance(obj.url(), str) is True
# apprise room was found
assert obj.send(body="test") is True
# Change our status code and try again
request.status_code = 403
assert obj.send(body="test") is False
assert obj.ratelimit_remaining == 1
# Return the status
request.status_code = requests.codes.ok
# Force a reset
request.headers['ratelimit-remaining'] = 0
# behind the scenes, it should cause us to update our rate limit
assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 0
# This should cause us to block
request.headers['ratelimit-remaining'] = 10
assert obj.send(body="test") is True
assert obj.ratelimit_remaining == 10
# Handle cases where we simply couldn't get this field
del request.headers['ratelimit-remaining']
assert obj.send(body="test") is True
# It remains set to the last value
assert obj.ratelimit_remaining == 10
# Reset our variable back to 1
request.headers['ratelimit-remaining'] = 1
# Handle cases where our epoch time is wrong
del request.headers['ratelimit-reset']
assert obj.send(body="test") is True
# Return our object, but place it in the future forcing us to block
request.headers['ratelimit-reset'] = \
(datetime.now(timezone.utc) - epoch).total_seconds() + 1
request.headers['ratelimit-remaining'] = 0
obj.ratelimit_remaining = 0
assert obj.send(body="test") is True
# Return our object, but place it in the future forcing us to block
request.headers['ratelimit-reset'] = \
(datetime.now(timezone.utc) - epoch).total_seconds() - 1
request.headers['ratelimit-remaining'] = 0
obj.ratelimit_remaining = 0
assert obj.send(body="test") is True
# Return our limits to always work
request.headers['ratelimit-reset'] = \
(datetime.now(timezone.utc) - epoch).total_seconds()
request.headers['ratelimit-remaining'] = 1
obj.ratelimit_remaining = 1
assert obj.send(body="test") is True
# Flush our cache forcing it is re-creating
NotifyBlueSky._user_cache = {}
assert obj.send(body="test") is True
# Cause content response to be None
request.content = None
assert obj.send(body="test") is True
# Invalid JSON
request.content = '{'
assert obj.send(body="test") is True
# Return it to a parseable string
request.content = '{}'
results = NotifyBlueSky.parse_url('bluesky://handle@app-pass-word')
assert isinstance(results, dict) is True
# cause a json parsing issue now
response_obj = None
assert obj.send(body="test") is True
response_obj = '{'
assert obj.send(body="test") is True
# Flush out our cache
NotifyBlueSky._user_cache = {}
response_obj = {
'accessJwt': 'abcd',
'refreshJwt': 'abcd',
'did': 'did:1234'
}
request.content = json.dumps(response_obj)
obj = NotifyBlueSky(user='handle', password='app-pass-word')
assert obj.send(body="test") is True
# Alter the key forcing us to look up a new value of ourselves again
NotifyBlueSky._user_cache = {}
NotifyBlueSky._whoami_cache = None
obj.ckey = 'different.then.it.was'
assert obj.send(body="test") is True
NotifyBlueSky._whoami_cache = None
obj.ckey = 'different.again'
assert obj.send(body="test") is True
def test_plugin_bluesky_edge_cases():
"""
NotifyBlueSky() Edge Cases
"""
with pytest.raises(TypeError):
NotifyBlueSky()
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_basic(
mock_get, mock_post, bluesky_url, good_message_response,
good_media_response):
"""
NotifyBlueSky() Attachment Checks - Basic
"""
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_message_response, good_media_response, good_message_response]
# Create application objects.
obj = Apprise.instantiate(bluesky_url)
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Send our notification
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 3
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[2][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_bad_message_response(
mock_get, mock_post, bluesky_url, good_media_response,
good_message_response, bad_message_response):
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_message_response, bad_message_response, good_message_response]
# Create application objects.
obj = Apprise.instantiate(bluesky_url)
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Our notification will fail now since our message will error out.
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_upload_fails(
mock_get, mock_post, bluesky_url, good_media_response,
good_message_response):
# Test case where upload fails.
mock_get.return_value = good_message_response
mock_post.side_effect = [good_message_response, OSError]
# Create application objects.
obj = Apprise.instantiate(bluesky_url)
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Send our notification; it will fail because of the message response.
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_invalid_attachment(
mock_get, mock_post, bluesky_url, good_message_response,
good_media_response):
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_message_response, good_media_response]
# Create application objects.
obj = Apprise.instantiate(bluesky_url)
attach = AppriseAttachment(
os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg'))
# An invalid attachment will cause a failure.
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
# No post request as attachment is not good.
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_attachments_multiple_batch(
mock_get, mock_post, bluesky_url, good_message_response,
good_media_response):
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_message_response, good_media_response, good_media_response,
good_media_response, good_media_response, good_message_response,
good_message_response, good_message_response, good_message_response]
# instantiate our object
obj = Apprise.instantiate(bluesky_url)
# 4 images are produced
attach = [
os.path.join(TEST_VAR_DIR, 'apprise-test.gif'),
os.path.join(TEST_VAR_DIR, 'apprise-test.gif'),
os.path.join(TEST_VAR_DIR, 'apprise-test.jpeg'),
os.path.join(TEST_VAR_DIR, 'apprise-test.png'),
# This one is not supported, so it's ignored gracefully
os.path.join(TEST_VAR_DIR, 'apprise-test.mp4'),
]
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 9
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[2][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[3][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[4][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[5][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[6][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[7][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[8][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
# If we call the functions again, the only difference is
# we no longer need to resolve the handle or create a session
# as the previous one is fine.
mock_get.reset_mock()
mock_post.reset_mock()
mock_get.return_value = good_message_response
mock_post.side_effect = [
good_media_response, good_media_response, good_media_response,
good_media_response, good_message_response, good_message_response,
good_message_response, good_message_response]
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Verify API calls.
assert mock_get.call_count == 0
assert mock_post.call_count == 8
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[1][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[2][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[3][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.uploadBlob'
assert mock_post.call_args_list[4][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[5][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[6][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
assert mock_post.call_args_list[7][0][0] == \
'https://bsky.social/xrpc/com.atproto.repo.createRecord'
@patch('requests.post')
@patch('requests.get')
def test_plugin_bluesky_auth_failure(
mock_get, mock_post, bluesky_url, good_message_response,
bad_message_response):
mock_get.return_value = good_message_response
mock_post.return_value = bad_message_response
# instantiate our object
obj = Apprise.instantiate(bluesky_url)
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO) is False
# Verify API calls.
assert mock_get.call_count == 1
assert mock_get.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.identity.resolveHandle'
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://bsky.social/xrpc/com.atproto.server.createSession'