Add support to run inside Linux namespace

**Motivation:**
In a specific use case, we use sshuttle to provide access to private
networks from multiple sites to a specific host. The sites may contain
networks that overlap each other, so each site is accessed inside a
different namespace that provides process-level network isolation and
prevents network overlap.

**Objective:**
This commit just adds a convenient way of spawning multiple sshuttle
instances inside different namespaces from a single process, by passing
the namespace's name though the variable --namespace. The result is the
same as calling `ip netns exec $NAMESPACE sshuttle ...`
This commit is contained in:
Raylan 2025-01-23 12:12:12 -03:00
parent b346e976eb
commit eb5f5d4474
4 changed files with 90 additions and 0 deletions

View File

@ -11,6 +11,7 @@ import sshuttle.ssyslog as ssyslog
from sshuttle.options import parser, parse_ipport
from sshuttle.helpers import family_ip_tuple, log, Fatal
from sshuttle.sudoers import sudoers
from sshuttle.namespace import enter_namespace
def main():
@ -37,6 +38,13 @@ def main():
helpers.verbose = opt.verbose
try:
namespace = getattr(opt, 'namespace', None)
if namespace:
prefix = helpers.logprefix
helpers.logprefix = 'ns: '
enter_namespace(namespace)
helpers.logprefix = prefix
if opt.firewall:
if opt.subnets or opt.subnets_file:
parser.error('exactly zero arguments expected')

36
sshuttle/namespace.py Normal file
View File

@ -0,0 +1,36 @@
import os
import ctypes
import ctypes.util
from sshuttle.helpers import Fatal, debug1, debug2
CLONE_NEWNET = 0x40000000
NETNS_RUN_DIR = "/var/run/netns"
def enter_namespace(namespace):
namespace_dir = f'{NETNS_RUN_DIR}/{namespace}'
if not os.path.exists(namespace_dir):
raise Fatal('The namespace %r does not exists.' % namespace_dir)
debug2('loading libc')
libc = ctypes.CDLL(ctypes.util.find_library("c"), use_errno=True)
default_errcheck = libc.setns.errcheck
def errcheck(ret, *args):
if ret == -1:
e = ctypes.get_errno()
raise Fatal(e, os.strerror(e))
if default_errcheck:
return default_errcheck(ret, *args)
libc.setns.errcheck = errcheck # type: ignore
debug1('Entering namespace %r' % namespace)
with open(namespace_dir) as fd:
libc.setns(fd.fileno(), CLONE_NEWNET)
debug1('Namespace %r successfully set' % namespace)

View File

@ -137,6 +137,15 @@ def parse_list(lst):
return re.split(r'[\s,]+', lst.strip()) if lst else []
def parse_namespace(namespace):
try:
assert re.fullmatch(
r'(@?[a-z_A-Z]\w+(?:\.@?[a-z_A-Z]\w+)*)', namespace)
return namespace
except AssertionError:
raise Fatal("%r is not a valid namespace name." % namespace)
class Concat(Action):
def __init__(self, option_strings, dest, nargs=None, **kwargs):
if nargs is not None:
@ -460,3 +469,10 @@ parser.add_argument(
hexadecimal (default '0x01')
"""
)
if sys.platform == 'linux':
parser.add_argument(
'--namespace',
type=parse_namespace,
help="Run it inside of a namespace."
)

View File

@ -176,3 +176,33 @@ def test_parse_subnetport_host_with_port(mock_getaddrinfo):
(socket.AF_INET6, '2404:6800:4004:821::2001', 128, 80, 90),
(socket.AF_INET, '142.251.42.129', 32, 80, 90),
])
def test_parse_namespace():
valid_namespaces = [
'my_namespace',
'my.namespace',
'my_namespace_with_underscore',
'MyNamespace',
'@my_namespace',
'my.long_namespace.with.multiple.dots',
'@my.long_namespace.with.multiple.dots',
'my.Namespace.With.Mixed.Case',
]
for namespace in valid_namespaces:
assert sshuttle.options.parse_namespace(namespace) == namespace
invalid_namespaces = [
'',
'123namespace',
'my-namespace',
'my_namespace!',
'.my_namespace',
'my_namespace.',
'my..namespace',
]
for namespace in invalid_namespaces:
with pytest.raises(Fatal, match="'.*' is not a valid namespace name."):
sshuttle.options.parse_namespace(namespace)