XBOW-025-016 - SSRF via HTTP Attachment Addressed (#232)

This commit is contained in:
Chris Caron 2025-02-15 19:50:23 -05:00 committed by GitHub
parent c1151c969d
commit 1a35a72974
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 638 additions and 11 deletions

View File

@ -412,6 +412,9 @@ The use of environment variables allow you to provide over-rides to default sett
| `APPRISE_CONFIG_LOCK` | Locks down your API hosting so that you can no longer delete/update/access stateful information. Your configuration is still referenced when stateful calls are made to `/notify`. The idea of this switch is to allow someone to set their (Apprise) configuration up and then as an added security tactic, they may choose to lock their configuration down (in a read-only state). Those who use the Apprise CLI tool may still do it, however the `--config` (`-c`) switch will not successfully reference this access point anymore. You can however use the `apprise://` plugin without any problem ([see here for more details](https://github.com/caronc/apprise/wiki/Notify_apprise_api)). This defaults to `no` and can however be set to `yes` by simply defining the global variable as such. | `APPRISE_CONFIG_LOCK` | Locks down your API hosting so that you can no longer delete/update/access stateful information. Your configuration is still referenced when stateful calls are made to `/notify`. The idea of this switch is to allow someone to set their (Apprise) configuration up and then as an added security tactic, they may choose to lock their configuration down (in a read-only state). Those who use the Apprise CLI tool may still do it, however the `--config` (`-c`) switch will not successfully reference this access point anymore. You can however use the `apprise://` plugin without any problem ([see here for more details](https://github.com/caronc/apprise/wiki/Notify_apprise_api)). This defaults to `no` and can however be set to `yes` by simply defining the global variable as such.
| `APPRISE_DENY_SERVICES` | A comma separated set of entries identifying what plugins to deny access to. You only need to identify one schema entry associated with a plugin to in turn disable all of it. Hence, if you wanted to disable the `glib` plugin, you do not need to additionally include `qt` as well since it's included as part of the (`dbus`) package; consequently specifying `qt` would in turn disable the `glib` module as well (another way to accomplish the same task). To exclude/disable more the one upstream service, simply specify additional entries separated by a `,` (comma) or ` ` (space). The `APPRISE_DENY_SERVICES` entries are ignored if the `APPRISE_ALLOW_SERVICES` is identified. By default, this is initialized to `windows, dbus, gnome, macosx, syslog` (blocking local actions from being issued inside of the docker container) | `APPRISE_DENY_SERVICES` | A comma separated set of entries identifying what plugins to deny access to. You only need to identify one schema entry associated with a plugin to in turn disable all of it. Hence, if you wanted to disable the `glib` plugin, you do not need to additionally include `qt` as well since it's included as part of the (`dbus`) package; consequently specifying `qt` would in turn disable the `glib` module as well (another way to accomplish the same task). To exclude/disable more the one upstream service, simply specify additional entries separated by a `,` (comma) or ` ` (space). The `APPRISE_DENY_SERVICES` entries are ignored if the `APPRISE_ALLOW_SERVICES` is identified. By default, this is initialized to `windows, dbus, gnome, macosx, syslog` (blocking local actions from being issued inside of the docker container)
| `APPRISE_ALLOW_SERVICES` | A comma separated set of entries identifying what plugins to allow access to. You may only use alpha-numeric characters as is the restriction of Apprise Schemas (schema://) anyway. To exclusively include more the one upstream service, simply specify additional entries separated by a `,` (comma) or ` ` (space). The `APPRISE_DENY_SERVICES` entries are ignored if the `APPRISE_ALLOW_SERVICES` is identified. | `APPRISE_ALLOW_SERVICES` | A comma separated set of entries identifying what plugins to allow access to. You may only use alpha-numeric characters as is the restriction of Apprise Schemas (schema://) anyway. To exclusively include more the one upstream service, simply specify additional entries separated by a `,` (comma) or ` ` (space). The `APPRISE_DENY_SERVICES` entries are ignored if the `APPRISE_ALLOW_SERVICES` is identified.
| `APPRISE_ATTACH_ALLOW_URLS` | A comma separated set of entries identifying the HTTP Attach URLs the Apprise API shall always accept. Use wildcards such as `*` and `?` to help construct the URL/Hosts you identify. Use a space and/or a comma to identify more then one entry. By default this is set to `*` (Accept all provided URLs).
| `APPRISE_ATTACH_DENY_URLS` | A comma separated set of entries identifying the HTTP Attach URLs the Apprise API shall always reject. Use wildcards such as `*` and `?` to help construct the URL/Hosts you identify. The `APPRISE_ATTACH_DENY_URLS` is always processed before the `APPRISE_ATTACH_ALLOW_URLS` list. Use a space and/or a comma to identify more then one entry. By default this is set to `127.0.* localhost*`.
By default this
| `SECRET_KEY` | A Django variable acting as a *salt* for most things that require security. This API uses it for the hash sequences when writing the configuration files to disk (`hash` mode only). | `SECRET_KEY` | A Django variable acting as a *salt* for most things that require security. This API uses it for the hash sequences when writing the configuration files to disk (`hash` mode only).
| `ALLOWED_HOSTS` | A list of strings representing the host/domain names that this API can serve. This is a security measure to prevent HTTP Host header attacks, which are possible even under many seemingly-safe web server configurations. By default this is set to `*` allowing any host. Use space to delimit more than one host. | `ALLOWED_HOSTS` | A list of strings representing the host/domain names that this API can serve. This is a security measure to prevent HTTP Host header attacks, which are possible even under many seemingly-safe web server configurations. By default this is set to `*` allowing any host. Use space to delimit more than one host.
| `APPRISE_PLUGIN_PATHS` | Apprise supports the ability to define your own `schema://` definitions and load them. To read more about how you can create your own customizations, check out [this link here](https://github.com/caronc/apprise/wiki/decorator_notify). You may define one or more paths (separated by comma `,`) here. By default the `apprise_api/var/plugin` directory is scanned (which does not include anything). Feel free to set this to an empty string to disable any custom plugin loading. | `APPRISE_PLUGIN_PATHS` | Apprise supports the ability to define your own `schema://` definitions and load them. To read more about how you can create your own customizations, check out [this link here](https://github.com/caronc/apprise/wiki/decorator_notify). You may define one or more paths (separated by comma `,`) here. By default the `apprise_api/var/plugin` directory is scanned (which does not include anything). Feel free to set this to an empty string to disable any custom plugin loading.

View File

@ -218,11 +218,11 @@ class AttachmentTests(SimpleTestCase):
# Support multi entries # Support multi entries
attachment_payload = [ attachment_payload = [
{ {
'url': 'http://localhost/my.attachment.3', 'url': 'http://myserver/my.attachment.3',
}, { }, {
'url': 'http://localhost/my.attachment.2', 'url': 'http://myserver/my.attachment.2',
}, { }, {
'url': 'http://localhost/my.attachment.1', 'url': 'http://myserver/my.attachment.1',
} }
] ]
result = parse_attachments(attachment_payload, {}) result = parse_attachments(attachment_payload, {})
@ -352,18 +352,36 @@ class AttachmentTests(SimpleTestCase):
attachment_payload = [ attachment_payload = [
# Request several images # Request several images
"https://localhost/myotherfile.png", "https://myserver/myotherfile.png",
"https://localhost/myfile.png" "https://myserver/myfile.png"
] ]
result = parse_attachments(attachment_payload, {}) result = parse_attachments(attachment_payload, {})
assert isinstance(result, list) assert isinstance(result, list)
assert len(result) == 2 assert len(result) == 2
# A attachment set where our URLs are blocked
attachment_payload = [
# Request several images
"https://localhost.localdomain/myotherfile.png",
"http://localhost/myfile.png",
"http://127.0.0.1/myfile.png",
"https://127.0.0.3/myfile.png",
]
with self.assertRaises(ValueError):
# We have hosts that will be blocked
parse_attachments(attachment_payload, {})
# Test each
for ap in attachment_payload:
with self.assertRaises(ValueError):
# We have hosts that will be blocked
parse_attachments([ap], {})
attachment_payload = [{ attachment_payload = [{
# Request several images # Request several images
'url': "https://localhost/myotherfile.png", 'url': "https://myserver/myotherfile.png",
}, { }, {
'url': "https://localhost/myfile.png" 'url': "https://myserver/myfile.png"
}] }]
result = parse_attachments(attachment_payload, {}) result = parse_attachments(attachment_payload, {})
assert isinstance(result, list) assert isinstance(result, list)
@ -388,17 +406,17 @@ class AttachmentTests(SimpleTestCase):
# While we have a network in place, we're intentionally requesting # While we have a network in place, we're intentionally requesting
# URLs that do not exist (hopefully they don't anyway) as we want # URLs that do not exist (hopefully they don't anyway) as we want
# this test to fail. # this test to fail.
"https://localhost/garbage/abcd1.png", "https://myserver/garbage/abcd1.png",
"https://localhost/garbage/abcd2.png", "https://myserver/garbage/abcd2.png",
] ]
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
parse_attachments(attachment_payload, {}) parse_attachments(attachment_payload, {})
# Support url encoding # Support url encoding
attachment_payload = [{ attachment_payload = [{
'url': "https://localhost/garbage/abcd1.png", 'url': "https://myserver/garbage/abcd1.png",
}, { }, {
'url': "https://localhost/garbage/abcd2.png", 'url': "https://myserver/garbage/abcd2.png",
}] }]
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
parse_attachments(attachment_payload, {}) parse_attachments(attachment_payload, {})

View File

@ -0,0 +1,320 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2025 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
from django.test import SimpleTestCase
from ..urlfilter import AppriseURLFilter
class AttachmentTests(SimpleTestCase):
def test_apprise_url_filter(self):
"""
Test the apprise url filter
"""
# empty allow and deny lists
af = AppriseURLFilter('', '')
# Test garbage entries
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# These ar blocked too since we have no allow list
self.assertFalse(af.is_allowed('http://localhost'))
self.assertFalse(af.is_allowed('http://localhost'))
#
# We have a wildcard for accept all in our allow list
#
af = AppriseURLFilter('*', '')
# We still block junk
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# We however allow localhost now (caught with *)
self.assertTrue(af.is_allowed('http://localhost'))
self.assertTrue(af.is_allowed('http://localhost/resources'))
self.assertTrue(af.is_allowed('http://localhost/images'))
#
# Allow list accepts all, except we want to explicitely block https://localhost/resources
#
af = AppriseURLFilter('*', 'https://localhost/resources')
# We still block junk
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# Takeaway is https:// was blocked to resources, but not http:// request
# because it was explicitly identify as so:
self.assertTrue(af.is_allowed('http://localhost'))
self.assertTrue(af.is_allowed('http://localhost/resources'))
self.assertTrue(af.is_allowed('http://localhost/resources/sub/path/'))
self.assertFalse(af.is_allowed('https://localhost/resources'))
self.assertFalse(af.is_allowed('https://localhost/resources/sub/path/'))
self.assertTrue(af.is_allowed('http://localhost/images'))
#
# Allow list accepts all, except we want to explicitely block both
# https://localhost/resources and http://localhost/resources
#
af = AppriseURLFilter('*', 'localhost/resources')
# We still block junk
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# Takeaway is https:// was blocked to resources, but not http:// request
# because it was explicitly identify as so:
self.assertTrue(af.is_allowed('http://localhost'))
self.assertFalse(af.is_allowed('http://localhost/resources'))
self.assertFalse(af.is_allowed('http://localhost/resources/sub/path'))
self.assertFalse(af.is_allowed('https://localhost/resources'))
self.assertFalse(af.is_allowed('https://localhost/resources/sub/path/'))
self.assertTrue(af.is_allowed('http://localhost/images'))
#
# A more restrictive allow/block list
# https://localhost/resources and http://localhost/resources
#
af = AppriseURLFilter('https://localhost, http://myserver.*', 'localhost/resources')
# We still block junk
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# Explicitly only allows https
self.assertFalse(af.is_allowed('http://localhost'))
self.assertTrue(af.is_allowed('https://localhost'))
self.assertFalse(af.is_allowed('https://localhost:8000'))
self.assertTrue(af.is_allowed('https://localhost/images'))
self.assertFalse(af.is_allowed('https://localhost/resources'))
self.assertFalse(af.is_allowed('https://localhost/resources/sub/path/'))
self.assertFalse(af.is_allowed('http://localhost/resources'))
self.assertFalse(af.is_allowed('http://localhost/resources/sub/path'))
self.assertFalse(af.is_allowed('http://not-in-list'))
# Explicitly definition of allowed hostname prohibits the below from working:
self.assertFalse(af.is_allowed('localhost'))
#
# Testing of hostnames only and ports
#
af = AppriseURLFilter('localhost, myserver:3000', 'localhost/resources')
# We still block junk
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# all forms of localhost is allowed (provided there is no port)
self.assertTrue(af.is_allowed('http://localhost'))
self.assertTrue(af.is_allowed('https://localhost'))
self.assertFalse(af.is_allowed('https://localhost:8000'))
self.assertFalse(af.is_allowed('https://localhost:80'))
self.assertFalse(af.is_allowed('https://localhost:443'))
self.assertTrue(af.is_allowed('https://localhost/images'))
self.assertFalse(af.is_allowed('https://localhost/resources'))
self.assertFalse(af.is_allowed('https://localhost/resources/sub/path'))
self.assertFalse(af.is_allowed('http://localhost/resources'))
self.assertTrue(af.is_allowed('http://localhost/resourcesssssssss'))
self.assertFalse(af.is_allowed('http://localhost/resources/sub/path/'))
self.assertFalse(af.is_allowed('http://not-in-list'))
# myserver is only allowed if port is provided
self.assertFalse(af.is_allowed('http://myserver'))
self.assertFalse(af.is_allowed('https://myserver'))
self.assertTrue(af.is_allowed('http://myserver:3000'))
self.assertTrue(af.is_allowed('https://myserver:3000'))
# Open range of hosts allows these to be accepted:
self.assertTrue(af.is_allowed('localhost'))
self.assertTrue(af.is_allowed('myserver:3000'))
self.assertTrue(af.is_allowed('https://myserver:3000'))
self.assertTrue(af.is_allowed('http://myserver:3000'))
#
# Testing of hostnames only and ports but via URLs (explicit http://)
# Also tests path ending with `/` (slash)
#
af = AppriseURLFilter('http://localhost, http://myserver:3000', 'http://localhost/resources/')
# We still block junk
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# http://localhost acceptance only
self.assertTrue(af.is_allowed('http://localhost'))
self.assertFalse(af.is_allowed('https://localhost'))
self.assertFalse(af.is_allowed('http://localhost:8000'))
self.assertFalse(af.is_allowed('http://localhost:80'))
self.assertTrue(af.is_allowed('http://localhost/images'))
self.assertFalse(af.is_allowed('https://localhost/images'))
self.assertFalse(af.is_allowed('https://localhost/resources'))
self.assertFalse(af.is_allowed('https://localhost/resources/sub/path'))
self.assertFalse(af.is_allowed('http://localhost/resources'))
self.assertFalse(af.is_allowed('http://not-in-list'))
# myserver is only allowed if port is provided and http://
self.assertFalse(af.is_allowed('http://myserver'))
self.assertFalse(af.is_allowed('https://myserver'))
self.assertTrue(af.is_allowed('http://myserver:3000'))
self.assertFalse(af.is_allowed('https://myserver:3000'))
self.assertTrue(af.is_allowed('http://myserver:3000/path/'))
# Open range of hosts is no longer allowed due to explicit http:// reference
self.assertFalse(af.is_allowed('localhost'))
self.assertFalse(af.is_allowed('myserver:3000'))
self.assertFalse(af.is_allowed('https://myserver:3000'))
#
# Testing of hostnames only and ports but via URLs (explicit https://)
# Also tests path ending with `/` (slash)
#
af = AppriseURLFilter('https://localhost, https://myserver:3000', 'https://localhost/resources/')
# We still block junk
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# http://localhost acceptance only
self.assertTrue(af.is_allowed('https://localhost'))
self.assertFalse(af.is_allowed('http://localhost'))
self.assertFalse(af.is_allowed('localhost'))
self.assertFalse(af.is_allowed('https://localhost:8000'))
self.assertFalse(af.is_allowed('https://localhost:80'))
self.assertTrue(af.is_allowed('https://localhost/images'))
self.assertFalse(af.is_allowed('http://localhost/images'))
self.assertFalse(af.is_allowed('http://localhost/resources'))
self.assertFalse(af.is_allowed('http://localhost/resources/sub/path'))
self.assertFalse(af.is_allowed('https://localhost/resources'))
self.assertFalse(af.is_allowed('https://not-in-list'))
# myserver is only allowed if port is provided and http://
self.assertFalse(af.is_allowed('https://myserver'))
self.assertFalse(af.is_allowed('http://myserver'))
self.assertFalse(af.is_allowed('myserver'))
self.assertTrue(af.is_allowed('https://myserver:3000'))
self.assertFalse(af.is_allowed('http://myserver:3000'))
self.assertTrue(af.is_allowed('https://myserver:3000/path/'))
# Open range of hosts is no longer allowed due to explicit http:// reference
self.assertFalse(af.is_allowed('localhost'))
self.assertFalse(af.is_allowed('myserver:3000'))
self.assertFalse(af.is_allowed('http://myserver:3000'))
#
# Testing Regular Expressions
#
af = AppriseURLFilter('https://localhost/incoming/*/*', 'https://localhost/*/*/var')
# We still block junk
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# Very specific paths are supported now in https://localhost only:
self.assertFalse(af.is_allowed('https://localhost'))
self.assertFalse(af.is_allowed('http://localhost'))
self.assertFalse(af.is_allowed('https://localhost/incoming'))
self.assertFalse(af.is_allowed('https://localhost/incoming/dir1'))
self.assertFalse(af.is_allowed('https://localhost/incoming/dir1/'))
self.assertTrue(af.is_allowed('https://localhost/incoming/dir1/dir2'))
self.assertTrue(af.is_allowed('https://localhost/incoming/dir1/dir2/'))
self.assertFalse(af.is_allowed('http://localhost/incoming/dir1/dir2'))
self.assertFalse(af.is_allowed('http://localhost/incoming/dir1/dir2/'))
# our incoming directory we restricted
self.assertFalse(af.is_allowed('https://localhost/incoming/dir1/var'))
self.assertFalse(af.is_allowed('https://localhost/incoming/dir1/var/'))
self.assertFalse(af.is_allowed('https://localhost/incoming/dir1/var/sub/dir'))
self.assertFalse(af.is_allowed('https://localhost/incoming/dir1/var/sub'))
# Test the ? out
af = AppriseURLFilter('localhost?', '')
# Test garbage entries
self.assertFalse(af.is_allowed('$'))
self.assertFalse(af.is_allowed(b'13'))
self.assertFalse(af.is_allowed(u'Mālō e Lelei'))
self.assertFalse(af.is_allowed(''))
self.assertFalse(af.is_allowed(None))
self.assertFalse(af.is_allowed(True))
self.assertFalse(af.is_allowed(42))
# These are blocked too since we have no allow list
self.assertFalse(af.is_allowed('http://localhost'))
self.assertTrue(af.is_allowed('http://localhost1'))
self.assertTrue(af.is_allowed('https://localhost1'))
self.assertFalse(af.is_allowed('http://localhost%'))
self.assertFalse(af.is_allowed('http://localhost10'))
# conflicting elements cancel one another
af = AppriseURLFilter('localhost', 'localhost')
# These are blocked too since we have no allow list
self.assertFalse(af.is_allowed('localhost'))

View File

@ -0,0 +1,234 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2023 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
from apprise.utils.parse import parse_url
class AppriseURLFilter:
"""
A URL filtering class that uses pre-parsed and pre-compiled allow/deny lists.
Deny rules are always processed before allow rules. If a URL matches any deny rule,
it is immediately rejected. If no deny rule matches, then the URL is allowed only
if it matches an allow rule; otherwise, it is rejected.
Each entry in the allow/deny lists can be provided as:
- A full URL (with http:// or https://)
- A URL without a scheme (e.g. "localhost/resources")
- A plain hostname or IP
Wildcards:
- '*' will match any sequence of characters.
- '?' will match a single alphanumeric/dash/underscore character.
A trailing '*' is implied if not already present so that rules operate as a prefix match.
"""
def __init__(self, allow_list: str, deny_list: str):
# Pre-compile our rules.
# Each rule is stored as a tuple (compiled_regex, is_url_based)
# where `is_url_based` indicates if the token included "http://" or "https://"
self.allow_rules = self._parse_list(allow_list)
self.deny_rules = self._parse_list(deny_list)
def _parse_list(self, list_str: str):
"""
Split the list (tokens separated by whitespace or commas) and compile each token.
Tokens are classified as follows:
- URLbased tokens: if they start with http:// or https:// (explicit)
or if they contain a / (implicit; no scheme given).
- Hostbased tokens: those that do not contain a /.
Returns a list of tuples (compiled_regex, is_url_based).
"""
tokens = re.split(r'[\s,]+', list_str.strip().lower())
rules = []
for token in tokens:
if not token:
continue
if token.startswith("http://") or token.startswith("https://"):
# Explicit URL token.
compiled = self._compile_url_token(token)
is_url_based = True
elif "/" in token:
# Implicit URL token: prepend a scheme pattern.
compiled = self._compile_implicit_token(token)
is_url_based = True
else:
# Host-based token.
compiled = self._compile_host_token(token)
is_url_based = False
rules.append((compiled, is_url_based))
return rules
def _compile_url_token(self, token: str):
"""
Compiles a URLbased token (explicit token that starts with a scheme) into a regex.
An implied trailing wildcard is added to the path:
- If no path is given (or just /) then (/.*)? is appended.
- If a nonempty path is given that does not end with / or *, then ($|/.*) is appended.
- If the path ends with /, then the trailing slash is removed and (/.*)? is appended,
so that /resources and /resources/ are treated equivalently.
Also, if no port is specified in the host part, the regex ensures that no port is present.
"""
# Determine the scheme.
scheme_regex = ""
if token.startswith("http://"):
scheme_regex = r'http'
# drop http://
token = token[7:]
elif token.startswith("https://"):
scheme_regex = r'https'
# drop https://
token = token[8:]
else: # https?
# Used for implicit tokens; our _compile_implicit_token ensures this.
scheme_regex = r'https?'
# strip https?://
token = token[9:]
# Split token into host (and optional port) and path.
if "/" in token:
netloc, path = token.split("/", 1)
path = "/" + path
else:
netloc = token
path = ""
# Process netloc and port.
if ":" in netloc:
host, port = netloc.split(":", 1)
port_specified = True
else:
host = netloc
port_specified = False
regex = "^" + scheme_regex + "://"
regex += self._wildcard_to_regex(host, is_host=True)
if port_specified:
regex += ":" + re.escape(port)
else:
# Ensure no port is present.
regex += r"(?!:)"
# Process the path.
if path in ("", "/"):
regex += r"(/.*)?"
else:
if path.endswith("*"):
# Remove the trailing "*" and append .*
regex += self._wildcard_to_regex(path[:-1]) + "([^/]+/?)"
elif path.endswith("/"):
# Remove the trailing "/" and allow an optional slash with extra path.
norm = self._wildcard_to_regex(path.rstrip("/"))
regex += norm + r"(/.*)?"
else:
# For a nonempty path that does not end with "/" or "*",
# match either an exact match or a prefix (with a following slash).
norm = self._wildcard_to_regex(path)
regex += norm + r"($|/.*)"
regex += "$"
return re.compile(regex, re.IGNORECASE)
def _compile_implicit_token(self, token: str):
"""
For an implicit token (one that does not start with a scheme but contains a /),
prepend https?:// so that it matches both http and https, then compile it.
"""
new_token = "https?://" + token
return self._compile_url_token(new_token)
def _compile_host_token(self, token: str):
"""
Compiles a hostbased token (one with no /) into a regex.
Note: When matching hostbased tokens, we require that the URLs scheme is exactly http.
"""
regex = "^" + self._wildcard_to_regex(token) + "$"
return re.compile(regex, re.IGNORECASE)
def _wildcard_to_regex(self, pattern: str, is_host: bool = True) -> str:
"""
Converts a pattern containing wildcards into a regex.
- '*' becomes '.*' if host or [^/]+/? if path
- '?' becomes '[A-Za-z0-9_-]'
- Other characters are escaped.
Special handling: if the pattern starts with "https?://", that prefix is preserved
(so it can match either http:// or https://).
"""
regex = ""
for char in pattern:
if char == '*':
regex += r"[^/]+/?" if not is_host else r'.*'
elif char == '?':
regex += r'[^/]' if not is_host else r"[A-Za-z0-9_-]"
else:
regex += re.escape(char)
return regex
def is_allowed(self, url: str) -> bool:
"""
Checks a given URL against the deny list first, then the allow list.
For URL-based rules (explicit or implicit), the full URL is tested.
For host-based rules, the URLs netloc (which includes the port) is tested.
"""
parsed = parse_url(url, strict_port=True, simple=True)
if not parsed:
return False
# includes port if present
netloc = '%s:%d' % (parsed['host'], parsed.get('port')) if parsed.get('port') else parsed['host']
# Check deny rules first.
for pattern, is_url_based in self.deny_rules:
if is_url_based:
if pattern.match(url):
return False
elif pattern.match(netloc):
return False
# Then check allow rules.
for pattern, is_url_based in self.allow_rules:
if is_url_based:
if pattern.match(url):
return True
elif pattern.match(netloc):
return True
return False

View File

@ -36,6 +36,7 @@ import requests
from json import dumps from json import dumps
from django.conf import settings from django.conf import settings
from datetime import datetime from datetime import datetime
from . urlfilter import AppriseURLFilter
# import the logging library # import the logging library
import logging import logging
@ -83,6 +84,9 @@ A_MGR = apprise.manager_attachment.AttachmentManager()
# Access our Notification Manager Singleton # Access our Notification Manager Singleton
N_MGR = apprise.manager_plugins.NotificationManager() N_MGR = apprise.manager_plugins.NotificationManager()
# Prepare our Attachment URL Filter
ATTACH_URL_FILTER = AppriseURLFilter(settings.APPRISE_ATTACH_ALLOW_URLS, settings.APPRISE_ATTACH_DENY_URLS)
class Attachment(A_MGR['file']): class Attachment(A_MGR['file']):
""" """
@ -321,6 +325,12 @@ def parse_attachments(attachment_payload, files_request):
"Failed to load attachment " "Failed to load attachment "
"%d (not web request): %s" % (no, entry)) "%d (not web request): %s" % (no, entry))
if not ATTACH_URL_FILTER.is_allowed(entry):
# We are not allowed to use this entry
raise ValueError(
"Denied attachment "
"%d (blocked web request): %s" % (no, entry))
attachment = HTTPAttachment( attachment = HTTPAttachment(
filename, **A_MGR['http'].parse_url(entry)) filename, **A_MGR['http'].parse_url(entry))
if not attachment: if not attachment:
@ -343,6 +353,13 @@ def parse_attachments(attachment_payload, files_request):
elif isinstance(entry, dict) and \ elif isinstance(entry, dict) and \
AttachmentPayload.URL in entry: AttachmentPayload.URL in entry:
if not ATTACH_URL_FILTER.is_allowed(entry[AttachmentPayload.URL]):
# We are not allowed to use this entry
raise ValueError(
"Denied attachment "
"%d (blocked web request): %s" % (
no, entry[AttachmentPayload.URL]))
attachment = HTTPAttachment( attachment = HTTPAttachment(
filename, **A_MGR['http'] filename, **A_MGR['http']
.parse_url(entry[AttachmentPayload.URL])) .parse_url(entry[AttachmentPayload.URL]))

View File

@ -172,6 +172,41 @@ APPRISE_ATTACH_DIR = os.environ.get(
# The maximum file attachment size allowed by the API (defined in MB) # The maximum file attachment size allowed by the API (defined in MB)
APPRISE_ATTACH_SIZE = int(os.environ.get('APPRISE_ATTACH_SIZE', 200)) * 1048576 APPRISE_ATTACH_SIZE = int(os.environ.get('APPRISE_ATTACH_SIZE', 200)) * 1048576
# A provided list that identify all of the URLs/Hosts/IPs that Apprise can
# retrieve remote attachments from.
#
# Processing Order:
# - The DENY list is always processed before the ALLOW list.
# * If a match is found, processing stops, and the URL attachment is ignored
# - The ALLOW list is ONLY processed if there was no match found on the DENY list
# - If there is no match found on the ALLOW List, then the Attachment URL is ignored
# * Not matching anything on the ALLOW list is effectively treated as a DENY
#
# Lists are both processed from top down (stopping on first match)
# Use the following rules when constructing your ALLOW/DENY entries:
# - Entries are separated with either a comma (,) and/or a space
# - Entries can start with http:// or https:// (enforcing URL security HTTPS as part of check)
# - IPs or hostnames provided is the preferred approach if it doesn't matter if the entry is
# https:// or http://
# - * wildcards are allowed. * means nothing or anything else that follows.
# - ? placeholder wildcards are allowed (identifying the explicit placeholder
# of an alpha/numeric/dash/underscore character)
#
# Notes of interest:
# - If the list is empty, then attachments can not be retrieved via URL at all.
# - If the URL to be attached is not found or matched against an entry in this list then
# the URL based attachment is ignored and is not retrieved at all.
# - Set the list to * (a single astrix) to match all URLs and accepting all provided
# matches
APPRISE_ATTACH_DENY_URLS = \
os.environ.get('APPRISE_ATTACH_REJECT_URL', '127.0.* localhost*').lower()
# The Allow list which is processed after the Deny list above
APPRISE_ATTACH_ALLOW_URLS = \
os.environ.get('APPRISE_ATTACH_ALLOW_URL', '*').lower()
# The maximum size in bytes that a request body may be before raising an error # The maximum size in bytes that a request body may be before raising an error
# (defined in MB) # (defined in MB)
DATA_UPLOAD_MAX_MEMORY_SIZE = abs(int(os.environ.get( DATA_UPLOAD_MAX_MEMORY_SIZE = abs(int(os.environ.get(