mirror of
https://github.com/RYDE-WORK/CORStest.git
synced 2026-01-19 13:13:33 +08:00
Python3 migration
This commit is contained in:
parent
d8ddce2425
commit
e0a258f578
153
corstest.py
153
corstest.py
@ -1,8 +1,14 @@
|
|||||||
#!/usr/bin/env python2
|
#!/usr/bin/env python3
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
# python standard library
|
# python standard library
|
||||||
import re, sys, ssl, signal, urllib2, urlparse, argparse, multiprocessing
|
import re, sys, ssl, signal
|
||||||
|
import argparse, multiprocessing
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
import urllib.request
|
||||||
|
import logging
|
||||||
|
|
||||||
|
timeoutValue = 1000
|
||||||
|
|
||||||
# -------------------------------------------------------------------------------------------------
|
# -------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
@ -17,44 +23,69 @@ def usage():
|
|||||||
return parser.parse_args()
|
return parser.parse_args()
|
||||||
|
|
||||||
# -------------------------------------------------------------------------------------------------
|
# -------------------------------------------------------------------------------------------------
|
||||||
|
args = usage()
|
||||||
|
|
||||||
def main():
|
def main():
|
||||||
global args; args = usage()
|
|
||||||
try:
|
try:
|
||||||
urls = [line.rstrip() for line in open(args.infile)]
|
urls = [line.rstrip() for line in open(args.infile)]
|
||||||
procs = min(abs(int(args.p or 32)), len(urls)) or 1
|
procs = min(abs(int(args.p or 32)), len(urls)) or 1
|
||||||
except (IOError, ValueError) as e: print e; return
|
except (IOError, ValueError) as e:
|
||||||
|
print("An error has occured: {}".format(e))
|
||||||
|
return
|
||||||
|
|
||||||
# check domains/urls in parallel but clean exit on ctrl-c
|
# check domains/urls in parallel but clean exit on ctrl-c
|
||||||
sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
|
sigint_handler = signal.signal(signal.SIGINT, signal.SIG_IGN)
|
||||||
pool = multiprocessing.Pool(processes=procs)
|
pool = multiprocessing.Pool(processes=procs)
|
||||||
signal.signal(signal.SIGINT, sigint_handler)
|
signal.signal(signal.SIGINT, sigint_handler)
|
||||||
try: pool.map_async(check, urls).get(2**32)
|
try:
|
||||||
except KeyboardInterrupt: pass
|
pool.map_async(check, urls).get( timeout=timeoutValue )
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
pass
|
||||||
|
|
||||||
# -------------------------------------------------------------------------------------------------
|
# -------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
# check for vulns/misconfigurations
|
# check for vulns/misconfigurations
|
||||||
def check(url):
|
def check(url):
|
||||||
if re.findall("^https://", url): args.s = True # set protocol
|
logging.debug("[DEBUG - check] url: {}".format(url))
|
||||||
url = re.sub("^https?://", "", url) # url w/o proto
|
|
||||||
host = urlparse.urlparse("//"+url).hostname or "" # set hostname
|
if re.findall("^https://", url):
|
||||||
acao = cors(url, url, False, True) # perform request
|
args.s = True # set protocol
|
||||||
|
|
||||||
|
url = re.sub("^https?://", "", url) # url w/o proto
|
||||||
|
host = urlparse("//"+url).hostname or "" # set hostname
|
||||||
|
acao = cors(url, url, False, True) # perform request
|
||||||
|
logging.debug("[DEBUG - check] acao: {}".format(acao))
|
||||||
|
|
||||||
if acao:
|
if acao:
|
||||||
if args.q and (acao == "no_acac" or "*" == acao): return
|
if args.q and (acao == "no_acac" or "*" == acao):
|
||||||
if acao == "*": info(url, "* (without credentials)")
|
return
|
||||||
elif acao in ["//", "://"]: alert(url, "Any origin allowed") # firefox/chrome/safari/opera only
|
if acao == "*":
|
||||||
elif re.findall("\s|,|\|", acao): invalid(url, "Multiple values in Access-Control-Allow-Origin")
|
info(url, "* (without credentials)")
|
||||||
elif re.findall("\*.", acao): invalid(url, 'Wrong use of wildcard, only single "*" is valid')
|
elif acao in ["//", "://"]:
|
||||||
elif re.findall("fiddle.jshell.net|s.codepen.io", acao): alert(url, "Developer backdoor")
|
alert(url, "Any origin allowed") # firefox/chrome/safari/opera only
|
||||||
elif "evil.org" in cors(url, "evil.org"): alert(url, "Origin reflection")
|
elif re.findall(r"\s|,|\|", acao):
|
||||||
elif cors(url, url, True).startswith("http://"): warning(url, "Non-ssl site allowed")
|
invalid(url, "Multiple values in Access-Control-Allow-Origin")
|
||||||
elif "sub."+host in cors(url, "sub."+url): warning(url, "Arbitrary subdomains allowed")
|
elif re.findall(r"\*.", acao):
|
||||||
|
invalid(url, 'Wrong use of wildcard, only single "*" is valid')
|
||||||
|
elif re.findall(r"fiddle.jshell.net|s.codepen.io", acao):
|
||||||
|
alert(url, "Developer backdoor")
|
||||||
|
elif "evil.org" in cors(url, "evil.org"):
|
||||||
|
alert(url, "Origin reflection")
|
||||||
|
elif cors(url, url, True).startswith("http://"):
|
||||||
|
warning(url, "Non-ssl site allowed")
|
||||||
|
elif "sub."+host in cors(url, "sub."+url):
|
||||||
|
warning(url, "Arbitrary subdomains allowed")
|
||||||
elif "not"+host in cors(url, "not"+url):
|
elif "not"+host in cors(url, "not"+url):
|
||||||
alert(url, "Pre-domain wildcard") if sld(host) else warning(url, "Pre-subdomain wildcard")
|
alert(url, "Pre-domain wildcard") if sld(host) else warning(url, "Pre-subdomain wildcard")
|
||||||
elif host+".tk" in cors(url, host+".tk"): alert(url, "Post-domain wildcard")
|
elif host+".tk" in cors(url, host+".tk"):
|
||||||
elif "null" == cors(url, "null").lower(): alert(url, "Null misconfiguration")
|
alert(url, "Post-domain wildcard")
|
||||||
else: info(url, acao)
|
elif "null" == cors(url, "null").lower():
|
||||||
elif acao != None and not args.q: notvuln(url, "Access-Control-Allow-Origin header not present")
|
alert(url, "Null misconfiguration")
|
||||||
|
else:
|
||||||
|
info(url, acao)
|
||||||
|
elif acao != None and not args.q:
|
||||||
|
notvuln(url, "Access-Control-Allow-Origin header not present")
|
||||||
|
|
||||||
# TBD: maybe use CORS preflight options request instead to check if cors protocol is understood
|
# TBD: maybe use CORS preflight options request instead to check if cors protocol is understood
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
@ -63,48 +94,78 @@ def check(url):
|
|||||||
# perform request and fetch response header
|
# perform request and fetch response header
|
||||||
def cors(url, origin, ssltest=False, firstrun=False):
|
def cors(url, origin, ssltest=False, firstrun=False):
|
||||||
url = ("http://" if not (ssltest or args.s) else "https://") + url
|
url = ("http://" if not (ssltest or args.s) else "https://") + url
|
||||||
if origin != "null": origin = ("http://" if (ssltest or not args.s) else "https://") + origin
|
logging.debug("[DEBUG - cors] url: {}".format(url))
|
||||||
|
|
||||||
|
if origin != "null":
|
||||||
|
origin = ("http://" if (ssltest or not args.s) else "https://") + origin
|
||||||
|
|
||||||
|
logging.debug("[DEBUG - cors] origin: {}".format(origin))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
request = urllib2.Request(url)
|
request = urllib.request.Request(url)
|
||||||
|
|
||||||
|
logging.debug("[DEBUG - cors] request: {}".format(request))
|
||||||
|
|
||||||
request.add_header('Origin', origin)
|
request.add_header('Origin', origin)
|
||||||
request.add_header('Cookie', args.c or "")
|
request.add_header('Cookie', args.c or "")
|
||||||
request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64)')
|
request.add_header('User-Agent', 'Mozilla/5.0 (Windows NT 6.1; WOW64)')
|
||||||
if not "_create_unverified_context" in dir(ssl): response = urllib2.urlopen(request, timeout=10)
|
if not "_create_unverified_context" in dir(ssl):
|
||||||
else: response = urllib2.urlopen(request, timeout=10, context=ssl._create_unverified_context())
|
response = urllib.request.urlopen(request, timeout=10)
|
||||||
acao = response.info().getheader('Access-Control-Allow-Origin')
|
else:
|
||||||
acac = str(response.info().getheader('Access-Control-Allow-Credentials')).lower() == "true"
|
response = urllib.request.urlopen(request, timeout=10, context=ssl._create_unverified_context())
|
||||||
vary = "Origin" in str(response.info().getheader('Vary'))
|
|
||||||
if args.v: print "%s\n%-10s%s\n%-10s%s\n%-10s%s\n%-10s%s" % ("-" * 72, "Resource:", \
|
logging.debug("[DEBUG - cors] response: {}".format(response))
|
||||||
response.geturl(), "Origin:", origin, "ACAO:", acao or "-", "ACAC:", acac or "-")
|
|
||||||
|
response_info = response.info()
|
||||||
|
logging.debug("[DEBUG - cors] response_info: {}".format(response_info))
|
||||||
|
|
||||||
|
acao = response.getheader('Access-Control-Allow-Origin')
|
||||||
|
acac = str(response.getheader('Access-Control-Allow-Credentials')).lower() == "true"
|
||||||
|
vary = "Origin" in str(response.getheader('Vary'))
|
||||||
|
logging.debug("[DEBUG - cors] acao: {}, acac: {}, vary: {}".format(acao, acac, vary))
|
||||||
|
|
||||||
|
if args.v:
|
||||||
|
print("%s\n%-10s%s\n%-10s%s\n%-10s%s\n%-10s%s" % ("-" * 72, "Resource:", response.geturl(), "Origin:", origin, "ACAO:", acao or "-", "ACAC:", acac or "-"))
|
||||||
if firstrun:
|
if firstrun:
|
||||||
if args.q and not acac: acao = "no_acac"
|
if args.q and not acac:
|
||||||
if acac and acao != '*' and not args.q: alert(url, "Access-Control-Allow-Credentials present")
|
acao = "no_acac"
|
||||||
if vary and not args.q: warning(url, "Access-Control-Allow-Origin dynamically generated")
|
if acac and acao != '*' and not args.q:
|
||||||
if ssltest and response.info().getheader('Strict-Transport-Security'): acao = ""
|
alert(url, "Access-Control-Allow-Credentials present")
|
||||||
|
if vary and not args.q:
|
||||||
|
warning(url, "Access-Control-Allow-Origin dynamically generated")
|
||||||
|
if ssltest and response.getheader('Strict-Transport-Security'):
|
||||||
|
acao = ""
|
||||||
return (acao or "") if acac else ""
|
return (acao or "") if acac else ""
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if not args.q: error(url, e.message or str(e).splitlines()[-1])
|
logging.debug("[DEBUG - cors] e: {}".format(e))
|
||||||
if not firstrun: return ""
|
if not args.q:
|
||||||
|
error(url, e or str(e).splitlines()[-1])
|
||||||
|
if not firstrun:
|
||||||
|
return ""
|
||||||
|
|
||||||
# -------------------------------------------------------------------------------------------------
|
# -------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
# check if given hostname is a second-level domain
|
# check if given hostname is a second-level domain
|
||||||
def sld(host):
|
def sld(host):
|
||||||
try:
|
try:
|
||||||
with open('tldlist.dat') as f: tlds = [line.strip() for line in f if line[0] not in "/\n"][::-1]
|
with open('tldlist.dat') as f:
|
||||||
except IOError as e: return True
|
tlds = [line.strip() for line in f if line[0] not in "/\n"][::-1]
|
||||||
|
except IOError as e:
|
||||||
|
logging.debug("[DEBUG - sld]: e: {}".format(e))
|
||||||
|
return True
|
||||||
|
|
||||||
for tld in tlds:
|
for tld in tlds:
|
||||||
if host.endswith('.' + tld): host = host[:-len(tld)]
|
if host.endswith('.' + tld): host = host[:-len(tld)]
|
||||||
if host.count('.') == 1: return True
|
if host.count('.') == 1: return True
|
||||||
|
|
||||||
# -------------------------------------------------------------------------------------------------
|
# -------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
def error(url, msg): print "\x1b[2m" + url, "- Error:", msg + "\x1b[0m"
|
def error(url, msg): print("\x1b[2m{} - Error: {}\x1b[0m".format( url, msg))
|
||||||
def alert(url, msg): print "\x1b[97;41m" + url, "- Alert:", msg + "\x1b[0m"
|
def alert(url, msg): print("\x1b[97;41m{} - Alert: {}\x1b[0m".format(url, msg))
|
||||||
def invalid(url, msg): print "\x1b[30;43m" + url, "- Invalid:", msg + "\x1b[0m"
|
def invalid(url, msg): print("\x1b[30;43m{} - Invalid: {}\x1b[0m".format(url, msg))
|
||||||
def warning(url, msg): print "\x1b[30;48;5;202m" + url, "- Warning:", msg + "\x1b[0m"
|
def warning(url, msg): print("\x1b[30;48;5;202m{} - Warning: {}\x1b[0m".format(url, msg))
|
||||||
def notvuln(url, msg): print "\x1b[97;100m" + url, "- Not vulnerable:", msg + "\x1b[0m"
|
def notvuln(url, msg): print("\x1b[97;100m{} - Not vulnerable: {}\x1b[0m".format(url, msg))
|
||||||
def info(url, msg): print "\x1b[30;42m" + url, "- Access-Control-Allow-Origin:", msg + "\x1b[0m"
|
def info(url, msg): print("\x1b[30;42m{} - Access-Control-Allow-Origin: {}\x1b[0m".format(url, msg))
|
||||||
|
|
||||||
# -------------------------------------------------------------------------------------------------
|
# -------------------------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user