sshuttle/tests/client/test_firewall.py
Victor Kareh 167a57e739 firewall: Allow overriding the TTL
In instances where a cluster pod in a local VM needs to access a server
that is sshuttle'd from the host, since the packets arriving at the host
already made a hop, their TTL is 63 and so get ignored by sshuttle.
Allowing an override of the firewall TTL rule allows the packets to go
through.
2021-03-05 08:53:53 +11:00

142 lines
4.3 KiB
Python

import io
from socket import AF_INET, AF_INET6
from unittest.mock import Mock, patch, call
import sshuttle.firewall
def setup_daemon():
stdin = io.StringIO(u"""ROUTES
{inet},24,0,1.2.3.0,8000,9000
{inet},32,1,1.2.3.66,8080,8080
{inet6},64,0,2404:6800:4004:80c::,0,0
{inet6},128,1,2404:6800:4004:80c::101f,80,80
NSLIST
{inet},1.2.3.33
{inet6},2404:6800:4004:80c::33
PORTS 1024,1025,1026,1027
GO 1 -
HOST 1.2.3.3,existing
""".format(inet=AF_INET, inet6=AF_INET6))
stdout = Mock()
return stdin, stdout
def test_rewrite_etc_hosts(tmpdir):
orig_hosts = tmpdir.join("hosts.orig")
orig_hosts.write("1.2.3.3 existing\n")
new_hosts = tmpdir.join("hosts")
orig_hosts.copy(new_hosts)
hostmap = {
'myhost': '1.2.3.4',
'myotherhost': '1.2.3.5',
}
with patch('sshuttle.firewall.HOSTSFILE', new=str(new_hosts)):
sshuttle.firewall.rewrite_etc_hosts(hostmap, 10)
with new_hosts.open() as f:
line = f.readline()
s = line.split()
assert s == ['1.2.3.3', 'existing']
line = f.readline()
s = line.split()
assert s == ['1.2.3.4', 'myhost',
'#', 'sshuttle-firewall-10', 'AUTOCREATED']
line = f.readline()
s = line.split()
assert s == ['1.2.3.5', 'myotherhost',
'#', 'sshuttle-firewall-10', 'AUTOCREATED']
line = f.readline()
assert line == ""
with patch('sshuttle.firewall.HOSTSFILE', new=str(new_hosts)):
sshuttle.firewall.restore_etc_hosts(hostmap, 10)
assert orig_hosts.computehash() == new_hosts.computehash()
def test_subnet_weight():
subnets = [
(AF_INET, 16, 0, '192.168.0.0', 0, 0),
(AF_INET, 24, 0, '192.168.69.0', 0, 0),
(AF_INET, 32, 0, '192.168.69.70', 0, 0),
(AF_INET, 32, 1, '192.168.69.70', 0, 0),
(AF_INET, 32, 1, '192.168.69.70', 80, 80),
(AF_INET, 0, 1, '0.0.0.0', 0, 0),
(AF_INET, 0, 1, '0.0.0.0', 8000, 9000),
(AF_INET, 0, 1, '0.0.0.0', 8000, 8500),
(AF_INET, 0, 1, '0.0.0.0', 8000, 8000),
(AF_INET, 0, 1, '0.0.0.0', 400, 450)
]
subnets_sorted = [
(AF_INET, 32, 1, '192.168.69.70', 80, 80),
(AF_INET, 0, 1, '0.0.0.0', 8000, 8000),
(AF_INET, 0, 1, '0.0.0.0', 400, 450),
(AF_INET, 0, 1, '0.0.0.0', 8000, 8500),
(AF_INET, 0, 1, '0.0.0.0', 8000, 9000),
(AF_INET, 32, 1, '192.168.69.70', 0, 0),
(AF_INET, 32, 0, '192.168.69.70', 0, 0),
(AF_INET, 24, 0, '192.168.69.0', 0, 0),
(AF_INET, 16, 0, '192.168.0.0', 0, 0),
(AF_INET, 0, 1, '0.0.0.0', 0, 0)
]
assert subnets_sorted == sorted(subnets,
key=sshuttle.firewall.subnet_weight,
reverse=True)
@patch('sshuttle.firewall.rewrite_etc_hosts')
@patch('sshuttle.firewall.setup_daemon')
@patch('sshuttle.firewall.get_method')
def test_main(mock_get_method, mock_setup_daemon, mock_rewrite_etc_hosts):
stdin, stdout = setup_daemon()
mock_setup_daemon.return_value = stdin, stdout
mock_get_method("not_auto").name = "test"
mock_get_method.reset_mock()
sshuttle.firewall.main("not_auto", False, 63)
assert mock_rewrite_etc_hosts.mock_calls == [
call({'1.2.3.3': 'existing'}, 1024),
call({}, 1024),
]
assert stdout.mock_calls == [
call.write('READY test\n'),
call.flush(),
call.write('STARTED\n'),
call.flush()
]
assert mock_setup_daemon.mock_calls == [call()]
assert mock_get_method.mock_calls == [
call('not_auto'),
call().is_supported(),
call().is_supported().__bool__(),
call().setup_firewall(
1024, 1026,
[(AF_INET6, u'2404:6800:4004:80c::33')],
AF_INET6,
[(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0),
(AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)],
True,
None,
63),
call().setup_firewall(
1025, 1027,
[(AF_INET, u'1.2.3.33')],
AF_INET,
[(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
(AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
True,
None,
63),
call().restore_firewall(1024, AF_INET6, True, None),
call().restore_firewall(1025, AF_INET, True, None),
]