import gi gi.require_version('Gtk', '3.0') from gi.repository import GLib, Gtk, GObject import json from nginxparser import loads import re import spur import threading import time fp = open('creds.json', 'r') creds = json.load(fp) def ssh_command(command, sudo=False): shell = spur.SshShell( hostname=creds['host'], username=creds['user'], password=creds['passphrase'], private_key_file=creds['ssh_key_path']) with shell: command_bits = command.split(" ") if sudo: command_bits.insert(0, "sudo") process = shell.spawn(command_bits) if sudo: process.stdin_write(creds['password']) result = process.wait_for_result() return result.output.decode() # # def get_check_domain_command(domain): # return "sudo openssl x509 -text -in /etc/letsencrypt/live/" + domain + "/fullchain.pem" # # def join_commands(commands): # return " && ".join(commands) class EntryWindow(Gtk.Window): def __init__(self): Gtk.Window.__init__(self, title="Entry Demo") self.set_size_request(200, 100) self.timeout_id = None vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6) self.add(vbox) self.progressbar = Gtk.ProgressBar(show_text=True) # self.progressbar.set_fraction(0.0) vbox.pack_start(self.progressbar, True, True, 0) hbox = Gtk.Box(spacing=6) vbox.add(hbox) self.entry_passphrase = Gtk.Entry() # https://developer.gnome.org/gtk3/stable/GtkEntry.html#gtk-entry-set-invisible-char self.entry_passphrase.set_visibility(False) # self.entry_passphrase.set_text("Enter SSH key passphrase") hbox.pack_start(self.entry_passphrase, True, True, 0) self.entry_password = Gtk.Entry() self.entry_password.set_visibility(False) # self.entry_password.set_text("Enter sudo user password") hbox.pack_start(self.entry_password, True, True, 0) self.button = Gtk.Button(label="Click Here") self.button.connect("clicked", self.on_button_clicked) hbox.pack_start(self.button, True, True, 0) def update_progess(self, domain): self.progressbar.pulse() self.progressbar.set_text('Done: ' + domain) return False def get_https_domains(self): for d in self.domains: GLib.idle_add(self.get_https_subdomains_for_domain, d) time.sleep(0.4) def get_nginx_vhosts(self): for v in self.vhosts: GLib.idle_add(self.get_nginx_vhost, v) time.sleep(0.4) def start_thread(self, func): thread = threading.Thread(target=func) thread.daemon = True thread.start() def on_button_clicked(self, widget): decoded = ssh_command("ls /etc/nginx/sites-enabled") vhosts = decoded.split("\n") vhosts.pop() self.vhosts = vhosts self.num_vhosts = len(vhosts) self.vhosts_done = 0 decoded = ssh_command("ls /etc/letsencrypt/live", True) domains = decoded.split("\n") domains.pop() self.domains = domains self.num_domains = len(domains) self.domains_done = 0 self.start_thread(self.get_nginx_vhosts) # self.start_thread(self.get_https_domains) # subdomains = [self.get_https_subdomains_for_domain(d) for d in domains] # subdomains_dict = dict(zip(domains, subdomains)) # print(subdomains_dict) def get_nginx_vhost(self, vhost): print(vhost) vhost_file = ssh_command("cat /etc/nginx/sites-enabled/" + vhost) parsed = loads(vhost_file) port_subdmomains = {} for server in parsed: server_inner = server[1] port = 0 subdomains = [] for directive in server_inner: if not port and "listen" in directive: p = re.compile('(\d+)') print('listen') ports = p.findall(directive[1]) port = int(ports[0]) if "server_name" in directive: print('server_name') print(directive) subd_trimmed = directive[1].strip() subdomains = subd_trimmed.split(' ') port_subdmomains[port] = subdomains print(port_subdmomains) self.vhosts_done += 1 percent_done = self.vhosts_done * 1.0 / self.num_vhosts self.progressbar.set_fraction(percent_done) return False def get_https_subdomains_for_domain(self, domain): print(domain) p = re.compile('DNS:([0-9a-z-.]+)') cert_data = ssh_command("sudo openssl x509 -text -in /etc/letsencrypt/live/" + domain + "/fullchain.pem", True) self.domains_done += 1 percent_done = self.domains_done * 1.0 / self.num_domains # print(percent_done) self.progressbar.set_fraction(percent_done) # self.progressbar.set_text('Done: ' + domain) # print(cert_data) # return p.findall (cert_data) return False def app_main(): win = EntryWindow() win.connect("delete-event", Gtk.main_quit) win.show_all() if __name__ == '__main__': import signal signal.signal(signal.SIGINT, signal.SIG_DFL) # Calling GObject.threads_init() is not needed for PyGObject 3.10.2+ GObject.threads_init() app_main() Gtk.main()