1
0
Fork 0

Thread classes outsourced to AnmapThread.py

master
MrMcX vor 6 Jahren
Ursprung 26b4dbba0d
Commit 5213448211

@ -0,0 +1,100 @@
from nmap import PortScanner
from threading import Thread
from datetime import datetime
class AnmapThread(Thread):
def __init__(self, hostname, ports, verbose, out):
Thread.__init__(self)
self.host = hostname
self.ports = ports
self.nm = PortScanner()
self.verbose = verbose
self.daemon = True
self.out = out
class ThoroughAnmapThread(AnmapThread):
def run(self):
log("Starting thorough scan on " + self.host, self.verbose)
self.nm.scan(self.host, "1," + ",".join(self.ports),
arguments='-sSVC -A -Pn{}'.format(output(self.out, self.host, 2)))
if self.out:
with open(output(True, self.host, 5), "w") as out:
out.write(self.nm.get_nmap_last_output())
host = self.nm[self.host]
for p in host.all_tcp():
if p == 1:
continue
log("Port {}/tcp: {}".format(p, host['tcp'][p]), self.verbose)
log("Finished thorough scan on " + self.host, self.verbose)
class UDPAnmapThread(AnmapThread):
def run(self):
log("Starting UDP scan on " + self.host, self.verbose)
self.nm.scan(self.host, arguments='-sVCU -A -Pn --top-ports {}{}'.
format(self.ports, output(self.out, self.host, 3)))
if self.out:
with open(output(True, self.host, 6), "w") as out:
out.write(self.nm.get_nmap_last_output())
host = self.nm[self.host]
for p in host.all_udp():
log("Port {}/udp: {}".format(p, host['udp'][p]), self.verbose)
log("Finished UDP scan on " + self.host, self.verbose)
class BaseAnmapThread(AnmapThread):
def __init__(self, hostname, ports, verbose, out):
AnmapThread.__init__(self, hostname, ports, verbose, out)
self.host_list = dict()
def run(self):
log("Starting quick scan", self.verbose)
self.nm.scan(self.host, arguments='-sS -Pn -p{}{}'.format(self.ports, output(self.out, self.host, 1)))
if self.out:
with open(output(True, self.host, 4), "w") as out:
out.write(self.nm.get_nmap_last_output())
log("Finished quick scan", self.verbose)
for hostname in self.nm.all_hosts():
host = self.nm[hostname]
port_list = list()
for p in host.all_tcp():
if self.nm[hostname]['tcp'][p]['state'] == 'open':
port_list.append(str(p))
if port_list is not list():
self.host_list[hostname] = port_list
def rjoin(self):
Thread.join(self)
return self.host_list
def output(o, host, st):
host = host.replace("/", "x")
host = host.replace(" ", "")
if not o:
return ""
if st == 1:
return " -oG nmap_{}_S_{}.gnmap".format(host, date())
if st == 2:
return " -oG nmap_{}_SVCA_{}.gnmap".format(host, date())
if st == 3:
return " -oG nmap_{}_VCUA_{}.gnmap".format(host, date())
if st == 4:
return "nmap_{}_S_{}.xml".format(host, date())
if st == 5:
return "nmap_{}_SVCA_{}.xml".format(host, date())
if st == 6:
return "nmap_{}_VCUA_{}.xml".format(host, date())
def log(message, verbose):
if verbose:
print("{}: {}".format(date(True), message))
def date(long=False):
if long:
return datetime.now().strftime("%Y-%m-%d_%H%M%S")
return datetime.now().strftime("%Y-%m-%d_%H%M")

@ -1,6 +1,6 @@
MIT License MIT License
Copyright (c) <year> <copyright holders> Copyright (c) 2018 Simon Moser
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

@ -1,10 +1,11 @@
# Anmap - Automatic nmap Scanner # Anmap - Automatic nmap Scanner
Prerequisites: ### Prerequisites:
* Python 3.6 (https://www.python.org/downloads/) * Python 3.6 (https://www.python.org/downloads/)
* python-nmap (https://xael.org/pages/python-nmap-en.html) * python-nmap (https://xael.org/pages/python-nmap-en.html)
* nmap (https://nmap.org/) * nmap (https://nmap.org/)
### Output:
``` ```
usage: anmap.py [-h] [-u UDP] [-v] [-d] [-o] HOST usage: anmap.py [-h] [-u UDP] [-v] [-d] [-o] HOST
@ -22,4 +23,7 @@ optional arguments:
-d, --debug Sets flags -v and -u 100 and scans only the first 1000 -d, --debug Sets flags -v and -u 100 and scans only the first 1000
tcp ports tcp ports
-o, --output Enables saving of output files -o, --output Enables saving of output files
``` ```
## License
(c) 2018 Simon Moser under MIT License (see LICENSE file)

@ -1,141 +1,37 @@
from nmap import PortScanner
from threading import Thread
from argparse import ArgumentParser from argparse import ArgumentParser
from datetime import datetime from AnmapThread import UDPAnmapThread, ThoroughAnmapThread, BaseAnmapThread
class AnmapThread(Thread): if __name__ == "__main__":
def __init__(self, hostname, ports, logger, out): # Argument parsing
Thread.__init__(self) ap = ArgumentParser(description="This script automates nmap scans by quickly scanning all TCP ports first and "
self.host = hostname "executing a thorough scan on all ports found open afterwards. "
self.ports = ports "Additionally it scans a given number of most used UDP ports.",
self.nm = PortScanner() prog="anmap.py")
self.logger = logger ap.add_argument("-u", "--udp", default=1000, type=int, help="The number of UDP ports to scan (Default 1000)")
self.daemon = True ap.add_argument("-v", "--verbose", action="store_true", help="This enables verbose output")
self.out = out ap.add_argument("-d", "--debug", action="store_true",
help="Sets flags -v and -u 100 and scans only the first 1000 tcp ports")
ap.add_argument("-o", "--output", action="store_true", help="Enables saving of output files")
class ThoroughAnmapThread(AnmapThread): ap.add_argument("HOST", type=str, help="The hosts to scan (Same notations as in nmap possible)")
def run(self): args = ap.parse_args()
self.logger.log("Starting thorough scan on " + self.host)
self.nm.scan(self.host, "1," + ",".join(self.ports),
arguments='-sSVC -A -Pn{}'.format(output(self.out, self.host, 2)))
if self.out:
with open(output(True, self.host, 5), "w") as out:
out.write(self.nm.get_nmap_last_output())
host = self.nm[self.host]
for p in host.all_tcp():
if p == 1:
continue
self.logger.log("Port {}/tcp: {}".format(p, host['tcp'][p]))
self.logger.log("Finished thorough scan on " + self.host)
class UDPAnmapThread(AnmapThread):
def run(self):
self.logger.log("Starting UDP scan on " + self.host)
self.nm.scan(self.host, arguments='-sVCU -A -Pn --top-ports {}{}'.
format(self.ports, output(self.out, self.host, 3)))
if self.out:
with open(output(True, self.host, 6), "w") as out:
out.write(self.nm.get_nmap_last_output())
host = self.nm[self.host]
for p in host.all_udp():
self.logger.log("Port {}/udp: {}".format(p, host['udp'][p]))
self.logger.log("Finished UDP scan on " + self.host)
class Logger:
def __init__(self, verbose):
self.verbose = verbose
def log(self, message):
if self.verbose:
print("{}: {}".format(date(True), message))
def date(long=False):
if long:
return datetime.now().strftime("%Y-%m-%d_%H%M%S")
return datetime.now().strftime("%Y-%m-%d_%H%M")
def output(o, host, st):
host = host.replace("/", "x")
host = host.replace(" ", "")
if not o:
return ""
if st == 1:
return " -oG nmap_{}_S_{}.gnmap".format(host, date())
if st == 2:
return " -oG nmap_{}_SVCA_{}.gnmap".format(host, date())
if st == 3:
return " -oG nmap_{}_VCUA_{}.gnmap".format(host, date())
if st == 4:
return "nmap_{}_S_{}.xml".format(host, date())
if st == 5:
return "nmap_{}_SVCA_{}.xml".format(host, date())
if st == 6:
return "nmap_{}_VCUA_{}.xml".format(host, date())
def run(args):
if args.debug: if args.debug:
args.verbose = True args.verbose = True
args.udp = 100 args.udp = 100
l = Logger(args.verbose)
# Scanning all tcp ports
nm = PortScanner()
l.log("Starting quick scan")
if args.debug:
nm.scan(args.HOST, arguments='-sS -Pn -p1-1000{}'.format(output(args.output, args.HOST, 1)))
else:
nm.scan(args.HOST, arguments='-sS -Pn -p-{}'.format(output(args.output, args.HOST, 1)))
if args.output:
with open(output(True, args.HOST, 4), "w") as out:
out.write(nm.get_nmap_last_output())
l.log("Finished quick scan")
host_list = dict()
for hostname in nm.all_hosts():
host = nm[hostname]
port_list = list()
for p in host.all_tcp():
if nm[hostname]['tcp'][p]['state'] == 'open':
port_list.append(str(p))
if port_list is not list():
host_list[hostname] = port_list
# Starting thorough and udp scan in separate threads
thread_list = []
for host, open_port_list in host_list.items():
t1 = ThoroughAnmapThread(host, open_port_list, l, args.output)
t1.start()
thread_list.append(t1)
t2 = UDPAnmapThread(host, args.udp, l, args.output)
t2.start()
thread_list.append(t2)
# Waiting for the threads to finish
for t in thread_list:
t.join()
if __name__ == "__main__":
# Argument parsing
parser = ArgumentParser(description="This script automates nmap scans by quickly scanning all TCP ports first and "
"executing a thorough scan on all ports found open afterwards. "
"Additionally it scans a given number of most used UDP ports.",
prog="anmap.py")
parser.add_argument("-u", "--udp", default=1000, type=int, help="The number of UDP ports to scan (Default 1000)")
parser.add_argument("-v", "--verbose", action="store_true", help="This enables verbose output")
parser.add_argument("-d", "--debug", action="store_true",
help="Sets flags -v and -u 100 and scans only the first 1000 tcp ports")
parser.add_argument("-o", "--output", action="store_true", help="Enables saving of output files")
parser.add_argument("HOST", type=str, help="The hosts to scan (Same notations as in nmap possible)")
try: try:
run(parser.parse_args()) # Scanning all tcp ports
t0 = BaseAnmapThread(args.HOST, "1-1000" if args.debug else "-", args.verbose, args.output)
t0.start()
host_list = t0.rjoin()
# Starting thorough and udp scan for each host in separate threads
thread_list = list()
for host, open_port_list in host_list.items():
thread_list.append(ThoroughAnmapThread(host, open_port_list, args.verbose, args.output))
thread_list.append(UDPAnmapThread(host, args.udp, args.verbose, args.output))
for t in thread_list:
t.start()
# Waiting for the threads to finish
for t in thread_list:
t.join()
except KeyboardInterrupt: except KeyboardInterrupt:
print("User Interrupt") print("User Interrupt")
exit(0)

Laden…
Abbrechen
Speichern