Asynchronous Dynamic Module Loading Support (#1071)

This commit is contained in:
Chris Caron 2024-03-03 14:22:17 -05:00 committed by GitHub
parent 52aa7f4ddb
commit 26d8e45683
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 254 additions and 169 deletions

View File

@ -32,6 +32,7 @@ import sys
import time
import hashlib
import inspect
import threading
from .utils import import_module
from .utils import Singleton
from .utils import parse_list
@ -60,6 +61,9 @@ class PluginManager(metaclass=Singleton):
# The module path to scan
module_path = join(abspath(dirname(__file__)), _id)
# thread safe loading
_lock = threading.Lock()
def __init__(self, *args, **kwargs):
"""
Over-ride our class instantiation to provide a singleton
@ -103,40 +107,49 @@ class PluginManager(metaclass=Singleton):
# effort/overhead doing it again
self._paths_previously_scanned = set()
# Track loaded module paths to prevent from loading them again
self._loaded = set()
def unload_modules(self, disable_native=False):
"""
Reset our object and unload all modules
"""
if self._custom_module_map:
# Handle Custom Module Assignments
for meta in self._custom_module_map.values():
if meta['name'] not in self._module_map:
# Nothing to remove
continue
with self._lock:
if self._custom_module_map:
# Handle Custom Module Assignments
for meta in self._custom_module_map.values():
if meta['name'] not in self._module_map:
# Nothing to remove
continue
# For the purpose of tidying up un-used modules in memory
loaded = [m for m in sys.modules.keys()
if m.startswith(
self._module_map[meta['name']]['path'])]
# For the purpose of tidying up un-used modules in memory
loaded = [m for m in sys.modules.keys()
if m.startswith(
self._module_map[meta['name']]['path'])]
for module_path in loaded:
del sys.modules[module_path]
for module_path in loaded:
del sys.modules[module_path]
# Reset disabled plugins (if any)
for schema in self._disabled:
self._schema_map[schema].enabled = True
self._disabled.clear()
# Reset disabled plugins (if any)
for schema in self._disabled:
self._schema_map[schema].enabled = True
self._disabled.clear()
# Reset our variables
self._module_map = None if not disable_native else {}
self._schema_map = {}
self._custom_module_map = {}
# Reset our variables
self._schema_map = {}
self._custom_module_map = {}
if disable_native:
self._module_map = {}
# Reset our path cache
self._paths_previously_scanned = set()
else:
self._module_map = None
self._loaded = set()
def load_modules(self, path=None, name=None):
# Reset our path cache
self._paths_previously_scanned = set()
def load_modules(self, path=None, name=None, force=False):
"""
Load our modules into memory
"""
@ -145,102 +158,120 @@ class PluginManager(metaclass=Singleton):
module_name_prefix = self.module_name_prefix if name is None else name
module_path = self.module_path if path is None else path
if not self:
# Initialize our maps
self._module_map = {}
self._schema_map = {}
self._custom_module_map = {}
with self._lock:
if not force and module_path in self._loaded:
# We're done
return
# Used for the detection of additional Notify Services objects
# The .py extension is optional as we support loading directories too
module_re = re.compile(
r'^(?P<name>' + self.fname_prefix + r'[a-z0-9]+)(\.py)?$', re.I)
# Our base reference
module_count = len(self._module_map) if self._module_map else 0
schema_count = len(self._schema_map) if self._schema_map else 0
t_start = time.time()
for f in os.listdir(module_path):
tl_start = time.time()
match = module_re.match(f)
if not match:
# keep going
continue
if not self:
# Initialize our maps
self._module_map = {}
self._schema_map = {}
self._custom_module_map = {}
elif match.group('name') == f'{self.fname_prefix}Base':
# keep going
continue
# Used for the detection of additional Notify Services objects
# The .py extension is optional as we support loading directories
# too
module_re = re.compile(
r'^(?P<name>' + self.fname_prefix + r'[a-z0-9]+)(\.py)?$',
re.I)
# Store our notification/plugin name:
module_name = match.group('name')
module_pyname = '{}.{}'.format(module_name_prefix, module_name)
if module_name in self._module_map:
logger.warning(
"%s(s) (%s) already loaded; ignoring %s",
self.name, module_name, os.path.join(module_path, f))
continue
try:
module = __import__(
module_pyname,
globals(), locals(),
fromlist=[module_name])
except ImportError:
# No problem, we can try again another way...
module = import_module(
os.path.join(module_path, f), module_pyname)
if not module:
# logging found in import_module and not needed here
t_start = time.time()
for f in os.listdir(module_path):
tl_start = time.time()
match = module_re.match(f)
if not match:
# keep going
continue
if not hasattr(module, module_name):
# Not a library we can load as it doesn't follow the simple
# rule that the class must bear the same name as the
# notification file itself.
logger.trace(
"%s (%s) import failed; no filename/Class "
"match found in %s",
self.name, module_name, os.path.join(module_path, f))
continue
# Get our plugin
plugin = getattr(module, module_name)
if not hasattr(plugin, 'app_id'):
# Filter out non-notification modules
logger.trace(
"(%s) import failed; no app_id defined in %s",
self.name, module_name, os.path.join(module_path, f))
continue
# Add our plugin name to our module map
self._module_map[module_name] = {
'plugin': set([plugin]),
'module': module,
'path': '{}.{}'.format(module_name_prefix, module_name),
'native': True,
}
fn = getattr(plugin, 'schemas', None)
schemas = set([]) if not callable(fn) else fn(plugin)
# map our schema to our plugin
for schema in schemas:
if schema in self._schema_map:
logger.error(
"{} schema ({}) mismatch detected - {} to {}"
.format(self.name, schema, self._schema_map, plugin))
elif match.group('name') == f'{self.fname_prefix}Base':
# keep going
continue
# Assign plugin
self._schema_map[schema] = plugin
# Store our notification/plugin name:
module_name = match.group('name')
module_pyname = '{}.{}'.format(module_name_prefix, module_name)
logger.trace(
'{} {} loaded in {:.6f}s'.format(
self.name, module_name, (time.time() - tl_start)))
logger.debug(
'{} {}(s) and {} Schema(s) loaded in {:.4f}s'
.format(
self.name, len(self._module_map), len(self._schema_map),
(time.time() - t_start)))
if module_name in self._module_map:
logger.warning(
"%s(s) (%s) already loaded; ignoring %s",
self.name, module_name, os.path.join(module_path, f))
continue
try:
module = __import__(
module_pyname,
globals(), locals(),
fromlist=[module_name])
except ImportError:
# No problem, we can try again another way...
module = import_module(
os.path.join(module_path, f), module_pyname)
if not module:
# logging found in import_module and not needed here
continue
if not hasattr(module, module_name):
# Not a library we can load as it doesn't follow the simple
# rule that the class must bear the same name as the
# notification file itself.
logger.trace(
"%s (%s) import failed; no filename/Class "
"match found in %s",
self.name, module_name, os.path.join(module_path, f))
continue
# Get our plugin
plugin = getattr(module, module_name)
if not hasattr(plugin, 'app_id'):
# Filter out non-notification modules
logger.trace(
"(%s) import failed; no app_id defined in %s",
self.name, module_name, os.path.join(module_path, f))
continue
# Add our plugin name to our module map
self._module_map[module_name] = {
'plugin': set([plugin]),
'module': module,
'path': '{}.{}'.format(module_name_prefix, module_name),
'native': True,
}
fn = getattr(plugin, 'schemas', None)
schemas = set([]) if not callable(fn) else fn(plugin)
# map our schema to our plugin
for schema in schemas:
if schema in self._schema_map:
logger.error(
"{} schema ({}) mismatch detected - {} to {}"
.format(self.name, schema, self._schema_map,
plugin))
continue
# Assign plugin
self._schema_map[schema] = plugin
logger.trace(
'{} {} loaded in {:.6f}s'.format(
self.name, module_name, (time.time() - tl_start)))
# Track the directory loaded so we never load it again
self._loaded.add(module_path)
logger.debug(
'{} {}(s) and {} Schema(s) loaded in {:.4f}s'
.format(
self.name,
len(self._module_map) - module_count,
len(self._schema_map) - schema_count,
(time.time() - t_start)))
def module_detection(self, paths, cache=True):
"""
@ -334,67 +365,69 @@ class PluginManager(metaclass=Singleton):
# end of _import_module()
return
for _path in paths:
path = os.path.abspath(os.path.expanduser(_path))
if (cache and path in self._paths_previously_scanned) \
or not os.path.exists(path):
# We're done as we've already scanned this
continue
# Store our path as a way of hashing it has been handled
self._paths_previously_scanned.add(path)
if os.path.isdir(path) and not \
os.path.isfile(os.path.join(path, '__init__.py')):
logger.debug('Scanning for custom plugins in: %s', path)
for entry in os.listdir(path):
re_match = module_re.match(entry)
if not re_match:
# keep going
logger.trace('Plugin Scan: Ignoring %s', entry)
continue
new_path = os.path.join(path, entry)
if os.path.isdir(new_path):
# Update our path
new_path = os.path.join(path, entry, '__init__.py')
if not os.path.isfile(new_path):
logger.trace(
'Plugin Scan: Ignoring %s',
os.path.join(path, entry))
continue
if not cache or \
(cache and
new_path not in self._paths_previously_scanned):
# Load our module
_import_module(new_path)
# Add our subdir path
self._paths_previously_scanned.add(new_path)
else:
if os.path.isdir(path):
# This logic is safe to apply because we already validated
# the directories state above; update our path
path = os.path.join(path, '__init__.py')
if cache and path in self._paths_previously_scanned:
continue
self._paths_previously_scanned.add(path)
# directly load as is
re_match = module_re.match(os.path.basename(path))
# must be a match and must have a .py extension
if not re_match or not re_match.group(1):
# keep going
logger.trace('Plugin Scan: Ignoring %s', path)
with self._lock:
for _path in paths:
path = os.path.abspath(os.path.expanduser(_path))
if (cache and path in self._paths_previously_scanned) \
or not os.path.exists(path):
# We're done as we've already scanned this
continue
# Load our module
_import_module(path)
# Store our path as a way of hashing it has been handled
self._paths_previously_scanned.add(path)
return None
if os.path.isdir(path) and not \
os.path.isfile(os.path.join(path, '__init__.py')):
logger.debug('Scanning for custom plugins in: %s', path)
for entry in os.listdir(path):
re_match = module_re.match(entry)
if not re_match:
# keep going
logger.trace('Plugin Scan: Ignoring %s', entry)
continue
new_path = os.path.join(path, entry)
if os.path.isdir(new_path):
# Update our path
new_path = os.path.join(path, entry, '__init__.py')
if not os.path.isfile(new_path):
logger.trace(
'Plugin Scan: Ignoring %s',
os.path.join(path, entry))
continue
if not cache or \
(cache and new_path not in
self._paths_previously_scanned):
# Load our module
_import_module(new_path)
# Add our subdir path
self._paths_previously_scanned.add(new_path)
else:
if os.path.isdir(path):
# This logic is safe to apply because we already
# validated the directories state above; update our
# path
path = os.path.join(path, '__init__.py')
if cache and path in self._paths_previously_scanned:
continue
self._paths_previously_scanned.add(path)
# directly load as is
re_match = module_re.match(os.path.basename(path))
# must be a match and must have a .py extension
if not re_match or not re_match.group(1):
# keep going
logger.trace('Plugin Scan: Ignoring %s', path)
continue
# Load our module
_import_module(path)
return None
def add(self, plugin, schemas=None, url=None, send_func=None):
"""
@ -714,4 +747,4 @@ class PluginManager(metaclass=Singleton):
"""
Determines if object has loaded or not
"""
return True if self._module_map is not None else False
return True if self._loaded and self._module_map is not None else False

View File

@ -29,8 +29,10 @@
import re
import pytest
import types
import threading
from inspect import cleandoc
from apprise import Apprise
from apprise.NotificationManager import NotificationManager
from apprise.plugins.NotifyBase import NotifyBase
@ -248,6 +250,48 @@ def test_notification_manager_module_loading(tmpdir):
N_MGR.load_modules()
N_MGR.load_modules()
#
# Thread Testing
#
# This tests against a racing condition when the modules have not been
# loaded. When multiple instances of Apprise are all instantiated,
# the loading of the modules will occur for each instance if detected
# having not been previously done, this tests that we can dynamically
# support the loading of modules once whe multiple instances to apprise
# are instantiated.
thread_count = 10
def thread_test(result, no):
"""
Load our apprise object with valid URLs and store our result
"""
apobj = Apprise()
result[no] = apobj.add('json://localhost') and \
apobj.add('form://localhost') and \
apobj.add('xml://localhost')
# Unload our modules
N_MGR.unload_modules()
# Prepare threads to load
results = [None] * thread_count
threads = [
threading.Thread(target=thread_test, args=(results, no))
for no in range(thread_count)
]
# Verify we can safely load our modules in a thread safe environment
for t in threads:
t.start()
for t in threads:
t.join()
# Verify we loaded our urls in all threads successfully
for result in results:
assert result is True
def test_notification_manager_decorators(tmpdir):
"""
@ -376,6 +420,10 @@ def test_notification_manager_decorators(tmpdir):
"""))
assert 'mytest' not in N_MGR
N_MGR.load_modules(path=str(notify_base))
# It's still not loaded because the path has already been scanned
assert 'mytest' not in N_MGR
N_MGR.load_modules(path=str(notify_base), force=True)
assert 'mytest' in N_MGR
# Could not be loaded because the filename did not align with the class
@ -387,3 +435,7 @@ def test_notification_manager_decorators(tmpdir):
N_MGR.load_modules(path=str(notify_base))
# Our item is still loaded as expected
assert 'mytest' in N_MGR
# Simple test to make sure we can handle duplicate entries loaded
N_MGR.load_modules(path=str(notify_base), force=True)
N_MGR.load_modules(path=str(notify_base), force=True)