Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
Physical Address
304 North Cardinal St.
Dorchester Center, MA 02124
В этой статье мы разберемся, по каким признакам можно узнать, что хакер уже вовсю орудует в твоем домене, автоматизируем процесс обнаружения этих атак и предусмотрим средства противодействия.
Если злоумышленник так и не был обнаружен на уровне сети и все же добрался до Active Directory с учеткой доменного пользователя, то первый раунд мы проиграли. Теперь у него в арсенале будет с десяток‑другой крайне эффективных атак, большая часть из которых достаточно бесшумна.
В этот момент начинается второй, уже финальный раунд нашего противостояния. На кону ни много ни мало вся внутренняя инфраструктура и, возможно, даже бизнес компании. Если домен выдержит самые простые и пробивные эксплоиты, которые хакер опробует в первые минуты, и тому придется применять более шумные атаки, то финальную битву еще можно выиграть.
В этой части мы научимся слышать едва уловимый шорох злоумышленника, что засел в Active Directory и, изучив твои мисконфиги, начал уверенное движение вперед — к контроллерам домена.
Очевидно, что злоумышленник, преследующий цель захватить сердце внутренней инфраструктуры — Active Directory, будет иметь дело с доменными учетками. И очень вероятно, что в ходе своего продвижения хакер столкнется с неуспешными аутентификациями, неправильными или старыми паролями или попытается банально подобрать пароль.
Да, на контроллере домена мы можем просто централизованно мониторить события 4776 (8004)
, 4768
или 4769
, для чего потребуются привилегии администратора домена. Но можно видеть попытки аутентификации всех доменных пользователей и с правами обычного пользователя, ведь все предлагаемые мною defence-приемы не требуют исключительных прав и могут быть выполнены абсолютно любым сотрудником.
Отслеживая по LDAP изменение атрибута lastLogon/lastLogonTimestamp
, можно видеть динамику успешных аутентификаций, изменение атрибутов badPasswordTime
и badPwdCount
— неуспешных аутентификаций, а lockoutTime
покажет нам динамику блокировок. Запрашивая в цикле объекты, у которых перечисленные атрибуты изменились за тот или иной интервал времени, мы можем увидеть эту динамику.
Поиск всех пользователей в Active Directory, у которых за указанное время были успешные или неуспешные попытки аутентификации
Предложенная ниже автоматизация сделает всю необходимую работу: скрипт в режиме реального времени будет писать, какой пользователь прошел успешную аутентификацию, какой нет и сколько раз, а кто оказался заблокирован:
defence/ad/auth.py from ldap3 import Server, Connection, SUBTREE, ALLfrom time import sleepfrom datetime import datetimefrom getpass import getpassfrom os import systemfrom sys import argvfrom colorama import Foredc = argv[1]userdom = argv[2] # "[email protected]"USERS = { }MAX_LOCKS = 50MAX_FAILS = 100server = Server(dc, get_info=ALL)Connection(server, auto_bind=True)root = server.info.naming_contexts[0]server_time = server.info.other.get('currentTime')[0]print("{root} {server_time}".format(root=root, server_time=server_time))conn = Connection(server, user=userdom, password=argv[3] if len(argv) > 3 else getpass("password: "))conn.bind()alerts = []def alert(user, action): if user in alerts: return print("[!] Auth event detected: %s %s" % (user,action)) #system("telegram '{message}' &".format(message="Auth event detected: "+user+" "+action)) #system("email [email protected] '{message}' &".format(message="Auth event detected: "+user+" "+action)) #system("sms PHONENUMBER '{message}' &".format(message="Auth event detected: "+user+" "+action)) system("zenity --warning --title='Auth event detected' --text='%s %s' &" % (user,action)) #system("echo 'Auth event detected' | festival --tts --language english") alerts.append(user)failures_time = {}success_time = {}fails = set()locks = set()timestamp = (int(datetime.strptime(server_time, "%Y%m%d%H%M%S.0Z").timestamp() if server_time else datetime.utcnow().timestamp()) + 11644473600) * 10000000while True: conn.search(root, '(&(objectCategory=person)(objectClass=user)(|(badPasswordTime>={timestamp})(lastLogon>={timestamp})))'.format(timestamp=timestamp), SUBTREE, attributes=["sAMAccountName", "badPasswordTime", "lastLogon", "badPwdCount", "lockoutTime"]) lasts = [timestamp] for result in conn.entries: dn = result.entry_dn if result['sAMAccountName']: user = result['sAMAccountName'].value if user.lower() in ('incident','sp_farm'): continue auth_failure_count = "" if result['badPwdCount']: auth_failure_count = int(result['badPwdCount'].value) if result['badPasswordTime']: if user in failures_time and failures_time[user] < result['badPasswordTime'].value.timestamp(): print('[{now}]{red} "{user}" auth failure ({auth_failure_count}){reset}'.format(now=datetime.now().strftime("%d.%m.%Y %H:%M:%S"), badPasswordTime=result["badPasswordTime"].value.strftime("%d.%m.%Y %H:%M:%S"), red=Fore.RED, user=user, auth_failure_count=auth_failure_count, reset=Fore.RESET)) if user.lower() in USERS['fail']: alert(user, 'failure') lasts.append((result['badPasswordTime'].value.timestamp() + 11644473600) * 10000000) if result['lockoutTime'].value and result['lockoutTime'].value.timestamp() == result['badPasswordTime'].value.timestamp(): print('[{now}]{red} "{user}" locked{reset}'.format(now=datetime.now().strftime("%d.%m.%Y %H:%M:%S"), red=Fore.LIGHTRED_EX, user=user, reset=Fore.RESET)) if user.lower() in USERS['lock']: alert(user, 'locked') locks.add(user) fails.add(user) failures_time[user] = result['badPasswordTime'].value.timestamp() if result['lastLogon']: if user in success_time and success_time[user] < result['lastLogon'].value.timestamp(): print('[{now}]{green} "{user}" auth success{reset}'.format(now=datetime.now().strftime("%d.%m.%Y %H:%M:%S"), lastLogon=result["lastLogon"].value.strftime("%d.%m.%Y %H:%M:%S"), green=Fore.GREEN, user=user, reset=Fore.RESET)) lasts.append((result['lastLogon'].value.timestamp() + 11644473600) * 10000000) if user.lower() in USERS['auth']: alert(user, 'auth') if user in locks: locks.remove(user) if user in fails: fails.remove(user) success_time[user] = result['lastLogon'].value.timestamp() if len(locks) > MAX_LOCKS: alert("mass locks users", str(len(locks))) if len(fails) > MAX_FAILS: alert("mass fails users", str(len(fails))) timestamp = int(max(lasts) + 1) sleep(1)
При нормальных обстоятельствах вполне закономерно, что некоторые пользователи периодически ошибаются при вводе паролей.
За неуспешной аутентификацией не последовало успешной
В этом примере за первой неуспешной аутентификацией сразу идет успешная — пользователь просто ошибся, вводя свой пароль, и затем ввел его правильно. Но вторая неуспешная аутентификация выглядит более подозрительно, ведь правильный пароль так и не был введен.
Атрибут badPwdCount
может показывать количество неуспешных попыток, по которому мы можем заключить, что для этой учетной записи подбирают пароль.
Детект подбора пароля к словарным пользователям
На скриншоте мы видим, что брутфорсу подвергаются явно словарные учетные записи. Это, скорее всего, отголоски атаки на внешний сетевой периметр. Часто там могут располагаться сервисы, использующие аутентификацию по доменным учетным записям.
Но вот в следующем примере ситуация иная — пользователи явно не словарные.
Детект подбора пароля к внутренним пользователям — один пользователь, много паролей
И если узнать их можно было только внутри сети, подключившись к Active Directory, то можно смело заключить: мы имеем дело с внутренним нарушителем. Прямой брутфорс доменных учетных записей весьма редок и будет скорее следствием неаккуратных атак.
Однако злоумышленник может проверить лишь самые слабые пароли на широком списке пользователей, не вызывая тем самым блокировок. Если в твоей компании имеется много учетных записей, у злоумышленника есть шансы на успех.
Хакер выполняет атаку password spraying — один пароль, много пользователей
Такая проверка будет для нас крайне заметной: со стороны она выглядит как спонтанный всплеск безуспешной аутентификации множества пользователей.
Детект атаки password spraying — один пароль, много пользователей
Подобную активность мы можем заметить и в том случае, если злоумышленник нашел валидный пароль и проводит атаку password spraying в надежде, что пароль подойдет куда‑то еще.
В более‑менее крупных компаниях поток событий аутентификации будет огромный, и отслеживать его вручную достаточно сложно. Поэтому мы можем настроить срабатывание автоматических уведомлений для определенных пользователей и при тех или иных событиях.
Например, у нас есть специально созданный пользователь, о существовании которого никто не знает. Любая аутентификация такого пользователя может рассматриваться как аномалия. Явно словарные пользователи guest
, security
, audit
, testuser
, test1
, никем при этом не используемые, также должны рассматриваться как аномалия, если для них происходит событие неуспешной аутентификации. Наконец, словарные пользователи типа administrator
и все вышеперечисленные при нормальных обстоятельствах никогда не должны блокироваться — в противном случае это тоже маркер атаки. Если подытожить, то в auth.py
мы все это описываем следующим образом:
defence/ad/auth.py...USERS = { # notifications 'auth': ['honeypot_user'], 'fail': ['guest', 'security', 'audit', 'testuser', 'test1'], 'lock': ['administrator', 'guest', 'security', 'audit', 'testuser', 'test1']}...
Раз скрипт собирает полную динамику событий аутентификации, то неплохо было бы уметь ее анализировать. Используя стандартные математические возможности Python, легко построить график трендов успешных и неуспешных аутентификаций и блокировок:
defence/ad/auth-anal.py from datetime import datetimeimport matplotlib.pyplot as pltHOURS = 24auth_success = {}auth_fail = {}auth_lock = {}while True: try: line = input() except: break try: date,time,user,*result = line.strip().split() except: continue date = date.split("[")[1] time = time.split("]")[0] hour = time.split(":")[0] result = " ".join(result) try: datetime.strptime(date, '%d.%m.%Y') except: continue if result.find("success") != -1: try: auth_success[date+"-"+hour] += 1 except: auth_success[date+"-"+hour] = 1 elif result.find("fail") != -1: try: auth_fail[date+"-"+hour] += 1 except: auth_fail[date+"-"+hour] = 1 elif result.find("lock") != -1: try: auth_lock[date+"-"+hour] += 1 except: auth_lock[date+"-"+hour] = 1plt.plot(sorted(auth_success, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()), list(map(lambda d: auth_success[d], sorted(auth_success, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()))), label="success")plt.plot(sorted(auth_fail, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()), list(map(lambda d: auth_fail[d], sorted(auth_fail, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()))), label="fail")plt.plot(sorted(auth_lock, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()), list(map(lambda d: auth_lock[d], sorted(auth_lock, key=lambda k:datetime.strptime(k, '%d.%m.%Y-%H').timestamp()))), label="lock")ax = plt.gca(); ax.set_xticks(ax.get_xticks()[::HOURS])plt.legend()plt.show()
Чрезвычайно простая аналитика, но общая картина налицо — сразу видно, в какие часы в компании кипит работа, а в какие идут ночные брутфорс‑атаки.
Анализ накопленных событий аутентификации за неделю
Однако события аутентификаций — это лишь самое малое, что происходит в Active Directory. Есть вещи, которые часто не мониторит даже настоящий SOC.
Многие атаки на инфраструктуру Active Directory оставляют в ней особые следы в виде изменения или создания соответствующих атрибутов у объектов. Практически все подобные модификации будут косвенно затрагивать атрибут whenChanged
, указывающий на изменения в объекте.
Получение всех объектов Active Directory, в которых что‑то изменилось за указанное время
Примечательно, что даже изменение в правах объекта вызывает обновление атрибута whenChanged
, благодаря чему становится возможным отслеживать крайне неуловимые ACL-атаки. Сделав предварительный снапшот всех атрибутов у всех объектов, а также анализ их ACL и сравнивая различия у изменившихся объектов, мы сможем отслеживать всю динамику в Active Directory:
defence/ad/changed.py from ldap3.protocol.microsoft import security_descriptor_controlfrom ldap3 import Server, Connection, SUBTREE, BASE, ALL, ALL_ATTRIBUTESimport picklefrom time import sleepfrom datetime import datetimefrom getpass import getpassfrom os import systemfrom sys import argvfrom re import matchfrom colorama import Forefrom winacl.dtyp.security_descriptor import SECURITY_DESCRIPTORfrom winacl.dtyp.sid import SIDfrom winacl.dtyp.ace import ADS_ACCESS_MASKdc = argv[1]ATTACKS = { # notifications "SPN attack": {"attr": "^serviceprincipalname$", "dn": ".*"}, "RBCD attack" : {"attr": "^msds-allowedtoactonbehalfofotheridentity$", "dn": ".*"}, "ShadowCredentials attack" : {"attr": "^msds-keycredentiallink$", "dn": ".*"}, "membership changed": {"attr": "^member$", "dn": ".*admin.*"}, "GPO attack": {"attr": "^gpcfilesyspath$", "dn": ".*"}, "user object abuse": {"attr": "^scriptpath$", "dn": ".*"}, "ACL attack": {"attr": ".*generic_all.*", "dn": ".*"}, "sAMAccountName spoofing": {"attr": "^samaccountname$", "dn": ".*"}, "dNSHostName spoofing": {"attr": "^dnshostname$", "dn": ".*"}, "ADCS attack templates ESC4": {"attr": "^(msPKI-Certificate-Name-Flag|msPKI-Enrollment-Flag|msPKI-RA-Signature)$", "dn": ".*CN=Certificate Templates,.*"}}server = Server(dc, get_info=ALL)Connection(server, auto_bind=True)server_time = server.info.other.get('currentTime')[0]if len(argv) < 4: print(server_time) print("n".join(server.info.naming_contexts)) exit()else: root = argv[3]userdom = argv[2] # "[email protected]"conn = Connection(server, user=userdom, password=getpass("password: "))conn.bind()alerts = []def alert(dn, attr, value, message): if (dn,attr) in alerts: return print("[!] Danger changes detected: %s: %s=%s (%s)" % (dn, attr, value, message)) #system("telegram '{message}'".format(message="Danger changes detected %s: %s=%s (%s)" % (dn, attr, value, message))) system("zenity --warning --title='Danger changes detected' --text='%s: %s=%s (%s)' &" % (dn, attr, value, message)) #system("echo 'Danger changes detected' | festival --tts --language english") alerts.append((dn,attr))cache_sid = {}def resolve_sid(sid): global cache_sid if not sid in cache_sid: cache_sid[sid] = None for dn in objects: if objects[dn].get("objectSid") == [sid]: name = objects[dn]["sAMAccountName"] cache_sid[sid] = name break return cache_sid.get(sid)def parse_acl(nTSecurityDescriptor): acl = SECURITY_DESCRIPTOR.from_bytes(nTSecurityDescriptor) acl_canonical = {"owner": [acl.Owner.to_sddl() if acl.Owner else ""], "dacl":[]} for ace in acl.Dacl.aces if acl.Dacl else []: ace_canonical = {} ace_canonical["who"] = SID.wellknown_sid_lookup(ace.Sid.to_sddl()) or resolve_sid(ace.Sid.to_sddl()) or ace.Sid.to_sddl() ace_canonical["type"] = str(ace).split("n")[0].strip() for line in str(ace).split("n")[1:]: if line.strip(): field = line.split(":")[0].lower() value = line.split(":")[1].strip() ace_canonical[field] = value acl_canonical["dacl"].append(ace_canonical) return acl_canonicaldef snapshot_create(): global objects #results = conn.extend.standard.paged_search(search_base=root, search_filter='(objectClass=*)', search_scope=SUBTREE, attributes=ALL_ATTRIBUTES, paged_size=1000) # only attributes results = conn.extend.standard.paged_search(search_base=root, search_filter='(objectClass=*)', search_scope=SUBTREE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05), paged_size=1000) # with ACL #conn.search(root, '(objectClass=*)', SUBTREE, attributes=ALL_ATTRIBUTES) # only attributes #conn.search(root, '(objectClass=*)', SUBTREE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05)) # with ACL #conn.search(root, '(|(objectClass=pKICertificateTemplate)(objectClass=certificationAuthority))', SUBTREE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05)) # with ACL #for result in conn.entries: for result in results: if result.get('type') == 'searchResRef': continue #dn = result.entry_dn #objects[dn] = result.entry_attributes_as_dict dn = result["dn"] objects[dn] = result["raw_attributes"] for dn in objects: # because of resolve_sid() if 'nTSecurityDescriptor' in objects[dn]: objects[dn]['nTSecurityDescriptor'] = parse_acl(objects[dn]['nTSecurityDescriptor'][0]) open("objects.dat", "wb").write(pickle.dumps([objects,cache_sid]))def snapshot_restore(): global objects, cache_sid try: objects, cache_sid = pickle.loads(open("objects.dat", "rb").read()) return True except: return Falsedef get_attrs(dn): #conn.search(dn, '(objectClass=*)', BASE, attributes=ALL_ATTRIBUTES) # only attributes #conn.search(dn, '(objectClass=*)', BASE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05)) # with ACL results = conn.extend.standard.paged_search(search_base=dn, search_filter='(objectClass=*)', search_scope=BASE, attributes=ALL_ATTRIBUTES, controls=security_descriptor_control(sdflags=0x05), paged_size=1000) # with ACL result = next(results) #attrs = conn.entries[0].entry_attributes_as_dict attrs = result["raw_attributes"] if attrs.get('nTSecurityDescriptor'): attrs['nTSecurityDescriptor'] = parse_acl(attrs['nTSecurityDescriptor'][0]) return attrsdef print_diff(dn): if not dn in objects: return def diff(attrs_before, attrs_after): for attr in attrs_before: if not attr in attrs_after: print(f"{Fore.RED}delete %s: %s{Fore.RESET}" % (attr, str(attrs_before[attr]))) else: if type(attrs_before[attr]) == dict: diff(attrs_before[attr], attrs_after[attr]) else: for value in attrs_before[attr]: if not value in attrs_after[attr]: print(f"{Fore.RED}delete %s: %s{Fore.RESET}" % (attr, value)) for attr in attrs_after: if not attr in attrs_before: print(f"{Fore.GREEN}new %s: %s{Fore.RESET}" % (attr, str(attrs_after[attr]))) for attack in ATTACKS: if (match(ATTACKS[attack]["attr"].lower(), attr.lower()) or match(ATTACKS[attack]["attr"].lower(), str(attrs_after[attr]).lower())) and match(ATTACKS[attack]["dn"].lower(), dn.lower()): alert(dn, attr, attrs_after[attr].decode(), attack) else: if type(attrs_after[attr]) == dict: diff(attrs_before[attr], attrs_after[attr]) else: for value in attrs_after[attr]: if not value in attrs_before[attr]: print(f"{Fore.GREEN}added %s: %s{Fore.RESET}" % (attr, value)) for attack in ATTACKS: if (match(ATTACKS[attack]["attr"].lower(), attr.lower()) or match(ATTACKS[attack]["attr"].lower(), str(value).lower())) and match(ATTACKS[attack]["dn"].lower(), dn.lower()): alert(dn, attr, value.decode(), attack) attrs = get_attrs(dn) diff(objects[dn], attrs) objects[dn] = attrsobjects = {}snapshot_restore() or snapshot_create()print("[*] %d objects" % len(objects))now = datetime.strptime(server_time, '%Y%m%d%H%M%S.0Z').timestamp() or datetime.utcnow().timestamp()first_time = Truewhile True: conn.search(root, f'(whenChanged>={datetime.utcfromtimestamp(now).strftime("%Y%m%d%H%M%S.0Z")})', SUBTREE, attributes=["distinguishedName", "whenChanged", "whenCreated"]) lasts = [now] for result in conn.entries: dn = result.entry_dn changed = result['whenChanged'].value created = result['whenCreated'].value time = changed.strftime("%d.%m.%Y %H:%M:%S") if changed == created: if not first_time: print(f'[{time}] "{dn}" created') objects[dn] = get_attrs(dn) lasts.append(created.timestamp()) else: if not first_time: print(f'[{time}] "{dn}" changed') print_diff(dn) lasts.append(changed.timestamp()) now = max(lasts) + 1 sleep(1) first_time = False
Источник: xakep.ru