import gi gi.require_version('Gtk', '3.0') from gi.repository import GLib, Gtk, GObject import json from nginxparser import loads import os import re import spur import threading import time DIR_PATH = os.path.dirname(os.path.realpath(__file__)) VHOSTS_PATH = DIR_PATH + '/nginx-vhosts' if not os.path.isdir(VHOSTS_PATH): os.mkdir(VHOSTS_PATH) fp = open('creds.json', 'r') creds = json.load(fp) class Error(Exception): """Base class for exceptions in this module.""" pass class InvalidDomainError(Error): """Exception raised for invalid domains. Attributes: domain -- domain name in which the error occurred """ def __init__(self, domain): self.message = 'Invalid domain name: ' + domain def is_valid_domain(domain): p = re.compile('^[0-9a-z-.]+$') return p.match(domain) def is_domain_root(domain): return len(domain.split('.')) == 2 def get_domain_root(subdomain): bits = subdomain.split('.') return '.'.join(bits[-2:]) def ssh_command(command, sudo=False): shell = spur.SshShell( hostname=creds['host'], username=creds['user'], password=creds['passphrase'], private_key_file=creds['ssh_key_path']) with shell: command_bits = command.split(" ") if sudo: command_bits.insert(0, "sudo") process = shell.spawn(command_bits) if sudo: process.stdin_write(creds['password']) result = process.wait_for_result() return result.output.decode() # # def get_check_domain_command(domain): # return "sudo openssl x509 -text -in /etc/letsencrypt/live/" + domain + "/fullchain.pem" # # def join_commands(commands): # return " && ".join(commands) class EntryWindow(Gtk.Window): def __init__(self): Gtk.Window.__init__(self, title="Entry Demo") self.set_size_request(200, 100) self.timeout_id = None vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) self.add(vbox) self.progressbar = Gtk.ProgressBar(show_text=True) # self.progressbar.set_fraction(0.0) vbox.pack_start(self.progressbar, True, True, 0) hbox = Gtk.Box(spacing=6) vbox.add(hbox) self.entry_passphrase = Gtk.Entry() # https://developer.gnome.org/gtk3/stable/GtkEntry.html#gtk-entry-set-invisible-char self.entry_passphrase.set_visibility(False) # self.entry_passphrase.set_text("Enter SSH key passphrase") hbox.pack_start(self.entry_passphrase, True, True, 0) self.entry_password = Gtk.Entry() self.entry_password.set_visibility(False) # self.entry_password.set_text("Enter sudo user password") hbox.pack_start(self.entry_password, True, True, 0) self.button = Gtk.Button(label="Click Here") self.button.connect("clicked", self.on_button_clicked) hbox.pack_start(self.button, True, True, 0) #Setting up the self.grid in which the elements are to be positionned self.grid = Gtk.Grid() self.grid.set_column_homogeneous(True) self.grid.set_row_homogeneous(True) vbox.add(self.grid) #Creating the TreeStore model self.vhost_treestore = Gtk.TreeStore(int, str, str) vhost_list = [] # for vhost in vhost_list: # self.vhost_treestore.append(list(vhost)) self.language_filter = self.vhost_treestore.filter_new() self.language_filter.set_visible_func(self.filter_func) #creating the treeview, making it use the filter as a model, and adding the columns self.treeview = Gtk.TreeView.new_with_model(self.language_filter) for i, column_title in enumerate(["Port", "File path", "Root domain"]): renderer = Gtk.CellRendererText() column = Gtk.TreeViewColumn(column_title, renderer, text=i) self.treeview.append_column(column) self.scrollable_treelist = Gtk.ScrolledWindow() self.scrollable_treelist.set_vexpand(True) self.grid.attach(self.scrollable_treelist, 0, 0, 8, 10) self.scrollable_treelist.add(self.treeview) def filter_func(self, model, iter, data): return True def update_progess(self, domain): self.progressbar.pulse() self.progressbar.set_text('Done: ' + domain) return False def get_https_domains(self): for d in self.domains: GLib.idle_add(self.get_https_subdomains_for_domain, d) time.sleep(0.4) def get_nginx_vhosts(self): for v in self.vhosts: GLib.idle_add(self.get_nginx_vhost, v) time.sleep(0.1) def start_thread(self, func): thread = threading.Thread(target=func) thread.daemon = True thread.start() def on_button_clicked(self, widget): decoded = ssh_command("ls /etc/nginx/sites-enabled") vhosts = decoded.split("\n") vhosts.pop() self.vhosts = vhosts self.num_vhosts = len(vhosts) self.vhosts_done = 0 decoded = ssh_command("ls /etc/letsencrypt/live", True) domains = decoded.split("\n") domains.pop() self.domains = domains self.num_domains = len(domains) self.domains_done = 0 self.start_thread(self.get_nginx_vhosts) # self.start_thread(self.get_https_domains) # subdomains = [self.get_https_subdomains_for_domain(d) for d in domains] # subdomains_dict = dict(zip(domains, subdomains)) # print(subdomains_dict) def get_nginx_vhost(self, vhost): print(vhost) vhost_file = VHOSTS_PATH + '/' + vhost if not os.path.isfile(vhost_file): print("not found locally: " + vhost_file) vhost_content = ssh_command("cat /etc/nginx/sites-enabled/" + vhost) vhost_fp = open(vhost_file, "w") vhost_fp.write(vhost_content) vhost_fp.close() else: print("found locally: " + vhost_file) vhost_fp = open(vhost_file, "r") vhost_content = vhost_fp.read() vhost_fp.close() parsed = loads(vhost_content) port_subdmomains = {} for server in parsed: server_inner = server[1] port = 0 subdomains = [] for directive in server_inner: if not port and "listen" in directive: p = re.compile('(\d+)') # print('listen') ports = p.findall(directive[1]) port = int(ports[0]) if "server_name" in directive: # print('server_name') # print(directive) subd_trimmed = directive[1].strip() subdomains = subd_trimmed.split(' ') port_subdmomains[port] = subdomains parents = {} for subd in subdomains: # subdomain = '*' + subd if is_domain_root(subd) else subd # print(subdomain) # self.vhost_treestore.append([port, vhost, subd]) root = get_domain_root(subd) print('processing', root, '=>', subd) if not is_valid_domain(subd): raise InvalidDomainError(subd) if not root in parents.keys(): parents[root] = self.vhost_treestore.append(None, (port, vhost, subd)) if root != subd: parent = parents[root] self.vhost_treestore.append(parent, (port, '', subd)) # print(parents) # print(port_subdmomains) self.vhosts_done += 1 percent_done = self.vhosts_done * 1.0 / self.num_vhosts self.progressbar.set_fraction(percent_done) return False def get_https_subdomains_for_domain(self, domain): # print(domain) p = re.compile('DNS:([0-9a-z-.]+)') cert_data = ssh_command("sudo openssl x509 -text -in /etc/letsencrypt/live/" + domain + "/fullchain.pem", True) self.domains_done += 1 percent_done = self.domains_done * 1.0 / self.num_domains # print(percent_done) self.progressbar.set_fraction(percent_done) # self.progressbar.set_text('Done: ' + domain) # print(cert_data) # return p.findall (cert_data) return False def app_main(): win = EntryWindow() win.connect("delete-event", Gtk.main_quit) win.show_all() if __name__ == '__main__': import signal signal.signal(signal.SIGINT, signal.SIG_DFL) # Calling GObject.threads_init() is not needed for PyGObject 3.10.2+ GObject.threads_init() app_main() Gtk.main()