#!/usr/bin/env python3 # -*- coding: utf-8 -*- """ Name: Wireguard Mesh Configurator Dev: K4YT3X Date Created: October 10, 2018 Last Modified: Feburary 20, 2019 Licensed under the GNU General Public License Version 3 (GNU GPL v3), available at: https://www.gnu.org/licenses/gpl-3.0.txt (C) 2018-2019 K4YT3X """ from avalon_framework import Avalon import os import pickle import re import readline import subprocess import sys import traceback VERSION = '1.1.7' COMMANDS = [ 'Interactive', 'ShowPeers', 'LoadProfile', 'SaveProfile', 'NewProfile', 'AddPeer', 'DeletePeer', 'GenerateConfigs', 'Exit', 'Quit', ] class Utilities: """ Useful utilities This class contains a number of utility tools. """ def execute(command, input_value=''): process = subprocess.Popen(command, stdin=subprocess.PIPE, stdout=subprocess.PIPE) output = process.communicate(input=input_value)[0] return output.decode().replace('\n', '') class ShellCompleter(object): """ A Cisco-IOS-like shell completer This is a Cisco-IOS-like shell completer, that is not case-sensitive. If the command typed is not ambiguous, then execute the only command that matches. User does not have to enter the entire command. """ def __init__(self, options): self.options = sorted(options) def complete(self, text, state): if state == 0: if text: self.matches = [s for s in self.options if s and s.lower().startswith(text.lower())] else: self.matches = self.options[:] try: return self.matches[state] except IndexError: return None class Peer: """ Peer class Each object of this class represents a peer in the wireguard mesh network. """ def __init__(self, address, public_address, listen_port, private_key, keep_alive, preshared_key=False): self.address = address self.public_address = public_address self.listen_port = listen_port self.private_key = private_key self.keep_alive = keep_alive self.preshared_key = preshared_key class WireGuard: """ WireGuard utility controller This class handles the interactions with the wg binary, including: - genkey - pubkey - genpsk """ def __init__(self): pass def genkey(self): """ Generate WG private key Generate a new wireguard private key via wg command. """ return Utilities.execute(['wg', 'genkey']) def pubkey(self, public_key): """ Convert WG private key into public key Uses wg pubkey command to convert the wg private key into a public key. """ return Utilities.execute(['wg', 'pubkey'], input_value=public_key.encode('utf-8')) def genpsk(self): """ Generate a random base64 psk """ return Utilities.execute(['wg', 'genpsk']) class ProfileManager(object): """ Profile manager Each instance of this class represents a profile, which is a complete topology of a mesh / c/s network. """ def __init__(self): """ Initialize peers list """ self.peers = [] def load_profile(self, profile_path): """ Load profile from a file Open the pickle file, deserialize the content and load it back into the profile manager. """ Avalon.debug_info('Loading profile from: {}'.format(profile_path)) with open(profile_path, 'rb') as profile: pm.peers = pickle.load(profile) profile.close() def save_profile(self, profile_path): """ Save current profile to a file Serializes the current profile with pickle and dumps it into a file. """ # If profile already exists (file or link), ask the user if # we should overwrite it. if os.path.isfile(profile_path) or os.path.islink(profile_path): if not Avalon.ask('File already exists. Overwrite?', True): Avalon.warning('Aborted saving profile') return 1 # Abort if profile_path points to a directory if os.path.isdir(profile_path): Avalon.warning('Destination path is a directory') Avalon.warning('Aborted saving profile') return 1 # Finally, write the profile into the destination file Avalon.debug_info('Writing profile to: {}'.format(profile_path)) with open(profile_path, 'wb') as profile: pickle.dump(pm.peers, profile) profile.close() def new_profile(self): """ Create new profile and flush the peers list """ # Warn the user before flushing configurations Avalon.warning('This will flush the currently loaded profile!') if len(self.peers) != 0: if not Avalon.ask('Continue?', False): return # Reset self.peers and start enrolling new peer data self.peers = [] def print_welcome(): """ Print program name and legal information """ print('WireGuard Mesh Configurator {}'.format(VERSION)) print('(C) 2018 K4YT3X') print('Licensed under GNU GPL v3') def print_peer_config(peer): """ Print the configuration of a specific peer Input takes one Peer object. """ Avalon.info('Peer {} information summary:'.format(peer.address)) if peer.address: print('Address: {}'.format(peer.address)) if peer.public_address: print('Public Address: {}'.format(peer.public_address)) if peer.listen_port: print('Listen Port: {}'.format(peer.listen_port)) print('Private Key: {}'.format(peer.private_key)) if peer.keep_alive: print('Keep Alive: {}'.format(peer.keep_alive)) # print('Preshared Key: {}'.format(peer.preshared_key)) def add_peer(): """ Enroll a new peer Gets all the information needed to generate a new Peer class object. """ # Get peer tunnel address while True: address = Avalon.gets('Address (leave empty if client only): ') if re.match('^(?:\d{1,3}\.){3}\d{1,3}/{1}(?:\d\d?)?$', address) is None: Avalon.error('Invalid address entered') Avalon.error('Please use CIDR notation (e.g. 10.0.0.0/8)') continue break # Get peer public IP address while True: public_address = Avalon.gets('Public address (leave empty if client only): ') # Check if public_address is valid IP or FQDN valid_address = False if re.match('^(?:\d{1,3}\.){3}\d{1,3}(?:/\d\d?)?$', public_address) is not None: valid_address = True if re.match('(?=^.{4,253}$)(^((?!-)[a-zA-Z0-9-]{1,63}(? 0: Avalon.warning('Ambiguous command \"{}\"'.format(commands[1])) print('Use \"Help\" command to list available commands') result = 1 else: Avalon.error('Invalid command') print('Use \"Help\" command to list available commands') result = 1 return result except IndexError: Avalon.error('Invalid arguments') print('Use \"Help\" command to list available commands') result = 0 def main(): """ WireGuard Mesh Configurator main function This function controls the main flow of this program. """ try: if sys.argv[1].lower() == 'help': print_help() exit(0) except IndexError: pass # Begin command interpreting try: if sys.argv[1].lower() == 'interactive' or sys.argv[1].lower() == 'int': print_welcome() # Set command completer completer = ShellCompleter(COMMANDS) readline.set_completer(completer.complete) readline.parse_and_bind('tab: complete') # Launch interactive trojan shell prompt = '{}[WGC]> {}'.format(Avalon.FM.BD, Avalon.FM.RST) while True: command_interpreter([''] + input(prompt).split(' ')) else: # Return to shell with command return value exit(command_interpreter(sys.argv[0:])) except IndexError: Avalon.warning('No commands specified') print_help() exit(0) except (KeyboardInterrupt, EOFError): Avalon.warning('Exiting') exit(0) except Exception: Avalon.error('Exception caught') traceback.print_exc() exit(1) if __name__ == '__main__': # Create global object for WireGuard handler wg = WireGuard() # Create global object for profile manager pm = ProfileManager() # Launch main function main()