| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264 |
- import gi
- gi.require_version('Gtk', '3.0')
- from gi.repository import GLib, Gtk, GObject
- import json
- from nginxparser import loads
- import os
- import re
- import spur
- import threading
- import time
- DIR_PATH = os.path.dirname(os.path.realpath(__file__))
- VHOSTS_PATH = DIR_PATH + '/nginx-vhosts'
- if not os.path.isdir(VHOSTS_PATH):
- os.mkdir(VHOSTS_PATH)
- fp = open('creds.json', 'r')
- creds = json.load(fp)
- class Error(Exception):
- """Base class for exceptions in this module."""
- pass
- class InvalidDomainError(Error):
- """Exception raised for invalid domains.
- Attributes:
- domain -- domain name in which the error occurred
- """
- def __init__(self, domain):
- self.message = 'Invalid domain name: ' + domain
- def is_valid_domain(domain):
- p = re.compile('^[0-9a-z-.]+$')
- return p.match(domain)
- def is_domain_root(domain):
- return len(domain.split('.')) == 2
- def get_domain_root(subdomain):
- bits = subdomain.split('.')
- return '.'.join(bits[-2:])
- def ssh_command(command, sudo=False):
- shell = spur.SshShell(
- hostname=creds['host'],
- username=creds['user'],
- password=creds['passphrase'],
- private_key_file=creds['ssh_key_path'])
- with shell:
- command_bits = command.split(" ")
- if sudo:
- command_bits.insert(0, "sudo")
- process = shell.spawn(command_bits)
- if sudo:
- process.stdin_write(creds['password'])
- result = process.wait_for_result()
- return result.output.decode()
- #
- # def get_check_domain_command(domain):
- # return "sudo openssl x509 -text -in /etc/letsencrypt/live/" + domain + "/fullchain.pem"
- #
- # def join_commands(commands):
- # return " && ".join(commands)
- class EntryWindow(Gtk.Window):
- def __init__(self):
- Gtk.Window.__init__(self, title="Entry Demo")
- self.set_size_request(200, 100)
- self.timeout_id = None
- vbox = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=6)
- self.add(vbox)
- self.progressbar = Gtk.ProgressBar(show_text=True)
- # self.progressbar.set_fraction(0.0)
- vbox.pack_start(self.progressbar, True, True, 0)
- hbox = Gtk.Box(spacing=6)
- vbox.add(hbox)
- self.entry_passphrase = Gtk.Entry()
- # https://developer.gnome.org/gtk3/stable/GtkEntry.html#gtk-entry-set-invisible-char
- self.entry_passphrase.set_visibility(False)
- # self.entry_passphrase.set_text("Enter SSH key passphrase")
- hbox.pack_start(self.entry_passphrase, True, True, 0)
- self.entry_password = Gtk.Entry()
- self.entry_password.set_visibility(False)
- # self.entry_password.set_text("Enter sudo user password")
- hbox.pack_start(self.entry_password, True, True, 0)
- self.button = Gtk.Button(label="Click Here")
- self.button.connect("clicked", self.on_button_clicked)
- hbox.pack_start(self.button, True, True, 0)
- #Setting up the self.grid in which the elements are to be positionned
- self.grid = Gtk.Grid()
- self.grid.set_column_homogeneous(True)
- self.grid.set_row_homogeneous(True)
- vbox.add(self.grid)
- #Creating the TreeStore model
- self.vhost_treestore = Gtk.TreeStore(int, str, str)
- self.parents = {}
- vhost_list = []
- # for vhost in vhost_list:
- # self.vhost_treestore.append(list(vhost))
- self.language_filter = self.vhost_treestore.filter_new()
- self.language_filter.set_visible_func(self.filter_func)
- #creating the treeview, making it use the filter as a model, and adding the columns
- self.treeview = Gtk.TreeView.new_with_model(self.language_filter)
- for i, column_title in enumerate(["Port", "Domain", "File path"]):
- renderer = Gtk.CellRendererText()
- column = Gtk.TreeViewColumn(column_title, renderer, text=i)
- self.treeview.append_column(column)
- self.scrollable_treelist = Gtk.ScrolledWindow()
- self.scrollable_treelist.set_vexpand(True)
- self.grid.attach(self.scrollable_treelist, 0, 0, 8, 10)
- self.scrollable_treelist.add(self.treeview)
- def filter_func(self, model, iter, data):
- return True
- def update_progess(self, domain):
- self.progressbar.pulse()
- self.progressbar.set_text('Done: ' + domain)
- return False
- def get_https_domains(self):
- for d in self.domains:
- GLib.idle_add(self.get_https_subdomains_for_domain, d)
- time.sleep(0.4)
- def get_nginx_vhosts(self):
- for v in self.vhosts:
- GLib.idle_add(self.get_nginx_vhost, v)
- time.sleep(0.1)
- def start_thread(self, func):
- thread = threading.Thread(target=func)
- thread.daemon = True
- thread.start()
- def on_button_clicked(self, widget):
- decoded = ssh_command("ls /etc/nginx/sites-enabled")
- vhosts = decoded.split("\n")
- vhosts.pop()
- self.vhosts = vhosts
- self.num_vhosts = len(vhosts)
- self.vhosts_done = 0
- decoded = ssh_command("ls /etc/letsencrypt/live", True)
- domains = decoded.split("\n")
- domains.pop()
- self.domains = domains
- self.num_domains = len(domains)
- self.domains_done = 0
- self.start_thread(self.get_nginx_vhosts)
- # self.start_thread(self.get_https_domains)
- # subdomains = [self.get_https_subdomains_for_domain(d) for d in domains]
- # subdomains_dict = dict(zip(domains, subdomains))
- # print(subdomains_dict)
- def get_nginx_vhost(self, vhost):
- print(vhost)
- vhost_file = VHOSTS_PATH + '/' + vhost
- if not os.path.isfile(vhost_file):
- print("not found locally: " + vhost_file)
- vhost_content = ssh_command("cat /etc/nginx/sites-enabled/" + vhost)
- vhost_fp = open(vhost_file, "w")
- vhost_fp.write(vhost_content)
- vhost_fp.close()
- else:
- print("found locally: " + vhost_file)
- vhost_fp = open(vhost_file, "r")
- vhost_content = vhost_fp.read()
- vhost_fp.close()
- parsed = loads(vhost_content)
- port_subdmomains = {}
- for server in parsed:
- server_inner = server[1]
- port = 0
- subdomains = []
- for directive in server_inner:
- if not port and "listen" in directive:
- p = re.compile('(\d+)')
- # print('listen')
- ports = p.findall(directive[1])
- port = int(ports[0])
- if "server_name" in directive:
- # print('server_name')
- # print(directive)
- subd_trimmed = directive[1].strip()
- subdomains = subd_trimmed.split(' ')
- port_subdmomains[port] = subdomains
- for subd in subdomains:
- # subdomain = '*' + subd if is_domain_root(subd) else subd
- # print(subdomain)
- # self.vhost_treestore.append([port, vhost, subd])
- root = get_domain_root(subd)
- print('processing', root, '=>', subd)
- if not is_valid_domain(subd):
- raise InvalidDomainError(subd)
- if not root in self.parents.keys():
- self.parents[root] = self.vhost_treestore.append(None, (port, root, vhost))
- if root != subd:
- parent = self.parents[root]
- self.vhost_treestore.append(parent, (port, subd, vhost))
- # print(parents)
- # print(port_subdmomains)
- self.vhosts_done += 1
- percent_done = self.vhosts_done * 1.0 / self.num_vhosts
- self.progressbar.set_fraction(percent_done)
- return False
- def get_https_subdomains_for_domain(self, domain):
- # print(domain)
- p = re.compile('DNS:([0-9a-z-.]+)')
- cert_data = ssh_command("sudo openssl x509 -text -in /etc/letsencrypt/live/" + domain + "/fullchain.pem", True)
- self.domains_done += 1
- percent_done = self.domains_done * 1.0 / self.num_domains
- # print(percent_done)
- self.progressbar.set_fraction(percent_done)
- # self.progressbar.set_text('Done: ' + domain)
- # print(cert_data)
- # return p.findall (cert_data)
- return False
- def app_main():
- win = EntryWindow()
- win.connect("delete-event", Gtk.main_quit)
- win.show_all()
- if __name__ == '__main__':
- import signal
- signal.signal(signal.SIGINT, signal.SIG_DFL)
- # Calling GObject.threads_init() is not needed for PyGObject 3.10.2+
- GObject.threads_init()
- app_main()
- Gtk.main()
|