1.1.0 interactive shell, save / load profile

This commit is contained in:
k4yt3x 2018-10-14 17:59:41 -04:00
parent a194c00ea7
commit 9b1ba9a4b7

View File

@ -4,23 +4,48 @@
Name: Wireguard Mesh Configurator
Dev: K4YT3X
Date Created: October 10, 2018
Last Modified: October 13, 2018
Last Modified: October 14, 2018
Licensed under the GNU General Public License Version 3 (GNU GPL v3),
available at: https://www.gnu.org/licenses/gpl-3.0.txt
(C) 2018 K4YT3X
"""
import argparse
import avalon_framework as avalon
import json
import os
import re
import readline
import subprocess
import sys
import traceback
VERSION = '1.0.1'
PEERS = []
VERSION = '1.1.0'
CONFIG_OUTPUT = '/tmp/wireguard'
COMMANDS = [
'LoadProfile',
'SaveProfile',
'NewProfile',
'GenerateConfigs',
'Exit',
'Quit',
]
class ShellCompleter(object):
def __init__(self, options):
self.options = sorted(options)
def complete(self, text, state):
if state == 0:
if text:
self.matches = [s for s in self.options if s and s.lower().startswith(text.lower())]
else:
self.matches = self.options[:]
try:
return self.matches[state]
except IndexError:
return None
class Peer:
@ -73,15 +98,54 @@ class WireGuard:
return output.decode().replace('\n', '')
def process_arguments():
"""This function parses all arguments
class ProfileManager(object):
""" Profile manaager
"""
parser = argparse.ArgumentParser()
control_group = parser.add_argument_group('Controls')
control_group.add_argument('-v', "--verbose", help='Display debug output (TODO)', action="store_true", default=False)
etc_group = parser.add_argument_group('Extra')
etc_group.add_argument('-V', '--version', help='Display version and legal information and exit', action="store_true", default=False)
return parser.parse_args()
def __init__(self):
""" Initialize peers list
"""
self.peers = []
def load_profile(self, profile_path):
""" Load profile from a json file
"""
avalon.dbgInfo('Loading profile from: {}'.format(profile_path))
with open(profile_path, 'r') as wgc_config:
profile = json.load(wgc_config)
wgc_config.close()
for peer in profile['peers']:
address = profile['peers'][peer]['address']
private_key = profile['peers'][peer]['private_key']
keep_alive = profile['peers'][peer]['keep_alive']
listen_port = profile['peers'][peer]['listen_port']
pm.peers.append(Peer(address, private_key, keep_alive, listen_port))
def save_profile(self, profile_path):
""" Save current profile to a json file
"""
avalon.dbgInfo('Writing profile to: {}'.format(profile_path))
# Convert peer objects into dictionary format
profile = {}
profile['peers'] = {}
for peer in pm.peers:
profile['peers'][peer.address] = {}
profile['peers'][peer.address]['address'] = peer.address
profile['peers'][peer.address]['private_key'] = peer.private_key
profile['peers'][peer.address]['keep_alive'] = peer.keep_alive
profile['peers'][peer.address]['listen_port'] = peer.listen_port
with open(profile_path, 'w') as wgc_config:
json.dump(profile, wgc_config, indent=2)
wgc_config.close()
def new_profile(self):
""" Create new profile and flush the peers list
"""
self.peers = []
get_peers_settings()
def print_welcome():
@ -116,7 +180,7 @@ def enroll_peer():
peer = Peer(address, private_key, keep_alive, listen_port, preshared_key)
"""
peer = Peer(address, private_key, keep_alive, listen_port)
PEERS.append(peer)
pm.peers.append(peer)
avalon.info('Peer information summary:')
print('Address: {}'.format(peer.address))
@ -126,24 +190,28 @@ def enroll_peer():
# print('Preshared Key: {}'.format(peer.preshared_key))
def gen_configs():
def generate_configs(output_path):
""" Generate configuration file for every peer
This function reads the PEERS list, generates a
configuration file for every peer, and export into
the CONFIG_OUTPUT directory.
"""
if len(PEERS) == 0:
if len(pm.peers) == 0:
avalon.warning('No peers configured, exiting')
exit(0)
if len(PEERS) == 1:
if len(pm.peers) == 1:
avalon.warning('Only one peer configured')
avalon.info('Generating configuration files')
for peer in PEERS:
if not os.path.isdir(output_path):
if not os.path.isfile(output_path) and not os.path.islink(output_path):
os.mkdir(output_path)
for peer in pm.peers:
avalon.dbgInfo('Generating configuration file for {}'.format(peer.address))
with open('{}/{}.conf'.format(CONFIG_OUTPUT, peer.address.split('/')[0]), 'w') as config:
with open('{}/{}.conf'.format(output_path, peer.address.split('/')[0]), 'w') as config:
# Write Interface config
config.write('[Interface]\n')
@ -154,7 +222,7 @@ def gen_configs():
config.write('ListenPort = {}\n'.format(peer.listen_port))
# Write peers' information
for p in PEERS:
for p in pm.peers:
if p.address == peer.address:
# Skip if peer is self
continue
@ -176,36 +244,106 @@ def get_peers_settings():
enroll_peer()
if __name__ == '__main__':
def print_help():
help_lines = [
'\n{}Commands are not case-sensitive{}'.format(avalon.FM.BD, avalon.FM.RST),
'LoadProfile [profile path]',
'SaveProfile [profile path]',
'NewProfile',
'GenerateConfigs [output directory]',
'Exit',
'Quit',
'',
]
for line in help_lines:
print(line)
def command_interpreter(commands):
""" AnyRadius shell command interpreter
"""
try:
# Print welcome message
print_welcome()
# Try to guess what the user is saying
possibilities = [s for s in COMMANDS if s.lower().startswith(commands[1])]
if len(possibilities) == 1:
commands[1] = possibilities[0]
# Parse command line arguments
args = process_arguments()
# Display version and legal information (--version)
if args.version: # prints program legal / dev / version info
print("Current Version: " + VERSION)
print("Author: K4YT3X")
print("License: GNU GPL v3")
print("Github Page: https://github.com/K4YT3X/wireguard-mesh-configurator")
print("Contact: k4yt3x@k4yt3x.com\n")
if commands[1].replace(' ', '') == '':
result = 0
elif commands[1].lower() == 'help':
print_help()
result = 0
elif commands[1].lower() == 'loadprofile':
result = pm.load_profile(commands[2])
elif commands[1].lower() == 'saveprofile':
result = pm.save_profile(commands[2])
elif commands[1].lower() == 'newprofile':
result = pm.new_profile()
elif commands[1].lower() == 'generateconfigs':
result = generate_configs(commands[2])
elif commands[1].lower() == 'exit' or commands[1].lower() == 'quit':
avalon.warning('Exiting')
exit(0)
elif len(possibilities) > 0:
avalon.warning('Ambiguous command \"{}\"'.format(commands[1]))
print('Use \"Help\" command to list available commands')
result = 1
else:
avalon.error('Invalid command')
print('Use \"Help\" command to list available commands')
result = 1
return result
except IndexError:
avalon.error('Invalid arguments')
print('Use \"Help\" command to list available commands')
result = 0
# Create object for wireguard binary handler
wg = WireGuard()
# Start addding peers
get_peers_settings()
def main():
""" WireGuard Mesh Configurator main function
This funciton controls the main flow of this program.
"""
# Generate configuration files
gen_configs()
except KeyboardInterrupt:
print()
avalon.warning('Keyboard interruption caught, exiting')
exit(1)
try:
if sys.argv[1].lower() == 'help':
print_help()
exit(0)
except IndexError:
pass
# Begin command interpreting
try:
if sys.argv[1].lower() == 'interactive' or sys.argv[1].lower() == 'int':
print_welcome()
# Set command completer
completer = ShellCompleter(COMMANDS)
readline.set_completer(completer.complete)
readline.parse_and_bind('tab: complete')
# Launch interactive trojan shell
prompt = '{}[WGC]> {}'.format(avalon.FM.BD, avalon.FM.RST)
while True:
command_interpreter([''] + input(prompt).split(' '))
else:
# Return to shell with command return value
exit(command_interpreter(sys.argv[0:]))
except IndexError:
avalon.warning('No commands specified')
exit(0)
except (KeyboardInterrupt, EOFError):
avalon.warning('Exiting')
exit(0)
except Exception:
avalon.error('Exception caught')
traceback.print_exc()
exit(1)
if __name__ == '__main__':
# Create global object for wireguard handler
wg = WireGuard()
# Create global object for profile manager
pm = ProfileManager()
# Launch main function
main()