mirror of
https://github.com/sshuttle/sshuttle.git
synced 2024-11-22 16:03:57 +01:00
c026a92cad
Adding IPv6 support to the nat method is straightforward after the previous work to add IPv6 support for nft.
231 lines
8.1 KiB
Python
231 lines
8.1 KiB
Python
import socket
|
|
from socket import AF_INET, AF_INET6
|
|
import struct
|
|
|
|
import pytest
|
|
from unittest.mock import Mock, patch, call
|
|
from sshuttle.helpers import Fatal
|
|
from sshuttle.methods import get_method
|
|
|
|
|
|
def test_get_supported_features():
|
|
method = get_method('nat')
|
|
features = method.get_supported_features()
|
|
assert features.ipv6
|
|
assert not features.udp
|
|
assert features.dns
|
|
|
|
|
|
def test_get_tcp_dstip():
|
|
sock = Mock()
|
|
sock.family = AF_INET
|
|
sock.getsockopt.return_value = struct.pack(
|
|
'!HHBBBB', socket.ntohs(AF_INET), 1024, 127, 0, 0, 1)
|
|
method = get_method('nat')
|
|
assert method.get_tcp_dstip(sock) == ('127.0.0.1', 1024)
|
|
assert sock.mock_calls == [call.getsockopt(0, 80, 16)]
|
|
|
|
sock = Mock()
|
|
sock.family = AF_INET6
|
|
sock.getsockopt.return_value = struct.pack(
|
|
'!HH4xBBBBBBBBBBBBBBBB', socket.ntohs(AF_INET6),
|
|
1024, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1)
|
|
method = get_method('nft')
|
|
assert method.get_tcp_dstip(sock) == ('::1', 1024)
|
|
assert sock.mock_calls == [call.getsockopt(41, 80, 64)]
|
|
|
|
|
|
def test_recv_udp():
|
|
sock = Mock()
|
|
sock.recvfrom.return_value = "11111", "127.0.0.1"
|
|
method = get_method('nat')
|
|
result = method.recv_udp(sock, 1024)
|
|
assert sock.mock_calls == [call.recvfrom(1024)]
|
|
assert result == ("127.0.0.1", None, "11111")
|
|
|
|
|
|
def test_send_udp():
|
|
sock = Mock()
|
|
method = get_method('nat')
|
|
method.send_udp(sock, None, "127.0.0.1", "22222")
|
|
assert sock.mock_calls == [call.sendto("22222", "127.0.0.1")]
|
|
|
|
|
|
def test_setup_tcp_listener():
|
|
listener = Mock()
|
|
method = get_method('nat')
|
|
method.setup_tcp_listener(listener)
|
|
assert listener.mock_calls == []
|
|
|
|
|
|
def test_setup_udp_listener():
|
|
listener = Mock()
|
|
method = get_method('nat')
|
|
method.setup_udp_listener(listener)
|
|
assert listener.mock_calls == []
|
|
|
|
|
|
def test_assert_features():
|
|
method = get_method('nat')
|
|
features = method.get_supported_features()
|
|
method.assert_features(features)
|
|
|
|
features.udp = True
|
|
with pytest.raises(Fatal):
|
|
method.assert_features(features)
|
|
|
|
features.ipv6 = True
|
|
with pytest.raises(Fatal):
|
|
method.assert_features(features)
|
|
|
|
|
|
def test_firewall_command():
|
|
method = get_method('nat')
|
|
assert not method.firewall_command("somthing")
|
|
|
|
|
|
@patch('sshuttle.methods.nat.ipt')
|
|
@patch('sshuttle.methods.nat.ipt_ttl')
|
|
@patch('sshuttle.methods.nat.ipt_chain_exists')
|
|
def test_setup_firewall(mock_ipt_chain_exists, mock_ipt_ttl, mock_ipt):
|
|
mock_ipt_chain_exists.return_value = True
|
|
method = get_method('nat')
|
|
assert method.name == 'nat'
|
|
|
|
assert mock_ipt_chain_exists.mock_calls == []
|
|
assert mock_ipt_ttl.mock_calls == []
|
|
assert mock_ipt.mock_calls == []
|
|
method.setup_firewall(
|
|
1024, 1026,
|
|
[(AF_INET6, u'2404:6800:4004:80c::33')],
|
|
AF_INET6,
|
|
[(AF_INET6, 64, False, u'2404:6800:4004:80c::', 0, 0),
|
|
(AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 80, 80)],
|
|
False,
|
|
None,
|
|
63)
|
|
|
|
assert mock_ipt_chain_exists.mock_calls == [
|
|
call(AF_INET6, 'nat', 'sshuttle-1024')
|
|
]
|
|
assert mock_ipt_ttl.mock_calls == [
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'RETURN',
|
|
'-m', 'hl', '--hl-eq', '63')
|
|
]
|
|
assert mock_ipt.mock_calls == [
|
|
call(AF_INET6, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-F', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-X', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-N', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-F', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-I', 'OUTPUT', '1', '-j', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-I', 'PREROUTING', '1', '-j', 'sshuttle-1024'),
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'REDIRECT',
|
|
'--dest', u'2404:6800:4004:80c::33', '-p', 'udp',
|
|
'--dport', '53', '--to-ports', '1026'),
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'RETURN',
|
|
'-m', 'addrtype', '--dst-type', 'LOCAL'),
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'RETURN',
|
|
'--dest', u'2404:6800:4004:80c::101f/128', '-p', 'tcp',
|
|
'--dport', '80:80'),
|
|
call(AF_INET6, 'nat', '-A', 'sshuttle-1024', '-j', 'REDIRECT',
|
|
'--dest', u'2404:6800:4004:80c::/64', '-p', 'tcp',
|
|
'--to-ports', '1024')
|
|
]
|
|
mock_ipt_chain_exists.reset_mock()
|
|
mock_ipt_ttl.reset_mock()
|
|
mock_ipt.reset_mock()
|
|
|
|
assert mock_ipt_chain_exists.mock_calls == []
|
|
assert mock_ipt_ttl.mock_calls == []
|
|
assert mock_ipt.mock_calls == []
|
|
|
|
with pytest.raises(Exception) as excinfo:
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
|
|
(AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
|
|
True,
|
|
None,
|
|
63)
|
|
assert str(excinfo.value) == 'UDP not supported by nat method_name'
|
|
assert mock_ipt_chain_exists.mock_calls == []
|
|
assert mock_ipt_ttl.mock_calls == []
|
|
assert mock_ipt.mock_calls == []
|
|
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 8000, 9000),
|
|
(AF_INET, 32, True, u'1.2.3.66', 8080, 8080)],
|
|
False,
|
|
None,
|
|
63)
|
|
assert mock_ipt_chain_exists.mock_calls == [
|
|
call(AF_INET, 'nat', 'sshuttle-1025')
|
|
]
|
|
assert mock_ipt_ttl.mock_calls == [
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'RETURN',
|
|
'-m', 'ttl', '--ttl', '63')
|
|
]
|
|
assert mock_ipt.mock_calls == [
|
|
call(AF_INET, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-D', 'PREROUTING', '-j', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-X', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-N', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-I', 'OUTPUT', '1', '-j', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-I', 'PREROUTING', '1', '-j', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT',
|
|
'--dest', u'1.2.3.33', '-p', 'udp',
|
|
'--dport', '53', '--to-ports', '1027'),
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'RETURN',
|
|
'-m', 'addrtype', '--dst-type', 'LOCAL'),
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'RETURN',
|
|
'--dest', u'1.2.3.66/32', '-p', 'tcp', '--dport', '8080:8080'),
|
|
call(AF_INET, 'nat', '-A', 'sshuttle-1025', '-j', 'REDIRECT',
|
|
'--dest', u'1.2.3.0/24', '-p', 'tcp', '--dport', '8000:9000',
|
|
'--to-ports', '1025')
|
|
]
|
|
mock_ipt_chain_exists.reset_mock()
|
|
mock_ipt_ttl.reset_mock()
|
|
mock_ipt.reset_mock()
|
|
|
|
method.restore_firewall(1025, AF_INET, False, None)
|
|
assert mock_ipt_chain_exists.mock_calls == [
|
|
call(AF_INET, 'nat', 'sshuttle-1025')
|
|
]
|
|
assert mock_ipt_ttl.mock_calls == []
|
|
assert mock_ipt.mock_calls == [
|
|
call(AF_INET, 'nat', '-D', 'OUTPUT', '-j',
|
|
'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-D', 'PREROUTING', '-j',
|
|
'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-F', 'sshuttle-1025'),
|
|
call(AF_INET, 'nat', '-X', 'sshuttle-1025')
|
|
]
|
|
mock_ipt_chain_exists.reset_mock()
|
|
mock_ipt_ttl.reset_mock()
|
|
mock_ipt.reset_mock()
|
|
|
|
method.restore_firewall(1025, AF_INET6, False, None)
|
|
assert mock_ipt_chain_exists.mock_calls == [
|
|
call(AF_INET6, 'nat', 'sshuttle-1025')
|
|
]
|
|
assert mock_ipt_ttl.mock_calls == []
|
|
assert mock_ipt.mock_calls == [
|
|
call(AF_INET6, 'nat', '-D', 'OUTPUT', '-j', 'sshuttle-1025'),
|
|
call(AF_INET6, 'nat', '-D', 'PREROUTING', '-j',
|
|
'sshuttle-1025'),
|
|
call(AF_INET6, 'nat', '-F', 'sshuttle-1025'),
|
|
call(AF_INET6, 'nat', '-X', 'sshuttle-1025')
|
|
]
|
|
mock_ipt_chain_exists.reset_mock()
|
|
mock_ipt_ttl.reset_mock()
|
|
mock_ipt.reset_mock()
|