apprise/test/test_plugin_fcm.py

943 lines
30 KiB
Python

# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#
# Great Resources:
# - Dev/Legacy API:
# https://firebase.google.com/docs/cloud-messaging/http-server-ref
# - Legacy API (v1) -> OAuth
# - https://firebase.google.com/docs/cloud-messaging/migrate-v1
import os
import sys
from unittest import mock
import pytest
import requests
import json
from apprise import Apprise
from apprise.plugins.fcm import NotifyFCM
from helpers import AppriseURLTester
try:
from apprise.plugins.fcm.oauth import GoogleOAuth
from apprise.plugins.fcm.common import FCM_MODES
from apprise.plugins.fcm.priority import (
FCMPriorityManager, FCM_PRIORITIES)
from apprise.plugins.fcm.color import FCMColorManager
from cryptography.exceptions import UnsupportedAlgorithm
except ImportError:
# No problem; there is no cryptography support
pass
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Test files for KeyFile Directory
PRIVATE_KEYFILE_DIR = os.path.join(os.path.dirname(__file__), 'var', 'fcm')
FCM_KEYFILE = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json')
# Our Testing URLs
apprise_url_tests = (
('fcm://', {
# We failed to identify any valid authentication
'instance': TypeError,
}),
('fcm://:@/', {
# We failed to identify any valid authentication
'instance': TypeError,
}),
('fcm://project@%20%20/', {
# invalid apikey
'instance': TypeError,
}),
('fcm://apikey/', {
# no project id specified so we operate in legacy mode
'instance': NotifyFCM,
# but there are no targets specified so we return False
'notify_response': False,
}),
('fcm://apikey/device', {
# Valid device
'instance': NotifyFCM,
'privacy_url': 'fcm://a...y/device',
}),
('fcm://apikey/#topic', {
# Valid topic
'instance': NotifyFCM,
'privacy_url': 'fcm://a...y/%23topic',
}),
('fcm://apikey/device?mode=invalid', {
# Valid device, invalid mode
'instance': TypeError,
}),
('fcm://apikey/#topic1/device/%20/', {
# Valid topic, valid device, and invalid entry
'instance': NotifyFCM,
}),
('fcm://apikey?to=#topic1,device', {
# Test to=
'instance': NotifyFCM,
}),
('fcm://?apikey=abc123&to=device', {
# Test apikey= to=
'instance': NotifyFCM,
}),
('fcm://?apikey=abc123&to=device&image=yes', {
# Test image boolean
'instance': NotifyFCM,
}),
('fcm://?apikey=abc123&to=device&color=no', {
# Disable colors
'instance': NotifyFCM,
}),
('fcm://?apikey=abc123&to=device&color=aabbcc', {
# custom colors
'instance': NotifyFCM,
}),
('fcm://?apikey=abc123&to=device'
'&image_url=http://example.com/interesting.jpg', {
# Test image_url
'instance': NotifyFCM}),
('fcm://?apikey=abc123&to=device'
'&image_url=http://example.com/interesting.jpg&image=no', {
# Test image_url but set to no
'instance': NotifyFCM}),
('fcm://?apikey=abc123&to=device&+key=value&+key2=value2', {
# Test apikey= to= and data arguments
'instance': NotifyFCM,
}),
('fcm://%20?to=device&keyfile=/invalid/path', {
# invalid Project ID
'instance': TypeError,
}),
('fcm://project_id?to=device&keyfile=/invalid/path', {
# Test to= and auto-detection of OAuth mode
'instance': NotifyFCM,
# we'll fail to send our notification as a result
'response': False,
}),
('fcm://?to=device&project=project_id&keyfile=/invalid/path', {
# Test project= & to= and auto detection of OAuth mode
'instance': NotifyFCM,
# we'll fail to send our notification as a result
'response': False,
}),
('fcm://project_id?to=device&mode=oauth2', {
# no keyfile was specified
'instance': TypeError,
}),
('fcm://project_id?to=device&mode=oauth2&keyfile=/invalid/path', {
# Same test as above except we explicitly set our oauth2 mode
# Test to= and auto-detection of OAuth mode
'instance': NotifyFCM,
# we'll fail to send our notification as a result
'response': False,
}),
('fcm://apikey/#topic1/device/?mode=legacy', {
'instance': NotifyFCM,
# throw a bizarre code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('fcm://apikey/#topic1/device/?mode=legacy', {
'instance': NotifyFCM,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracefully handle them
'test_requests_exceptions': True,
}),
('fcm://project/#topic1/device/?mode=oauth2&keyfile=file://{}'.format(
os.path.join(
os.path.dirname(__file__), 'var', 'fcm',
'service_account.json')), {
'instance': NotifyFCM,
# throw a bizarre code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('fcm://projectid/#topic1/device/?mode=oauth2&keyfile=file://{}'.format(
os.path.join(
os.path.dirname(__file__), 'var', 'fcm',
'service_account.json')), {
'instance': NotifyFCM,
# Throws a series of connection and transfer exceptions when
# this flag is set and tests that we gracefully handle them
'test_requests_exceptions': True,
}),
)
@pytest.fixture
def mock_post(mocker):
"""
Prepare a good OAuth mock response.
"""
mock_thing = mocker.patch("requests.post")
response = mock.Mock()
response.content = json.dumps({
"access_token": "ya29.c.abcd",
"expires_in": 3599,
"token_type": "Bearer",
})
response.status_code = requests.codes.ok
mock_thing.return_value = response
return mock_thing
@pytest.fixture
def mock_post_legacy(mocker):
"""
Prepare a good legacy mock response.
"""
mock_thing = mocker.patch("requests.post")
response = mock.Mock()
response.status_code = requests.codes.ok
mock_thing.return_value = response
return mock_thing
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_urls():
"""
NotifyFCM() Apprise URLs
"""
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_legacy_default(mock_post_legacy):
"""
NotifyFCM() Legacy/APIKey default checks.
"""
# A valid Legacy URL
obj = Apprise.instantiate(
'fcm://abc123/device/'
'?+key=value&+key2=value2'
'&image_url=https://example.com/interesting.png')
# Send our notification
assert obj.notify("test") is True
# Test our call count
assert mock_post_legacy.call_count == 1
assert mock_post_legacy.call_args_list[0][0][0] == \
'https://fcm.googleapis.com/fcm/send'
payload = mock_post_legacy.mock_calls[0][2]
data = json.loads(payload['data'])
assert 'data' in data
assert isinstance(data, dict)
assert 'key' in data['data']
assert data['data']['key'] == 'value'
assert 'key2' in data['data']
assert data['data']['key2'] == 'value2'
assert 'notification' in data
assert isinstance(data['notification'], dict)
assert 'notification' in data['notification']
assert isinstance(data['notification']['notification'], dict)
assert 'image' in data['notification']['notification']
assert data['notification']['notification']['image'] == \
'https://example.com/interesting.png'
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_legacy_priorities(mock_post_legacy):
"""
NotifyFCM() Legacy/APIKey priorities checks.
"""
obj = Apprise.instantiate(
'fcm://abc123/device/?priority=low')
assert mock_post_legacy.call_count == 0
# Send our notification
assert obj.notify(title="title", body="body") is True
# Test our call count
assert mock_post_legacy.call_count == 1
assert mock_post_legacy.call_args_list[0][0][0] == \
'https://fcm.googleapis.com/fcm/send'
payload = mock_post_legacy.mock_calls[0][2]
data = json.loads(payload['data'])
assert 'data' not in data
assert 'notification' in data
assert isinstance(data['notification'], dict)
assert 'notification' in data['notification']
assert isinstance(data['notification']['notification'], dict)
assert 'image' not in data['notification']['notification']
assert 'priority' in data
# legacy can only switch between high/low
assert data['priority'] == "normal"
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_legacy_no_colors(mock_post_legacy):
"""
NotifyFCM() Legacy/APIKey `color=no` checks.
"""
obj = Apprise.instantiate(
'fcm://abc123/device/?color=no')
assert mock_post_legacy.call_count == 0
# Send our notification
assert obj.notify(title="title", body="body") is True
# Test our call count
assert mock_post_legacy.call_count == 1
assert mock_post_legacy.call_args_list[0][0][0] == \
'https://fcm.googleapis.com/fcm/send'
payload = mock_post_legacy.mock_calls[0][2]
data = json.loads(payload['data'])
assert 'data' not in data
assert 'notification' in data
assert isinstance(data['notification'], dict)
assert 'notification' in data['notification']
assert isinstance(data['notification']['notification'], dict)
assert 'image' not in data['notification']['notification']
assert 'color' not in data['notification']['notification']
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_legacy_colors(mock_post_legacy):
"""
NotifyFCM() Legacy/APIKey colors checks.
"""
obj = Apprise.instantiate(
'fcm://abc123/device/?color=AA001b')
assert mock_post_legacy.call_count == 0
# Send our notification
assert obj.notify(title="title", body="body") is True
# Test our call count
assert mock_post_legacy.call_count == 1
assert mock_post_legacy.call_args_list[0][0][0] == \
'https://fcm.googleapis.com/fcm/send'
payload = mock_post_legacy.mock_calls[0][2]
data = json.loads(payload['data'])
assert 'data' not in data
assert 'notification' in data
assert isinstance(data['notification'], dict)
assert 'notification' in data['notification']
assert isinstance(data['notification']['notification'], dict)
assert 'image' not in data['notification']['notification']
assert 'color' in data['notification']['notification']
assert data['notification']['notification']['color'] == '#aa001b'
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_oauth_default(mock_post):
"""
NotifyFCM() general OAuth checks - success.
Test using a valid Project ID and key file.
"""
obj = Apprise.instantiate(
f'fcm://mock-project-id/device/#topic/?keyfile={FCM_KEYFILE}')
# send our notification
assert obj.notify("test") is True
# Test our call count
assert mock_post.call_count == 3
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
assert mock_post.call_args_list[1][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
assert mock_post.call_args_list[2][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_oauth_invalid_project_id(mock_post):
"""
NotifyFCM() OAuth checks, with invalid project id.
"""
# Test having a valid keyfile, but not a valid project id match.
obj = Apprise.instantiate(
f'fcm://invalid_project_id/device/?keyfile={FCM_KEYFILE}')
# we'll fail as a result
assert obj.notify("test") is False
# Test our call count
assert mock_post.call_count == 0
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_oauth_keyfile_error(mock_post):
"""
NotifyFCM() OAuth checks, while unable to read key file.
"""
# Now we test using a valid Project ID but we can't open our file
obj = Apprise.instantiate(
f'fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}')
with mock.patch('builtins.open', side_effect=OSError):
# we'll fail as a result
assert obj.notify("test") is False
# Test our call count
assert mock_post.call_count == 0
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_oauth_data_parameters(mock_post):
"""
NotifyFCM() OAuth checks, success.
Test using a valid Project ID and data parameters.
"""
obj = Apprise.instantiate(
f'fcm://mock-project-id/device/#topic/?keyfile={FCM_KEYFILE}'
'&+key=value&+key2=value2'
'&image_url=https://example.com/interesting.png')
assert mock_post.call_count == 0
# send our notification
assert obj.notify("test") is True
# Test our call count
assert mock_post.call_count == 3
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
assert mock_post.call_args_list[1][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
payload = mock_post.mock_calls[1][2]
data = json.loads(payload['data'])
assert 'message' in data
assert isinstance(data['message'], dict)
assert 'data' in data['message']
assert isinstance(data['message']['data'], dict)
assert 'key' in data['message']['data']
assert data['message']['data']['key'] == 'value'
assert 'key2' in data['message']['data']
assert data['message']['data']['key2'] == 'value2'
assert 'notification' in data['message']
assert isinstance(data['message']['notification'], dict)
assert 'image' in data['message']['notification']
assert data['message']['notification']['image'] == \
'https://example.com/interesting.png'
assert mock_post.call_args_list[2][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
payload = mock_post.mock_calls[2][2]
data = json.loads(payload['data'])
assert 'message' in data
assert isinstance(data['message'], dict)
assert 'data' in data['message']
assert isinstance(data['message']['data'], dict)
assert 'key' in data['message']['data']
assert data['message']['data']['key'] == 'value'
assert 'key2' in data['message']['data']
assert data['message']['data']['key2'] == 'value2'
assert 'notification' in data['message']
assert isinstance(data['message']['notification'], dict)
assert 'image' in data['message']['notification']
assert data['message']['notification']['image'] == \
'https://example.com/interesting.png'
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_oauth_priorities(mock_post):
"""
Verify priorities work as intended.
"""
obj = Apprise.instantiate(
f'fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}'
'&priority=high')
assert mock_post.call_count == 0
# Send our notification
assert obj.notify(title="title", body="body") is True
# Test our call count
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
assert mock_post.call_args_list[1][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
payload = mock_post.mock_calls[1][2]
data = json.loads(payload['data'])
assert 'message' in data
assert isinstance(data['message'], dict)
assert 'data' not in data['message']
assert 'notification' in data['message']
assert isinstance(data['message']['notification'], dict)
assert 'image' not in data['message']['notification']
assert data['message']['apns']['headers']['apns-priority'] == "10"
assert data['message']['webpush']['headers']['Urgency'] == "high"
assert data['message']['android']['priority'] == "HIGH"
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_oauth_no_colors(mock_post):
"""
Verify `color=no` work as intended.
"""
obj = Apprise.instantiate(
f'fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}'
'&color=no')
assert mock_post.call_count == 0
# Send our notification
assert obj.notify(title="title", body="body") is True
# Test our call count
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
assert mock_post.call_args_list[1][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
payload = mock_post.mock_calls[1][2]
data = json.loads(payload['data'])
assert 'message' in data
assert isinstance(data['message'], dict)
assert 'data' not in data['message']
assert 'notification' in data['message']
assert isinstance(data['message']['notification'], dict)
assert 'color' not in data['message']['notification']
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_oauth_colors(mock_post):
"""
Verify colors work as intended.
"""
obj = Apprise.instantiate(
f'fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}'
'&color=#12AAbb')
assert mock_post.call_count == 0
# Send our notification
assert obj.notify(title="title", body="body") is True
# Test our call count
assert mock_post.call_count == 2
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
assert mock_post.call_args_list[1][0][0] == \
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send'
payload = mock_post.mock_calls[1][2]
data = json.loads(payload['data'])
assert 'message' in data
assert isinstance(data['message'], dict)
assert 'data' not in data['message']
assert 'notification' in data['message']
assert isinstance(data['message']['notification'], dict)
assert 'color' in data['message']['android']['notification']
assert data['message']['android']['notification']['color'] == '#12aabb'
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_keyfile_parse_default(mock_post):
"""
NotifyFCM() KeyFile Tests
"""
oauth = GoogleOAuth()
# We can not get an Access Token without content loaded
assert oauth.access_token is None
# Load our content
assert oauth.load(FCM_KEYFILE) is True
assert oauth.access_token is not None
# Test our call count
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
mock_post.reset_mock()
# a second call uses cache since our token hasn't expired yet
assert oauth.access_token is not None
assert mock_post.call_count == 0
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_keyfile_parse_no_expiry(mock_post):
"""
Test case without `expires_in` entry.
"""
mock_post.return_value.content = json.dumps({
"access_token": "ya29.c.abcd",
"token_type": "Bearer",
})
oauth = GoogleOAuth()
assert oauth.load(FCM_KEYFILE) is True
assert oauth.access_token is not None
# Test our call count
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_keyfile_parse_user_agent(mock_post):
"""
Test case with `user-agent` override.
"""
oauth = GoogleOAuth(user_agent="test-agent-override")
assert oauth.load(FCM_KEYFILE) is True
assert oauth.access_token is not None
assert mock_post.call_count == 1
assert mock_post.call_args_list[0][0][0] == \
'https://accounts.google.com/o/oauth2/token'
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_keyfile_parse_keyfile_failures(mock_post: mock.Mock):
"""
Test some errors that can get thrown when trying to handle
the `service_account.json` file.
"""
# Now we test a case where we can't access the file we've been pointed to:
oauth = GoogleOAuth()
with mock.patch('builtins.open', side_effect=OSError):
# We will fail to retrieve our Access Token
assert oauth.load(FCM_KEYFILE) is False
assert oauth.access_token is None
oauth = GoogleOAuth()
with mock.patch('json.loads', side_effect=([], )):
# We will fail to retrieve our Access Token since we did not parse
# a dictionary
assert oauth.load(FCM_KEYFILE) is False
assert oauth.access_token is None
# Case where we can't load the PEM key:
oauth = GoogleOAuth()
with mock.patch(
'cryptography.hazmat.primitives.serialization'
'.load_pem_private_key',
side_effect=ValueError("")):
assert oauth.load(FCM_KEYFILE) is False
assert oauth.access_token is None
# Case where we can't load the PEM key:
oauth = GoogleOAuth()
with mock.patch(
'cryptography.hazmat.primitives.serialization'
'.load_pem_private_key',
side_effect=TypeError("")):
assert oauth.load(FCM_KEYFILE) is False
assert oauth.access_token is None
# Case where we can't load the PEM key:
oauth = GoogleOAuth()
with mock.patch(
'cryptography.hazmat.primitives.serialization'
'.load_pem_private_key',
side_effect=UnsupportedAlgorithm("")):
# Note: This test should be te
assert oauth.load(FCM_KEYFILE) is False
assert oauth.access_token is None
# Verify that not a single call to the web escaped the test harness.
assert mock_post.mock_calls == []
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_keyfile_parse_token_failures(mock_post):
"""
Test some web errors that can occur when speaking upstream
with Google to get our token generated.
"""
mock_post.return_value.status_code = requests.codes.internal_server_error
oauth = GoogleOAuth()
assert oauth.load(FCM_KEYFILE) is True
# We'll fail due to an bad web response
assert oauth.access_token is None
# Return our status code to how it was
mock_post.return_value.status_code = requests.codes.ok
# No access token
bad_response_1 = mock.Mock()
bad_response_1.content = json.dumps({
"expires_in": 3599,
"token_type": "Bearer",
})
# Invalid JSON
bad_response_2 = mock.Mock()
bad_response_2.content = '{'
mock_post.return_value = None
# Throw an exception on the first call to requests.post()
for side_effect in (
requests.RequestException(), bad_response_1, bad_response_2):
mock_post.side_effect = side_effect
# Test all of our bad side effects
oauth = GoogleOAuth()
assert oauth.load(FCM_KEYFILE) is True
# We'll fail due to an bad web response
assert oauth.access_token is None
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_bad_keyfile_parse():
"""
NotifyFCM() KeyFile Bad Service Account Type Tests
"""
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account-bad-type.json')
oauth = GoogleOAuth()
assert oauth.load(path) is False
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_keyfile_missing_entries_parse(tmpdir):
"""
NotifyFCM() KeyFile Missing Entries Test
"""
# Prepare a base keyfile reference to use
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json')
with open(path, mode="r", encoding='utf-8') as fp:
content = json.loads(fp.read())
path = tmpdir.join('fcm_keyfile.json')
# Test that we fail to load if the following keys are missing:
for entry in (
'client_email', 'private_key_id', 'private_key', 'type',
'project_id'):
# Ensure the key actually exists in our file
assert entry in content
# Create a copy of our content
content_copy = content.copy()
# Remove our entry we expect to validate against
del content_copy[entry]
assert entry not in content_copy
path.write(json.dumps(content_copy))
oauth = GoogleOAuth()
assert oauth.load(str(path)) is False
# Now write ourselves a bad JSON file
path.write('{')
oauth = GoogleOAuth()
assert oauth.load(str(path)) is False
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_priority_manager():
"""
NotifyFCM() FCMPriorityManager() Testing
"""
for mode in FCM_MODES:
for priority in FCM_PRIORITIES:
instance = FCMPriorityManager(mode, priority)
assert isinstance(instance.payload(), dict)
# Verify it's not empty
assert bool(instance)
assert instance.payload()
assert str(instance) == priority
# We do not have to set a priority
instance = FCMPriorityManager(mode)
assert isinstance(instance.payload(), dict)
# Dictionary is empty though
assert not bool(instance)
assert not instance.payload()
assert str(instance) == ''
with pytest.raises(TypeError):
instance = FCMPriorityManager(mode, 'invalid')
with pytest.raises(TypeError):
instance = FCMPriorityManager('invald', 'high')
# mode validation is done at the higher NotifyFCM() level so
# it is not tested here (not required)
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_fcm_color_manager():
"""
NotifyFCM() FCMColorManager() Testing
"""
# No colors
instance = FCMColorManager('no')
assert bool(instance) is False
assert instance.get() is None
# We'll return that we are not defined
assert str(instance) == 'no'
# Asset colors
instance = FCMColorManager('yes')
assert isinstance(instance.get(), str)
# Output: #rrggbb
assert len(instance.get()) == 7
# Starts with has symbol
assert instance.get()[0] == '#'
# We'll return that we are defined but using default configuration
assert str(instance) == 'yes'
# We will be `true` because we can acquire a color based on what was
# passed in
assert bool(instance)
# Custom color
instance = FCMColorManager('#A2B3A4')
assert isinstance(instance.get(), str)
assert instance.get() == '#a2b3a4'
assert bool(instance)
# str() response does not include hashtag
assert str(instance) == 'a2b3a4'
# Custom color (no hashtag)
instance = FCMColorManager('A2B3A4')
assert isinstance(instance.get(), str)
# Hashtag is always part of output
assert instance.get() == '#a2b3a4'
assert bool(instance)
# str() response does not include hashtag
assert str(instance) == 'a2b3a4'
# Custom color (no hashtag) but only using 3 letter rgb values
instance = FCMColorManager('AC4')
assert isinstance(instance.get(), str)
# Hashtag is always part of output
assert instance.get() == '#aacc44'
assert bool(instance)
# str() response does not include hashtag
assert str(instance) == 'aacc44'
@pytest.mark.skipif(
'cryptography' in sys.modules,
reason="Requires that cryptography NOT be installed")
def test_plugin_fcm_cryptography_import_error():
"""
NotifyFCM Cryptography loading failure
"""
# Prepare a base keyfile reference to use
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json')
# Attempt to instantiate our object
obj = Apprise.instantiate(
'fcm://mock-project-id/device/#topic/?keyfile={}'.format(str(path)))
# It's not possible because our cryptography depedancy is missing
assert obj is None
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
@mock.patch('requests.post')
def test_plugin_fcm_edge_cases(mock_post):
"""
NotifyFCM() Edge Cases
"""
# Prepare a good response
response = mock.Mock()
response.status_code = requests.codes.ok
mock_post.return_value = response
# this tests an edge case where verify if the data_kwargs is a dictionary
# or not. Below, we don't even define it, so it will be None (causing
# the check to go). We'll still correctly instantiate a plugin:
obj = NotifyFCM("project", "api:123", targets='device')
assert obj is not None