added attachment framework

This commit is contained in:
Chris Caron 2019-11-10 01:10:03 -05:00
parent 5ee9065b5b
commit 25d56d3fad
16 changed files with 1982 additions and 11 deletions

View File

@ -10,7 +10,7 @@ To inform or tell (someone). To make one aware of something.
* One notification library to rule them all. * One notification library to rule them all.
* A common and intuitive notification syntax. * A common and intuitive notification syntax.
* Supports the handling of images (to the notification services that will accept them). * Supports the handling of images and attachments (to the notification services that will accept them).
System owners who wish to provide a notification service no longer need to research each and every new one as they appear. They just need to include this one library and then they can immediately gain access to almost all of the notifications services available to us today. System owners who wish to provide a notification service no longer need to research each and every new one as they appear. They just need to include this one library and then they can immediately gain access to almost all of the notifications services available to us today.

View File

@ -38,6 +38,7 @@ from .logger import logger
from .AppriseAsset import AppriseAsset from .AppriseAsset import AppriseAsset
from .AppriseConfig import AppriseConfig from .AppriseConfig import AppriseConfig
from .AppriseAttachment import AppriseAttachment
from .AppriseLocale import AppriseLocale from .AppriseLocale import AppriseLocale
from .config.ConfigBase import ConfigBase from .config.ConfigBase import ConfigBase
from .plugins.NotifyBase import NotifyBase from .plugins.NotifyBase import NotifyBase
@ -277,7 +278,7 @@ class Apprise(object):
return return
def notify(self, body, title='', notify_type=NotifyType.INFO, def notify(self, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=MATCH_ALL_TAG): body_format=None, tag=MATCH_ALL_TAG, attach=None):
""" """
Send a notification to all of the plugins previously loaded. Send a notification to all of the plugins previously loaded.
@ -293,6 +294,10 @@ class Apprise(object):
sent, False if even just one of them fails, and None if no sent, False if even just one of them fails, and None if no
notifications were sent at all as a result of tag filtering and/or notifications were sent at all as a result of tag filtering and/or
simply having empty configuration files that were read. simply having empty configuration files that were read.
Attach can contain a list of attachment URLs. attach can also be
represented by a an AttachBase() (or list of) object(s). This
identifies the products you wish to notify
""" """
if len(self) == 0: if len(self) == 0:
@ -309,6 +314,10 @@ class Apprise(object):
# Tracks conversions # Tracks conversions
conversion_map = dict() conversion_map = dict()
# Prepare attachments
if attach is not None and not isinstance(attach, AppriseAttachment):
attach = AppriseAttachment(attach, asset=self.asset)
# Iterate over our loaded plugins # Iterate over our loaded plugins
for server in self.find(tag): for server in self.find(tag):
if status is None: if status is None:
@ -371,7 +380,8 @@ class Apprise(object):
if not server.notify( if not server.notify(
body=conversion_map[server.notify_format], body=conversion_map[server.notify_format],
title=title, title=title,
notify_type=notify_type): notify_type=notify_type,
attach=attach):
# Toggle our return status flag # Toggle our return status flag
status = False status = False

View File

@ -0,0 +1,229 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
from . import attachment
from . import AttachBase
from . import URLBase
from .AppriseAsset import AppriseAsset
from .logger import logger
from .utils import GET_SCHEMA_RE
class AppriseAttachment(object):
"""
Our Apprise Attachment File Manager
"""
def __init__(self, paths=None, asset=None, **kwargs):
"""
Loads all of the paths/urls specified (if any).
The path can either be a single string identifying one explicit
location, otherwise you can pass in a series of locations to scan
via a list.
"""
# Initialize our attachment listings
self.attachments = list()
# Prepare our Asset Object
self.asset = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
# Now parse any paths specified
if paths is not None:
# Store our path(s)
self.add(paths)
def add(self, attachments, asset=None, db=None):
"""
Adds one or more attachments into our list.
"""
# Initialize our return status
return_status = True
if isinstance(asset, AppriseAsset):
# prepare default asset
asset = self.asset
if isinstance(attachments, AttachBase):
# Go ahead and just add our attachments into our list
self.attachments.append(attachments)
return True
elif isinstance(attachments, six.string_types):
# Save our path
attachments = (attachments, )
elif not isinstance(attachments, (tuple, set, list)):
logger.error(
'An invalid attachment url (type={}) was '
'specified.'.format(type(attachments)))
return False
# Iterate over our attachments
for _attachment in attachments:
if isinstance(_attachment, AttachBase):
# Go ahead and just add our attachment into our list
self.attachments.append(_attachment)
continue
elif not isinstance(_attachment, six.string_types):
logger.warning(
"An invalid attachment (type={}) was specified.".format(
type(_attachment)))
return_status = False
continue
logger.debug("Loading attachment: {}".format(_attachment))
# Instantiate ourselves an object, this function throws or
# returns None if it fails
instance = AppriseAttachment.instantiate(_attachment, asset=asset)
if not isinstance(instance, AttachBase):
return_status = False
continue
# Add our initialized plugin to our server listings
self.attachments.append(instance)
# Return our status
return return_status
@staticmethod
def instantiate(url, asset=None, suppress_exceptions=True):
"""
Returns the instance of a instantiated attachment plugin based on
the provided Attachment URL. If the url fails to be parsed, then None
is returned.
"""
# Attempt to acquire the schema at the very least to allow our
# attachment based urls.
schema = GET_SCHEMA_RE.match(url)
if schema is None:
# Plan B is to assume we're dealing with a file
schema = attachment.AttachFile.protocol
url = '{}://{}'.format(schema, URLBase.quote(url))
else:
# Ensure our schema is always in lower case
schema = schema.group('schema').lower()
# Some basic validation
if schema not in attachment.SCHEMA_MAP:
logger.warning('Unsupported schema {}.'.format(schema))
return None
# Parse our url details of the server object as dictionary containing
# all of the information parsed from our URL
results = attachment.SCHEMA_MAP[schema].parse_url(url)
if not results:
# Failed to parse the server URL
logger.warning('Unparseable URL {}.'.format(url))
return None
# Prepare our Asset Object
results['asset'] = \
asset if isinstance(asset, AppriseAsset) else AppriseAsset()
if suppress_exceptions:
try:
# Attempt to create an instance of our plugin using the parsed
# URL information
attach_plugin = \
attachment.SCHEMA_MAP[results['schema']](**results)
except Exception:
# the arguments are invalid or can not be used.
logger.warning('Could not load URL: %s' % url)
return None
else:
# Attempt to create an instance of our plugin using the parsed
# URL information but don't wrap it in a try catch
attach_plugin = attachment.SCHEMA_MAP[results['schema']](**results)
return attach_plugin
def clear(self):
"""
Empties our attachment list
"""
self.attachments[:] = []
def size(self):
"""
Returns the total size of accumulated attachments
"""
return sum([len(a) for a in self.attachments if len(a) > 0])
def pop(self, index=-1):
"""
Removes an indexed Apprise Attachment from the stack and returns it.
by default the last element is poped from the list
"""
# Remove our entry
return self.attachments.pop(index)
def __getitem__(self, index):
"""
Returns the indexed entry of a loaded apprise attachments
"""
return self.attachments[index]
def __bool__(self):
"""
Allows the Apprise object to be wrapped in an Python 3.x based 'if
statement'. True is returned if at least one service has been loaded.
"""
return True if self.attachments else False
def __nonzero__(self):
"""
Allows the Apprise object to be wrapped in an Python 2.x based 'if
statement'. True is returned if at least one service has been loaded.
"""
return True if self.attachments else False
def __iter__(self):
"""
Returns an iterator to our attachment list
"""
return iter(self.attachments)
def __len__(self):
"""
Returns the number of attachment entries loaded
"""
return len(self.attachments)

View File

@ -46,10 +46,12 @@ from .URLBase import URLBase
from .URLBase import PrivacyMode from .URLBase import PrivacyMode
from .plugins.NotifyBase import NotifyBase from .plugins.NotifyBase import NotifyBase
from .config.ConfigBase import ConfigBase from .config.ConfigBase import ConfigBase
from .attachment.AttachBase import AttachBase
from .Apprise import Apprise from .Apprise import Apprise
from .AppriseAsset import AppriseAsset from .AppriseAsset import AppriseAsset
from .AppriseConfig import AppriseConfig from .AppriseConfig import AppriseConfig
from .AppriseAttachment import AppriseAttachment
# Set default logging handler to avoid "No handler found" warnings. # Set default logging handler to avoid "No handler found" warnings.
import logging import logging
@ -58,8 +60,8 @@ logging.getLogger(__name__).addHandler(NullHandler())
__all__ = [ __all__ = [
# Core # Core
'Apprise', 'AppriseAsset', 'AppriseConfig', 'URLBase', 'NotifyBase', 'Apprise', 'AppriseAsset', 'AppriseConfig', 'AppriseAttachment', 'URLBase',
'ConfigBase', 'NotifyBase', 'ConfigBase', 'AttachBase',
# Reference # Reference
'NotifyType', 'NotifyImageSize', 'NotifyFormat', 'OverflowMode', 'NotifyType', 'NotifyImageSize', 'NotifyFormat', 'OverflowMode',

View File

@ -0,0 +1,250 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import os
import mimetypes
from ..URLBase import URLBase
class AttachBase(URLBase):
"""
This is the base class for all supported attachment types
"""
# For attachment type detection; this amount of data is read into memory
# 128KB (131072B)
max_detect_buffer_size = 131072
# Unknown mimetype
unknown_mimetype = 'application/octet-stream'
# Our filename when we can't otherwise determine one
unknown_filename = 'apprise-attachment'
# Our filename extension when we can't otherwise determine one
unknown_filename_extension = '.obj'
# The strict argument is a flag specifying whether the list of known MIME
# types is limited to only the official types registered with IANA. When
# strict is True, only the IANA types are supported; when strict is False
# (the default), some additional non-standard but commonly used MIME types
# are also recognized.
strict = False
# The maximum file-size we will accept for an attachment size. If this is
# set to zero (0), then no check is performed
# 1 MB = 1048576 bytes
# 5 MB = 5242880 bytes
max_file_size = 5242880
def __init__(self, name=None, mimetype=None, **kwargs):
"""
Initialize some general logging and common server arguments that will
keep things consistent when working with the configurations that
inherit this class.
Optionally provide a filename to over-ride name associated with the
actual file retrieved (from where-ever).
The mime-type is automatically detected, but you can over-ride this by
explicitly stating what it should be.
"""
super(AttachBase, self).__init__(**kwargs)
if not mimetypes.inited:
# Ensure mimetypes has been initialized
mimetypes.init()
# Attach Filename (does not have to be the same as path)
self._name = name
# The mime type of the attached content. This is detected if not
# otherwise specified.
self._mimetype = mimetype
# The detected_mimetype, this is only used as a fallback if the
# mimetype wasn't forced by the user
self.detected_mimetype = None
# The detected filename by calling child class. A detected filename
# is always used if no force naming was specified.
self.detected_name = None
# Absolute path to attachment
self.download_path = None
# Validate mimetype if specified
if self._mimetype:
if next((t for t in mimetypes.types_map.values()
if self._mimetype == t), None) is None:
err = 'An invalid mime-type ({}) was specified.'.format(
mimetype)
self.logger.warning(err)
raise TypeError(err)
return
@property
def path(self):
"""
Returns the absolute path to the filename
"""
if self.download_path:
# return our fixed content
return self.download_path
if not self.download():
return None
return self.download_path
@property
def name(self):
"""
Returns the filename
"""
if self._name:
# return our fixed content
return self._name
if not self.detected_name and not self.download():
# we could not obtain our name
return None
if not self.detected_name:
# If we get here, our download was successful but we don't have a
# filename based on our content.
extension = mimetypes.guess_extension(self.mimetype)
self.detected_name = '{}{}'.format(
self.unknown_filename,
extension if extension else self.unknown_filename_extension)
return self.detected_name
@property
def mimetype(self):
"""
Returns mime type (if one is present).
Content is cached once determied to prevent overhead of future
calls.
"""
if self._mimetype:
# return our pre-calculated cached content
return self._mimetype
if not self.detected_mimetype and not self.download():
# we could not obtain our name
return None
if not self.detected_mimetype:
# guess_type() returns: (type, encoding) and sets type to None
# if it can't otherwise determine it.
try:
# Directly reference _name and detected_name to prevent
# recursion loop (as self.name calls this function)
self.detected_mimetype, _ = mimetypes.guess_type(
self._name if self._name
else self.detected_name, strict=self.strict)
except TypeError:
# Thrown if None was specified in filename section
pass
# Return our mime type
return self.detected_mimetype \
if self.detected_mimetype else self.unknown_mimetype
def download(self):
"""
This function must be over-ridden by inheriting classes.
Inherited classes should populate:
- detected_name : Should identify a human readable filename
- download_path: Must contain a absolute path to content
- detected_mimetype: Should identify mimetype of content
"""
raise NotImplementedError(
"download() is implimented by the child class.")
@staticmethod
def parse_url(url, verify_host=True, mimetype_db=None):
"""Parses the URL and returns it broken apart into a dictionary.
This is very specific and customized for Apprise.
Args:
url (str): The URL you want to fully parse.
verify_host (:obj:`bool`, optional): a flag kept with the parsed
URL which some child classes will later use to verify SSL
keys (if SSL transactions take place). Unless under very
specific circumstances, it is strongly recomended that
you leave this default value set to True.
Returns:
A dictionary is returned containing the URL fully parsed if
successful, otherwise None is returned.
"""
results = URLBase.parse_url(url, verify_host=verify_host)
if not results:
# We're done; we failed to parse our url
return results
# Allow overriding the default config mime type
if 'mime' in results['qsd']:
results['mimetype'] = results['qsd'].get('mime', '') \
.strip().lower()
# Allow overriding the default file name
if 'name' in results['qsd']:
results['name'] = results['qsd'].get('name', '') \
.strip().lower()
return results
def __len__(self):
"""
Returns the filesize of the attachment.
"""
return os.path.getsize(self.path) if self.path else 0
def __bool__(self):
"""
Allows the Apprise object to be wrapped in an Python 3.x based 'if
statement'. True is returned if our content was downloaded correctly.
"""
return True if self.path else False
def __nonzero__(self):
"""
Allows the Apprise object to be wrapped in an Python 2.x based 'if
statement'. True is returned if our content was downloaded correctly.
"""
return True if self.path else False

View File

@ -0,0 +1,123 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import os
from .AttachBase import AttachBase
from ..AppriseLocale import gettext_lazy as _
class AttachFile(AttachBase):
"""
A wrapper for File based attachment sources
"""
# The default descriptive name associated with the service
service_name = _('Local File')
# The default protocol
protocol = 'file'
def __init__(self, path, **kwargs):
"""
Initialize Local File Attachment Object
"""
super(AttachFile, self).__init__(**kwargs)
# Store path but mark it dirty since we have not performed any
# verification at this point.
self.dirty_path = os.path.expanduser(path)
return
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {}
if self._mimetype:
# A mime-type was enforced
args['mime'] = self._mimetype
if self._name:
# A name was enforced
args['name'] = self._name
return 'file://{path}{args}'.format(
path=self.quote(self.dirty_path),
args='?{}'.format(self.urlencode(args)) if args else '',
)
def download(self, **kwargs):
"""
Perform retrieval of our data.
For file base attachments, our data already exists, so we only need to
validate it.
"""
if not os.path.isfile(self.dirty_path):
return False
if self.max_file_size > 0 and \
os.path.getsize(self.dirty_path) > self.max_file_size:
# The content to attach is to large
self.logger.error(
'Content exceeds allowable maximum file length '
'({}KB): {}'.format(
int(self.max_file_size / 1024), self.url(privacy=True)))
# Return False (signifying a failure)
return False
# We're good to go if we get here. Set our minimum requirements of
# a call do download() before returning a success
self.download_path = self.dirty_path
self.detected_name = os.path.basename(self.download_path)
return True
@staticmethod
def parse_url(url):
"""
Parses the URL so that we can handle all different file paths
and return it as our path object
"""
results = AttachBase.parse_url(url, verify_host=False)
if not results:
# We're done early; it's not a good URL
return results
match = re.match(r'file://(?P<path>[^?]+)(\?.*)?', url, re.I)
if not match:
return None
results['path'] = AttachFile.unquote(match.group('path'))
return results

View File

@ -0,0 +1,314 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import os
import six
import requests
from tempfile import NamedTemporaryFile
from .AttachBase import AttachBase
from ..URLBase import PrivacyMode
from ..AppriseLocale import gettext_lazy as _
class AttachHTTP(AttachBase):
"""
A wrapper for HTTP based attachment sources
"""
# The default descriptive name associated with the service
service_name = _('Web Based')
# The default protocol
protocol = 'http'
# The default secure protocol
secure_protocol = 'https'
# The maximum number of seconds to wait for a connection to be established
# before out-right just giving up
connection_timeout_sec = 5.0
# The number of bytes in memory to read from the remote source at a time
chunk_size = 8192
def __init__(self, headers=None, **kwargs):
"""
Initialize HTTP Object
headers can be a dictionary of key/value pairs that you want to
additionally include as part of the server headers to post with
"""
super(AttachHTTP, self).__init__(**kwargs)
self.schema = 'https' if self.secure else 'http'
self.fullpath = kwargs.get('fullpath')
if not isinstance(self.fullpath, six.string_types):
self.fullpath = '/'
self.headers = {}
if headers:
# Store our extra headers
self.headers.update(headers)
# Where our content is written to upon a call to download.
self._temp_file = None
return
def download(self, **kwargs):
"""
Perform retrieval of the configuration based on the specified request
"""
if self._temp_file is not None:
# There is nothing to do; we're already pointing at our downloaded
# content
return True
# prepare header
headers = {
'User-Agent': self.app_id,
}
# Apply any/all header over-rides defined
headers.update(self.headers)
auth = None
if self.user:
auth = (self.user, self.password)
url = '%s://%s' % (self.schema, self.host)
if isinstance(self.port, int):
url += ':%d' % self.port
url += self.fullpath
self.logger.debug('HTTP POST URL: %s (cert_verify=%r)' % (
url, self.verify_certificate,
))
# Where our request object will temporarily live.
r = None
# Always call throttle before any remote server i/o is made
self.throttle()
try:
# Make our request
with requests.get(
url,
headers=headers,
auth=auth,
verify=self.verify_certificate,
timeout=self.connection_timeout_sec,
stream=True) as r:
# Handle Errors
r.raise_for_status()
# Store our response if within reason
if self.max_file_size > 0 \
and r.headers.get(
'Content-Length', 0) > self.max_file_size:
# The content retrieved is to large
self.logger.error(
'HTTP response exceeds allowable maximum file length '
'({}KB): {}'.format(
int(self.max_file_size / 1024),
self.url(privacy=True)))
# Return False (signifying a failure)
return False
# Detect config format based on mime if the format isn't
# already enforced
self.detected_mimetype = r.headers.get('Content-Type')
d = r.headers.get('Content-Disposition', '')
result = re.search(
"filename=['\"]?(?P<name>[^'\"]+)['\"]?", d, re.I)
if result:
self.detected_name = result.group('name').strip()
# Create a temporary file to work with
self._temp_file = NamedTemporaryFile()
# Get our chunk size
chunk_size = self.chunk_size
# Track all bytes written to disk
bytes_written = 0
# If we get here, we can now safely write our content to disk
for chunk in r.iter_content(chunk_size=chunk_size):
# filter out keep-alive chunks
if chunk:
self._temp_file.write(chunk)
bytes_written = self._temp_file.tell()
# Prevent a case where Content-Length isn't provided
# we don't want to fetch beyond our limits
if self.max_file_size > 0:
if bytes_written > self.max_file_size:
# The content retrieved is to large
self.logger.error(
'HTTP response exceeds allowable maximum '
'file length ({}KB): {}'.format(
int(self.max_file_size / 1024),
self.url(privacy=True)))
# Reset our temporary object
self._temp_file = None
# Ensure our detected name and mimetype are
# reset
self.detected_name = None
self.detected_mimetype = None
# Return False (signifying a failure)
return False
elif bytes_written + chunk_size \
> self.max_file_size:
# Adjust out next read to accomodate up to our
# limit +1. This will prevent us from readig
# to much into our memory buffer
self.max_file_size - bytes_written + 1
# Ensure our content is flushed to disk for post-processing
self._temp_file.flush()
# Set our minimum requirements for a successful download() call
self.download_path = self._temp_file.name
if not self.detected_name:
self.detected_name = os.path.basename(self.fullpath)
except requests.RequestException as e:
self.logger.error(
'A Connection error occured retrieving HTTP '
'configuration from %s.' % self.host)
self.logger.debug('Socket Exception: %s' % str(e))
# Reset our temporary object
self._temp_file = None
# Ensure our detected name and mimetype are reset
self.detected_name = None
self.detected_mimetype = None
# Return False (signifying a failure)
return False
except (IOError, OSError):
# IOError is present for backwards compatibility with Python
# versions older then 3.3. >= 3.3 throw OSError now.
# Could not open and/or write the temporary file
self.logger.error(
'Could not write attachment to disk: {}'.format(
self.url(privacy=True)))
# Reset our temporary object
self._temp_file = None
# Ensure our detected name and mimetype are reset
self.detected_name = None
self.detected_mimetype = None
# Return False (signifying a failure)
return False
# Return our success
return True
def url(self, privacy=False, *args, **kwargs):
"""
Returns the URL built dynamically based on specified arguments.
"""
# Define any arguments set
args = {
'verify': 'yes' if self.verify_certificate else 'no',
}
if self._mimetype:
# A format was enforced
args['mime'] = self._mimetype
if self._name:
# A name was enforced
args['name'] = self._name
# Append our headers into our args
args.update({'+{}'.format(k): v for k, v in self.headers.items()})
# Determine Authentication
auth = ''
if self.user and self.password:
auth = '{user}:{password}@'.format(
user=self.quote(self.user, safe=''),
password=self.pprint(
self.password, privacy, mode=PrivacyMode.Secret, safe=''),
)
elif self.user:
auth = '{user}@'.format(
user=self.quote(self.user, safe=''),
)
default_port = 443 if self.secure else 80
return '{schema}://{auth}{hostname}{port}{fullpath}/?{args}'.format(
schema=self.secure_protocol if self.secure else self.protocol,
auth=auth,
hostname=self.quote(self.host, safe=''),
port='' if self.port is None or self.port == default_port
else ':{}'.format(self.port),
fullpath=self.quote(self.fullpath, safe='/'),
args=self.urlencode(args),
)
@staticmethod
def parse_url(url):
"""
Parses the URL and returns enough arguments that can allow
us to substantiate this object.
"""
results = AttachBase.parse_url(url)
if not results:
# We're done early as we couldn't load the results
return results
# Add our headers that the user can potentially over-ride if they wish
# to to our returned result set
results['headers'] = results['qsd-']
results['headers'].update(results['qsd+'])
return results

View File

@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import six
import re
from os import listdir
from os.path import dirname
from os.path import abspath
# Maintains a mapping of all of the attachment services
SCHEMA_MAP = {}
__all__ = []
# Load our Lookup Matrix
def __load_matrix(path=abspath(dirname(__file__)), name='apprise.attachment'):
"""
Dynamically load our schema map; this allows us to gracefully
skip over modules we simply don't have the dependencies for.
"""
# Used for the detection of additional Attachment Services objects
# The .py extension is optional as we support loading directories too
module_re = re.compile(r'^(?P<name>Attach[a-z0-9]+)(\.py)?$', re.I)
for f in listdir(path):
match = module_re.match(f)
if not match:
# keep going
continue
# Store our notification/plugin name:
plugin_name = match.group('name')
try:
module = __import__(
'{}.{}'.format(name, plugin_name),
globals(), locals(),
fromlist=[plugin_name])
except ImportError:
# No problem, we can't use this object
continue
if not hasattr(module, plugin_name):
# Not a library we can load as it doesn't follow the simple rule
# that the class must bear the same name as the notification
# file itself.
continue
# Get our plugin
plugin = getattr(module, plugin_name)
if not hasattr(plugin, 'app_id'):
# Filter out non-notification modules
continue
elif plugin_name in __all__:
# we're already handling this object
continue
# Add our module name to our __all__
__all__.append(plugin_name)
# Ensure we provide the class as the reference to this directory and
# not the module:
globals()[plugin_name] = plugin
# Load protocol(s) if defined
proto = getattr(plugin, 'protocol', None)
if isinstance(proto, six.string_types):
if proto not in SCHEMA_MAP:
SCHEMA_MAP[proto] = plugin
elif isinstance(proto, (set, list, tuple)):
# Support iterables list types
for p in proto:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
# Load secure protocol(s) if defined
protos = getattr(plugin, 'secure_protocol', None)
if isinstance(protos, six.string_types):
if protos not in SCHEMA_MAP:
SCHEMA_MAP[protos] = plugin
if isinstance(protos, (set, list, tuple)):
# Support iterables list types
for p in protos:
if p not in SCHEMA_MAP:
SCHEMA_MAP[p] = plugin
return SCHEMA_MAP
# Dynamically build our schema base
__load_matrix()

View File

@ -99,6 +99,9 @@ def print_version_msg():
@click.option('--config', '-c', default=None, type=str, multiple=True, @click.option('--config', '-c', default=None, type=str, multiple=True,
metavar='CONFIG_URL', metavar='CONFIG_URL',
help='Specify one or more configuration locations.') help='Specify one or more configuration locations.')
@click.option('--attach', '-a', default=None, type=str, multiple=True,
metavar='ATTACHMENT_URL',
help='Specify one or more configuration locations.')
@click.option('--notification-type', '-n', default=NotifyType.INFO, type=str, @click.option('--notification-type', '-n', default=NotifyType.INFO, type=str,
metavar='TYPE', metavar='TYPE',
help='Specify the message type (default=info). Possible values' help='Specify the message type (default=info). Possible values'
@ -120,8 +123,8 @@ def print_version_msg():
help='Display the apprise version and exit.') help='Display the apprise version and exit.')
@click.argument('urls', nargs=-1, @click.argument('urls', nargs=-1,
metavar='SERVER_URL [SERVER_URL2 [SERVER_URL3]]',) metavar='SERVER_URL [SERVER_URL2 [SERVER_URL3]]',)
def main(body, title, config, urls, notification_type, theme, tag, dry_run, def main(body, title, config, attach, urls, notification_type, theme, tag,
verbose, version): dry_run, verbose, version):
""" """
Send a notification to all of the specified servers identified by their Send a notification to all of the specified servers identified by their
URLs the content provided within the title, body and notification-type. URLs the content provided within the title, body and notification-type.
@ -200,7 +203,8 @@ def main(body, title, config, urls, notification_type, theme, tag, dry_run,
# now print it out # now print it out
result = a.notify( result = a.notify(
body=body, title=title, notify_type=notification_type, tag=tags) body=body, title=title, notify_type=notification_type, tag=tags,
attach=attach)
else: else:
# Number of rows to assume in the terminal. In future, maybe this can # Number of rows to assume in the terminal. In future, maybe this can
# be detected and made dynamic. The actual row count is 80, but 5 # be detected and made dynamic. The actual row count is 80, but 5

View File

@ -241,7 +241,7 @@ class NotifyBase(URLBase):
) )
def notify(self, body, title=None, notify_type=NotifyType.INFO, def notify(self, body, title=None, notify_type=NotifyType.INFO,
overflow=None, **kwargs): overflow=None, attach=None, **kwargs):
""" """
Performs notification Performs notification
@ -255,7 +255,7 @@ class NotifyBase(URLBase):
overflow=overflow): overflow=overflow):
# Send notification # Send notification
if not self.send(body=chunk['body'], title=chunk['title'], if not self.send(body=chunk['body'], title=chunk['title'],
notify_type=notify_type): notify_type=notify_type, attach=attach):
# Toggle our return status flag # Toggle our return status flag
return False return False

View File

@ -0,0 +1,344 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import sys
import pytest
from os.path import getsize
from os.path import join
from os.path import dirname
from apprise.AppriseAttachment import AppriseAttachment
from apprise.AppriseAsset import AppriseAsset
from apprise.attachment.AttachBase import AttachBase
from apprise.attachment import SCHEMA_MAP as ATTACH_SCHEMA_MAP
from apprise.attachment import __load_matrix
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
TEST_VAR_DIR = join(dirname(__file__), 'var')
def test_apprise_attachment():
"""
API: AppriseAttachment basic testing
"""
# Create ourselves an attachment object
aa = AppriseAttachment()
# There are no attachents loaded
assert len(aa) == 0
# Object can be directly checked as a boolean; response is False
# when there are no entries loaded
assert not aa
# An attachment object using a custom Apprise Asset object
aa = AppriseAttachment(asset=AppriseAsset())
# still no attachments added
assert len(aa) == 0
# Add a file by it's path
path = join(TEST_VAR_DIR, 'apprise-test.gif')
assert aa.add(path)
# There is now 1 attachment
assert len(aa) == 1
# we can test the object as a boolean and get a value of True now
assert aa
# Add another entry already in it's AttachBase format
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachBase)
assert aa.add(response, asset=AppriseAsset())
# There is now 2 attachments
assert len(aa) == 2
# Reset our object
aa = AppriseAttachment()
# We can add by lists as well in a variety of formats
attachments = (
path,
'file://{}?name=newfilename.gif'.format(path),
AppriseAttachment.instantiate(
'file://{}?name=anotherfilename.gif'.format(path)),
)
# Add them
assert aa.add(attachments)
# There is now 3 attachments
assert len(aa) == 3
# We can pop the last element off of the list as well
attachment = aa.pop()
assert isinstance(attachment, AttachBase)
# we can test of the attachment is valid using a boolean check:
assert attachment
assert len(aa) == 2
assert attachment.path == path
assert attachment.name == 'anotherfilename.gif'
assert attachment.mimetype == 'image/gif'
# elements can also be directly indexed
assert isinstance(aa[0], AttachBase)
assert isinstance(aa[1], AttachBase)
with pytest.raises(IndexError):
aa[2]
# We can iterate over attachments too:
for count, a in enumerate(aa):
assert isinstance(a, AttachBase)
# we'll never iterate more then the number of entries in our object
assert count < len(aa)
# Get the file-size of our image
expected_size = getsize(path) * len(aa)
# verify that's what we get as a result
assert aa.size() == expected_size
# Attachments can also be loaded during the instantiation of the
# AppriseAttachment object
aa = AppriseAttachment(attachments)
# There is now 3 attachments
assert len(aa) == 3
# Reset our object
aa.clear()
assert len(aa) == 0
assert not aa
# Garbage in produces garbage out
assert aa.add(None) is False
assert aa.add(object()) is False
assert aa.add(42) is False
# length remains unchanged
assert len(aa) == 0
# We can add by lists as well in a variety of formats
attachments = (
None,
object(),
42,
'garbage://',
)
# Add our attachments
assert aa.add(attachments) is False
# length remains unchanged
assert len(aa) == 0
# test cases when file simply doesn't exist
aa = AppriseAttachment('file://non-existant-file.png')
# Our length is still 1
assert len(aa) == 1
# Our object will still return a True
assert aa
# However our indexed entry will not
assert not aa[0]
# length will return 0
assert len(aa[0]) == 0
# Total length will also return 0
assert aa.size() == 0
def test_apprise_attachment_instantiate():
"""
API: AppriseAttachment.instantiate()
"""
assert AppriseAttachment.instantiate(
'file://?', suppress_exceptions=True) is None
assert AppriseAttachment.instantiate(
'invalid://?', suppress_exceptions=True) is None
class BadAttachType(AttachBase):
def __init__(self, **kwargs):
super(BadAttachType, self).__init__(**kwargs)
# We fail whenever we're initialized
raise TypeError()
# Store our bad attachment type in our schema map
ATTACH_SCHEMA_MAP['bad'] = BadAttachType
with pytest.raises(TypeError):
AppriseAttachment.instantiate(
'bad://path', suppress_exceptions=False)
# Same call but exceptions suppressed
assert AppriseAttachment.instantiate(
'bad://path', suppress_exceptions=True) is None
def test_apprise_attachment_matrix_load():
"""
API: AppriseAttachment() matrix initialization
"""
import apprise
class AttachmentDummy(AttachBase):
"""
A dummy wrapper for testing the different options in the load_matrix
function
"""
# The default descriptive name associated with the Notification
service_name = 'dummy'
# protocol as tuple
protocol = ('uh', 'oh')
# secure protocol as tuple
secure_protocol = ('no', 'yes')
class AttachmentDummy2(AttachBase):
"""
A dummy wrapper for testing the different options in the load_matrix
function
"""
# The default descriptive name associated with the Notification
service_name = 'dummy2'
# secure protocol as tuple
secure_protocol = ('true', 'false')
class AttachmentDummy3(AttachBase):
"""
A dummy wrapper for testing the different options in the load_matrix
function
"""
# The default descriptive name associated with the Notification
service_name = 'dummy3'
# secure protocol as string
secure_protocol = 'true'
class AttachmentDummy4(AttachBase):
"""
A dummy wrapper for testing the different options in the load_matrix
function
"""
# The default descriptive name associated with the Notification
service_name = 'dummy4'
# protocol as string
protocol = 'true'
# Generate ourselves a fake entry
apprise.attachment.AttachmentDummy = AttachmentDummy
apprise.attachment.AttachmentDummy2 = AttachmentDummy2
apprise.attachment.AttachmentDummy3 = AttachmentDummy3
apprise.attachment.AttachmentDummy4 = AttachmentDummy4
__load_matrix()
# Call it again so we detect our entries already loaded
__load_matrix()
def test_attachment_matrix_dynamic_importing(tmpdir):
"""
API: Apprise() Attachment Matrix Importing
"""
# Make our new path valid
suite = tmpdir.mkdir("apprise_attach_test_suite")
suite.join("__init__.py").write('')
module_name = 'badattach'
# Update our path to point to our new test suite
sys.path.insert(0, str(suite))
# Create a base area to work within
base = suite.mkdir(module_name)
base.join("__init__.py").write('')
# Test no app_id
base.join('AttachBadFile1.py').write(
"""
class AttachBadFile1(object):
pass""")
# No class of the same name
base.join('AttachBadFile2.py').write(
"""
class BadClassName(object):
pass""")
# Exception thrown
base.join('AttachBadFile3.py').write("""raise ImportError()""")
# Utilizes a schema:// already occupied (as string)
base.join('AttachGoober.py').write(
"""
from apprise import AttachBase
class AttachGoober(AttachBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol
protocol = 'http'
# The default secure protocol
secure_protocol = 'https'""")
# Utilizes a schema:// already occupied (as tuple)
base.join('AttachBugger.py').write("""
from apprise import AttachBase
class AttachBugger(AttachBase):
# This class tests the fact we have a new class name, but we're
# trying to over-ride items previously used
# The default simple (insecure) protocol
protocol = ('http', 'bugger-test' )
# The default secure protocol
secure_protocol = ('https', 'bugger-tests')""")
__load_matrix(path=str(base), name=module_name)

92
test/test_attach_base.py Normal file
View File

@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import mock
import pytest
from apprise.attachment.AttachBase import AttachBase
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
def test_mimetype_initialization():
"""
API: AttachBase() mimetype initialization
"""
with mock.patch('mimetypes.init') as mock_init:
with mock.patch('mimetypes.inited', True):
AttachBase()
assert mock_init.call_count == 0
with mock.patch('mimetypes.init') as mock_init:
with mock.patch('mimetypes.inited', False):
AttachBase()
assert mock_init.call_count == 1
def test_attach_base():
"""
API: AttachBase()
"""
# an invalid mime-type
with pytest.raises(TypeError):
AttachBase(**{'mimetype': 'invalid'})
# a valid mime-type does not cause an exception to throw
AttachBase(**{'mimetype': 'image/png'})
# Create an object with no mimetype over-ride
obj = AttachBase()
# We can not process name/path/mimetype at a Base level
with pytest.raises(NotImplementedError):
obj.download()
with pytest.raises(NotImplementedError):
obj.name
with pytest.raises(NotImplementedError):
obj.path
with pytest.raises(NotImplementedError):
obj.mimetype
# Unsupported URLs are not parsed
assert AttachBase.parse_url(url='invalid://') is None
# Valid URL & Valid Format
results = AttachBase.parse_url(url='file://relative/path')
assert isinstance(results, dict)
# No mime is defined
assert results.get('mimetype') is None
# Valid URL & Valid Format with mime type set
results = AttachBase.parse_url(url='file://relative/path?mime=image/jpeg')
assert isinstance(results, dict)
# mime defined
assert results.get('mimetype') == 'image/jpeg'

151
test/test_attach_file.py Normal file
View File

@ -0,0 +1,151 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import mock
from os.path import dirname
from os.path import join
from apprise.attachment.AttachBase import AttachBase
from apprise.attachment.AttachFile import AttachFile
from apprise import AppriseAttachment
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
TEST_VAR_DIR = join(dirname(__file__), 'var')
def test_attach_file_parse_url():
"""
API: AttachFile().parse_url()
"""
# bad entry
assert AttachFile.parse_url('garbage://') is None
# no file path specified
assert AttachFile.parse_url('file://') is None
def test_attach_file():
"""
API: AttachFile()
"""
# Simple gif test
path = join(TEST_VAR_DIR, 'apprise-test.gif')
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
assert response.path == path
assert response.name == 'apprise-test.gif'
assert response.mimetype == 'image/gif'
# Download is successful and has already been called by now; below pulls
# results from cache
assert response.download()
assert response.url().startswith('file://{}'.format(path))
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', response.url()) is None
assert re.search(r'[?&]name=', response.url()) is None
# File handling (even if image is set to maxium allowable)
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch('os.path.getsize', return_value=AttachBase.max_file_size):
# It will still work
assert response.path == path
# File handling when size is to large
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch(
'os.path.getsize', return_value=AttachBase.max_file_size + 1):
# We can't work in this case
assert response.path is None
# File handling when image is not available
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch('os.path.isfile', return_value=False):
# This triggers a full check and will fail the isfile() check
assert response.path is None
# The call to AttachBase.path automatically triggers a call to download()
# but this same is done with a call to AttachBase.name as well. Above
# test cases reference 'path' right after instantiation; here we reference
# 'name'
response = AppriseAttachment.instantiate(path)
assert response.name == 'apprise-test.gif'
assert response.path == path
assert response.mimetype == 'image/gif'
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', response.url()) is None
assert re.search(r'[?&]name=', response.url()) is None
# continuation to cheking 'name' instead of 'path' first where our call
# to download() fails
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch('os.path.isfile', return_value=False):
# This triggers a full check and will fail the isfile() check
assert response.name is None
# The call to AttachBase.path automatically triggers a call to download()
# but this same is done with a call to AttachBase.mimetype as well. Above
# test cases reference 'path' right after instantiation; here we reference
# 'mimetype'
response = AppriseAttachment.instantiate(path)
assert response.mimetype == 'image/gif'
assert response.name == 'apprise-test.gif'
assert response.path == path
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', response.url()) is None
assert re.search(r'[?&]name=', response.url()) is None
# continuation to cheking 'name' instead of 'path' first where our call
# to download() fails
response = AppriseAttachment.instantiate(path)
assert isinstance(response, AttachFile)
with mock.patch('os.path.isfile', return_value=False):
# download() fails so we don't have a mimetpe
assert response.mimetype is None
assert response.name is None
assert response.path is None
# This triggers a full check and will fail the isfile() check
# Force a mime-type and new name
response = AppriseAttachment.instantiate(
'file://{}?mime={}&name={}'.format(path, 'image/jpeg', 'test.jpeg'))
assert isinstance(response, AttachFile)
assert response.path == path
assert response.name == 'test.jpeg'
assert response.mimetype == 'image/jpeg'
# We will match on mime type now (%2F = /)
assert re.search(r'[?&]mime=image%2Fjpeg', response.url(), re.I)
assert re.search(r'[?&]name=test\.jpeg', response.url(), re.I)

330
test/test_attach_http.py Normal file
View File

@ -0,0 +1,330 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2019 Chris Caron <lead2gold@gmail.com>
# All rights reserved.
#
# This code is licensed under the MIT License.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files(the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and / or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions :
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
import re
import six
import mock
import requests
import mimetypes
from os.path import join
from os.path import dirname
from os.path import getsize
from apprise.attachment.AttachHTTP import AttachHTTP
from apprise import AppriseAttachment
from apprise.plugins.NotifyBase import NotifyBase
from apprise.plugins import SCHEMA_MAP
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
TEST_VAR_DIR = join(dirname(__file__), 'var')
# Some exception handling we'll use
REQUEST_EXCEPTIONS = (
requests.ConnectionError(
0, 'requests.ConnectionError() not handled'),
requests.RequestException(
0, 'requests.RequestException() not handled'),
requests.HTTPError(
0, 'requests.HTTPError() not handled'),
requests.ReadTimeout(
0, 'requests.ReadTimeout() not handled'),
requests.TooManyRedirects(
0, 'requests.TooManyRedirects() not handled'),
# Throw OSError exceptions too
OSError("SystemError")
)
def test_attach_http_parse_url():
"""
API: AttachHTTP().parse_url()
"""
# bad entry
assert AttachHTTP.parse_url('garbage://') is None
# no url specified
assert AttachHTTP.parse_url('http://') is None
@mock.patch('requests.get')
def test_attach_http(mock_get):
"""
API: AttachHTTP() object
"""
# Define our good:// url
class GoodNotification(NotifyBase):
def __init__(self, *args, **kwargs):
super(GoodNotification, self).__init__(*args, **kwargs)
def notify(self, *args, **kwargs):
# Pretend everything is okay
return True
def url(self):
# Support url() function
return ''
# Store our good notification in our schema map
SCHEMA_MAP['good'] = GoodNotification
# Temporary path
path = join(TEST_VAR_DIR, 'apprise-test.gif')
class DummyResponse(object):
"""
A dummy response used to manage our object
"""
status_code = requests.codes.ok
headers = {
'Content-Length': getsize(path),
'Content-Type': 'image/gif',
}
# Pointer to file
ptr = None
# used to return random keep-alive chunks
_keepalive_chunk_ref = 0
def close(self):
return
def iter_content(self, chunk_size=1024):
"""Lazy function (generator) to read a file piece by piece.
Default chunk size: 1k."""
while True:
self._keepalive_chunk_ref += 1
if 16 % self._keepalive_chunk_ref == 0:
# Yield a keep-alive block
yield ''
data = self.ptr.read(chunk_size)
if not data:
break
yield data
def raise_for_status(self):
return
def __enter__(self):
self.ptr = open(path, 'rb')
return self
def __exit__(self, *args, **kwargs):
self.ptr.close()
# Prepare Mock
dummy_response = DummyResponse()
mock_get.return_value = dummy_response
results = AttachHTTP.parse_url(
'http://user:pass@localhost/apprise.gif?+key=value')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
# No Content-Disposition; so we use filename from path
assert attachment.name == 'apprise.gif'
assert attachment.mimetype == 'image/gif'
results = AttachHTTP.parse_url(
'http://localhost:3000/noname.gif?name=usethis.jpg&mime=image/jpeg')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# both mime and name over-ridden
assert re.search(r'[?&]mime=image%2Fjpeg', attachment.url())
assert re.search(r'[?&]name=usethis.jpg', attachment.url())
# No Content-Disposition; so we use filename from path
assert attachment.name == 'usethis.jpg'
assert attachment.mimetype == 'image/jpeg'
# Edge case; download called a second time when content already retrieved
assert attachment.download()
assert attachment
assert len(attachment) == getsize(path)
# No path specified
# No Content-Disposition specified
# No filename (because no path)
results = AttachHTTP.parse_url('http://localhost')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == '{}{}'.format(
AttachHTTP.unknown_filename,
mimetypes.guess_extension(attachment.mimetype)
)
assert attachment
assert len(attachment) == getsize(path)
# Set Content-Length to a value that exceeds our maximum allowable
dummy_response.headers['Content-Length'] = AttachHTTP.max_file_size + 1
results = AttachHTTP.parse_url('http://localhost/toobig.jpg')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
# we can not download this attachment
assert not attachment
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype is None
assert attachment.name is None
assert len(attachment) == 0
# Handle cases where we have no Content-Length and we need to rely
# on what is read as it is streamed
del dummy_response.headers['Content-Length']
# No path specified
# No Content-Disposition specified
# No Content-Length specified
# No filename (because no path)
results = AttachHTTP.parse_url('http://localhost/no-length.gif')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == 'no-length.gif'
assert attachment
assert len(attachment) == getsize(path)
# Set our limit to be the length of our image; everything should work
# without a problem
AttachHTTP.max_file_size = getsize(path)
# Set ourselves a Content-Disposition (providing a filename)
dummy_response.headers['Content-Disposition'] = \
'attachment; filename="myimage.gif"'
# Remove our content type so we're forced to guess it from our filename
# specified in our Content-Disposition
del dummy_response.headers['Content-Type']
# No path specified
# No Content-Length specified
# Filename in Content-Disposition (over-rides one found in path
results = AttachHTTP.parse_url('http://user@localhost/ignore-filename.gif')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == 'myimage.gif'
assert attachment
assert len(attachment) == getsize(path)
# Similar to test above except we make our max message size just 1 byte
# smaller then our gif file. This will cause us to fail to read the
# attachment
AttachHTTP.max_file_size = getsize(path) - 1
results = AttachHTTP.parse_url('http://localhost/toobig.jpg')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
# we can not download this attachment
assert not attachment
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype is None
assert attachment.name is None
assert len(attachment) == 0
# Disable our file size limitations
AttachHTTP.max_file_size = 0
results = AttachHTTP.parse_url('http://user@localhost')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
assert attachment.mimetype == 'image/gif'
# Because we could determine our mime type, we could build an extension
# for our unknown filename
assert attachment.name == 'myimage.gif'
assert attachment
assert len(attachment) == getsize(path)
# Give ourselves nothing to work with
dummy_response.headers = {}
results = AttachHTTP.parse_url('http://user@localhost')
assert isinstance(results, dict)
attachment = AttachHTTP(**results)
# we can not download this attachment
assert attachment
assert isinstance(attachment.url(), six.string_types) is True
# No mime-type and/or filename over-ride was specified, so therefore it
# won't show up in the generated URL
assert re.search(r'[?&]mime=', attachment.url()) is None
assert re.search(r'[?&]name=', attachment.url()) is None
# Handle edge-case where detected_name is None for whatever reason
attachment.detected_name = None
assert attachment.mimetype == 'application/octet-stream'
assert attachment.name == '{}{}'.format(
AttachHTTP.unknown_filename,
mimetypes.guess_extension(attachment.mimetype)
)
assert len(attachment) == getsize(path)
# Exception handling
mock_get.return_value = None
for _exception in REQUEST_EXCEPTIONS:
aa = AppriseAttachment.instantiate('http://localhost/exception.gif')
assert isinstance(aa, AttachHTTP)
mock_get.side_effect = _exception
assert not aa

View File

@ -25,6 +25,8 @@
from __future__ import print_function from __future__ import print_function
import re import re
import mock import mock
from os.path import dirname
from os.path import join
from apprise import cli from apprise import cli
from apprise import NotifyBase from apprise import NotifyBase
from click.testing import CliRunner from click.testing import CliRunner
@ -227,10 +229,11 @@ def test_apprise_cli(tmpdir):
]) ])
assert result.exit_code == 2 assert result.exit_code == 2
# Here is a case where we get what was expected # Here is a case where we get what was expected; we also attach a file
result = runner.invoke(cli.main, [ result = runner.invoke(cli.main, [
'-b', 'has taga', '-b', 'has taga',
'--config', str(t), '--config', str(t),
'--attach', join(dirname(__file__), 'var', 'apprise-test.gif'),
'--tag', 'myTag', '--tag', 'myTag',
]) ])
assert result.exit_code == 0 assert result.exit_code == 0

BIN
test/var/apprise-test.gif Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 75 KiB