mirror of
https://github.com/sshuttle/sshuttle.git
synced 2024-12-01 12:23:48 +01:00
ca41026c89
Before this change, in pf, exclusions used a pass out quick which gave them higher precedence than any other rule independent of subnet width. As reported in #265 this causes exclusion from one instance of sshuttle to also take effect on other instances because quick aborts the evaluation of rules across all anchors. This commit changes the precedence of rules so quick can now be dropped. The new order is defined by the following rule, from subnet_weight: "We need to go from smaller, more specific, port ranges, to larger, less-specific, port ranges. At each level, we order by subnet width, from most-specific subnets (largest swidth) to least-specific. On ties, excludes come first."
488 lines
16 KiB
Python
488 lines
16 KiB
Python
import pytest
|
|
from mock import Mock, patch, call, ANY
|
|
import socket
|
|
from socket import AF_INET, AF_INET6
|
|
|
|
from sshuttle.methods import get_method
|
|
from sshuttle.helpers import Fatal
|
|
from sshuttle.methods.pf import FreeBsd, Darwin, OpenBsd
|
|
|
|
|
|
def test_get_supported_features():
|
|
method = get_method('pf')
|
|
features = method.get_supported_features()
|
|
assert features.ipv6
|
|
assert not features.udp
|
|
assert features.dns
|
|
|
|
|
|
@patch('sshuttle.helpers.verbose', new=3)
|
|
def test_get_tcp_dstip():
|
|
sock = Mock()
|
|
sock.getpeername.return_value = ("127.0.0.1", 1024)
|
|
sock.getsockname.return_value = ("127.0.0.2", 1025)
|
|
sock.family = AF_INET
|
|
|
|
firewall = Mock()
|
|
firewall.pfile.readline.return_value = \
|
|
b"QUERY_PF_NAT_SUCCESS 127.0.0.3,1026\n"
|
|
|
|
method = get_method('pf')
|
|
method.set_firewall(firewall)
|
|
assert method.get_tcp_dstip(sock) == ('127.0.0.3', 1026)
|
|
|
|
assert sock.mock_calls == [
|
|
call.getpeername(),
|
|
call.getsockname(),
|
|
]
|
|
assert firewall.mock_calls == [
|
|
call.pfile.write(b'QUERY_PF_NAT 2,6,127.0.0.1,1024,127.0.0.2,1025\n'),
|
|
call.pfile.flush(),
|
|
call.pfile.readline()
|
|
]
|
|
|
|
|
|
def test_recv_udp():
|
|
sock = Mock()
|
|
sock.recvfrom.return_value = "11111", "127.0.0.1"
|
|
method = get_method('pf')
|
|
result = method.recv_udp(sock, 1024)
|
|
assert sock.mock_calls == [call.recvfrom(1024)]
|
|
assert result == ("127.0.0.1", None, "11111")
|
|
|
|
|
|
def test_send_udp():
|
|
sock = Mock()
|
|
method = get_method('pf')
|
|
method.send_udp(sock, None, "127.0.0.1", "22222")
|
|
assert sock.mock_calls == [call.sendto("22222", "127.0.0.1")]
|
|
|
|
|
|
def test_setup_tcp_listener():
|
|
listener = Mock()
|
|
method = get_method('pf')
|
|
method.setup_tcp_listener(listener)
|
|
assert listener.mock_calls == []
|
|
|
|
|
|
def test_setup_udp_listener():
|
|
listener = Mock()
|
|
method = get_method('pf')
|
|
method.setup_udp_listener(listener)
|
|
assert listener.mock_calls == []
|
|
|
|
|
|
def test_assert_features():
|
|
method = get_method('pf')
|
|
features = method.get_supported_features()
|
|
method.assert_features(features)
|
|
|
|
features.udp = True
|
|
with pytest.raises(Fatal):
|
|
method.assert_features(features)
|
|
|
|
features.ipv6 = True
|
|
with pytest.raises(Fatal):
|
|
method.assert_features(features)
|
|
|
|
|
|
@patch('sshuttle.methods.pf.pf', Darwin())
|
|
@patch('sshuttle.methods.pf.sys.stdout')
|
|
@patch('sshuttle.methods.pf.ioctl')
|
|
@patch('sshuttle.methods.pf.pf_get_dev')
|
|
def test_firewall_command_darwin(mock_pf_get_dev, mock_ioctl, mock_stdout):
|
|
method = get_method('pf')
|
|
assert not method.firewall_command("somthing")
|
|
|
|
command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
|
|
AF_INET, socket.IPPROTO_TCP,
|
|
"127.0.0.1", 1025, "127.0.0.2", 1024)
|
|
assert method.firewall_command(command)
|
|
|
|
assert mock_pf_get_dev.mock_calls == [call()]
|
|
assert mock_ioctl.mock_calls == [
|
|
call(mock_pf_get_dev(), 0xc0544417, ANY),
|
|
]
|
|
assert mock_stdout.mock_calls == [
|
|
call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
|
|
call.flush(),
|
|
]
|
|
|
|
|
|
@patch('sshuttle.methods.pf.pf', FreeBsd())
|
|
@patch('sshuttle.methods.pf.sys.stdout')
|
|
@patch('sshuttle.methods.pf.ioctl')
|
|
@patch('sshuttle.methods.pf.pf_get_dev')
|
|
def test_firewall_command_freebsd(mock_pf_get_dev, mock_ioctl, mock_stdout):
|
|
method = get_method('pf')
|
|
assert not method.firewall_command("somthing")
|
|
|
|
command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
|
|
AF_INET, socket.IPPROTO_TCP,
|
|
"127.0.0.1", 1025, "127.0.0.2", 1024)
|
|
assert method.firewall_command(command)
|
|
|
|
assert mock_pf_get_dev.mock_calls == [call()]
|
|
assert mock_ioctl.mock_calls == [
|
|
call(mock_pf_get_dev(), 0xc04c4417, ANY),
|
|
]
|
|
assert mock_stdout.mock_calls == [
|
|
call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
|
|
call.flush(),
|
|
]
|
|
|
|
|
|
@patch('sshuttle.methods.pf.pf', OpenBsd())
|
|
@patch('sshuttle.methods.pf.sys.stdout')
|
|
@patch('sshuttle.methods.pf.ioctl')
|
|
@patch('sshuttle.methods.pf.pf_get_dev')
|
|
def test_firewall_command_openbsd(mock_pf_get_dev, mock_ioctl, mock_stdout):
|
|
method = get_method('pf')
|
|
assert not method.firewall_command("somthing")
|
|
|
|
command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
|
|
AF_INET, socket.IPPROTO_TCP,
|
|
"127.0.0.1", 1025, "127.0.0.2", 1024)
|
|
assert method.firewall_command(command)
|
|
|
|
assert mock_pf_get_dev.mock_calls == [call()]
|
|
assert mock_ioctl.mock_calls == [
|
|
call(mock_pf_get_dev(), 0xc0504417, ANY),
|
|
]
|
|
assert mock_stdout.mock_calls == [
|
|
call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
|
|
call.flush(),
|
|
]
|
|
|
|
|
|
def pfctl(args, stdin=None):
|
|
if args == '-s Interfaces -i lo -v':
|
|
return (b'lo0 (skip)',)
|
|
if args == '-s all':
|
|
return (b'INFO:\nStatus: Disabled\nanother mary had a little lamb\n',
|
|
b'little lamb\n')
|
|
if args == '-E':
|
|
return (b'\n', b'Token : abcdefg\n')
|
|
return None
|
|
|
|
|
|
@patch('sshuttle.helpers.verbose', new=3)
|
|
@patch('sshuttle.methods.pf.pf', Darwin())
|
|
@patch('sshuttle.methods.pf.pfctl')
|
|
@patch('sshuttle.methods.pf.ioctl')
|
|
@patch('sshuttle.methods.pf.pf_get_dev')
|
|
def test_setup_firewall_darwin(mock_pf_get_dev, mock_ioctl, mock_pfctl):
|
|
mock_pfctl.side_effect = pfctl
|
|
|
|
method = get_method('pf')
|
|
assert method.name == 'pf'
|
|
|
|
# IPV6
|
|
|
|
method.setup_firewall(
|
|
1024, 1026,
|
|
[(AF_INET6, u'2404:6800:4004:80c::33')],
|
|
AF_INET6,
|
|
[(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
|
|
(AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
|
|
False,
|
|
None)
|
|
assert mock_ioctl.mock_calls == [
|
|
call(mock_pf_get_dev(), 0xC4704433, ANY),
|
|
call(mock_pf_get_dev(), 0xCC20441A, ANY),
|
|
call(mock_pf_get_dev(), 0xCC20441A, ANY),
|
|
call(mock_pf_get_dev(), 0xC4704433, ANY),
|
|
call(mock_pf_get_dev(), 0xCC20441A, ANY),
|
|
call(mock_pf_get_dev(), 0xCC20441A, ANY),
|
|
]
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-s Interfaces -i lo -v'),
|
|
call('-f /dev/stdin', b'pass on lo\n'),
|
|
call('-s all'),
|
|
call('-a sshuttle6-1024 -f /dev/stdin',
|
|
b'table <dns_servers> {2404:6800:4004:80c::33}\n'
|
|
b'rdr pass on lo0 inet6 proto tcp from ! ::1 to '
|
|
b'2404:6800:4004:80c::/64 port 8000:9000 -> ::1 port 1024\n'
|
|
b'rdr pass on lo0 inet6 proto udp '
|
|
b'to <dns_servers> port 53 -> ::1 port 1026\n'
|
|
b'pass out route-to lo0 inet6 proto tcp to '
|
|
b'2404:6800:4004:80c::/64 port 8000:9000 keep state\n'
|
|
b'pass out inet6 proto tcp to '
|
|
b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
|
|
b'pass out route-to lo0 inet6 proto udp '
|
|
b'to <dns_servers> port 53 keep state\n'),
|
|
call('-E'),
|
|
]
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_ioctl.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
|
|
with pytest.raises(Exception) as excinfo:
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 0, 0),
|
|
(AF_INET, 32, True, u'1.2.3.66', 80, 80)],
|
|
True,
|
|
None)
|
|
assert str(excinfo.value) == 'UDP not supported by pf method_name'
|
|
assert mock_pf_get_dev.mock_calls == []
|
|
assert mock_ioctl.mock_calls == []
|
|
assert mock_pfctl.mock_calls == []
|
|
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 0, 0),
|
|
(AF_INET, 32, True, u'1.2.3.66', 80, 80)],
|
|
False,
|
|
None)
|
|
assert mock_ioctl.mock_calls == [
|
|
call(mock_pf_get_dev(), 0xC4704433, ANY),
|
|
call(mock_pf_get_dev(), 0xCC20441A, ANY),
|
|
call(mock_pf_get_dev(), 0xCC20441A, ANY),
|
|
call(mock_pf_get_dev(), 0xC4704433, ANY),
|
|
call(mock_pf_get_dev(), 0xCC20441A, ANY),
|
|
call(mock_pf_get_dev(), 0xCC20441A, ANY),
|
|
]
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-s Interfaces -i lo -v'),
|
|
call('-f /dev/stdin', b'pass on lo\n'),
|
|
call('-s all'),
|
|
call('-a sshuttle-1025 -f /dev/stdin',
|
|
b'table <dns_servers> {1.2.3.33}\n'
|
|
b'rdr pass on lo0 inet proto tcp from ! 127.0.0.1 to 1.2.3.0/24 '
|
|
b'-> 127.0.0.1 port 1025\n'
|
|
b'rdr pass on lo0 inet proto udp '
|
|
b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n'
|
|
b'pass out route-to lo0 inet proto tcp to 1.2.3.0/24 keep state\n'
|
|
b'pass out inet proto tcp to 1.2.3.66/32 port 80:80\n'
|
|
b'pass out route-to lo0 inet proto udp '
|
|
b'to <dns_servers> port 53 keep state\n'),
|
|
call('-E'),
|
|
]
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_ioctl.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
|
|
method.restore_firewall(1025, AF_INET, False, None)
|
|
assert mock_ioctl.mock_calls == []
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-a sshuttle-1025 -F all'),
|
|
call("-X abcdefg"),
|
|
]
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
mock_ioctl.reset_mock()
|
|
|
|
|
|
@patch('sshuttle.helpers.verbose', new=3)
|
|
@patch('sshuttle.methods.pf.pf', FreeBsd())
|
|
@patch('subprocess.call')
|
|
@patch('sshuttle.methods.pf.pfctl')
|
|
@patch('sshuttle.methods.pf.ioctl')
|
|
@patch('sshuttle.methods.pf.pf_get_dev')
|
|
def test_setup_firewall_freebsd(mock_pf_get_dev, mock_ioctl, mock_pfctl,
|
|
mock_subprocess_call):
|
|
mock_pfctl.side_effect = pfctl
|
|
|
|
method = get_method('pf')
|
|
assert method.name == 'pf'
|
|
|
|
method.setup_firewall(
|
|
1024, 1026,
|
|
[(AF_INET6, u'2404:6800:4004:80c::33')],
|
|
AF_INET6,
|
|
[(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
|
|
(AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
|
|
False,
|
|
None)
|
|
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-s all'),
|
|
call('-a sshuttle6-1024 -f /dev/stdin',
|
|
b'table <dns_servers> {2404:6800:4004:80c::33}\n'
|
|
b'rdr pass on lo0 inet6 proto tcp from ! ::1 to '
|
|
b'2404:6800:4004:80c::/64 port 8000:9000 -> ::1 port 1024\n'
|
|
b'rdr pass on lo0 inet6 proto udp '
|
|
b'to <dns_servers> port 53 -> ::1 port 1026\n'
|
|
b'pass out route-to lo0 inet6 proto tcp to '
|
|
b'2404:6800:4004:80c::/64 port 8000:9000 keep state\n'
|
|
b'pass out inet6 proto tcp to '
|
|
b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
|
|
b'pass out route-to lo0 inet6 proto udp '
|
|
b'to <dns_servers> port 53 keep state\n'),
|
|
call('-e'),
|
|
]
|
|
assert call(['kldload', 'pf']) in mock_subprocess_call.mock_calls
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_ioctl.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
|
|
with pytest.raises(Exception) as excinfo:
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 0, 0),
|
|
(AF_INET, 32, True, u'1.2.3.66', 80, 80)],
|
|
True,
|
|
None)
|
|
assert str(excinfo.value) == 'UDP not supported by pf method_name'
|
|
assert mock_pf_get_dev.mock_calls == []
|
|
assert mock_ioctl.mock_calls == []
|
|
assert mock_pfctl.mock_calls == []
|
|
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 0, 0),
|
|
(AF_INET, 32, True, u'1.2.3.66', 80, 80)],
|
|
False,
|
|
None)
|
|
assert mock_ioctl.mock_calls == [
|
|
call(mock_pf_get_dev(), 0xC4704433, ANY),
|
|
call(mock_pf_get_dev(), 0xCBE0441A, ANY),
|
|
call(mock_pf_get_dev(), 0xCBE0441A, ANY),
|
|
call(mock_pf_get_dev(), 0xC4704433, ANY),
|
|
call(mock_pf_get_dev(), 0xCBE0441A, ANY),
|
|
call(mock_pf_get_dev(), 0xCBE0441A, ANY),
|
|
]
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-s all'),
|
|
call('-a sshuttle-1025 -f /dev/stdin',
|
|
b'table <dns_servers> {1.2.3.33}\n'
|
|
b'rdr pass on lo0 inet proto tcp from ! 127.0.0.1 '
|
|
b'to 1.2.3.0/24 -> 127.0.0.1 port 1025\n'
|
|
b'rdr pass on lo0 inet proto udp '
|
|
b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n'
|
|
b'pass out route-to lo0 inet proto tcp to 1.2.3.0/24 keep state\n'
|
|
b'pass out inet proto tcp to 1.2.3.66/32 port 80:80\n'
|
|
b'pass out route-to lo0 inet proto udp '
|
|
b'to <dns_servers> port 53 keep state\n'),
|
|
call('-e'),
|
|
]
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_ioctl.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
|
|
method.restore_firewall(1025, AF_INET, False, None)
|
|
method.restore_firewall(1024, AF_INET6, False, None)
|
|
assert mock_ioctl.mock_calls == []
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-a sshuttle-1025 -F all'),
|
|
call('-a sshuttle6-1024 -F all'),
|
|
call("-d"),
|
|
]
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
mock_ioctl.reset_mock()
|
|
|
|
|
|
@patch('sshuttle.helpers.verbose', new=3)
|
|
@patch('sshuttle.methods.pf.pf', OpenBsd())
|
|
@patch('sshuttle.methods.pf.pfctl')
|
|
@patch('sshuttle.methods.pf.ioctl')
|
|
@patch('sshuttle.methods.pf.pf_get_dev')
|
|
def test_setup_firewall_openbsd(mock_pf_get_dev, mock_ioctl, mock_pfctl):
|
|
mock_pfctl.side_effect = pfctl
|
|
|
|
method = get_method('pf')
|
|
assert method.name == 'pf'
|
|
|
|
method.setup_firewall(
|
|
1024, 1026,
|
|
[(AF_INET6, u'2404:6800:4004:80c::33')],
|
|
AF_INET6,
|
|
[(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
|
|
(AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
|
|
False,
|
|
None)
|
|
|
|
assert mock_ioctl.mock_calls == [
|
|
call(mock_pf_get_dev(), 0xcd58441a, ANY),
|
|
call(mock_pf_get_dev(), 0xcd58441a, ANY),
|
|
]
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-s Interfaces -i lo -v'),
|
|
call('-f /dev/stdin', b'match on lo\n'),
|
|
call('-s all'),
|
|
call('-a sshuttle6-1024 -f /dev/stdin',
|
|
b'table <dns_servers> {2404:6800:4004:80c::33}\n'
|
|
b'pass in on lo0 inet6 proto tcp to 2404:6800:4004:80c::/64 '
|
|
b'port 8000:9000 divert-to ::1 port 1024\n'
|
|
b'pass in on lo0 inet6 proto udp '
|
|
b'to <dns_servers> port 53 rdr-to ::1 port 1026\n'
|
|
b'pass out inet6 proto tcp to 2404:6800:4004:80c::/64 '
|
|
b'port 8000:9000 route-to lo0 keep state\n'
|
|
b'pass out inet6 proto tcp to '
|
|
b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
|
|
b'pass out inet6 proto udp to '
|
|
b'<dns_servers> port 53 route-to lo0 keep state\n'),
|
|
call('-e'),
|
|
]
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_ioctl.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
|
|
with pytest.raises(Exception) as excinfo:
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 0, 0),
|
|
(AF_INET, 32, True, u'1.2.3.66', 80, 80)],
|
|
True,
|
|
None)
|
|
assert str(excinfo.value) == 'UDP not supported by pf method_name'
|
|
assert mock_pf_get_dev.mock_calls == []
|
|
assert mock_ioctl.mock_calls == []
|
|
assert mock_pfctl.mock_calls == []
|
|
|
|
method.setup_firewall(
|
|
1025, 1027,
|
|
[(AF_INET, u'1.2.3.33')],
|
|
AF_INET,
|
|
[(AF_INET, 24, False, u'1.2.3.0', 0, 0),
|
|
(AF_INET, 32, True, u'1.2.3.66', 80, 80)],
|
|
False,
|
|
None)
|
|
assert mock_ioctl.mock_calls == [
|
|
call(mock_pf_get_dev(), 0xcd58441a, ANY),
|
|
call(mock_pf_get_dev(), 0xcd58441a, ANY),
|
|
]
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-s Interfaces -i lo -v'),
|
|
call('-f /dev/stdin', b'match on lo\n'),
|
|
call('-s all'),
|
|
call('-a sshuttle-1025 -f /dev/stdin',
|
|
b'table <dns_servers> {1.2.3.33}\n'
|
|
b'pass in on lo0 inet proto tcp to 1.2.3.0/24 divert-to '
|
|
b'127.0.0.1 port 1025\n'
|
|
b'pass in on lo0 inet proto udp to '
|
|
b'<dns_servers> port 53 rdr-to 127.0.0.1 port 1027\n'
|
|
b'pass out inet proto tcp to 1.2.3.0/24 route-to lo0 keep state\n'
|
|
b'pass out inet proto tcp to 1.2.3.66/32 port 80:80\n'
|
|
b'pass out inet proto udp to '
|
|
b'<dns_servers> port 53 route-to lo0 keep state\n'),
|
|
call('-e'),
|
|
]
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_ioctl.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
|
|
method.restore_firewall(1025, AF_INET, False, None)
|
|
method.restore_firewall(1024, AF_INET6, False, None)
|
|
assert mock_ioctl.mock_calls == []
|
|
assert mock_pfctl.mock_calls == [
|
|
call('-a sshuttle-1025 -F all'),
|
|
call('-a sshuttle6-1024 -F all'),
|
|
call('-d'),
|
|
]
|
|
mock_pf_get_dev.reset_mock()
|
|
mock_pfctl.reset_mock()
|
|
mock_ioctl.reset_mock()
|