mirror of
https://github.com/sshuttle/sshuttle.git
synced 2025-04-22 10:18:59 +02:00
Merge pull request #541 from skuhl/use-all-ips
When subnets and excludes are specified with hostnames, use all IPs.
This commit is contained in:
commit
c3016f2d90
@ -586,31 +586,76 @@ def main(listenip_v6, listenip_v4,
|
|||||||
|
|
||||||
fw = FirewallClient(method_name, sudo_pythonpath)
|
fw = FirewallClient(method_name, sudo_pythonpath)
|
||||||
|
|
||||||
# Get family specific subnet lists
|
# If --dns is used, store the IP addresses that the client
|
||||||
|
# normally uses for DNS lookups in nslist. The firewall needs to
|
||||||
|
# redirect packets outgoing to this server to the remote host
|
||||||
|
# instead.
|
||||||
if dns:
|
if dns:
|
||||||
nslist += resolvconf_nameservers()
|
nslist += resolvconf_nameservers()
|
||||||
if to_nameserver is not None:
|
if to_nameserver is not None:
|
||||||
to_nameserver = "%s@%s" % tuple(to_nameserver[1:])
|
to_nameserver = "%s@%s" % tuple(to_nameserver[1:])
|
||||||
else:
|
else:
|
||||||
# option doesn't make sense if we aren't proxying dns
|
# option doesn't make sense if we aren't proxying dns
|
||||||
|
if to_nameserver and len(to_nameserver) > 0:
|
||||||
|
print("WARNING: --to-ns option is ignored because --dns was not "
|
||||||
|
"used.")
|
||||||
to_nameserver = None
|
to_nameserver = None
|
||||||
|
|
||||||
subnets = subnets_include + subnets_exclude # we don't care here
|
# Get family specific subnet lists. Also, the user may not specify
|
||||||
subnets_v6 = [i for i in subnets if i[0] == socket.AF_INET6]
|
# any subnets if they use --auto-nets. In this case, our subnets
|
||||||
nslist_v6 = [i for i in nslist if i[0] == socket.AF_INET6]
|
# list will be empty and the forwarded subnets will be determined
|
||||||
subnets_v4 = [i for i in subnets if i[0] == socket.AF_INET]
|
# later by the server.
|
||||||
|
subnets_v4 = [i for i in subnets_include if i[0] == socket.AF_INET]
|
||||||
|
subnets_v6 = [i for i in subnets_include if i[0] == socket.AF_INET6]
|
||||||
nslist_v4 = [i for i in nslist if i[0] == socket.AF_INET]
|
nslist_v4 = [i for i in nslist if i[0] == socket.AF_INET]
|
||||||
|
nslist_v6 = [i for i in nslist if i[0] == socket.AF_INET6]
|
||||||
|
|
||||||
# Check features available
|
# Get available features from the firewall method
|
||||||
avail = fw.method.get_supported_features()
|
avail = fw.method.get_supported_features()
|
||||||
|
|
||||||
|
# A feature is "required" if the user supplies us parameters which
|
||||||
|
# implies that the feature is needed.
|
||||||
required = Features()
|
required = Features()
|
||||||
|
|
||||||
|
# Select the default addresses to bind to / listen to.
|
||||||
|
|
||||||
|
# Assume IPv4 is always available and should always be enabled. If
|
||||||
|
# a method doesn't provide IPv4 support or if we wish to run
|
||||||
|
# ipv6-only, changes to this code are required.
|
||||||
|
assert avail.ipv4
|
||||||
|
required.ipv4 = True
|
||||||
|
|
||||||
|
# listenip_v4 contains user specified value or it is set to "auto".
|
||||||
|
if listenip_v4 == "auto":
|
||||||
|
listenip_v4 = ('127.0.0.1', 0)
|
||||||
|
|
||||||
|
# listenip_v6 is...
|
||||||
|
# None when IPv6 is disabled.
|
||||||
|
# "auto" when listen address is unspecified.
|
||||||
|
# The user specified address if provided by user
|
||||||
|
if listenip_v6 is None:
|
||||||
|
debug1("IPv6 disabled by --disable-ipv6\n")
|
||||||
if listenip_v6 == "auto":
|
if listenip_v6 == "auto":
|
||||||
if avail.ipv6:
|
if avail.ipv6:
|
||||||
|
debug1("IPv6 enabled: Using default IPv6 listen address ::1\n")
|
||||||
listenip_v6 = ('::1', 0)
|
listenip_v6 = ('::1', 0)
|
||||||
else:
|
else:
|
||||||
|
debug1("IPv6 disabled since it isn't supported by method "
|
||||||
|
"%s.\n" % fw.method.name)
|
||||||
listenip_v6 = None
|
listenip_v6 = None
|
||||||
|
|
||||||
|
# Make final decision about enabling IPv6:
|
||||||
|
required.ipv6 = False
|
||||||
|
if listenip_v6:
|
||||||
|
required.ipv6 = True
|
||||||
|
|
||||||
|
# If we get here, it is possible that listenip_v6 was user
|
||||||
|
# specified but not supported by the current method.
|
||||||
|
if required.ipv6 and not avail.ipv6:
|
||||||
|
raise Fatal("An IPv6 listen address was supplied, but IPv6 is "
|
||||||
|
"disabled at your request or is unsupported by the %s "
|
||||||
|
"method." % fw.method.name)
|
||||||
|
|
||||||
if user is not None:
|
if user is not None:
|
||||||
if getpwnam is None:
|
if getpwnam is None:
|
||||||
raise Fatal("Routing by user not available on this system.")
|
raise Fatal("Routing by user not available on this system.")
|
||||||
@ -618,38 +663,66 @@ def main(listenip_v6, listenip_v4,
|
|||||||
user = getpwnam(user).pw_uid
|
user = getpwnam(user).pw_uid
|
||||||
except KeyError:
|
except KeyError:
|
||||||
raise Fatal("User %s does not exist." % user)
|
raise Fatal("User %s does not exist." % user)
|
||||||
|
|
||||||
if fw.method.name != 'nat':
|
|
||||||
required.ipv6 = len(subnets_v6) > 0 or listenip_v6 is not None
|
|
||||||
required.ipv4 = len(subnets_v4) > 0 or listenip_v4 is not None
|
|
||||||
else:
|
|
||||||
required.ipv6 = None
|
|
||||||
required.ipv4 = None
|
|
||||||
|
|
||||||
required.udp = avail.udp
|
|
||||||
required.dns = len(nslist) > 0
|
|
||||||
required.user = False if user is None else True
|
required.user = False if user is None else True
|
||||||
|
|
||||||
# if IPv6 not supported, ignore IPv6 DNS servers
|
if not required.ipv6 and len(subnets_v6) > 0:
|
||||||
if not required.ipv6:
|
print("WARNING: IPv6 subnets were ignored because IPv6 is disabled "
|
||||||
|
"in sshuttle.")
|
||||||
|
subnets_v6 = []
|
||||||
|
subnets_include = subnets_v4
|
||||||
|
|
||||||
|
required.udp = avail.udp # automatically enable UDP if it is available
|
||||||
|
required.dns = len(nslist) > 0
|
||||||
|
|
||||||
|
# Remove DNS servers using IPv6.
|
||||||
|
if required.dns:
|
||||||
|
if not required.ipv6 and len(nslist_v6) > 0:
|
||||||
|
print("WARNING: Your system is configured to use an IPv6 DNS "
|
||||||
|
"server but sshuttle is not using IPv6. Therefore DNS "
|
||||||
|
"traffic your system sends to the IPv6 DNS server won't "
|
||||||
|
"be redirected via sshuttle to the remote machine.")
|
||||||
nslist_v6 = []
|
nslist_v6 = []
|
||||||
nslist = nslist_v4
|
nslist = nslist_v4
|
||||||
|
|
||||||
|
if len(nslist) == 0:
|
||||||
|
raise Fatal("Can't redirect DNS traffic since IPv6 is not "
|
||||||
|
"enabled in sshuttle and all of the system DNS "
|
||||||
|
"servers are IPv6.")
|
||||||
|
|
||||||
|
# If we aren't using IPv6, we can safely ignore excluded IPv6 subnets.
|
||||||
|
if not required.ipv6:
|
||||||
|
orig_len = len(subnets_exclude)
|
||||||
|
subnets_exclude = [i for i in subnets_exclude
|
||||||
|
if i[0] == socket.AF_INET]
|
||||||
|
if len(subnets_exclude) < orig_len:
|
||||||
|
print("WARNING: Ignoring one or more excluded IPv6 subnets "
|
||||||
|
"because IPv6 is not enabled.")
|
||||||
|
|
||||||
|
# This will print error messages if we required a feature that
|
||||||
|
# isn't available by the current method.
|
||||||
fw.method.assert_features(required)
|
fw.method.assert_features(required)
|
||||||
|
|
||||||
if required.ipv6 and listenip_v6 is None:
|
|
||||||
raise Fatal("IPv6 required but not listening.")
|
|
||||||
|
|
||||||
# display features enabled
|
# display features enabled
|
||||||
debug1("IPv6 enabled: %r\n" % required.ipv6)
|
def feature_status(label, enabled, available):
|
||||||
debug1("UDP enabled: %r\n" % required.udp)
|
msg = label + ": "
|
||||||
debug1("DNS enabled: %r\n" % required.dns)
|
if enabled:
|
||||||
debug1("User enabled: %r\n" % required.user)
|
msg += "on"
|
||||||
|
else:
|
||||||
|
msg += "off "
|
||||||
|
if available:
|
||||||
|
msg += "(available)"
|
||||||
|
else:
|
||||||
|
msg += "(not available with %s method)" % fw.method.name
|
||||||
|
debug1(msg + "\n")
|
||||||
|
|
||||||
# bind to required ports
|
debug1("Method: %s\n" % fw.method.name)
|
||||||
if listenip_v4 == "auto":
|
feature_status("IPv4", required.ipv4, avail.ipv4)
|
||||||
listenip_v4 = ('127.0.0.1', 0)
|
feature_status("IPv6", required.ipv6, avail.ipv6)
|
||||||
|
feature_status("UDP ", required.udp, avail.udp)
|
||||||
|
feature_status("DNS ", required.dns, avail.dns)
|
||||||
|
feature_status("User", required.user, avail.user)
|
||||||
|
|
||||||
|
# Exclude traffic destined to our listen addresses.
|
||||||
if required.ipv4 and \
|
if required.ipv4 and \
|
||||||
not any(listenip_v4[0] == sex[1] for sex in subnets_v4):
|
not any(listenip_v4[0] == sex[1] for sex in subnets_v4):
|
||||||
subnets_exclude.append((socket.AF_INET, listenip_v4[0], 32, 0, 0))
|
subnets_exclude.append((socket.AF_INET, listenip_v4[0], 32, 0, 0))
|
||||||
@ -658,6 +731,25 @@ def main(listenip_v6, listenip_v4,
|
|||||||
not any(listenip_v6[0] == sex[1] for sex in subnets_v6):
|
not any(listenip_v6[0] == sex[1] for sex in subnets_v6):
|
||||||
subnets_exclude.append((socket.AF_INET6, listenip_v6[0], 128, 0, 0))
|
subnets_exclude.append((socket.AF_INET6, listenip_v6[0], 128, 0, 0))
|
||||||
|
|
||||||
|
# We don't print the IP+port of where we are listening here
|
||||||
|
# because we do that below when we have identified the ports to
|
||||||
|
# listen on.
|
||||||
|
debug1("Subnets to forward through remote host (type, IP, cidr mask "
|
||||||
|
"width, startPort, endPort):\n")
|
||||||
|
for i in subnets_include:
|
||||||
|
print(" "+str(i))
|
||||||
|
if auto_nets:
|
||||||
|
debug1("NOTE: Additional subnets to forward may be added below by "
|
||||||
|
"--auto-nets.\n")
|
||||||
|
debug1("Subnets to exclude from forwarding:\n")
|
||||||
|
for i in subnets_exclude:
|
||||||
|
print(" "+str(i))
|
||||||
|
if required.dns:
|
||||||
|
debug1("DNS requests normally directed at these servers will be "
|
||||||
|
"redirected to remote:\n")
|
||||||
|
for i in nslist:
|
||||||
|
print(" "+str(i))
|
||||||
|
|
||||||
if listenip_v6 and listenip_v6[1] and listenip_v4 and listenip_v4[1]:
|
if listenip_v6 and listenip_v6[1] and listenip_v4 and listenip_v4[1]:
|
||||||
# if both ports given, no need to search for a spare port
|
# if both ports given, no need to search for a spare port
|
||||||
ports = [0, ]
|
ports = [0, ]
|
||||||
|
@ -47,8 +47,16 @@ def main():
|
|||||||
elif opt.hostwatch:
|
elif opt.hostwatch:
|
||||||
return hostwatch.hw_main(opt.subnets, opt.auto_hosts)
|
return hostwatch.hw_main(opt.subnets, opt.auto_hosts)
|
||||||
else:
|
else:
|
||||||
includes = opt.subnets + opt.subnets_file
|
# parse_subnetports() is used to create a list of includes
|
||||||
excludes = opt.exclude
|
# and excludes. It is called once for each parameter and
|
||||||
|
# returns a list of one or more items for each subnet (it
|
||||||
|
# can return more than one item when a hostname in the
|
||||||
|
# parameter resolves to multiple IP addresses. Here, we
|
||||||
|
# flatten these lists.
|
||||||
|
includes = [item for sublist in opt.subnets+opt.subnets_file
|
||||||
|
for item in sublist]
|
||||||
|
excludes = [item for sublist in opt.exclude for item in sublist]
|
||||||
|
|
||||||
if not includes and not opt.auto_nets:
|
if not includes and not opt.auto_nets:
|
||||||
parser.error('at least one subnet, subnet file, '
|
parser.error('at least one subnet, subnet file, '
|
||||||
'or -N expected')
|
'or -N expected')
|
||||||
|
@ -38,6 +38,7 @@ class BaseMethod(object):
|
|||||||
@staticmethod
|
@staticmethod
|
||||||
def get_supported_features():
|
def get_supported_features():
|
||||||
result = Features()
|
result = Features()
|
||||||
|
result.ipv4 = True
|
||||||
result.ipv6 = False
|
result.ipv6 = False
|
||||||
result.udp = False
|
result.udp = False
|
||||||
result.dns = True
|
result.dns = True
|
||||||
@ -68,7 +69,7 @@ class BaseMethod(object):
|
|||||||
|
|
||||||
def assert_features(self, features):
|
def assert_features(self, features):
|
||||||
avail = self.get_supported_features()
|
avail = self.get_supported_features()
|
||||||
for key in ["udp", "dns", "ipv6", "user"]:
|
for key in ["udp", "dns", "ipv6", "ipv4", "user"]:
|
||||||
if getattr(features, key) and not getattr(avail, key):
|
if getattr(features, key) and not getattr(avail, key):
|
||||||
raise Fatal(
|
raise Fatal(
|
||||||
"Feature %s not supported with method %s.\n" %
|
"Feature %s not supported with method %s.\n" %
|
||||||
|
@ -28,7 +28,14 @@ def parse_subnetport_file(s):
|
|||||||
# 1.2.3.4/5:678, 1.2.3.4:567, 1.2.3.4/16 or just 1.2.3.4
|
# 1.2.3.4/5:678, 1.2.3.4:567, 1.2.3.4/16 or just 1.2.3.4
|
||||||
# [1:2::3/64]:456, [1:2::3]:456, 1:2::3/64 or just 1:2::3
|
# [1:2::3/64]:456, [1:2::3]:456, 1:2::3/64 or just 1:2::3
|
||||||
# example.com:123 or just example.com
|
# example.com:123 or just example.com
|
||||||
|
#
|
||||||
|
# In addition, the port number can be specified as a range:
|
||||||
|
# 1.2.3.4:8000-8080.
|
||||||
|
#
|
||||||
|
# Can return multiple matches if the domain name used in the request
|
||||||
|
# has multiple IP addresses.
|
||||||
def parse_subnetport(s):
|
def parse_subnetport(s):
|
||||||
|
|
||||||
if s.count(':') > 1:
|
if s.count(':') > 1:
|
||||||
rx = r'(?:\[?([\w\:]+)(?:/(\d+))?]?)(?::(\d+)(?:-(\d+))?)?$'
|
rx = r'(?:\[?([\w\:]+)(?:/(\d+))?]?)(?::(\d+)(?:-(\d+))?)?$'
|
||||||
else:
|
else:
|
||||||
@ -38,19 +45,56 @@ def parse_subnetport(s):
|
|||||||
if not m:
|
if not m:
|
||||||
raise Fatal('%r is not a valid address/mask:port format' % s)
|
raise Fatal('%r is not a valid address/mask:port format' % s)
|
||||||
|
|
||||||
addr, width, fport, lport = m.groups()
|
# Ports range from fport to lport. If only one port is specified,
|
||||||
|
# fport is defined and lport is None.
|
||||||
|
#
|
||||||
|
# cidr is the mask defined with the slash notation
|
||||||
|
host, cidr, fport, lport = m.groups()
|
||||||
try:
|
try:
|
||||||
addrinfo = socket.getaddrinfo(addr, 0, 0, socket.SOCK_STREAM)
|
addrinfo = socket.getaddrinfo(host, 0, 0, socket.SOCK_STREAM)
|
||||||
except socket.gaierror:
|
except socket.gaierror:
|
||||||
raise Fatal('Unable to resolve address: %s' % addr)
|
raise Fatal('Unable to resolve address: %s' % host)
|
||||||
|
|
||||||
family, _, _, _, addr = min(addrinfo)
|
# If the address is a domain with multiple IPs and a mask is also
|
||||||
max_width = 32 if family == socket.AF_INET else 128
|
# provided, proceed cautiously:
|
||||||
width = int(width or max_width)
|
if cidr is not None:
|
||||||
if not 0 <= width <= max_width:
|
addr_v6 = [a for a in addrinfo if a[0] == socket.AF_INET6]
|
||||||
raise Fatal('width %d is not between 0 and %d' % (width, max_width))
|
addr_v4 = [a for a in addrinfo if a[0] == socket.AF_INET]
|
||||||
|
|
||||||
return (family, addr[0], width, int(fport or 0), int(lport or fport or 0))
|
# Refuse to proceed if IPv4 and IPv6 addresses are present:
|
||||||
|
if len(addr_v6) > 0 and len(addr_v4) > 0:
|
||||||
|
raise Fatal("%s has IPv4 and IPv6 addresses, so the mask "
|
||||||
|
"of /%s is not supported. Specify the IP "
|
||||||
|
"addresses directly if you wish to specify "
|
||||||
|
"a mask." % (host, cidr))
|
||||||
|
|
||||||
|
# Warn if a domain has multiple IPs of the same type (IPv4 vs
|
||||||
|
# IPv6) and the mask is applied to all of the IPs.
|
||||||
|
if len(addr_v4) > 1 or len(addr_v6) > 1:
|
||||||
|
print("WARNING: %s has multiple IP addresses. The "
|
||||||
|
"mask of /%s is applied to all of the addresses."
|
||||||
|
% (host, cidr))
|
||||||
|
|
||||||
|
rv = []
|
||||||
|
for a in addrinfo:
|
||||||
|
family, _, _, _, addr = a
|
||||||
|
|
||||||
|
# Largest possible slash value we can use with this IP:
|
||||||
|
max_cidr = 32 if family == socket.AF_INET else 128
|
||||||
|
|
||||||
|
if cidr is None: # if no mask, use largest mask
|
||||||
|
cidr_to_use = max_cidr
|
||||||
|
else: # verify user-provided mask is appropriate
|
||||||
|
cidr_to_use = int(cidr)
|
||||||
|
if not 0 <= cidr_to_use <= max_cidr:
|
||||||
|
raise Fatal('Slash in CIDR notation (/%d) is '
|
||||||
|
'not between 0 and %d'
|
||||||
|
% (cidr_to_use, max_cidr))
|
||||||
|
|
||||||
|
rv.append((family, addr[0], cidr_to_use,
|
||||||
|
int(fport or 0), int(lport or fport or 0)))
|
||||||
|
|
||||||
|
return rv
|
||||||
|
|
||||||
|
|
||||||
# 1.2.3.4:567 or just 1.2.3.4 or just 567
|
# 1.2.3.4:567 or just 1.2.3.4 or just 567
|
||||||
@ -69,16 +113,21 @@ def parse_ipport(s):
|
|||||||
if not m:
|
if not m:
|
||||||
raise Fatal('%r is not a valid IP:port format' % s)
|
raise Fatal('%r is not a valid IP:port format' % s)
|
||||||
|
|
||||||
ip, port = m.groups()
|
host, port = m.groups()
|
||||||
ip = ip or '0.0.0.0'
|
host = host or '0.0.0.0'
|
||||||
port = int(port or 0)
|
port = int(port or 0)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
addrinfo = socket.getaddrinfo(ip, port, 0, socket.SOCK_STREAM)
|
addrinfo = socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM)
|
||||||
except socket.gaierror:
|
except socket.gaierror:
|
||||||
raise Fatal('%r is not a valid IP:port format' % s)
|
raise Fatal('Unable to resolve address: %s' % host)
|
||||||
|
|
||||||
|
if len(addrinfo) > 1:
|
||||||
|
print("WARNING: Host %s has more than one IP, only using one of them."
|
||||||
|
% host)
|
||||||
|
|
||||||
family, _, _, _, addr = min(addrinfo)
|
family, _, _, _, addr = min(addrinfo)
|
||||||
|
# Note: addr contains (ip, port)
|
||||||
return (family,) + addr[:2]
|
return (family,) + addr[:2]
|
||||||
|
|
||||||
|
|
||||||
|
@ -13,7 +13,6 @@ _ip4_reprs = {
|
|||||||
'3098282570': '184.172.10.74',
|
'3098282570': '184.172.10.74',
|
||||||
'0xb8.0xac.0x0a.0x4a': '184.172.10.74',
|
'0xb8.0xac.0x0a.0x4a': '184.172.10.74',
|
||||||
'0270.0254.0012.0112': '184.172.10.74',
|
'0270.0254.0012.0112': '184.172.10.74',
|
||||||
'localhost': '127.0.0.1'
|
|
||||||
}
|
}
|
||||||
|
|
||||||
_ip4_swidths = (1, 8, 22, 27, 32)
|
_ip4_swidths = (1, 8, 22, 27, 32)
|
||||||
@ -31,7 +30,7 @@ _ip6_swidths = (48, 64, 96, 115, 128)
|
|||||||
def test_parse_subnetport_ip4():
|
def test_parse_subnetport_ip4():
|
||||||
for ip_repr, ip in _ip4_reprs.items():
|
for ip_repr, ip in _ip4_reprs.items():
|
||||||
assert sshuttle.options.parse_subnetport(ip_repr) \
|
assert sshuttle.options.parse_subnetport(ip_repr) \
|
||||||
== (socket.AF_INET, ip, 32, 0, 0)
|
== [(socket.AF_INET, ip, 32, 0, 0)]
|
||||||
with pytest.raises(Fatal) as excinfo:
|
with pytest.raises(Fatal) as excinfo:
|
||||||
sshuttle.options.parse_subnetport('10.256.0.0')
|
sshuttle.options.parse_subnetport('10.256.0.0')
|
||||||
assert str(excinfo.value) == 'Unable to resolve address: 10.256.0.0'
|
assert str(excinfo.value) == 'Unable to resolve address: 10.256.0.0'
|
||||||
@ -42,34 +41,35 @@ def test_parse_subnetport_ip4_with_mask():
|
|||||||
for swidth in _ip4_swidths:
|
for swidth in _ip4_swidths:
|
||||||
assert sshuttle.options.parse_subnetport(
|
assert sshuttle.options.parse_subnetport(
|
||||||
'/'.join((ip_repr, str(swidth)))
|
'/'.join((ip_repr, str(swidth)))
|
||||||
) == (socket.AF_INET, ip, swidth, 0, 0)
|
) == [(socket.AF_INET, ip, swidth, 0, 0)]
|
||||||
assert sshuttle.options.parse_subnetport('0/0') \
|
assert sshuttle.options.parse_subnetport('0/0') \
|
||||||
== (socket.AF_INET, '0.0.0.0', 0, 0, 0)
|
== [(socket.AF_INET, '0.0.0.0', 0, 0, 0)]
|
||||||
with pytest.raises(Fatal) as excinfo:
|
with pytest.raises(Fatal) as excinfo:
|
||||||
sshuttle.options.parse_subnetport('10.0.0.0/33')
|
sshuttle.options.parse_subnetport('10.0.0.0/33')
|
||||||
assert str(excinfo.value) == 'width 33 is not between 0 and 32'
|
assert str(excinfo.value) \
|
||||||
|
== 'Slash in CIDR notation (/33) is not between 0 and 32'
|
||||||
|
|
||||||
|
|
||||||
def test_parse_subnetport_ip4_with_port():
|
def test_parse_subnetport_ip4_with_port():
|
||||||
for ip_repr, ip in _ip4_reprs.items():
|
for ip_repr, ip in _ip4_reprs.items():
|
||||||
assert sshuttle.options.parse_subnetport(':'.join((ip_repr, '80'))) \
|
assert sshuttle.options.parse_subnetport(':'.join((ip_repr, '80'))) \
|
||||||
== (socket.AF_INET, ip, 32, 80, 80)
|
== [(socket.AF_INET, ip, 32, 80, 80)]
|
||||||
assert sshuttle.options.parse_subnetport(':'.join((ip_repr, '80-90')))\
|
assert sshuttle.options.parse_subnetport(':'.join((ip_repr, '80-90')))\
|
||||||
== (socket.AF_INET, ip, 32, 80, 90)
|
== [(socket.AF_INET, ip, 32, 80, 90)]
|
||||||
|
|
||||||
|
|
||||||
def test_parse_subnetport_ip4_with_mask_and_port():
|
def test_parse_subnetport_ip4_with_mask_and_port():
|
||||||
for ip_repr, ip in _ip4_reprs.items():
|
for ip_repr, ip in _ip4_reprs.items():
|
||||||
assert sshuttle.options.parse_subnetport(ip_repr + '/32:80') \
|
assert sshuttle.options.parse_subnetport(ip_repr + '/32:80') \
|
||||||
== (socket.AF_INET, ip, 32, 80, 80)
|
== [(socket.AF_INET, ip, 32, 80, 80)]
|
||||||
assert sshuttle.options.parse_subnetport(ip_repr + '/16:80-90') \
|
assert sshuttle.options.parse_subnetport(ip_repr + '/16:80-90') \
|
||||||
== (socket.AF_INET, ip, 16, 80, 90)
|
== [(socket.AF_INET, ip, 16, 80, 90)]
|
||||||
|
|
||||||
|
|
||||||
def test_parse_subnetport_ip6():
|
def test_parse_subnetport_ip6():
|
||||||
for ip_repr, ip in _ip6_reprs.items():
|
for ip_repr, ip in _ip6_reprs.items():
|
||||||
assert sshuttle.options.parse_subnetport(ip_repr) \
|
assert sshuttle.options.parse_subnetport(ip_repr) \
|
||||||
== (socket.AF_INET6, ip, 128, 0, 0)
|
== [(socket.AF_INET6, ip, 128, 0, 0)]
|
||||||
|
|
||||||
|
|
||||||
def test_parse_subnetport_ip6_with_mask():
|
def test_parse_subnetport_ip6_with_mask():
|
||||||
@ -77,25 +77,26 @@ def test_parse_subnetport_ip6_with_mask():
|
|||||||
for swidth in _ip4_swidths + _ip6_swidths:
|
for swidth in _ip4_swidths + _ip6_swidths:
|
||||||
assert sshuttle.options.parse_subnetport(
|
assert sshuttle.options.parse_subnetport(
|
||||||
'/'.join((ip_repr, str(swidth)))
|
'/'.join((ip_repr, str(swidth)))
|
||||||
) == (socket.AF_INET6, ip, swidth, 0, 0)
|
) == [(socket.AF_INET6, ip, swidth, 0, 0)]
|
||||||
assert sshuttle.options.parse_subnetport('::/0') \
|
assert sshuttle.options.parse_subnetport('::/0') \
|
||||||
== (socket.AF_INET6, '::', 0, 0, 0)
|
== [(socket.AF_INET6, '::', 0, 0, 0)]
|
||||||
with pytest.raises(Fatal) as excinfo:
|
with pytest.raises(Fatal) as excinfo:
|
||||||
sshuttle.options.parse_subnetport('fc00::/129')
|
sshuttle.options.parse_subnetport('fc00::/129')
|
||||||
assert str(excinfo.value) == 'width 129 is not between 0 and 128'
|
assert str(excinfo.value) \
|
||||||
|
== 'Slash in CIDR notation (/129) is not between 0 and 128'
|
||||||
|
|
||||||
|
|
||||||
def test_parse_subnetport_ip6_with_port():
|
def test_parse_subnetport_ip6_with_port():
|
||||||
for ip_repr, ip in _ip6_reprs.items():
|
for ip_repr, ip in _ip6_reprs.items():
|
||||||
assert sshuttle.options.parse_subnetport('[' + ip_repr + ']:80') \
|
assert sshuttle.options.parse_subnetport('[' + ip_repr + ']:80') \
|
||||||
== (socket.AF_INET6, ip, 128, 80, 80)
|
== [(socket.AF_INET6, ip, 128, 80, 80)]
|
||||||
assert sshuttle.options.parse_subnetport('[' + ip_repr + ']:80-90') \
|
assert sshuttle.options.parse_subnetport('[' + ip_repr + ']:80-90') \
|
||||||
== (socket.AF_INET6, ip, 128, 80, 90)
|
== [(socket.AF_INET6, ip, 128, 80, 90)]
|
||||||
|
|
||||||
|
|
||||||
def test_parse_subnetport_ip6_with_mask_and_port():
|
def test_parse_subnetport_ip6_with_mask_and_port():
|
||||||
for ip_repr, ip in _ip6_reprs.items():
|
for ip_repr, ip in _ip6_reprs.items():
|
||||||
assert sshuttle.options.parse_subnetport('[' + ip_repr + '/128]:80') \
|
assert sshuttle.options.parse_subnetport('[' + ip_repr + '/128]:80') \
|
||||||
== (socket.AF_INET6, ip, 128, 80, 80)
|
== [(socket.AF_INET6, ip, 128, 80, 80)]
|
||||||
assert sshuttle.options.parse_subnetport('[' + ip_repr + '/16]:80-90')\
|
assert sshuttle.options.parse_subnetport('[' + ip_repr + '/16]:80-90')\
|
||||||
== (socket.AF_INET6, ip, 16, 80, 90)
|
== [(socket.AF_INET6, ip, 16, 80, 90)]
|
||||||
|
Loading…
Reference in New Issue
Block a user