Самооборона по-хакерски. Выявляем атаки в Active Directory

Содержание статьи

  • Аутентификации
  • Изменения объектов в Active Directory
  • Атаки на пользователей
  • Атаки на компьютеры
  • Атаки на GPO
  • Атаки на группы
  • ACL-атаки
  • Атаки на ADCS
  • Выводы

В этой статье мы раз­берем­ся, по каким приз­накам мож­но узнать, что хакер уже вов­сю ору­дует в тво­ем домене, авто­мати­зиру­ем про­цесс обна­руже­ния этих атак и пре­дус­мотрим средс­тва про­тиво­дей­ствия.

Ес­ли зло­умыш­ленник так и не был обна­ружен на уров­не сети и все же доб­рался до Active Directory с учет­кой домен­ного поль­зовате­ля, то пер­вый раунд мы про­игра­ли. Теперь у него в арсе­нале будет с десяток‑дру­гой край­не эффектив­ных атак, боль­шая часть из которых дос­таточ­но бес­шумна.

В этот момент начина­ется вто­рой, уже финаль­ный раунд нашего про­тивос­тояния. На кону ни мно­го ни мало вся внут­ренняя инфраструк­тура и, воз­можно, даже биз­нес ком­пании. Если домен выдер­жит самые прос­тые и про­бив­ные экс­пло­иты, которые хакер опро­бует в пер­вые минуты, и тому при­дет­ся при­менять более шум­ные ата­ки, то финаль­ную бит­ву еще мож­но выиг­рать.

В этой час­ти мы научим­ся слы­шать едва уло­вимый шорох зло­умыш­ленни­ка, что засел в Active Directory и, изу­чив твои мис­конфи­ги, начал уве­рен­ное дви­жение впе­ред — к кон­трол­лерам домена.

 

Аутентификации

Оче­вид­но, что зло­умыш­ленник, прес­леду­ющий цель зах­ватить сер­дце внут­ренней инфраструк­туры — Active Directory, будет иметь дело с домен­ными учет­ками. И очень веро­ятно, что в ходе сво­его прод­вижения хакер стол­кнет­ся с неус­пешны­ми аутен­тифика­циями, неп­равиль­ными или ста­рыми пароля­ми или попыта­ется баналь­но подоб­рать пароль.

Да, на кон­трол­лере домена мы можем прос­то цен­тра­лизо­ван­но монито­рить события 4776 (8004), 4768 или 4769, для чего пот­ребу­ются при­виле­гии адми­нис­тра­тора домена. Но мож­но видеть попыт­ки аутен­тифика­ции всех домен­ных поль­зовате­лей и с пра­вами обыч­ного поль­зовате­ля, ведь все пред­лага­емые мною defence-при­емы не тре­буют исклю­читель­ных прав и могут быть выпол­нены абсо­лют­но любым сот­рудни­ком.

От­сле­живая по LDAP изме­нение атри­бута lastLogon/lastLogonTimestamp, мож­но видеть динами­ку успешных аутен­тифика­ций, изме­нение атри­бутов badPasswordTime и badPwdCount — неус­пешных аутен­тифика­ций, а lockoutTime покажет нам динами­ку бло­киро­вок. Зап­рашивая в цик­ле объ­екты, у которых перечис­ленные атри­буты изме­нились за тот или иной интервал вре­мени, мы можем уви­деть эту динами­ку.

По­иск всех поль­зовате­лей в Active Directory, у которых за ука­зан­ное вре­мя были успешные или неус­пешные попыт­ки аутен­тифика­ции

Пред­ложен­ная ниже авто­мати­зация сде­лает всю необ­ходимую работу: скрипт в режиме реаль­ного вре­мени будет писать, какой поль­зователь про­шел успешную аутен­тифика­цию, какой нет и сколь­ко раз, а кто ока­зал­ся заб­локиро­ван:

defence/ad/auth.py from ldap3 import Server, Connection, SUBTREE, ALLfrom time import sleepfrom datetime import datetimefrom getpass import getpassfrom os import systemfrom sys import argvfrom colorama import Foredc = argv[1]userdom = argv[2] # "[email protected]"USERS = { }MAX_LOCKS = 50MAX_FAILS = 100server = Server(dc, get_info=ALL)Connection(server, auto_bind=True)root = server.info.naming_contexts[0]server_time = server.info.other.get('currentTime')[0]print("{root} {server_time}".format(root=root, server_time=server_time))conn = Connection(server, user=userdom, password=argv[3] if len(argv) > 3 else getpass("password: "))conn.bind()alerts = []def alert(user, action): if user in alerts: return print("[!] Auth event detected: %s %s" % (user,action)) #system("telegram '{message}' &".format(message="Auth event detected: "+user+" "+action)) #system("email [email protected] '{message}' &".format(message="Auth event detected: "+user+" "+action)) #system("sms PHONENUMBER '{message}' &".format(message="Auth event detected: "+user+" "+action)) system("zenity --warning --title='Auth event detected' --text='%s %s' &" % (user,action)) #system("echo 'Auth event detected' | festival --tts --language english") alerts.append(user)failures_time = {}success_time = {}fails = set()locks = set()timestamp = (int(datetime.strptime(server_time, "%Y%m%d%H%M%S.0Z").timestamp() if server_time else datetime.utcnow().timestamp()) + 11644473600) * 10000000while True: conn.search(root, '(&(objectCategory=person)(objectClass=user)(|(badPasswordTime>={timestamp})(lastLogon>={timestamp})))'.format(timestamp=timestamp), SUBTREE, attributes=["sAMAccountName", "badPasswordTime", "lastLogon", "badPwdCount", "lockoutTime"]) lasts = [timestamp] for result in conn.entries: dn = result.entry_dn if result['sAMAccountName']: user = result['sAMAccountName'].value if user.lower() in ('incident','sp_farm'): continue auth_failure_count = "" if result['badPwdCount']: auth_failure_count = int(result['badPwdCount'].value) if result['badPasswordTime']: if user in failures_time and failures_time[user] < result['badPasswordTime'].value.timestamp(): print('[{now}]{red} "{user}" auth failure ({auth_failure_count}){reset}'.format(now=datetime.now().strftime("%d.%m.%Y %H:%M:%S"), badPasswordTime=result["badPasswordTime"].value.strftime("%d.%m.%Y %H:%M:%S"), red=Fore.RED, user=user, auth_failure_count=auth_failure_count, reset=Fore.RESET)) if user.lower() in USERS['fail']: alert(user, 'failure') lasts.append((result['badPasswordTime'].value.timestamp() + 11644473600) * 10000000) if result['lockoutTime'].value and result['lockoutTime'].value.timestamp() == result['badPasswordTime'].value.timestamp(): print('[{now}]{red} "{user}" locked{reset}'.format(now=datetime.now().strftime("%d.%m.%Y %H:%M:%S"), red=Fore.LIGHTRED_EX, user=user, reset=Fore.RESET)) if user.lower() in USERS['lock']: alert(user, 'locked') locks.add(user) fails.add(user) failures_time[user] = result['badPasswordTime'].value.timestamp() if result['lastLogon']: if user in success_time and success_time[user] < result['lastLogon'].value.timestamp(): print('[{now}]{green} "{user}" auth success{reset}'.format(now=datetime.now().strftime("%d.%m.%Y %H:%M:%S"), lastLogon=result["lastLogon"].value.strftime("%d.%m.%Y %H:%M:%S"), green=Fore.GREEN, user=user, reset=Fore.RESET)) lasts.append((result['lastLogon'].value.timestamp() + 11644473600) * 10000000) if user.lower() in USERS['auth']: alert(user, 'auth') if user in locks: locks.remove(user) if user in fails: fails.remove(user) success_time[user] = result['lastLogon'].value.timestamp() if len(locks) > MAX_LOCKS: alert("mass locks users", str(len(locks))) if len(fails) > MAX_FAILS: alert("mass fails users", str(len(fails))) timestamp = int(max(lasts) + 1) sleep(1)

При нор­маль­ных обсто­ятель­ствах впол­не законо­мер­но, что некото­рые поль­зовате­ли пери­оди­чес­ки оши­бают­ся при вво­де паролей.

За неус­пешной аутен­тифика­цией не пос­ледова­ло успешной

В этом при­мере за пер­вой неус­пешной аутен­тифика­цией сра­зу идет успешная — поль­зователь прос­то ошиб­ся, вво­дя свой пароль, и затем ввел его пра­виль­но. Но вто­рая неус­пешная аутен­тифика­ция выг­лядит более подоз­ритель­но, ведь пра­виль­ный пароль так и не был вве­ден.

Ат­рибут badPwdCount может показы­вать количес­тво неус­пешных попыток, по которо­му мы можем зак­лючить, что для этой учет­ной записи под­бира­ют пароль.

Де­тект под­бора пароля к сло­вар­ным поль­зовате­лям

На скрин­шоте мы видим, что брут­форсу под­верга­ются явно сло­вар­ные учет­ные записи. Это, ско­рее все­го, отго­лос­ки ата­ки на внеш­ний сетевой периметр. Час­то там могут рас­полагать­ся сер­висы, исполь­зующие аутен­тифика­цию по домен­ным учет­ным записям.

Но вот в сле­дующем при­мере ситу­ация иная — поль­зовате­ли явно не сло­вар­ные.

Де­тект под­бора пароля к внут­ренним поль­зовате­лям — один поль­зователь, мно­го паролей

И если узнать их мож­но было толь­ко внут­ри сети, под­клю­чив­шись к Active Directory, то мож­но сме­ло зак­лючить: мы име­ем дело с внут­ренним наруши­телем. Пря­мой брут­форс домен­ных учет­ных записей весь­ма редок и будет ско­рее следс­тви­ем неак­курат­ных атак.

Од­нако зло­умыш­ленник может про­верить лишь самые сла­бые пароли на широком спис­ке поль­зовате­лей, не вызывая тем самым бло­киро­вок. Если в тво­ей ком­пании име­ется мно­го учет­ных записей, у зло­умыш­ленни­ка есть шан­сы на успех.

Ха­кер выпол­няет ата­ку password spraying — один пароль, мно­го поль­зовате­лей

Та­кая про­вер­ка будет для нас край­не замет­ной: со сто­роны она выг­лядит как спон­танный всплеск безус­пешной аутен­тифика­ции мно­жес­тва поль­зовате­лей.

Де­тект ата­ки password spraying — один пароль, мно­го поль­зовате­лей

По­доб­ную активность мы можем заметить и в том слу­чае, если зло­умыш­ленник нашел валид­ный пароль и про­водит ата­ку password spraying в надеж­де, что пароль подой­дет куда‑то еще.

В более‑менее круп­ных ком­пани­ях поток событий аутен­тифика­ции будет огромный, и отсле­живать его вруч­ную дос­таточ­но слож­но. Поэто­му мы можем нас­тро­ить сра­баты­вание авто­мати­чес­ких уве­дом­лений для опре­делен­ных поль­зовате­лей и при тех или иных событи­ях.

Нап­ример, у нас есть спе­циаль­но соз­данный поль­зователь, о сущес­тво­вании которо­го ник­то не зна­ет. Любая аутен­тифика­ция такого поль­зовате­ля может рас­смат­ривать­ся как ано­малия. Явно сло­вар­ные поль­зовате­ли guest, security, audit, testuser, test1, никем при этом не исполь­зуемые, так­же дол­жны рас­смат­ривать­ся как ано­малия, если для них про­исхо­дит событие неус­пешной аутен­тифика­ции. Наконец, сло­вар­ные поль­зовате­ли типа administrator и все вышепе­речис­ленные при нор­маль­ных обсто­ятель­ствах никог­да не дол­жны бло­киро­вать­ся — в про­тив­ном слу­чае это тоже мар­кер ата­ки. Если подыто­жить, то в auth.py мы все это опи­сыва­ем сле­дующим обра­зом:

defence/ad/auth.py...USERS = { # notifications 'auth': ['honeypot_user'], 'fail': ['guest', 'security', 'audit', 'testuser', 'test1'], 'lock': ['administrator', 'guest', 'security', 'audit', 'testuser', 'test1']}...

Раз скрипт собира­ет пол­ную динами­ку событий аутен­тифика­ции, то неп­лохо было бы уметь ее ана­лизи­ровать. Исполь­зуя стан­дар­тные матема­тичес­кие воз­можнос­ти Python, лег­ко пос­тро­ить гра­фик трен­дов успешных и неус­пешных аутен­тифика­ций и бло­киро­вок:

defence/ad/auth-anal.py from datetime import datetimeimport matplotlib.pyplot as pltHOURS = 24auth_success = {}auth_fail = {}auth_lock = {}while True: try: line = input() except: break try: date,time,user,*result = line.strip().split() except: continue date = date.split("[")[1] time = time.split("]")[0] hour = time.split(":")[0] result = " ".join(result) try: datetime.strptime(date, '%d.%m.%Y') except: continue if result.find("success") != -1: try: auth_success[date+"-"+hour] += 1 except: auth_success[date+"-"+hour] = 1 elif result.find("fail") != -1: try: auth_fail[date+"-"+hour] += 1 except: auth_fail[date+"-"+hour] = 1 elif result.find("lock") != -1: try: auth_lock[date+"-"+hour] += 1 except: auth_lock[date+"-"+hour] = 1plt.plot(sorted(auth_success, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()), list(map(lambda d: auth_success[d], sorted(auth_success, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()))), label="success")plt.plot(sorted(auth_fail, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()), list(map(lambda d: auth_fail[d], sorted(auth_fail, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()))), label="fail")plt.plot(sorted(auth_lock, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()), list(map(lambda d: auth_lock[d], sorted(auth_lock, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()))), label="lock")ax = plt.gca(); ax.set_xticks(ax.get_xticks()[::HOURS])plt.legend()plt.show()

Чрез­вычай­но прос­тая ана­лити­ка, но общая кар­тина налицо — сра­зу вид­но, в какие часы в ком­пании кипит работа, а в какие идут ноч­ные брут­форс‑ата­ки.

Ана­лиз накоп­ленных событий аутен­тифика­ции за неделю

Од­нако события аутен­тифика­ций — это лишь самое малое, что про­исхо­дит в Active Directory. Есть вещи, которые час­то не монито­рит даже нас­тоящий SOC.

 

Изменения объектов в Active Directory

Мно­гие ата­ки на инфраструк­туру Active Directory оставля­ют в ней осо­бые сле­ды в виде изме­нения или соз­дания соот­ветс­тву­ющих атри­бутов у объ­ектов. Прак­тичес­ки все подоб­ные модифи­кации будут кос­венно зат­рагивать атри­бут whenChanged, ука­зыва­ющий на изме­нения в объ­екте.

По­луче­ние всех объ­ектов Active Directory, в которых что‑то изме­нилось за ука­зан­ное вре­мя

При­меча­тель­но, что даже изме­нение в пра­вах объ­екта вызыва­ет обновле­ние атри­бута whenChanged, бла­года­ря чему ста­новит­ся воз­можным отсле­живать край­не неуло­вимые ACL-ата­ки. Сде­лав пред­варитель­ный снап­шот всех атри­бутов у всех объ­ектов, а так­же ана­лиз их ACL и срав­нивая раз­личия у изме­нив­шихся объ­ектов, мы смо­жем отсле­живать всю динами­ку в Active Directory:

defence/ad/changed.py from ldap3.protocol.microsoft import security_descriptor_controlfrom ldap3 import Server, Connection, SUBTREE, BASE, ALL, ALL_ATTRIBUTESimport picklefrom time import sleepfrom datetime import datetimefrom getpass import getpassfrom os import systemfrom sys import argvfrom re import matchfrom colorama import Forefrom winacl.dtyp.security_descriptor import SECURITY_DESCRIPTORfrom winacl.dtyp.sid import SIDfrom winacl.dtyp.ace import ADS_ACCESS_MASKdc = argv[1]ATTACKS = { # notifications "SPN attack": {"attr": "^serviceprincipalname$", "dn": ".*"}, "RBCD attack" : {"attr": "^msds-allowedtoactonbehalfofotheridentity$", "dn": ".*"}, "ShadowCredentials attack" : {"attr": "^msds-keycredentiallink$", "dn": ".*"}, "membership changed": {"attr": "^member$", "dn": ".*admin.*"}, "GPO attack": {"attr": "^gpcfilesyspath$", "dn": ".*"}, "user object abuse": {"attr": "^scriptpath$", "dn": ".*"}, "ACL attack": {"attr": ".*generic_all.*", "dn": ".*"}, "sAMAccountName spoofing": {"attr": "^samaccountname$", "dn": ".*"}, "dNSHostName spoofing": {"attr": "^dnshostname$", "dn": ".*"}, "ADCS attack templates ESC4": {"attr": "^(msPKI-Certificate-Name-Flag|msPKI-Enrollment-Flag|msPKI-RA-Signature)$", "dn": ".*CN=Certificate Templates,.*"}}server = Server(dc, get_info=ALL)Connection(server, auto_bind=True)server_time = server.info.other.get('currentTime')[0]if len(argv) < 4: print(server_time) print("n".join(server.info.naming_contexts)) exit()else: root = argv[3]userdom = argv[2] # "[email protected]"conn = Connection(server, user=userdom, password=getpass("password: "))conn.bind()alerts = []def alert(dn, attr, value, message): if (dn,attr) in alerts: return print("[!] Danger changes detected: %s: %s=%s (%s)" % (dn, attr, value, message)) #system("telegram '{message}'".format(message="Danger changes detected %s: %s=%s (%s)" % (dn, attr, value, message))) system("zenity --warning --title='Danger changes detected' --text='%s: %s=%s (%s)' &" % (dn, attr, value, message)) #system("echo 'Danger changes detected' | festival --tts --language english") alerts.append((dn,attr))cache_sid = {}def resolve_sid(sid): global cache_sid if not sid in cache_sid: cache_sid[sid] = None for dn in objects: if objects[dn].get("objectSid") == [sid]: name = objects[dn]["sAMAccountName"] cache_sid[sid] = name break return cache_sid.get(sid)def parse_acl(nTSecurityDescriptor): acl = SECURITY_DESCRIPTOR.from_bytes(nTSecurityDescriptor) acl_canonical = {"owner": [acl.Owner.to_sddl() if acl.Owner else ""], "dacl":[]} for ace in acl.Dacl.aces if acl.Dacl else []: ace_canonical = {} ace_canonical["who"] = SID.wellknown_sid_lookup(ace.Sid.to_sddl()) or resolve_sid(ace.Sid.to_sddl()) or ace.Sid.to_sddl() ace_canonical["type"] = str(ace).split("n")[0].strip() for line in str(ace).split("n")[1:]: if line.strip(): field = line.split(":")[0].lower() value = line.split(":")[1].strip() ace_canonical[field] = value acl_canonical["dacl"].append(ace_canonical) return acl_canonicaldef snapshot_create(): global objects #results = conn.extend.standard.paged_search(search_base=root, search_filter='(objectClass=*)', search_scope=SUBTREE, attributes=ALL_ATTRIBUTES, paged_size=1000) # only attributes results = conn.extend.standard.paged_search(search_base=root, search_filter='(objectClass=*)', search_scope=SUBTREE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05), paged_size=1000) # with ACL #conn.search(root, '(objectClass=*)', SUBTREE, attributes=ALL_ATTRIBUTES) # only attributes #conn.search(root, '(objectClass=*)', SUBTREE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05)) # with ACL #conn.search(root, '(|(objectClass=pKICertificateTemplate)(objectClass=certificationAuthority))', SUBTREE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05)) # with ACL #for result in conn.entries: for result in results: if result.get('type') == 'searchResRef': continue #dn = result.entry_dn #objects[dn] = result.entry_attributes_as_dict dn = result["dn"] objects[dn] = result["raw_attributes"] for dn in objects: # because of resolve_sid() if 'nTSecurityDescriptor' in objects[dn]: objects[dn]['nTSecurityDescriptor'] = parse_acl(objects[dn]['nTSecurityDescriptor'][0]) open("objects.dat", "wb").write(pickle.dumps([objects,cache_sid]))def snapshot_restore(): global objects, cache_sid try: objects, cache_sid = pickle.loads(open("objects.dat", "rb").read()) return True except: return Falsedef get_attrs(dn): #conn.search(dn, '(objectClass=*)', BASE, attributes=ALL_ATTRIBUTES) # only attributes #conn.search(dn, '(objectClass=*)', BASE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05)) # with ACL results = conn.extend.standard.paged_search(search_base=dn, search_filter='(objectClass=*)', search_scope=BASE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05), paged_size=1000) # with ACL result = next(results) #attrs = conn.entries[0].entry_attributes_as_dict attrs = result["raw_attributes"] if attrs.get('nTSecurityDescriptor'): attrs['nTSecurityDescriptor'] = parse_acl(attrs['nTSecurityDescriptor'][0]) return attrsdef print_diff(dn): if not dn in objects: return def diff(attrs_before, attrs_after): for attr in attrs_before: if not attr in attrs_after: print(f"{Fore.RED}delete %s: %s{Fore.RESET}" % (attr, str(attrs_before[attr]))) else: if type(attrs_before[attr]) == dict: diff(attrs_before[attr], attrs_after[attr]) else: for value in attrs_before[attr]: if not value in attrs_after[attr]: print(f"{Fore.RED}delete %s: %s{Fore.RESET}" % (attr, value)) for attr in attrs_after: if not attr in attrs_before: print(f"{Fore.GREEN}new %s: %s{Fore.RESET}" % (attr, str(attrs_after[attr]))) for attack in ATTACKS: if (match(ATTACKS[attack]["attr"].lower(), attr.lower()) or match(ATTACKS[attack]["attr"].lower(), str(attrs_after[attr]).lower())) and match(ATTACKS[attack]["dn"].lower(), dn.lower()): alert(dn, attr, attrs_after[attr].decode(), attack) else: if type(attrs_after[attr]) == dict: diff(attrs_before[attr], attrs_after[attr]) else: for value in attrs_after[attr]: if not value in attrs_before[attr]: print(f"{Fore.GREEN}added %s: %s{Fore.RESET}" % (attr, value)) for attack in ATTACKS: if (match(ATTACKS[attack]["attr"].lower(), attr.lower()) or match(ATTACKS[attack]["attr"].lower(), str(value).lower())) and match(ATTACKS[attack]["dn"].lower(), dn.lower()): alert(dn, attr, value.decode(), attack) attrs = get_attrs(dn) diff(objects[dn], attrs) objects[dn] = attrsobjects = {}snapshot_restore() or snapshot_create()print("[*] %d objects" % len(objects))now = datetime.strptime(server_time, '%Y%m%d%H%M%S.0Z').timestamp() or datetime.utcnow().timestamp()first_time = Truewhile True: conn.search(root, f'(whenChanged>={datetime.utcfromtimestamp(now).strftime("%Y%m%d%H%M%S.0Z")})', SUBTREE, attributes=["distinguishedName", "whenChanged", "whenCreated"]) lasts = [now] for result in conn.entries: dn = result.entry_dn changed = result['whenChanged'].value created = result['whenCreated'].value time = changed.strftime("%d.%m.%Y %H:%M:%S") if changed == created: if not first_time: print(f'[{time}] "{dn}" created') objects[dn] = get_attrs(dn) lasts.append(created.timestamp()) else: if not first_time: print(f'[{time}] "{dn}" changed') print_diff(dn) lasts.append(changed.timestamp()) now = max(lasts) + 1 sleep(1) first_time = False

Источник: xakep.ru

Ответить

Ваш адрес email не будет опубликован. Обязательные поля помечены *