Improved variable parsing in YAML files (#1088)

This commit is contained in:
Chris Caron 2024-03-29 16:16:40 -04:00 committed by GitHub
parent f55f691a1f
commit 2c5341a2a5
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 154 additions and 51 deletions

View File

@ -669,6 +669,79 @@ class URLBase:
'verify': 'yes' if self.verify_certificate else 'no', 'verify': 'yes' if self.verify_certificate else 'no',
} }
@staticmethod
def post_process_parse_url_results(results):
"""
After parsing the URL, this function applies a bit of extra logic to
support extra entries like `pass` becoming `password`, etc
This function assumes that parse_url() was called previously setting
up the basics to be checked
"""
# if our URL ends with an 's', then assume our secure flag is set.
results['secure'] = (results['schema'][-1] == 's')
# QSD Checking (over-rides all)
qsd_exists = True if isinstance(results.get('qsd'), dict) else False
if qsd_exists and 'verify' in results['qsd']:
# Pulled from URL String
results['verify'] = parse_bool(
results['qsd'].get('verify', True))
elif 'verify' in results:
# Pulled from YAML Configuratoin
results['verify'] = parse_bool(results.get('verify', True))
else:
# Support SSL Certificate 'verify' keyword. Default to being
# enabled
results['verify'] = True
# Password overrides
if 'pass' in results:
results['password'] = results['pass']
del results['pass']
if qsd_exists:
if 'password' in results['qsd']:
results['password'] = results['qsd']['password']
if 'pass' in results['qsd']:
results['password'] = results['qsd']['pass']
# User overrides
if 'user' in results['qsd']:
results['user'] = results['qsd']['user']
# parse_url() always creates a 'password' and 'user' entry in the
# results returned. Entries are set to None if they weren't
# specified
if results['password'] is None and 'user' in results['qsd']:
# Handle cases where the user= provided in 2 locations, we want
# the original to fall back as a being a password (if one
# wasn't otherwise defined) e.g.
# mailtos://PASSWORD@hostname?user=admin@mail-domain.com
# - in the above, the PASSWORD gets lost in the parse url()
# since a user= over-ride is specified.
presults = parse_url(results['url'])
if presults:
# Store our Password
results['password'] = presults['user']
# Store our socket read timeout if specified
if 'rto' in results['qsd']:
results['rto'] = results['qsd']['rto']
# Store our socket connect timeout if specified
if 'cto' in results['qsd']:
results['cto'] = results['qsd']['cto']
if 'port' in results['qsd']:
results['port'] = results['qsd']['port']
return results
@staticmethod @staticmethod
def parse_url(url, verify_host=True, plus_to_space=False, def parse_url(url, verify_host=True, plus_to_space=False,
strict_port=False): strict_port=False):
@ -698,53 +771,7 @@ class URLBase:
# We're done; we failed to parse our url # We're done; we failed to parse our url
return results return results
# if our URL ends with an 's', then assume our secure flag is set. return URLBase.post_process_parse_url_results(results)
results['secure'] = (results['schema'][-1] == 's')
# Support SSL Certificate 'verify' keyword. Default to being enabled
results['verify'] = True
if 'verify' in results['qsd']:
results['verify'] = parse_bool(
results['qsd'].get('verify', True))
# Password overrides
if 'password' in results['qsd']:
results['password'] = results['qsd']['password']
if 'pass' in results['qsd']:
results['password'] = results['qsd']['pass']
# User overrides
if 'user' in results['qsd']:
results['user'] = results['qsd']['user']
# parse_url() always creates a 'password' and 'user' entry in the
# results returned. Entries are set to None if they weren't specified
if results['password'] is None and 'user' in results['qsd']:
# Handle cases where the user= provided in 2 locations, we want
# the original to fall back as a being a password (if one wasn't
# otherwise defined)
# e.g.
# mailtos://PASSWORD@hostname?user=admin@mail-domain.com
# - the PASSWORD gets lost in the parse url() since a user=
# over-ride is specified.
presults = parse_url(results['url'])
if presults:
# Store our Password
results['password'] = presults['user']
# Store our socket read timeout if specified
if 'rto' in results['qsd']:
results['rto'] = results['qsd']['rto']
# Store our socket connect timeout if specified
if 'cto' in results['qsd']:
results['cto'] = results['qsd']['cto']
if 'port' in results['qsd']:
results['port'] = results['qsd']['port']
return results
@staticmethod @staticmethod
def http_response_code_lookup(code, response_mask=None): def http_response_code_lookup(code, response_mask=None):

View File

@ -1184,6 +1184,9 @@ class ConfigBase(URLBase):
# Prepare our Asset Object # Prepare our Asset Object
_results['asset'] = asset _results['asset'] = asset
# Handle post processing of result set
_results = URLBase.post_process_parse_url_results(_results)
# Store our preloaded entries # Store our preloaded entries
preloaded.append({ preloaded.append({
'results': _results, 'results': _results,

View File

@ -147,6 +147,10 @@ class CustomNotifyPlugin(NotifyBase):
self._default_args = {} self._default_args = {}
# Some variables do not need to be set
if 'secure' in kwargs:
del kwargs['secure']
# Apply our updates based on what was parsed # Apply our updates based on what was parsed
dict_full_update(self._default_args, self._base_args) dict_full_update(self._default_args, self._base_args)
dict_full_update(self._default_args, kwargs) dict_full_update(self._default_args, kwargs)

View File

@ -29,6 +29,7 @@
from os.path import dirname from os.path import dirname
from os.path import join from os.path import join
from apprise.decorators import notify from apprise.decorators import notify
from apprise.decorators.CustomNotifyPlugin import CustomNotifyPlugin
from apprise import Apprise from apprise import Apprise
from apprise import AppriseConfig from apprise import AppriseConfig
from apprise import AppriseAsset from apprise import AppriseAsset
@ -351,7 +352,7 @@ def test_notify_multi_instance_decoration(tmpdir):
t = tmpdir.mkdir("multi-test").join("apprise.yml") t = tmpdir.mkdir("multi-test").join("apprise.yml")
t.write("""urls: t.write("""urls:
- multi://user1:pass@hostname - multi://user1:pass@hostname
- multi://user2:pass2@hostname - multi://user2:pass2@hostname?verify=no
""") """)
# Create ourselves a config object # Create ourselves a config object
@ -404,11 +405,12 @@ def test_notify_multi_instance_decoration(tmpdir):
assert 'tag' in meta assert 'tag' in meta
assert isinstance(meta['tag'], set) assert isinstance(meta['tag'], set)
assert len(meta) == 7 assert len(meta) == 8
# We carry all of our default arguments from the @notify's initialization # We carry all of our default arguments from the @notify's initialization
assert meta['schema'] == 'multi' assert meta['schema'] == 'multi'
assert meta['host'] == 'hostname' assert meta['host'] == 'hostname'
assert meta['user'] == 'user1' assert meta['user'] == 'user1'
assert meta['verify'] is True
assert meta['password'] == 'pass' assert meta['password'] == 'pass'
# Verify our URL is correct # Verify our URL is correct
@ -441,15 +443,24 @@ def test_notify_multi_instance_decoration(tmpdir):
assert 'tag' in meta assert 'tag' in meta
assert isinstance(meta['tag'], set) assert isinstance(meta['tag'], set)
assert len(meta) == 7 assert len(meta) == 9
# We carry all of our default arguments from the @notify's initialization # We carry all of our default arguments from the @notify's initialization
assert meta['schema'] == 'multi' assert meta['schema'] == 'multi'
assert meta['host'] == 'hostname' assert meta['host'] == 'hostname'
assert meta['user'] == 'user2' assert meta['user'] == 'user2'
assert meta['password'] == 'pass2' assert meta['password'] == 'pass2'
assert meta['verify'] is False
assert meta['qsd']['verify'] == 'no'
# Verify our URL is correct # Verify our URL is correct
assert meta['url'] == 'multi://user2:pass2@hostname' assert meta['url'] == 'multi://user2:pass2@hostname?verify=no'
# Tidy # Tidy
N_MGR.remove('multi') N_MGR.remove('multi')
def test_custom_notify_plugin_decoration():
"""decorators: CustomNotifyPlugin testing
"""
CustomNotifyPlugin()

View File

@ -30,6 +30,7 @@ import logging
import os import os
import re import re
from unittest import mock from unittest import mock
from inspect import cleandoc
import smtplib import smtplib
from email.header import decode_header from email.header import decode_header
@ -37,6 +38,8 @@ from email.header import decode_header
from apprise import NotifyType, NotifyBase from apprise import NotifyType, NotifyBase
from apprise import Apprise from apprise import Apprise
from apprise import AttachBase from apprise import AttachBase
from apprise.AppriseAsset import AppriseAsset
from apprise.config.ConfigBase import ConfigBase
from apprise import AppriseAttachment from apprise import AppriseAttachment
from apprise.plugins.NotifyEmail import NotifyEmail from apprise.plugins.NotifyEmail import NotifyEmail
from apprise.plugins import NotifyEmail as NotifyEmailModule from apprise.plugins import NotifyEmail as NotifyEmailModule
@ -1757,3 +1760,58 @@ def test_plugin_email_formatting_990(mock_smtp, mock_smtp_ssl):
assert len(obj.targets) == 1 assert len(obj.targets) == 1
assert (False, 'me@mydomain.com') in obj.targets assert (False, 'me@mydomain.com') in obj.targets
def test_plugin_email_variables_1087():
"""
NotifyEmail() GitHub Issue 1087
https://github.com/caronc/apprise/issues/1087
Email variables reported not working correctly
"""
# Valid Configuration
result, _ = ConfigBase.config_parse(cleandoc("""
#
# Test Email Parsing
#
urls:
- mailtos://alt.lan/:
- user: testuser@alt.lan
pass: xxxxXXXxxx
smtp: smtp.alt.lan
to: alteriks@alt.lan
"""), asset=AppriseAsset())
assert isinstance(result, list)
assert len(result) == 1
email = result[0]
assert email.from_addr == ['Apprise', 'testuser@alt.lan']
assert email.user == 'testuser@alt.lan'
assert email.smtp_host == 'smtp.alt.lan'
assert email.targets == [(False, 'alteriks@alt.lan')]
assert email.password == 'xxxxXXXxxx'
# Valid Configuration
result, _ = ConfigBase.config_parse(cleandoc("""
#
# Test Email Parsing where qsd over-rides all
#
urls:
- mailtos://alt.lan/?pass=abcd&user=joe@alt.lan:
- user: testuser@alt.lan
pass: xxxxXXXxxx
smtp: smtp.alt.lan
to: alteriks@alt.lan
"""), asset=AppriseAsset())
assert isinstance(result, list)
assert len(result) == 1
email = result[0]
assert email.from_addr == ['Apprise', 'joe@alt.lan']
assert email.user == 'joe@alt.lan'
assert email.smtp_host == 'smtp.alt.lan'
assert email.targets == [(False, 'alteriks@alt.lan')]
assert email.password == 'abcd'