From 52134482118dcbc9f17131d9bbe74b299af0a5db Mon Sep 17 00:00:00 2001 From: MrMcX Date: Mon, 28 May 2018 23:17:43 +0200 Subject: [PATCH] Thread classes outsourced to AnmapThread.py --- AnmapThread.py | 100 +++++++++++++++++++++++++++++++ LICENSE | 2 +- README.md | 8 ++- anmap.py | 160 +++++++++---------------------------------------- 4 files changed, 135 insertions(+), 135 deletions(-) create mode 100644 AnmapThread.py diff --git a/AnmapThread.py b/AnmapThread.py new file mode 100644 index 0000000..3ceec09 --- /dev/null +++ b/AnmapThread.py @@ -0,0 +1,100 @@ +from nmap import PortScanner +from threading import Thread +from datetime import datetime + + +class AnmapThread(Thread): + def __init__(self, hostname, ports, verbose, out): + Thread.__init__(self) + self.host = hostname + self.ports = ports + self.nm = PortScanner() + self.verbose = verbose + self.daemon = True + self.out = out + + +class ThoroughAnmapThread(AnmapThread): + def run(self): + log("Starting thorough scan on " + self.host, self.verbose) + self.nm.scan(self.host, "1," + ",".join(self.ports), + arguments='-sSVC -A -Pn{}'.format(output(self.out, self.host, 2))) + if self.out: + with open(output(True, self.host, 5), "w") as out: + out.write(self.nm.get_nmap_last_output()) + host = self.nm[self.host] + for p in host.all_tcp(): + if p == 1: + continue + log("Port {}/tcp: {}".format(p, host['tcp'][p]), self.verbose) + log("Finished thorough scan on " + self.host, self.verbose) + + +class UDPAnmapThread(AnmapThread): + def run(self): + log("Starting UDP scan on " + self.host, self.verbose) + self.nm.scan(self.host, arguments='-sVCU -A -Pn --top-ports {}{}'. + format(self.ports, output(self.out, self.host, 3))) + if self.out: + with open(output(True, self.host, 6), "w") as out: + out.write(self.nm.get_nmap_last_output()) + host = self.nm[self.host] + for p in host.all_udp(): + log("Port {}/udp: {}".format(p, host['udp'][p]), self.verbose) + log("Finished UDP scan on " + self.host, self.verbose) + + +class BaseAnmapThread(AnmapThread): + def __init__(self, hostname, ports, verbose, out): + AnmapThread.__init__(self, hostname, ports, verbose, out) + self.host_list = dict() + + def run(self): + log("Starting quick scan", self.verbose) + self.nm.scan(self.host, arguments='-sS -Pn -p{}{}'.format(self.ports, output(self.out, self.host, 1))) + if self.out: + with open(output(True, self.host, 4), "w") as out: + out.write(self.nm.get_nmap_last_output()) + log("Finished quick scan", self.verbose) + for hostname in self.nm.all_hosts(): + host = self.nm[hostname] + port_list = list() + for p in host.all_tcp(): + if self.nm[hostname]['tcp'][p]['state'] == 'open': + port_list.append(str(p)) + if port_list is not list(): + self.host_list[hostname] = port_list + + def rjoin(self): + Thread.join(self) + return self.host_list + + +def output(o, host, st): + host = host.replace("/", "x") + host = host.replace(" ", "") + if not o: + return "" + if st == 1: + return " -oG nmap_{}_S_{}.gnmap".format(host, date()) + if st == 2: + return " -oG nmap_{}_SVCA_{}.gnmap".format(host, date()) + if st == 3: + return " -oG nmap_{}_VCUA_{}.gnmap".format(host, date()) + if st == 4: + return "nmap_{}_S_{}.xml".format(host, date()) + if st == 5: + return "nmap_{}_SVCA_{}.xml".format(host, date()) + if st == 6: + return "nmap_{}_VCUA_{}.xml".format(host, date()) + + +def log(message, verbose): + if verbose: + print("{}: {}".format(date(True), message)) + + +def date(long=False): + if long: + return datetime.now().strftime("%Y-%m-%d_%H%M%S") + return datetime.now().strftime("%Y-%m-%d_%H%M") diff --git a/LICENSE b/LICENSE index 3f28a5f..a5d3059 100644 --- a/LICENSE +++ b/LICENSE @@ -1,6 +1,6 @@ MIT License -Copyright (c) +Copyright (c) 2018 Simon Moser Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: diff --git a/README.md b/README.md index 73b6e4b..5c9df93 100644 --- a/README.md +++ b/README.md @@ -1,10 +1,11 @@ # Anmap - Automatic nmap Scanner -Prerequisites: +### Prerequisites: * Python 3.6 (https://www.python.org/downloads/) * python-nmap (https://xael.org/pages/python-nmap-en.html) * nmap (https://nmap.org/) +### Output: ``` usage: anmap.py [-h] [-u UDP] [-v] [-d] [-o] HOST @@ -22,4 +23,7 @@ optional arguments: -d, --debug Sets flags -v and -u 100 and scans only the first 1000 tcp ports -o, --output Enables saving of output files -``` \ No newline at end of file +``` + +## License +(c) 2018 Simon Moser under MIT License (see LICENSE file) \ No newline at end of file diff --git a/anmap.py b/anmap.py index 33c35ff..f644b85 100644 --- a/anmap.py +++ b/anmap.py @@ -1,141 +1,37 @@ -from nmap import PortScanner -from threading import Thread from argparse import ArgumentParser -from datetime import datetime +from AnmapThread import UDPAnmapThread, ThoroughAnmapThread, BaseAnmapThread -class AnmapThread(Thread): - def __init__(self, hostname, ports, logger, out): - Thread.__init__(self) - self.host = hostname - self.ports = ports - self.nm = PortScanner() - self.logger = logger - self.daemon = True - self.out = out - - -class ThoroughAnmapThread(AnmapThread): - def run(self): - self.logger.log("Starting thorough scan on " + self.host) - self.nm.scan(self.host, "1," + ",".join(self.ports), - arguments='-sSVC -A -Pn{}'.format(output(self.out, self.host, 2))) - if self.out: - with open(output(True, self.host, 5), "w") as out: - out.write(self.nm.get_nmap_last_output()) - host = self.nm[self.host] - for p in host.all_tcp(): - if p == 1: - continue - self.logger.log("Port {}/tcp: {}".format(p, host['tcp'][p])) - self.logger.log("Finished thorough scan on " + self.host) - - -class UDPAnmapThread(AnmapThread): - def run(self): - self.logger.log("Starting UDP scan on " + self.host) - self.nm.scan(self.host, arguments='-sVCU -A -Pn --top-ports {}{}'. - format(self.ports, output(self.out, self.host, 3))) - if self.out: - with open(output(True, self.host, 6), "w") as out: - out.write(self.nm.get_nmap_last_output()) - host = self.nm[self.host] - for p in host.all_udp(): - self.logger.log("Port {}/udp: {}".format(p, host['udp'][p])) - self.logger.log("Finished UDP scan on " + self.host) - - -class Logger: - def __init__(self, verbose): - self.verbose = verbose - - def log(self, message): - if self.verbose: - print("{}: {}".format(date(True), message)) - - -def date(long=False): - if long: - return datetime.now().strftime("%Y-%m-%d_%H%M%S") - return datetime.now().strftime("%Y-%m-%d_%H%M") - - -def output(o, host, st): - host = host.replace("/", "x") - host = host.replace(" ", "") - if not o: - return "" - if st == 1: - return " -oG nmap_{}_S_{}.gnmap".format(host, date()) - if st == 2: - return " -oG nmap_{}_SVCA_{}.gnmap".format(host, date()) - if st == 3: - return " -oG nmap_{}_VCUA_{}.gnmap".format(host, date()) - if st == 4: - return "nmap_{}_S_{}.xml".format(host, date()) - if st == 5: - return "nmap_{}_SVCA_{}.xml".format(host, date()) - if st == 6: - return "nmap_{}_VCUA_{}.xml".format(host, date()) - - -def run(args): +if __name__ == "__main__": + # Argument parsing + ap = ArgumentParser(description="This script automates nmap scans by quickly scanning all TCP ports first and " + "executing a thorough scan on all ports found open afterwards. " + "Additionally it scans a given number of most used UDP ports.", + prog="anmap.py") + ap.add_argument("-u", "--udp", default=1000, type=int, help="The number of UDP ports to scan (Default 1000)") + ap.add_argument("-v", "--verbose", action="store_true", help="This enables verbose output") + ap.add_argument("-d", "--debug", action="store_true", + help="Sets flags -v and -u 100 and scans only the first 1000 tcp ports") + ap.add_argument("-o", "--output", action="store_true", help="Enables saving of output files") + ap.add_argument("HOST", type=str, help="The hosts to scan (Same notations as in nmap possible)") + args = ap.parse_args() if args.debug: args.verbose = True args.udp = 100 - l = Logger(args.verbose) - - # Scanning all tcp ports - nm = PortScanner() - l.log("Starting quick scan") - if args.debug: - nm.scan(args.HOST, arguments='-sS -Pn -p1-1000{}'.format(output(args.output, args.HOST, 1))) - else: - nm.scan(args.HOST, arguments='-sS -Pn -p-{}'.format(output(args.output, args.HOST, 1))) - if args.output: - with open(output(True, args.HOST, 4), "w") as out: - out.write(nm.get_nmap_last_output()) - l.log("Finished quick scan") - host_list = dict() - for hostname in nm.all_hosts(): - host = nm[hostname] - port_list = list() - for p in host.all_tcp(): - if nm[hostname]['tcp'][p]['state'] == 'open': - port_list.append(str(p)) - if port_list is not list(): - host_list[hostname] = port_list - - # Starting thorough and udp scan in separate threads - thread_list = [] - for host, open_port_list in host_list.items(): - t1 = ThoroughAnmapThread(host, open_port_list, l, args.output) - t1.start() - thread_list.append(t1) - t2 = UDPAnmapThread(host, args.udp, l, args.output) - t2.start() - thread_list.append(t2) - - # Waiting for the threads to finish - for t in thread_list: - t.join() - - -if __name__ == "__main__": - # Argument parsing - parser = ArgumentParser(description="This script automates nmap scans by quickly scanning all TCP ports first and " - "executing a thorough scan on all ports found open afterwards. " - "Additionally it scans a given number of most used UDP ports.", - prog="anmap.py") - parser.add_argument("-u", "--udp", default=1000, type=int, help="The number of UDP ports to scan (Default 1000)") - parser.add_argument("-v", "--verbose", action="store_true", help="This enables verbose output") - parser.add_argument("-d", "--debug", action="store_true", - help="Sets flags -v and -u 100 and scans only the first 1000 tcp ports") - parser.add_argument("-o", "--output", action="store_true", help="Enables saving of output files") - parser.add_argument("HOST", type=str, help="The hosts to scan (Same notations as in nmap possible)") - try: - run(parser.parse_args()) + # Scanning all tcp ports + t0 = BaseAnmapThread(args.HOST, "1-1000" if args.debug else "-", args.verbose, args.output) + t0.start() + host_list = t0.rjoin() + # Starting thorough and udp scan for each host in separate threads + thread_list = list() + for host, open_port_list in host_list.items(): + thread_list.append(ThoroughAnmapThread(host, open_port_list, args.verbose, args.output)) + thread_list.append(UDPAnmapThread(host, args.udp, args.verbose, args.output)) + for t in thread_list: + t.start() + # Waiting for the threads to finish + for t in thread_list: + t.join() except KeyboardInterrupt: print("User Interrupt") - exit(0)