mirror of
https://github.com/sshuttle/sshuttle.git
synced 2024-11-22 16:03:57 +01:00
6d4261e3f9
Add an "is_supported()" function to the different methods so that each method can include whatever logic they wish to indicate if they are supported on a particular machine. Previously, methods/__init__.py contained all of the logic for selecting individual methods. Now, it iterates through a list of possible options and stops on the first method that it finds that is_supported(). Currently, the decision is made based on the presence of programs in the PATH. In the future, things such as the platform sshuttle is running on could be considered.
140 lines
4.3 KiB
Python
140 lines
4.3 KiB
Python
import io
|
|
from socket import AF_INET, AF_INET6
|
|
|
|
from mock import Mock, patch, call
|
|
import sshuttle.firewall
|
|
|
|
|
|
def setup_daemon():
|
|
stdin = io.StringIO(u"""ROUTES
|
|
{inet},24,0,1.2.3.0,8000,9000
|
|
{inet},32,1,1.2.3.66,8080,8080
|
|
{inet6},64,0,2404:6800:4004:80c::,0,0
|
|
{inet6},128,1,2404:6800:4004:80c::101f,80,80
|
|
NSLIST
|
|
{inet},1.2.3.33
|
|
{inet6},2404:6800:4004:80c::33
|
|
PORTS 1024,1025,1026,1027
|
|
GO 1 -
|
|
HOST 1.2.3.3,existing
|
|
""".format(inet=AF_INET, inet6=AF_INET6))
|
|
stdout = Mock()
|
|
return stdin, stdout
|
|
|
|
|
|
def test_rewrite_etc_hosts(tmpdir):
|
|
orig_hosts = tmpdir.join("hosts.orig")
|
|
orig_hosts.write("1.2.3.3 existing\n")
|
|
|
|
new_hosts = tmpdir.join("hosts")
|
|
orig_hosts.copy(new_hosts)
|
|
|
|
hostmap = {
|
|
'myhost': '1.2.3.4',
|
|
'myotherhost': '1.2.3.5',
|
|
}
|
|
with patch('sshuttle.firewall.HOSTSFILE', new=str(new_hosts)):
|
|
sshuttle.firewall.rewrite_etc_hosts(hostmap, 10)
|
|
|
|
with new_hosts.open() as f:
|
|
line = f.readline()
|
|
s = line.split()
|
|
assert s == ['1.2.3.3', 'existing']
|
|
|
|
line = f.readline()
|
|
s = line.split()
|
|
assert s == ['1.2.3.4', 'myhost',
|
|
'#', 'sshuttle-firewall-10', 'AUTOCREATED']
|
|
|
|
line = f.readline()
|
|
s = line.split()
|
|
assert s == ['1.2.3.5', 'myotherhost',
|
|
'#', 'sshuttle-firewall-10', 'AUTOCREATED']
|
|
|
|
line = f.readline()
|
|
assert line == ""
|
|
|
|
with patch('sshuttle.firewall.HOSTSFILE', new=str(new_hosts)):
|
|
sshuttle.firewall.restore_etc_hosts(hostmap, 10)
|
|
assert orig_hosts.computehash() == new_hosts.computehash()
|
|
|
|
|
|
def test_subnet_weight():
|
|
subnets = [
|
|
(AF_INET, 16, 0, '192.168.0.0', 0, 0),
|
|
(AF_INET, 24, 0, '192.168.69.0', 0, 0),
|
|
(AF_INET, 32, 0, '192.168.69.70', 0, 0),
|
|
(AF_INET, 32, 1, '192.168.69.70', 0, 0),
|
|
(AF_INET, 32, 1, '192.168.69.70', 80, 80),
|
|
(AF_INET, 0, 1, '0.0.0.0', 0, 0),
|
|
(AF_INET, 0, 1, '0.0.0.0', 8000, 9000),
|
|
(AF_INET, 0, 1, '0.0.0.0', 8000, 8500),
|
|
(AF_INET, 0, 1, '0.0.0.0', 8000, 8000),
|
|
(AF_INET, 0, 1, '0.0.0.0', 400, 450)
|
|
]
|
|
subnets_sorted = [
|
|
(AF_INET, 32, 1, '192.168.69.70', 80, 80),
|
|
(AF_INET, 0, 1, '0.0.0.0', 8000, 8000),
|
|
(AF_INET, 0, 1, '0.0.0.0', 400, 450),
|
|
(AF_INET, 0, 1, '0.0.0.0', 8000, 8500),
|
|
(AF_INET, 0, 1, '0.0.0.0', 8000, 9000),
|
|
(AF_INET, 32, 1, '192.168.69.70', 0, 0),
|
|
(AF_INET, 32, 0, '192.168.69.70', 0, 0),
|
|
(AF_INET, 24, 0, '192.168.69.0', 0, 0),
|
|
(AF_INET, 16, 0, '192.168.0.0', 0, 0),
|
|
(AF_INET, 0, 1, '0.0.0.0', 0, 0)
|
|
]
|
|
|
|
assert subnets_sorted == sorted(subnets,
|
|
key=sshuttle.firewall.subnet_weight,
|
|
reverse=True)
|
|
|
|
|
|
@patch('sshuttle.firewall.rewrite_etc_hosts')
|
|
@patch('sshuttle.firewall.setup_daemon')
|
|
@patch('sshuttle.firewall.get_method')
|
|
def test_main(mock_get_method, mock_setup_daemon, mock_rewrite_etc_hosts):
|
|
stdin, stdout = setup_daemon()
|
|
mock_setup_daemon.return_value = stdin, stdout
|
|
|
|
mock_get_method("not_auto").name = "test"
|
|
mock_get_method.reset_mock()
|
|
|
|
sshuttle.firewall.main("not_auto", False)
|
|
|
|
assert mock_rewrite_etc_hosts.mock_calls == [
|
|
call({'1.2.3.3': 'existing'}, 1024),
|
|
call({}, 1024),
|
|
]
|
|
|
|
assert stdout.mock_calls == [
|
|
call.write('READY test\n'),
|
|
call.flush(),
|
|
call.write('STARTED\n'),
|
|
call.flush()
|
|
]
|
|
assert mock_setup_daemon.mock_calls == [call()]
|
|
assert mock_get_method.mock_calls == [
|
|
call('not_auto'),
|
|
call().is_supported(),
|
|
call().is_supported().__bool__(),
|
|
call().setup_firewall(
|
|
1024, 1026,
|
|
[(AF_INET6, u'2404:6800:4004:80c::33')],
|
|
AF_INET6,
|
|
[(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0),
|
|
(AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)],
|
|
True,
|
|
None),
|
|
call().setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
|
|
(AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
|
|
True,
|
|
None),
|
|
call().restore_firewall(1024, AF_INET6, True, None),
|
|
call().restore_firewall(1025, AF_INET, True, None),
|
|
]
|