| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174 |
- import gi
- gi.require_version('Gtk', '3.0')
- from gi.repository import GLib, Gtk, GObject
- import json
- from nginxparser import loads
- import re
- import spur
- import threading
- import time
- fp = open('creds.json', 'r')
- creds = json.load(fp)
- def ssh_command(command, sudo=False):
- shell = spur.SshShell(
- hostname=creds['host'],
- username=creds['user'],
- password=creds['passphrase'],
- private_key_file=creds['ssh_key_path'])
- with shell:
- command_bits = command.split(" ")
- if sudo:
- command_bits.insert(0, "sudo")
- process = shell.spawn(command_bits)
- if sudo:
- process.stdin_write(creds['password'])
- result = process.wait_for_result()
- return result.output.decode()
- #
- # def get_check_domain_command(domain):
- # return "sudo openssl x509 -text -in /etc/letsencrypt/live/" + domain + "/fullchain.pem"
- #
- # def join_commands(commands):
- # return " && ".join(commands)
- class EntryWindow(Gtk.Window):
- def __init__(self):
- Gtk.Window.__init__(self, title="Entry Demo")
- self.set_size_request(200, 100)
- self.timeout_id = None
- vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
- self.add(vbox)
- self.progressbar = Gtk.ProgressBar(show_text=True)
- # self.progressbar.set_fraction(0.0)
- vbox.pack_start(self.progressbar, True, True, 0)
- hbox = Gtk.Box(spacing=6)
- vbox.add(hbox)
- self.entry_passphrase = Gtk.Entry()
- # https://developer.gnome.org/gtk3/stable/GtkEntry.html#gtk-entry-set-invisible-char
- self.entry_passphrase.set_visibility(False)
- # self.entry_passphrase.set_text("Enter SSH key passphrase")
- hbox.pack_start(self.entry_passphrase, True, True, 0)
- self.entry_password = Gtk.Entry()
- self.entry_password.set_visibility(False)
- # self.entry_password.set_text("Enter sudo user password")
- hbox.pack_start(self.entry_password, True, True, 0)
- self.button = Gtk.Button(label="Click Here")
- self.button.connect("clicked", self.on_button_clicked)
- hbox.pack_start(self.button, True, True, 0)
- def update_progess(self, domain):
- self.progressbar.pulse()
- self.progressbar.set_text('Done: ' + domain)
- return False
- def get_https_domains(self):
- for d in self.domains:
- GLib.idle_add(self.get_https_subdomains_for_domain, d)
- time.sleep(0.4)
- def get_nginx_vhosts(self):
- for v in self.vhosts:
- GLib.idle_add(self.get_nginx_vhost, v)
- time.sleep(0.4)
- def start_thread(self, func):
- thread = threading.Thread(target=func)
- thread.daemon = True
- thread.start()
- def on_button_clicked(self, widget):
- decoded = ssh_command("ls /etc/nginx/sites-enabled")
- vhosts = decoded.split("\n")
- vhosts.pop()
- self.vhosts = vhosts
- self.num_vhosts = len(vhosts)
- self.vhosts_done = 0
- decoded = ssh_command("ls /etc/letsencrypt/live", True)
- domains = decoded.split("\n")
- domains.pop()
- self.domains = domains
- self.num_domains = len(domains)
- self.domains_done = 0
- self.start_thread(self.get_nginx_vhosts)
- # self.start_thread(self.get_https_domains)
- # subdomains = [self.get_https_subdomains_for_domain(d) for d in domains]
- # subdomains_dict = dict(zip(domains, subdomains))
- # print(subdomains_dict)
- def get_nginx_vhost(self, vhost):
- print(vhost)
- vhost_file = ssh_command("cat /etc/nginx/sites-enabled/" + vhost)
- parsed = loads(vhost_file)
- port_subdmomains = {}
- for server in parsed:
- server_inner = server[1]
- port = 0
- subdomains = []
- for directive in server_inner:
- if not port and "listen" in directive:
- p = re.compile('(\d+)')
- print('listen')
- ports = p.findall(directive[1])
- port = int(ports[0])
- if "server_name" in directive:
- print('server_name')
- print(directive)
- subd_trimmed = directive[1].strip()
- subdomains = subd_trimmed.split(' ')
- port_subdmomains[port] = subdomains
- print(port_subdmomains)
- self.vhosts_done += 1
- percent_done = self.vhosts_done * 1.0 / self.num_vhosts
- self.progressbar.set_fraction(percent_done)
- return False
- def get_https_subdomains_for_domain(self, domain):
- print(domain)
- p = re.compile('DNS:([0-9a-z-.]+)')
- cert_data = ssh_command("sudo openssl x509 -text -in /etc/letsencrypt/live/" + domain + "/fullchain.pem", True)
- self.domains_done += 1
- percent_done = self.domains_done * 1.0 / self.num_domains
- # print(percent_done)
- self.progressbar.set_fraction(percent_done)
- # self.progressbar.set_text('Done: ' + domain)
- # print(cert_data)
- # return p.findall (cert_data)
- return False
- def app_main():
- win = EntryWindow()
- win.connect("delete-event", Gtk.main_quit)
- win.show_all()
- if __name__ == '__main__':
- import signal
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- # Calling GObject.threads_init() is not needed for PyGObject 3.10.2+
- GObject.threads_init()
- app_main()
- Gtk.main()
|