1
0
Fork 0

Result class added

master
Simon Moser vor 7 Jahren
Ursprung 0eb4435f73
Commit aed253e77b

@ -0,0 +1,93 @@
from utils import log, Col
class AnmapResult(dict):
def add_host(self, host):
try:
if host.services_tcp:
self[host.hostname].append(host.services_tcp)
if host.services_udp:
self[host.hostname].append(host.services_udp, True)
if host.osmatch:
self[host.hostname].osmatch.extend(host.osmatch)
except KeyError:
self[host.hostname] = host
return host.open_ports
def print(self):
for host in self.values():
host.print()
class AnmapHost:
def __init__(self, host, verbose=False):
self.ip = host["addresses"]["ipv4"]
self.hostname = host.hostname()
self.osmatch = list()
try:
self.osmatch.extend(host["osmatch"])
except KeyError:
self.information = ""
self.state = host.state()
self.services_tcp = dict()
self.services_udp = dict()
self.open_ports = 0
log(self.get_info(), verbose)
for t in host.all_tcp():
if "open" not in host['tcp'][t]['state']:
continue
self.open_ports += 1
log("Port {}/tcp: {}".format(t, host['tcp'][t]), verbose)
self.services_tcp[t] = host["tcp"][t]
for u in host.all_udp():
if "open" not in host['udp'][u]['state']:
continue
self.open_ports += 1
log("Port {}/udp: {}".format(u, host['udp'][u]), verbose)
self.services_udp[u] = host["udp"][u]
def print(self):
print(self.get_info())
for guess in self.osmatch:
print("\tOS-Guess: {} ({}%)".format(guess["name"], guess["accuracy"]))
for osclass in guess["osclass"]:
for key, value in osclass.items():
print("\t\t{}: {}".format(key, value))
for port, info in self.services_tcp.items():
print("\tPort {}/tcp:".format(port))
for key, value in info.items():
if key == "script":
print("\t\tScript:")
for k, v in value.items():
print("\t\t\t{}: {}".format(k, v))
continue
print("\t\t{}: {}".format(key, value))
for port, info in self.services_udp.items():
print("\tPort {}/udp:".format(port))
for key, value in info.items():
if key == "script":
print("\t\tScript:")
for k, v in value.items():
print("\t\t\t{}: {}".format(k, v.replace("\n", "\n\t\t\t\t")))
continue
print("\t\t{}: {}".format(key, value))
def get_info(self):
return "{}/{} is {} with {} open ports".format(self.hostname, self.ip, self.state, self.open_ports)
def append(self, services, udp=False):
for port, service in services.items():
if udp:
if port in self.services_udp:
for key, value in service.items():
self.services_udp[port][key] = value
continue
self.services_udp[port] = service
self.open_ports += 1
else:
if port in self.services_tcp:
for key, value in service.items():
self.services_tcp[port][key] = value
continue
self.services_tcp[port] = service
self.open_ports += 1

@ -1,12 +1,12 @@
import nmap import nmap
import masscan import masscan
from threading import Thread from threading import Thread
from datetime import datetime from AnmapResult import AnmapHost
from pprint import pprint from utils import date, log
class AnmapThread(Thread): class AnmapThread(Thread):
def __init__(self, hostname, ports, verbose, out): def __init__(self, hostname, ports, verbose, out, result, proto):
Thread.__init__(self) Thread.__init__(self)
self.host = hostname self.host = hostname
self.ports = ports self.ports = ports
@ -14,95 +14,45 @@ class AnmapThread(Thread):
self.verbose = verbose self.verbose = verbose
self.daemon = True self.daemon = True
self.out = out self.out = out
self.result = result
self.proto = proto
class ThoroughAnmapThread(AnmapThread):
def run(self):
log("Starting thorough scan on " + self.host, self.verbose)
self.scanner.scan(self.host, "1," + ",".join(self.ports),
arguments='-sSVC -A -Pn{}'.format(output(self.out, self.host, 2)))
log(self.scanner.command_line(), self.verbose)
if self.out:
with open(output(True, self.host, 5), "w") as outfile:
outfile.write(self.scanner.get_nmap_last_output())
host = self.scanner[self.host]
log("{}/{} is {}".format(host.hostname(), host["addresses"]["ipv4"], host["osmatch"][0]["name"]), self.verbose)
for p in host.all_tcp():
if p == 1:
continue
log("Port {}/tcp: {}".format(p, host['tcp'][p]), self.verbose)
log("Finished thorough scan on " + self.host, self.verbose)
class UDPAnmapThread(AnmapThread):
def run(self):
log("Starting UDP scan on " + self.host, self.verbose)
self.scanner.scan(self.host, arguments='-sVCU -A -Pn --top-ports {}{}'.
format(self.ports, output(self.out, self.host, 3)))
log(self.scanner.command_line(), self.verbose)
if self.out:
with open(output(True, self.host, 6), "w") as outfile:
outfile.write(self.scanner.get_nmap_last_output())
host = self.scanner[self.host]
log("{}/{} is {}".format(host.hostname(), host["addresses"]["ipv4"], host["osmatch"][0]["name"]), self.verbose)
for p in host.all_udp():
log("Port {}/udp: {}".format(p, host['udp'][p]), self.verbose)
log("Finished UDP scan on " + self.host, self.verbose)
class BaseAnmapThread(AnmapThread):
def __init__(self, hostname, ports, verbose, out):
AnmapThread.__init__(self, hostname, ports, verbose, out)
self.host_dict = dict()
def run(self): def run(self):
log("Starting quick scan", self.verbose) try:
self.scanner.scan(self.host, arguments='-sS -Pn -p{}{}'.format(self.ports, output(self.out, self.host, 1))) log("Starting {} scan on {}".format(self.proto, self.host), self.verbose)
log(self.scanner.command_line(), self.verbose) if self.proto == "tcp":
if self.out: xml = 5
with open(output(True, self.host, 4), "w") as outfile: self.scanner.scan(self.host, arguments='-p 1,{} -sSVC -A -Pn{}'.format(",".join(map(str, self.ports)), output(self.out, self.host, 2)))
outfile.write(self.scanner.get_nmap_last_output()) elif self.proto == "udp":
log("Finished quick scan", self.verbose) xml = 6
np = 0 self.scanner.scan(self.host, arguments='-sVCU -A -Pn --top-ports {}{}'.format(self.ports, output(self.out, self.host, 3)))
for hostname in self.scanner.all_hosts(): elif self.proto == "quick":
host = self.scanner[hostname] xml = 4
port_list = list() self.scanner.scan(self.host, arguments='-sS -Pn -p{}{}'.format(self.ports, output(self.out, self.host, 1)))
for p in host.all_tcp(): elif self.proto == "masscan":
if self.scanner[hostname]['tcp'][p]['state'] == 'open': xml = False
port_list.append(str(p)) self.scanner.scan(self.host, ports=self.ports, arguments=output(self.out, self.host, 7), sudo=True)
if port_list is not list(): else:
self.host_dict[hostname] = port_list return
np += len(port_list) log(self.scanner.command_line(), self.verbose)
log("Found {} open ports on {} host(s) with {}".format(np, len(self.host_dict), "nmap"), self.verbose) if self.out and xml:
with open(output(True, self.host, xml), "w") as outfile:
def rjoin(self): outfile.write(self.scanner.get_nmap_last_output())
Thread.join(self) np = nh = 0
return self.host_dict for hn in self.scanner.all_hosts():
np += self.result.add_host(AnmapHost(self.scanner[hn], self.verbose))
nh += 1
class MasscanAnmapThread(BaseAnmapThread): log("Found {} open ports on {} host(s)".format(np, nh), self.verbose)
log("Finished {} scan on {}".format(self.proto, self.host), self.verbose)
except KeyboardInterrupt:
return
class MasscanThread(AnmapThread):
def __init__(self, hostname, ports, verbose, out): def __init__(self, hostname, ports, verbose, out):
AnmapThread.__init__(self, hostname, ports, verbose, out) AnmapThread.__init__(self, hostname, ports, verbose, out)
self.host_dict = dict()
self.scanner = masscan.PortScanner() self.scanner = masscan.PortScanner()
def run(self):
log("Starting masscan scan", self.verbose)
self.scanner.scan(self.host, ports=self.ports, arguments=output(self.out, self.host, 7), sudo=True)
log(self.scanner.command_line(), self.verbose)
log("Finished quick scan", self.verbose)
np = 0
for hostname in self.scanner.all_hosts():
host = self.scanner[hostname]
port_list = list()
for p in host.all_tcp():
if self.scanner[hostname]['tcp'][p]['state'] == 'open':
port_list.append(str(p))
if port_list is not list():
self.host_dict[hostname] = port_list
np += len(port_list)
log("Found {} open ports on {} host(s) with {}".format(np, len(self.host_dict), "masscan"), self.verbose)
def output(o, host, st): def output(o, host, st):
host = host.replace("/", "x") host = host.replace("/", "x")
@ -123,13 +73,3 @@ def output(o, host, st):
return "nmap_{}_VCUA_{}.xml".format(host, date()) return "nmap_{}_VCUA_{}.xml".format(host, date())
if st == 7: if st == 7:
return " -oG masscan_{}_S_{}.gnmap -oX masscan_{}_S_{}.xml".format(host, date(), host, date()) return " -oG masscan_{}_S_{}.gnmap -oX masscan_{}_S_{}.xml".format(host, date(), host, date())
def log(message, verbose):
if verbose: print("{}: {}".format(date(True), message))
def date(long=False):
if long:
return datetime.now().strftime("%Y-%m-%d_%H%M%S")
return datetime.now().strftime("%Y-%m-%d_%H%M")

@ -1,6 +1,7 @@
#!/usr/bin/python3 #!/usr/bin/python3
from argparse import ArgumentParser from argparse import ArgumentParser
from AnmapThread import UDPAnmapThread, ThoroughAnmapThread, BaseAnmapThread, MasscanAnmapThread from AnmapThread import AnmapThread, MasscanThread
from AnmapResult import AnmapResult
if __name__ == "__main__": if __name__ == "__main__":
@ -14,41 +15,44 @@ if __name__ == "__main__":
# Not functional yet # Not functional yet
# ap.add_argument("-m", "--masscan", action="store_true", help="This enables masscan for first scan") # ap.add_argument("-m", "--masscan", action="store_true", help="This enables masscan for first scan")
ap.add_argument("-d", "--debug", action="store_true", ap.add_argument("-d", "--debug", action="store_true",
help="Sets flags -v and -u 100 and scans only the first 1000 tcp ports") help="Sets flags -v and -u 10 and scans only the first 100 tcp ports")
ap.add_argument("-o", "--output", action="store_true", help="Enables saving of output files") ap.add_argument("-o", "--output", action="store_true", help="Enables saving of output files")
ap.add_argument("HOST", type=str, help="The hosts to scan (Same notations as in nmap possible)") ap.add_argument("HOST", type=str, help="The hosts to scan (Same notations as in nmap possible)")
args = ap.parse_args() args = ap.parse_args()
if args.debug: if args.debug:
args.verbose = True # args.verbose = True
args.udp = 100 args.udp = 10
result = AnmapResult()
try: try:
c = host_dict = "" c = ""
try: try:
ms = args.masscan ms = args.masscan
except AttributeError: except AttributeError:
ms = False ms = False
ports = "1-100" if args.debug else "-"
if ms: if ms:
tm = MasscanAnmapThread(args.HOST, "1-1000" if args.debug else "-", args.verbose, args.output) tm = MasscanThread(args.HOST, ports, args.verbose, args.output)
tm.start() tm.start()
host_dict = tm.rjoin() tm.join()
c = input("Do you want to continue without a full nmap scan? (y/N)") c = input("Do you want to continue without a full nmap scan? (y/N)")
if c != "y": if c != "y":
t0 = BaseAnmapThread(args.HOST, "1-1000" if args.debug else "-", args.verbose, args.output) t0 = AnmapThread(args.HOST, ports, args.verbose, args.output, result, "quick")
t0.start() t0.start()
host_dict = t0.rjoin() t0.join()
else: else:
t0 = BaseAnmapThread(args.HOST, "1-1000" if args.debug else "-", args.verbose, args.output) t0 = AnmapThread(args.HOST, ports, args.verbose, args.output, result, "quick")
t0.start() t0.start()
host_dict = t0.rjoin() t0.join()
# Starting thorough and udp scan for each host in separate threads # Starting thorough and udp scan for each host in separate threads
thread_list = list() thread_list = list()
for host, open_port_list in host_dict.items(): for name, host in result.items():
thread_list.append(ThoroughAnmapThread(host, open_port_list, args.verbose, args.output)) thread_list.append(AnmapThread(name, host.services_tcp.keys(), args.verbose, args.output, result, "tcp"))
thread_list.append(UDPAnmapThread(host, args.udp, args.verbose, args.output)) thread_list.append(AnmapThread(name, args.udp, args.verbose, args.output, result, "udp"))
for t in thread_list: for t in thread_list:
t.start() t.start()
# Waiting for the threads to finish # Waiting for the threads to finish
for t in thread_list: for t in thread_list:
t.join() t.join()
result.print()
except KeyboardInterrupt: except KeyboardInterrupt:
print("User Interrupt") print("User Interrupt")

@ -0,0 +1,17 @@
from datetime import datetime
class Col:
SRED = "\033[91m"
ERED = "\033[0m"
def log(message, verbose):
if verbose:
print("{}: {}".format(date(True), message))
def date(long=False):
if long:
return datetime.now().strftime("%Y-%m-%d_%H%M%S")
return datetime.now().strftime("%Y-%m-%d_%H%M")
Laden…
Abbrechen
Speichern