diff --git a/AnmapResult.py b/AnmapResult.py new file mode 100644 index 0000000..29dde17 --- /dev/null +++ b/AnmapResult.py @@ -0,0 +1,93 @@ +from utils import log, Col + + +class AnmapResult(dict): + def add_host(self, host): + try: + if host.services_tcp: + self[host.hostname].append(host.services_tcp) + if host.services_udp: + self[host.hostname].append(host.services_udp, True) + if host.osmatch: + self[host.hostname].osmatch.extend(host.osmatch) + except KeyError: + self[host.hostname] = host + return host.open_ports + + def print(self): + for host in self.values(): + host.print() + + +class AnmapHost: + def __init__(self, host, verbose=False): + self.ip = host["addresses"]["ipv4"] + self.hostname = host.hostname() + self.osmatch = list() + try: + self.osmatch.extend(host["osmatch"]) + except KeyError: + self.information = "" + self.state = host.state() + self.services_tcp = dict() + self.services_udp = dict() + self.open_ports = 0 + log(self.get_info(), verbose) + for t in host.all_tcp(): + if "open" not in host['tcp'][t]['state']: + continue + self.open_ports += 1 + log("Port {}/tcp: {}".format(t, host['tcp'][t]), verbose) + self.services_tcp[t] = host["tcp"][t] + for u in host.all_udp(): + if "open" not in host['udp'][u]['state']: + continue + self.open_ports += 1 + log("Port {}/udp: {}".format(u, host['udp'][u]), verbose) + self.services_udp[u] = host["udp"][u] + + def print(self): + print(self.get_info()) + for guess in self.osmatch: + print("\tOS-Guess: {} ({}%)".format(guess["name"], guess["accuracy"])) + for osclass in guess["osclass"]: + for key, value in osclass.items(): + print("\t\t{}: {}".format(key, value)) + for port, info in self.services_tcp.items(): + print("\tPort {}/tcp:".format(port)) + for key, value in info.items(): + if key == "script": + print("\t\tScript:") + for k, v in value.items(): + print("\t\t\t{}: {}".format(k, v)) + continue + print("\t\t{}: {}".format(key, value)) + for port, info in self.services_udp.items(): + print("\tPort {}/udp:".format(port)) + for key, value in info.items(): + if key == "script": + print("\t\tScript:") + for k, v in value.items(): + print("\t\t\t{}: {}".format(k, v.replace("\n", "\n\t\t\t\t"))) + continue + print("\t\t{}: {}".format(key, value)) + + def get_info(self): + return "{}/{} is {} with {} open ports".format(self.hostname, self.ip, self.state, self.open_ports) + + def append(self, services, udp=False): + for port, service in services.items(): + if udp: + if port in self.services_udp: + for key, value in service.items(): + self.services_udp[port][key] = value + continue + self.services_udp[port] = service + self.open_ports += 1 + else: + if port in self.services_tcp: + for key, value in service.items(): + self.services_tcp[port][key] = value + continue + self.services_tcp[port] = service + self.open_ports += 1 diff --git a/AnmapThread.py b/AnmapThread.py index 9dac0ff..e177eba 100644 --- a/AnmapThread.py +++ b/AnmapThread.py @@ -1,12 +1,12 @@ import nmap import masscan from threading import Thread -from datetime import datetime -from pprint import pprint +from AnmapResult import AnmapHost +from utils import date, log class AnmapThread(Thread): - def __init__(self, hostname, ports, verbose, out): + def __init__(self, hostname, ports, verbose, out, result, proto): Thread.__init__(self) self.host = hostname self.ports = ports @@ -14,95 +14,45 @@ class AnmapThread(Thread): self.verbose = verbose self.daemon = True self.out = out - - -class ThoroughAnmapThread(AnmapThread): - def run(self): - log("Starting thorough scan on " + self.host, self.verbose) - self.scanner.scan(self.host, "1," + ",".join(self.ports), - arguments='-sSVC -A -Pn{}'.format(output(self.out, self.host, 2))) - log(self.scanner.command_line(), self.verbose) - if self.out: - with open(output(True, self.host, 5), "w") as outfile: - outfile.write(self.scanner.get_nmap_last_output()) - host = self.scanner[self.host] - log("{}/{} is {}".format(host.hostname(), host["addresses"]["ipv4"], host["osmatch"][0]["name"]), self.verbose) - for p in host.all_tcp(): - if p == 1: - continue - log("Port {}/tcp: {}".format(p, host['tcp'][p]), self.verbose) - log("Finished thorough scan on " + self.host, self.verbose) - - -class UDPAnmapThread(AnmapThread): - def run(self): - log("Starting UDP scan on " + self.host, self.verbose) - self.scanner.scan(self.host, arguments='-sVCU -A -Pn --top-ports {}{}'. - format(self.ports, output(self.out, self.host, 3))) - log(self.scanner.command_line(), self.verbose) - if self.out: - with open(output(True, self.host, 6), "w") as outfile: - outfile.write(self.scanner.get_nmap_last_output()) - host = self.scanner[self.host] - log("{}/{} is {}".format(host.hostname(), host["addresses"]["ipv4"], host["osmatch"][0]["name"]), self.verbose) - for p in host.all_udp(): - log("Port {}/udp: {}".format(p, host['udp'][p]), self.verbose) - log("Finished UDP scan on " + self.host, self.verbose) - - -class BaseAnmapThread(AnmapThread): - def __init__(self, hostname, ports, verbose, out): - AnmapThread.__init__(self, hostname, ports, verbose, out) - self.host_dict = dict() + self.result = result + self.proto = proto def run(self): - log("Starting quick scan", self.verbose) - self.scanner.scan(self.host, arguments='-sS -Pn -p{}{}'.format(self.ports, output(self.out, self.host, 1))) - log(self.scanner.command_line(), self.verbose) - if self.out: - with open(output(True, self.host, 4), "w") as outfile: - outfile.write(self.scanner.get_nmap_last_output()) - log("Finished quick scan", self.verbose) - np = 0 - for hostname in self.scanner.all_hosts(): - host = self.scanner[hostname] - port_list = list() - for p in host.all_tcp(): - if self.scanner[hostname]['tcp'][p]['state'] == 'open': - port_list.append(str(p)) - if port_list is not list(): - self.host_dict[hostname] = port_list - np += len(port_list) - log("Found {} open ports on {} host(s) with {}".format(np, len(self.host_dict), "nmap"), self.verbose) - - def rjoin(self): - Thread.join(self) - return self.host_dict - - -class MasscanAnmapThread(BaseAnmapThread): + try: + log("Starting {} scan on {}".format(self.proto, self.host), self.verbose) + if self.proto == "tcp": + xml = 5 + self.scanner.scan(self.host, arguments='-p 1,{} -sSVC -A -Pn{}'.format(",".join(map(str, self.ports)), output(self.out, self.host, 2))) + elif self.proto == "udp": + xml = 6 + self.scanner.scan(self.host, arguments='-sVCU -A -Pn --top-ports {}{}'.format(self.ports, output(self.out, self.host, 3))) + elif self.proto == "quick": + xml = 4 + self.scanner.scan(self.host, arguments='-sS -Pn -p{}{}'.format(self.ports, output(self.out, self.host, 1))) + elif self.proto == "masscan": + xml = False + self.scanner.scan(self.host, ports=self.ports, arguments=output(self.out, self.host, 7), sudo=True) + else: + return + log(self.scanner.command_line(), self.verbose) + if self.out and xml: + with open(output(True, self.host, xml), "w") as outfile: + outfile.write(self.scanner.get_nmap_last_output()) + np = nh = 0 + for hn in self.scanner.all_hosts(): + np += self.result.add_host(AnmapHost(self.scanner[hn], self.verbose)) + nh += 1 + log("Found {} open ports on {} host(s)".format(np, nh), self.verbose) + log("Finished {} scan on {}".format(self.proto, self.host), self.verbose) + except KeyboardInterrupt: + return + + +class MasscanThread(AnmapThread): def __init__(self, hostname, ports, verbose, out): AnmapThread.__init__(self, hostname, ports, verbose, out) - self.host_dict = dict() self.scanner = masscan.PortScanner() - def run(self): - log("Starting masscan scan", self.verbose) - self.scanner.scan(self.host, ports=self.ports, arguments=output(self.out, self.host, 7), sudo=True) - log(self.scanner.command_line(), self.verbose) - log("Finished quick scan", self.verbose) - np = 0 - for hostname in self.scanner.all_hosts(): - host = self.scanner[hostname] - port_list = list() - for p in host.all_tcp(): - if self.scanner[hostname]['tcp'][p]['state'] == 'open': - port_list.append(str(p)) - if port_list is not list(): - self.host_dict[hostname] = port_list - np += len(port_list) - log("Found {} open ports on {} host(s) with {}".format(np, len(self.host_dict), "masscan"), self.verbose) - def output(o, host, st): host = host.replace("/", "x") @@ -123,13 +73,3 @@ def output(o, host, st): return "nmap_{}_VCUA_{}.xml".format(host, date()) if st == 7: return " -oG masscan_{}_S_{}.gnmap -oX masscan_{}_S_{}.xml".format(host, date(), host, date()) - - -def log(message, verbose): - if verbose: print("{}: {}".format(date(True), message)) - - -def date(long=False): - if long: - return datetime.now().strftime("%Y-%m-%d_%H%M%S") - return datetime.now().strftime("%Y-%m-%d_%H%M") diff --git a/anmap.py b/anmap.py index 46cdecb..e5f3c8b 100755 --- a/anmap.py +++ b/anmap.py @@ -1,6 +1,7 @@ #!/usr/bin/python3 from argparse import ArgumentParser -from AnmapThread import UDPAnmapThread, ThoroughAnmapThread, BaseAnmapThread, MasscanAnmapThread +from AnmapThread import AnmapThread, MasscanThread +from AnmapResult import AnmapResult if __name__ == "__main__": @@ -14,41 +15,44 @@ if __name__ == "__main__": # Not functional yet # ap.add_argument("-m", "--masscan", action="store_true", help="This enables masscan for first scan") ap.add_argument("-d", "--debug", action="store_true", - help="Sets flags -v and -u 100 and scans only the first 1000 tcp ports") + help="Sets flags -v and -u 10 and scans only the first 100 tcp ports") ap.add_argument("-o", "--output", action="store_true", help="Enables saving of output files") ap.add_argument("HOST", type=str, help="The hosts to scan (Same notations as in nmap possible)") args = ap.parse_args() if args.debug: - args.verbose = True - args.udp = 100 + # args.verbose = True + args.udp = 10 + result = AnmapResult() try: - c = host_dict = "" + c = "" try: ms = args.masscan except AttributeError: ms = False + ports = "1-100" if args.debug else "-" if ms: - tm = MasscanAnmapThread(args.HOST, "1-1000" if args.debug else "-", args.verbose, args.output) + tm = MasscanThread(args.HOST, ports, args.verbose, args.output) tm.start() - host_dict = tm.rjoin() + tm.join() c = input("Do you want to continue without a full nmap scan? (y/N)") if c != "y": - t0 = BaseAnmapThread(args.HOST, "1-1000" if args.debug else "-", args.verbose, args.output) + t0 = AnmapThread(args.HOST, ports, args.verbose, args.output, result, "quick") t0.start() - host_dict = t0.rjoin() + t0.join() else: - t0 = BaseAnmapThread(args.HOST, "1-1000" if args.debug else "-", args.verbose, args.output) + t0 = AnmapThread(args.HOST, ports, args.verbose, args.output, result, "quick") t0.start() - host_dict = t0.rjoin() + t0.join() # Starting thorough and udp scan for each host in separate threads thread_list = list() - for host, open_port_list in host_dict.items(): - thread_list.append(ThoroughAnmapThread(host, open_port_list, args.verbose, args.output)) - thread_list.append(UDPAnmapThread(host, args.udp, args.verbose, args.output)) + for name, host in result.items(): + thread_list.append(AnmapThread(name, host.services_tcp.keys(), args.verbose, args.output, result, "tcp")) + thread_list.append(AnmapThread(name, args.udp, args.verbose, args.output, result, "udp")) for t in thread_list: t.start() # Waiting for the threads to finish for t in thread_list: t.join() + result.print() except KeyboardInterrupt: print("User Interrupt") diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..efbde44 --- /dev/null +++ b/utils.py @@ -0,0 +1,17 @@ +from datetime import datetime + + +class Col: + SRED = "\033[91m" + ERED = "\033[0m" + + +def log(message, verbose): + if verbose: + print("{}: {}".format(date(True), message)) + + +def date(long=False): + if long: + return datetime.now().strftime("%Y-%m-%d_%H%M%S") + return datetime.now().strftime("%Y-%m-%d_%H%M") \ No newline at end of file