import pytest
from mock import Mock, patch, call, ANY
import socket
from socket import AF_INET, AF_INET6

from sshuttle.methods import get_method
from sshuttle.helpers import Fatal
from sshuttle.methods.pf import FreeBsd, Darwin, OpenBsd


def test_get_supported_features():
    method = get_method('pf')
    features = method.get_supported_features()
    assert features.ipv6
    assert not features.udp
    assert features.dns


@patch('sshuttle.helpers.verbose', new=3)
def test_get_tcp_dstip():
    sock = Mock()
    sock.getpeername.return_value = ("127.0.0.1", 1024)
    sock.getsockname.return_value = ("127.0.0.2", 1025)
    sock.family = AF_INET

    firewall = Mock()
    firewall.pfile.readline.return_value = \
        b"QUERY_PF_NAT_SUCCESS 127.0.0.3,1026\n"

    method = get_method('pf')
    method.set_firewall(firewall)
    assert method.get_tcp_dstip(sock) == ('127.0.0.3', 1026)

    assert sock.mock_calls == [
        call.getpeername(),
        call.getsockname(),
    ]
    assert firewall.mock_calls == [
        call.pfile.write(b'QUERY_PF_NAT 2,6,127.0.0.1,1024,127.0.0.2,1025\n'),
        call.pfile.flush(),
        call.pfile.readline()
    ]


def test_recv_udp():
    sock = Mock()
    sock.recvfrom.return_value = "11111", "127.0.0.1"
    method = get_method('pf')
    result = method.recv_udp(sock, 1024)
    assert sock.mock_calls == [call.recvfrom(1024)]
    assert result == ("127.0.0.1", None, "11111")


def test_send_udp():
    sock = Mock()
    method = get_method('pf')
    method.send_udp(sock, None, "127.0.0.1", "22222")
    assert sock.mock_calls == [call.sendto("22222", "127.0.0.1")]


def test_setup_tcp_listener():
    listener = Mock()
    method = get_method('pf')
    method.setup_tcp_listener(listener)
    assert listener.mock_calls == []


def test_setup_udp_listener():
    listener = Mock()
    method = get_method('pf')
    method.setup_udp_listener(listener)
    assert listener.mock_calls == []


def test_assert_features():
    method = get_method('pf')
    features = method.get_supported_features()
    method.assert_features(features)

    features.udp = True
    with pytest.raises(Fatal):
        method.assert_features(features)

    features.ipv6 = True
    with pytest.raises(Fatal):
        method.assert_features(features)


@patch('sshuttle.methods.pf.pf', Darwin())
@patch('sshuttle.methods.pf.sys.stdout')
@patch('sshuttle.methods.pf.ioctl')
@patch('sshuttle.methods.pf.pf_get_dev')
def test_firewall_command_darwin(mock_pf_get_dev, mock_ioctl, mock_stdout):
    method = get_method('pf')
    assert not method.firewall_command("somthing")

    command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
        AF_INET, socket.IPPROTO_TCP,
        "127.0.0.1", 1025, "127.0.0.2", 1024)
    assert method.firewall_command(command)

    assert mock_pf_get_dev.mock_calls == [call()]
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xc0544417, ANY),
    ]
    assert mock_stdout.mock_calls == [
        call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
        call.flush(),
    ]


@patch('sshuttle.methods.pf.pf', FreeBsd())
@patch('sshuttle.methods.pf.sys.stdout')
@patch('sshuttle.methods.pf.ioctl')
@patch('sshuttle.methods.pf.pf_get_dev')
def test_firewall_command_freebsd(mock_pf_get_dev, mock_ioctl, mock_stdout):
    method = get_method('pf')
    assert not method.firewall_command("somthing")

    command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
        AF_INET, socket.IPPROTO_TCP,
        "127.0.0.1", 1025, "127.0.0.2", 1024)
    assert method.firewall_command(command)

    assert mock_pf_get_dev.mock_calls == [call()]
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xc04c4417, ANY),
    ]
    assert mock_stdout.mock_calls == [
        call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
        call.flush(),
    ]


@patch('sshuttle.methods.pf.pf', OpenBsd())
@patch('sshuttle.methods.pf.sys.stdout')
@patch('sshuttle.methods.pf.ioctl')
@patch('sshuttle.methods.pf.pf_get_dev')
def test_firewall_command_openbsd(mock_pf_get_dev, mock_ioctl, mock_stdout):
    method = get_method('pf')
    assert not method.firewall_command("somthing")

    command = "QUERY_PF_NAT %d,%d,%s,%d,%s,%d\n" % (
        AF_INET, socket.IPPROTO_TCP,
        "127.0.0.1", 1025, "127.0.0.2", 1024)
    assert method.firewall_command(command)

    assert mock_pf_get_dev.mock_calls == [call()]
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xc0504417, ANY),
    ]
    assert mock_stdout.mock_calls == [
        call.write('QUERY_PF_NAT_SUCCESS 0.0.0.0,0\n'),
        call.flush(),
    ]


def pfctl(args, stdin=None):
    if args == '-s Interfaces -i lo -v':
        return (b'lo0 (skip)',)
    if args == '-s all':
        return (b'INFO:\nStatus: Disabled\nanother mary had a little lamb\n',
                b'little lamb\n')
    if args == '-E':
        return (b'\n', b'Token : abcdefg\n')
    return None


@patch('sshuttle.helpers.verbose', new=3)
@patch('sshuttle.methods.pf.pf', Darwin())
@patch('sshuttle.methods.pf.pfctl')
@patch('sshuttle.methods.pf.ioctl')
@patch('sshuttle.methods.pf.pf_get_dev')
def test_setup_firewall_darwin(mock_pf_get_dev, mock_ioctl, mock_pfctl):
    mock_pfctl.side_effect = pfctl

    method = get_method('pf')
    assert method.name == 'pf'

    # IPV6

    method.setup_firewall(
        1024, 1026,
        [(AF_INET6, u'2404:6800:4004:80c::33')],
        AF_INET6,
        [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
            (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
        False,
        None)
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xC4704433, ANY),
        call(mock_pf_get_dev(), 0xCC20441A, ANY),
        call(mock_pf_get_dev(), 0xCC20441A, ANY),
        call(mock_pf_get_dev(), 0xC4704433, ANY),
        call(mock_pf_get_dev(), 0xCC20441A, ANY),
        call(mock_pf_get_dev(), 0xCC20441A, ANY),
    ]
    assert mock_pfctl.mock_calls == [
        call('-s Interfaces -i lo -v'),
        call('-f /dev/stdin', b'pass on lo\n'),
        call('-s all'),
        call('-a sshuttle6-1024 -f /dev/stdin',
             b'table <dns_servers> {2404:6800:4004:80c::33}\n'
             b'rdr pass on lo0 inet6 proto tcp from ! ::1 to '
             b'2404:6800:4004:80c::/64 port 8000:9000 -> ::1 port 1024\n'
             b'rdr pass on lo0 inet6 proto udp '
             b'to <dns_servers> port 53 -> ::1 port 1026\n'
             b'pass out route-to lo0 inet6 proto tcp to '
             b'2404:6800:4004:80c::/64 port 8000:9000 keep state\n'
             b'pass out inet6 proto tcp to '
             b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
             b'pass out route-to lo0 inet6 proto udp '
             b'to <dns_servers> port 53 keep state\n'),
        call('-E'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027,
            [(AF_INET, u'1.2.3.33')],
            AF_INET,
            [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
                (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
            True,
            None)
    assert str(excinfo.value) == 'UDP not supported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    method.setup_firewall(
        1025, 1027,
        [(AF_INET, u'1.2.3.33')],
        AF_INET,
        [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
            (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
        False,
        None)
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xC4704433, ANY),
        call(mock_pf_get_dev(), 0xCC20441A, ANY),
        call(mock_pf_get_dev(), 0xCC20441A, ANY),
        call(mock_pf_get_dev(), 0xC4704433, ANY),
        call(mock_pf_get_dev(), 0xCC20441A, ANY),
        call(mock_pf_get_dev(), 0xCC20441A, ANY),
    ]
    assert mock_pfctl.mock_calls == [
        call('-s Interfaces -i lo -v'),
        call('-f /dev/stdin', b'pass on lo\n'),
        call('-s all'),
        call('-a sshuttle-1025 -f /dev/stdin',
             b'table <dns_servers> {1.2.3.33}\n'
             b'rdr pass on lo0 inet proto tcp from ! 127.0.0.1 to 1.2.3.0/24 '
             b'-> 127.0.0.1 port 1025\n'
             b'rdr pass on lo0 inet proto udp '
             b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n'
             b'pass out route-to lo0 inet proto tcp to 1.2.3.0/24 keep state\n'
             b'pass out inet proto tcp to 1.2.3.66/32 port 80:80\n'
             b'pass out route-to lo0 inet proto udp '
             b'to <dns_servers> port 53 keep state\n'),
        call('-E'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    method.restore_firewall(1025, AF_INET, False, None)
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == [
        call('-a sshuttle-1025 -F all'),
        call("-X abcdefg"),
    ]
    mock_pf_get_dev.reset_mock()
    mock_pfctl.reset_mock()
    mock_ioctl.reset_mock()


@patch('sshuttle.helpers.verbose', new=3)
@patch('sshuttle.methods.pf.pf', FreeBsd())
@patch('subprocess.call')
@patch('sshuttle.methods.pf.pfctl')
@patch('sshuttle.methods.pf.ioctl')
@patch('sshuttle.methods.pf.pf_get_dev')
def test_setup_firewall_freebsd(mock_pf_get_dev, mock_ioctl, mock_pfctl,
                                mock_subprocess_call):
    mock_pfctl.side_effect = pfctl

    method = get_method('pf')
    assert method.name == 'pf'

    method.setup_firewall(
        1024, 1026,
        [(AF_INET6, u'2404:6800:4004:80c::33')],
        AF_INET6,
        [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
            (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
        False,
        None)

    assert mock_pfctl.mock_calls == [
        call('-s all'),
        call('-a sshuttle6-1024 -f /dev/stdin',
             b'table <dns_servers> {2404:6800:4004:80c::33}\n'
             b'rdr pass on lo0 inet6 proto tcp from ! ::1 to '
             b'2404:6800:4004:80c::/64 port 8000:9000 -> ::1 port 1024\n'
             b'rdr pass on lo0 inet6 proto udp '
             b'to <dns_servers> port 53 -> ::1 port 1026\n'
             b'pass out route-to lo0 inet6 proto tcp to '
             b'2404:6800:4004:80c::/64 port 8000:9000 keep state\n'
             b'pass out inet6 proto tcp to '
             b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
             b'pass out route-to lo0 inet6 proto udp '
             b'to <dns_servers> port 53 keep state\n'),
        call('-e'),
    ]
    assert call(['kldload', 'pf']) in mock_subprocess_call.mock_calls
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027,
            [(AF_INET, u'1.2.3.33')],
            AF_INET,
            [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
                (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
            True,
            None)
    assert str(excinfo.value) == 'UDP not supported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    method.setup_firewall(
        1025, 1027,
        [(AF_INET, u'1.2.3.33')],
        AF_INET,
        [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
            (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
        False,
        None)
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xC4704433, ANY),
        call(mock_pf_get_dev(), 0xCBE0441A, ANY),
        call(mock_pf_get_dev(), 0xCBE0441A, ANY),
        call(mock_pf_get_dev(), 0xC4704433, ANY),
        call(mock_pf_get_dev(), 0xCBE0441A, ANY),
        call(mock_pf_get_dev(), 0xCBE0441A, ANY),
    ]
    assert mock_pfctl.mock_calls == [
        call('-s all'),
        call('-a sshuttle-1025 -f /dev/stdin',
             b'table <dns_servers> {1.2.3.33}\n'
             b'rdr pass on lo0 inet proto tcp from ! 127.0.0.1 '
             b'to 1.2.3.0/24 -> 127.0.0.1 port 1025\n'
             b'rdr pass on lo0 inet proto udp '
             b'to <dns_servers> port 53 -> 127.0.0.1 port 1027\n'
             b'pass out route-to lo0 inet proto tcp to 1.2.3.0/24 keep state\n'
             b'pass out inet proto tcp to 1.2.3.66/32 port 80:80\n'
             b'pass out route-to lo0 inet proto udp '
             b'to <dns_servers> port 53 keep state\n'),
        call('-e'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    method.restore_firewall(1025, AF_INET, False, None)
    method.restore_firewall(1024, AF_INET6, False, None)
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == [
        call('-a sshuttle-1025 -F all'),
        call('-a sshuttle6-1024 -F all'),
        call("-d"),
    ]
    mock_pf_get_dev.reset_mock()
    mock_pfctl.reset_mock()
    mock_ioctl.reset_mock()


@patch('sshuttle.helpers.verbose', new=3)
@patch('sshuttle.methods.pf.pf', OpenBsd())
@patch('sshuttle.methods.pf.pfctl')
@patch('sshuttle.methods.pf.ioctl')
@patch('sshuttle.methods.pf.pf_get_dev')
def test_setup_firewall_openbsd(mock_pf_get_dev, mock_ioctl, mock_pfctl):
    mock_pfctl.side_effect = pfctl

    method = get_method('pf')
    assert method.name == 'pf'

    method.setup_firewall(
        1024, 1026,
        [(AF_INET6, u'2404:6800:4004:80c::33')],
        AF_INET6,
        [(AF_INET6, 64, False, u'2404:6800:4004:80c::', 8000, 9000),
            (AF_INET6, 128, True, u'2404:6800:4004:80c::101f', 8080, 8080)],
        False,
        None)

    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xcd58441a, ANY),
        call(mock_pf_get_dev(), 0xcd58441a, ANY),
    ]
    assert mock_pfctl.mock_calls == [
        call('-s Interfaces -i lo -v'),
        call('-f /dev/stdin', b'match on lo\n'),
        call('-s all'),
        call('-a sshuttle6-1024 -f /dev/stdin',
             b'table <dns_servers> {2404:6800:4004:80c::33}\n'
             b'pass in on lo0 inet6 proto tcp to 2404:6800:4004:80c::/64 '
             b'port 8000:9000 divert-to ::1 port 1024\n'
             b'pass in on lo0 inet6 proto udp '
             b'to <dns_servers> port 53 rdr-to ::1 port 1026\n'
             b'pass out inet6 proto tcp to 2404:6800:4004:80c::/64 '
             b'port 8000:9000 route-to lo0 keep state\n'
             b'pass out inet6 proto tcp to '
             b'2404:6800:4004:80c::101f/128 port 8080:8080\n'
             b'pass out inet6 proto udp to '
             b'<dns_servers> port 53 route-to lo0 keep state\n'),
        call('-e'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    with pytest.raises(Exception) as excinfo:
        method.setup_firewall(
            1025, 1027,
            [(AF_INET, u'1.2.3.33')],
            AF_INET,
            [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
                (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
            True,
            None)
    assert str(excinfo.value) == 'UDP not supported by pf method_name'
    assert mock_pf_get_dev.mock_calls == []
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == []

    method.setup_firewall(
        1025, 1027,
        [(AF_INET, u'1.2.3.33')],
        AF_INET,
        [(AF_INET, 24, False, u'1.2.3.0', 0, 0),
            (AF_INET, 32, True, u'1.2.3.66', 80, 80)],
        False,
        None)
    assert mock_ioctl.mock_calls == [
        call(mock_pf_get_dev(), 0xcd58441a, ANY),
        call(mock_pf_get_dev(), 0xcd58441a, ANY),
    ]
    assert mock_pfctl.mock_calls == [
        call('-s Interfaces -i lo -v'),
        call('-f /dev/stdin', b'match on lo\n'),
        call('-s all'),
        call('-a sshuttle-1025 -f /dev/stdin',
             b'table <dns_servers> {1.2.3.33}\n'
             b'pass in on lo0 inet proto tcp to 1.2.3.0/24 divert-to '
             b'127.0.0.1 port 1025\n'
             b'pass in on lo0 inet proto udp to '
             b'<dns_servers> port 53 rdr-to 127.0.0.1 port 1027\n'
             b'pass out inet proto tcp to 1.2.3.0/24 route-to lo0 keep state\n'
             b'pass out inet proto tcp to 1.2.3.66/32 port 80:80\n'
             b'pass out inet proto udp to '
             b'<dns_servers> port 53 route-to lo0 keep state\n'),
        call('-e'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_ioctl.reset_mock()
    mock_pfctl.reset_mock()

    method.restore_firewall(1025, AF_INET, False, None)
    method.restore_firewall(1024, AF_INET6, False, None)
    assert mock_ioctl.mock_calls == []
    assert mock_pfctl.mock_calls == [
        call('-a sshuttle-1025 -F all'),
        call('-a sshuttle6-1024 -F all'),
        call('-d'),
    ]
    mock_pf_get_dev.reset_mock()
    mock_pfctl.reset_mock()
    mock_ioctl.reset_mock()