mirror of
https://github.com/sshuttle/sshuttle.git
synced 2025-01-17 11:29:24 +01:00
bc065e368d
Previously, it was possible to run sshuttle locally without using ssh and connecting to a remote server. In this configuration, traffic was redirected to the sshuttle server running on the localhost. However, the firewall needed to distinguish between traffic leaving the sshuttle server and traffic that originated from the machine that still needed to be routed through the sshuttle server. The TTL of the packets leaving the sshuttle server were manipulated to indicate to the firewall what should happen. The TTL was adjusted for all packets leaving the sshuttle server (even if it wasn't necessary because the server and client were running on different machines). Changing the TTL caused trouble and some machines, and the --ttl option was added as a workaround to change how the TTL was set for traffic leaving sshuttle. All of this added complexity to the code for a feature (running the server on localhost) that is likely only used for testing and rarely used by others. This commit updates the associated documentation, but doesn't fully fix the ipfw method since I am unable to test that. This change will also make sshuttle fail to work if -r is used to specify a localhost. Pull request #610 partially addresses that issue. For example, see: #240, #490, #660, #606.
213 lines
7.4 KiB
Python
213 lines
7.4 KiB
Python
import socket
|
|
from socket import AF_INET, AF_INET6
|
|
import struct
|
|
|
|
import pytest
|
|
from unittest.mock import Mock, patch, call
|
|
from sshuttle.helpers import Fatal
|
|
from sshuttle.methods import get_method
|
|
|
|
|
|
def test_get_supported_features():
|
|
method = get_method('nat')
|
|
features = method.get_supported_features()
|
|
assert features.ipv6
|
|
assert not features.udp
|
|
assert features.dns
|
|
|
|
|
|
def test_get_tcp_dstip():
|
|
sock = Mock()
|
|
sock.family = AF_INET
|
|
sock.getsockopt.return_value = struct.pack(
|
|
'!HHBBBB', socket.ntohs(AF_INET), 1024, 127, 0, 0, 1)
|
|
method = get_method('nat')
|
|
assert method.get_tcp_dstip(sock) == ('127.0.0.1', 1024)
|
|
assert sock.mock_calls == [call.getsockopt(0, 80, 16)]
|
|
|
|
sock = Mock()
|
|
sock.family = AF_INET6
|
|
sock.getsockopt.return_value = struct.pack(
|
|
'!HH4xBBBBBBBBBBBBBBBB', socket.ntohs(AF_INET6),
|
|
1024, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1)
|
|
method = get_method('nft')
|
|
assert method.get_tcp_dstip(sock) == ('::1', 1024)
|
|
assert sock.mock_calls == [call.getsockopt(41, 80, 64)]
|
|
|
|
|
|
def test_recv_udp():
|
|
sock = Mock()
|
|
sock.recvfrom.return_value = "11111", "127.0.0.1"
|
|
method = get_method('nat')
|
|
result = method.recv_udp(sock, 1024)
|
|
assert sock.mock_calls == [call.recvfrom(1024)]
|
|
assert result == ("127.0.0.1", None, "11111")
|
|
|
|
|
|
def test_send_udp():
|
|
sock = Mock()
|
|
method = get_method('nat')
|
|
method.send_udp(sock, None, "127.0.0.1", "22222")
|
|
assert sock.mock_calls == [call.sendto("22222", "127.0.0.1")]
|
|
|
|
|
|
def test_setup_tcp_listener():
|
|
listener = Mock()
|
|
method = get_method('nat')
|
|
method.setup_tcp_listener(listener)
|
|
assert listener.mock_calls == []
|
|
|
|
|
|
def test_setup_udp_listener():
|
|
listener = Mock()
|
|
method = get_method('nat')
|
|
method.setup_udp_listener(listener)
|
|
assert listener.mock_calls == []
|
|
|
|
|
|
def test_assert_features():
|
|
method = get_method('nat')
|
|
features = method.get_supported_features()
|
|
method.assert_features(features)
|
|
|
|
features.udp = True
|
|
with pytest.raises(Fatal):
|
|
method.assert_features(features)
|
|
|
|
features.ipv6 = True
|
|
with pytest.raises(Fatal):
|
|
method.assert_features(features)
|
|
|
|
|
|
def test_firewall_command():
|
|
method = get_method('nat')
|
|
assert not method.firewall_command("somthing")
|
|
|
|
|
|
@patch('sshuttle.methods.nat.ipt')
|
|
@patch('sshuttle.methods.nat.ipt_chain_exists')
|
|
def test_setup_firewall(mock_ipt_chain_exists, mock_ipt):
|
|
mock_ipt_chain_exists.return_value = True
|
|
method = get_method('nat')
|
|
assert method.name == 'nat'
|
|
|
|
assert mock_ipt_chain_exists.mock_calls == []
|
|
assert mock_ipt.mock_calls == []
|
|
method.setup_firewall(
|
|
1024, 1026,
|
|
[(AF_INET6, u'2404:6800:4004:80c::33')],
|
|
AF_INET6,
|
|
[(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0),
|
|
(AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)],
|
|
False,
|
|
None,
|
|
'0x01')
|
|
|
|
assert mock_ipt_chain_exists.mock_calls == [
|
|
call(AF_INET6, 'nat', 'sshuttle-1024')
|
|
]
|
|
assert mock_ipt.mock_calls == [
|
|
call(AF_INET6, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-F', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-X', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-N', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-F', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-I', 'OUTPUT', '1', '-j', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-I', 'PREROUTING', '1', '-j', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'REDIRECT',
|
|
'--dest', u'2404:6800:4004:80c::33', '-p', 'udp',
|
|
'--dport', '53', '--to-ports', '1026'),
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'RETURN',
|
|
'-m', 'addrtype', '--dst-type', 'LOCAL'),
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'RETURN',
|
|
'--dest', u'2404:6800:4004:80c::101f/128', '-p', 'tcp',
|
|
'--dport', '80:80'),
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'REDIRECT',
|
|
'--dest', u'2404:6800:4004:80c::/64', '-p', 'tcp',
|
|
'--to-ports', '1024')
|
|
]
|
|
mock_ipt_chain_exists.reset_mock()
|
|
mock_ipt.reset_mock()
|
|
|
|
assert mock_ipt_chain_exists.mock_calls == []
|
|
assert mock_ipt.mock_calls == []
|
|
|
|
with pytest.raises(Exception) as excinfo:
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
|
|
(AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
|
|
True,
|
|
None,
|
|
'0x01')
|
|
assert str(excinfo.value) == 'UDP not supported by nat method_name'
|
|
assert mock_ipt_chain_exists.mock_calls == []
|
|
assert mock_ipt.mock_calls == []
|
|
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
|
|
(AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
|
|
False,
|
|
None,
|
|
'0x01')
|
|
assert mock_ipt_chain_exists.mock_calls == [
|
|
call(AF_INET, 'nat', 'sshuttle-1025')
|
|
]
|
|
assert mock_ipt.mock_calls == [
|
|
call(AF_INET, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-X', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-N', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-I', 'OUTPUT', '1', '-j', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-I', 'PREROUTING', '1', '-j', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT',
|
|
'--dest', u'1.2.3.33', '-p', 'udp',
|
|
'--dport', '53', '--to-ports', '1027'),
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'RETURN',
|
|
'-m', 'addrtype', '--dst-type', 'LOCAL'),
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'RETURN',
|
|
'--dest', u'1.2.3.66/32', '-p', 'tcp', '--dport', '8080:8080'),
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT',
|
|
'--dest', u'1.2.3.0/24', '-p', 'tcp', '--dport', '8000:9000',
|
|
'--to-ports', '1025')
|
|
]
|
|
mock_ipt_chain_exists.reset_mock()
|
|
mock_ipt.reset_mock()
|
|
|
|
method.restore_firewall(1025, AF_INET, False, None)
|
|
assert mock_ipt_chain_exists.mock_calls == [
|
|
call(AF_INET, 'nat', 'sshuttle-1025')
|
|
]
|
|
assert mock_ipt.mock_calls == [
|
|
call(AF_INET, 'nat', '-D', 'OUTPUT', '-j',
|
|
'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-D', 'PREROUTING', '-j',
|
|
'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-X', 'sshuttle-1025')
|
|
]
|
|
mock_ipt_chain_exists.reset_mock()
|
|
mock_ipt.reset_mock()
|
|
|
|
method.restore_firewall(1025, AF_INET6, False, None)
|
|
assert mock_ipt_chain_exists.mock_calls == [
|
|
call(AF_INET6, 'nat', 'sshuttle-1025')
|
|
]
|
|
assert mock_ipt.mock_calls == [
|
|
call(AF_INET6, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'),
|
|
call(AF_INET6, 'nat', '-D', 'PREROUTING', '-j',
|
|
'sshuttle-1025'),
|
|
call(AF_INET6, 'nat', '-F', 'sshuttle-1025'),
|
|
call(AF_INET6, 'nat', '-X', 'sshuttle-1025')
|
|
]
|
|
mock_ipt_chain_exists.reset_mock()
|
|
mock_ipt.reset_mock()
|