ProcMon - A Red Teamer's Arsenal for Process Monitoring and Evasion

As Red Teamers, staying ahead of the game involves mastering the art of process monitoring and evasion. The ability to navigate through the intricacies of defender tools and EDR (Endpoint Detection and Response) processes is crucial for executing successful operations. In this blog post, we introduce ProcMon, a Python utility tailored for Red Teamers that not only monitors processes but also provides insights into defender tools and EDR activities.

Features

  1. Get a list of all running processes with a short overview about the used Process ID, Process Name, CPU, RAM, the amount of open file handles, network connections, and threads
  2. Follow a process based the process ID (PID) and monitor resource consumptions and network connections (as an example when a defender uses RTR to investigate the machine)
  3. Monitoring for Defender Tools. The script provides notifications when tools like Microsoft Defender, Carbon Black, Crowd Strike, or SentinelOne are detected.
  4. Get insights about a process ID and watch detailed info about open file handles
  5. Search for processes based on the name
  6. Get an overview about the Top X (free definable) processes

Prerequisites

Install using pip the following dependencies:

psutil
tabulate
desktop_notifier
pyshark

The Tool

I will later possibly make this bigger, move it into another language and craft a small binary out of this to make it fit for LOLBAS.

The Code:

import psutil
from pyfiglet import Figlet
import argparse
import time
from tabulate import tabulate
from desktop_notifier import DesktopNotifier, Button
import asyncio
import threading
import subprocess
import pkg_resources
import pyshark
import socket

def install_required_packages():
    REQUIRED_PACKAGES = [
        'psutil',
        'pyfiglet',
        'tabulate',
        'desktop_notifier',
    ]

    for package in REQUIRED_PACKAGES:
        try:
            dist = pkg_resources.get_distribution(package)
        except pkg_resources.DistributionNotFound:
            print('{} is NOT installed'.format(package))
            subprocess.call(['pip', 'install', package])

def get_running_processes():
    processes = []
    for process in psutil.process_iter(['pid', 'name', 'connections', 'cpu_percent', 'memory_info', 'num_threads', 'open_files']):
        processes.append({
            'pid': process.info['pid'],
            'name': process.info['name'],
            'connections': process.info['connections'],
            'cpu_percent': process.info.get('cpu_percent', 0.0),
            'memory_info': process.info['memory_info'],
            'num_threads': process.info['num_threads'],
            'open_files': process.info['open_files']
        })
    return processes

def display_processes(processes, top_n=None, verbose=False):
    if top_n:
        processes.sort(key=lambda x: (x.get('cpu_percent') or 0) + (x.get('memory_info').rss if x.get('memory_info') else 0), reverse=True)
        processes = processes[:top_n]

    table = []
    for process in processes:
        row = []
        row.append(process.get('pid'))
        row.append(process.get('name'))
        row.append(process.get('memory_info').rss / (1024 ** 2) if process.get('memory_info') else "N/A")
        row.append(process.get('num_threads'))
        if verbose:
            row.append(process.get('connections') if process.get('connections') else "N/A")
            row.append(process.get('open_files') if process.get('open_files') else "N/A")
        else:
            row.append(len(process.get('connections')) if process.get('connections') else 0)
            row.append(len(process.get('open_files')) if process.get('open_files') else 0)
        table.append(row)

    print(tabulate(table, headers=["Process ID", "Name", "Memory Usage (MB)", "Number of Threads", "Open Network Connections", "Open Files"], tablefmt="pretty"))

def monitor_single_process(pid, verbose=False):
    try:
        while True:
            process = psutil.Process(pid)
            connections = process.connections()
            table = []
            row = []
            row.append(pid)
            row.append(process.name())
            row.append(process.cpu_percent())
            row.append(process.memory_info().rss / (1024 ** 2) if process.memory_info() else "N/A")
            row.append(process.num_threads())
            if verbose:
                row.append(connections if connections else "N/A")
            else:
                row.append(len(connections))
            table.append(row)
            print(tabulate(table, headers=["Process ID", "Name", "CPU Percent", "Memory Usage (MB)", "Number of Threads", "Open Network Connections"], tablefmt="pretty"))
            time.sleep(2)  # Sleep for 2 seconds before refreshing
            print("\n" + "="*30 + "\n")

    except KeyboardInterrupt:
        print("\nMonitoring stopped.")

def search_and_monitor_process(process_name):
    try:
        while True:
            found_processes = [process.info for process in psutil.process_iter(['pid', 'name']) if process_name.lower() in process.info['name'].lower()]
            if found_processes:
                display_processes(found_processes)
            else:
                print(f"No process found with the name containing '{process_name}'.")

            time.sleep(2)  # Sleep for 2 seconds before refreshing
            print("\n" + "="*30 + "\n")

    except KeyboardInterrupt:
        print("\nMonitoring stopped.")

def display_process_details(pid):
    try:
        process = psutil.Process(pid)
        print(f"Process ID: {process.pid}")
        print(f"Name: {process.name()}")
        print(f"CPU Percent: {process.cpu_percent()}")
        print(f"Memory Usage: {process.memory_info().rss / (1024 ** 2) if process.memory_info() else 'N/A'} MB")
        print(f"Number of Threads: {process.num_threads()}")
        print(f"Connections: {process.connections() if process.connections() else 'N/A'}")
        
        open_files = process.open_files()
        if open_files:
            print("Open Files:")
            for file in open_files:
                print(f"  - {file.path}")
        else:
            print("No open files.")
    except psutil.NoSuchProcess as e:
        print(f"Error: {e}")

def log_process_connections(pid):
    try:
        while True:
            process = psutil.Process(pid)
            connections = process.connections()

            data = []
            for conn in connections:
                data.append([
                    time.ctime(),
                    socket.SocketKind(conn.type).name,
                    conn.laddr,
                    conn.raddr if conn.raddr else 'N/A',
                    conn.status,
                    'TCP' if conn.type == socket.SOCK_STREAM else ('UDP' if conn.type == socket.SOCK_DGRAM else 'Other')
                ])

            print(tabulate(data, headers=["Timestamp", "Type", "Local Address", "Remote Address", "Status", "Protocol"]))
            print("\n\n")
            time.sleep(2)
    except psutil.NoSuchProcess as e:
        print(f"Error: {e}")
    except KeyboardInterrupt:
        print("\nMonitoring stopped.")

###########################################
# CAUTION: THIS FUNCTION CAPTURES TRAFFIC #
#          USE WITH CAUTION               #
#   -> you may leave stealth mode         #    
#   -> requires Wireshark to be installed #  
###########################################
def capture_traffic(pid, interface='eth0', duration=10, output_file='output.pcap'):
    process = psutil.Process(pid)
    connections = process.connections()

    capture = pyshark.LiveCapture(interface=interface, output_file=output_file)

    capture.sniff(timeout=duration)

    for packet in capture:
        for conn in connections:
            if (packet.ip.src == conn.laddr.ip and packet[conn.type].srcport == conn.laddr.port) or \
               (packet.ip.dst == conn.raddr.ip and packet[conn.type].dstport == conn.raddr.port):
                print(packet)

def monitor_process_details(pid):
    try:
        while True:
            display_process_details(pid)
            time.sleep(0.5)  # Sleep for 2 seconds before refreshing
            print("\n" + "="*30 + "\n")
    except KeyboardInterrupt:
        print("\nMonitoring stopped.")

def monitor_process_start(process_name):
    def check_process():
        notifier = DesktopNotifier()
        while not any(process_name.lower() in process.info['name'].lower() for process in psutil.process_iter(['pid', 'name'])):
            time.sleep(2)
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        loop.run_until_complete(notifier.send(
            title='Process Monitor',
            message=f'👀 Big brother is watching you! 👀\n\n   >> Process [{process_name}] has started! <<',
            sound=True,
            buttons=[
                Button(
                title="Mark as read",
                on_pressed=lambda: print("Marked as read")),
            ]))
        loop.close()

    threading.Thread(target=check_process).start()

defender_tool_processes = {
    "Microsoft Defender": ["MsMpEng.exe", "MsSense.exe", "SenseIR.exe", "SenseNdr.exe", "SenseCncProxy.exe", "SenseSampleUploader.exe", "SenseSC.exe", "SenseCE.exe", "SenseCM.exe"],
    "Microsoft Smart Scren": ["smartscreen.exe"],
    "Carbon Black (Cloud)": ["cb.exe", "RepMgr", "RepUtils", "RepUx", "RepWAV", "RepWSC"],
    "BitdDefender": ["bdagent.exe", "bdredline.exe", "bdreinit.exe", "bdsubwiz.exe", "bdwtxag"],
    "Crowd Strike EDR": ["csagent", "falconctl", "falconhost", "falcon-sensor", "CSFalconService.exe", "CSFalconContainer.exe", "CSFalconUI.exe", "CSFalconUpdate.exe", "CSFalconSensorService.exe", "CSFalconSensor"],
    "Elastic EDR": ["winlogbeat.exe", "elastic-agent", "elastic-endpoint", "filebeat"],
    "Trellix EDR": ["xagt.exe"],
    "Qualys EDR": ["qagent", "QualysSensor.exe", "QualysAgent"],
    "SentinelOne": ["s1-orchestrator", "s1-network", "s1-scanner", "s1-agent", "s1-firewall", # Linux 
                    "sentineld", "sentineld-shell" , "SentinelAgent", "SentinelAgentWorker.exe", "SentinelServiceHost.exe", "sentinel_helper", "sentinel_shell", # Mac
                    "SentinelStaticEngine.exe", "LogProcessorService.exe", "SentinelStaticEngineScanner.exe", "SentinelHelperService.exe", "SentinelUI. exe", "SentinelServiceHost.exe" ,"SentinelAgent.exe", "SentinelMemoryScanner.exe", "SentinelBrowserNativeHost.exe", # Windows
                    "com.sentinelone", "sentineld_helper", "sentineld_guard", "com.sentinelone.network-monitoring",  # Generic stuff
                    ],
    "Cylance": ["CylanceSvc.exe", "CylanceUI.exe", "CylanceSvc64.exe", "CylanceUI64.exe", "CylanceSvc3.exe", "CylanceUI3.exe"],
    "Cybereason": ["AmSvc", "CrAmTray", "CrsSvc", "ExecutionPreventionSvc", "CybereasonAv", "CybereasonSensorService.exe", "CybereasonSensor.exe", "CybereasonSensor64.exe", "CybereasonSensor3.exe", ],
    "Tanium": ["TaniumClient", "TaniumCX", "TaniumDetectEngine"],
    "Palo Alto Traps/Cortex XDR": ["traps", "cyserver", "CyveraService", "CyvrFsFlt"],
    "FortiEDR": ["FortiEDR.exe", "FortiEDRService.exe", "FortiEDRTray", "FortiEDRTrayService"],
    "Cisco Secure Endpoint": ["sfc.exe", "sfc"],
    "ESET Inspect": ["EIConnector", "ekrn"],
    "FireEye Endpoint Security": ["FireEyeAgent", "FireEyeHXAgent"],
    "TrendMicro Apex One": ["CETASvc.exe", "WSCommunicator.exe", "EndpointBasecamp.exe", "TmListen.exe",
                            "Ntrtscan.exe", "TmWSCSvc.exe", "PccNTMon.exe", "TMBMSRV.exe", "CNTAoSMgr.exe", "TmCCSF.exe"
                            "Deep Security Manager.exe", "coreServiceShell.exe", "ds_monitor.exe", "Notifier.exe", "dsa.exe", "ds_nuagent.exe"  ],
    "ClamAV": ["freshclam", "clamscan"],
    "Splunk Agent": ["splunkd"],
    "Sysmon": ["sysmon64.exe", "sysmon.exe"],
    "YARA": ["yara", "yara.exe"],
    "Wazuh Agent": ["wazuh"],
    "Rapid 7": ["R7Agent", "R7Agent64", "R7AgentService", "R7AgentService64", "R7AgentTray", "R7AgentTray64"],
    "Rapid 7 Insight Agent": ["ir_agent.exe", "insight-agent", "insight-agentd", "insight-agentd64", "insight-agent64"],
    "Rapid 7 Collector": ["collector.exe", "insight-collector", "insight-collectord", "insight-collectord64", "insight-collector64"],
}

def monitor_dict_process_start(process_dict):
    def check_process():
        notifier = DesktopNotifier()
        for scanner, process_names in process_dict.items():
            for process_name in process_names:
                if any(process_name.lower() in process.info['name'].lower() for process in psutil.process_iter(['pid', 'name'])):
                    msg = f'👀 Big brother is watching you! 👀\n\n   >> Process [{process_name}] from [{scanner}] is running! <<'
                    loop = asyncio.new_event_loop()
                    asyncio.set_event_loop(loop)
                    loop.run_until_complete(notifier.send(
                        title='Process Monitor',
                        message=msg,
                        sound=True,
                        buttons=[
                            Button(
                            title="Mark as read",
                            on_pressed=lambda: print("Marked as read")),
                        ]))
                    loop.close()
                    print(f"{msg}\n")

    threading.Thread(target=check_process).start()

def monitor_s1a():
    monitor_process_start("sentineld_shell")

if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Display information about running processes and their network connections, CPU and Memory usage.")
    parser.add_argument("--list", action="store_true", help="Display information about all running processes.")
    parser.add_argument("--process", type=int,help="Monitor a specific process by providing its process ID.")
    parser.add_argument("--top", type=int, help="Display the top N processes with the highest CPU and memory usage.")
    parser.add_argument("--search", type=str, help="Search for processes with a name containing the specified substring and continuously monitor them.")
    parser.add_argument("-v", "--verbose", default=False, action="store_true", help="Display all files and information instead of the count.")
    parser.add_argument("-d", "--details", type=int, help="Display all details for a specific process by providing its process ID.")
    parser.add_argument("-l", "--connections", type=int, help="Logs all network connections for a specific process by providing its process ID under /tmp/processname. ")
    parser.add_argument("-c", "--capture-traffic", type=int, help="Captures all network connections for a specific process by providing its process ID as pcap file. ")
    parser.add_argument("-i", "--interface", type=str, default='eth0', help="Network interface to capture traffic on.")
    parser.add_argument("-f", "--follow", action="store_true", help="Continuously update the details of the process specified by --details.")
    parser.add_argument("--monitor", type=str, help="Monitor the start of a process with a name containing the specified substring.")
    parser.add_argument("--s1a", action="store_true", help="Monitor the start of a process with a name sentineld_shell.")
    parser.add_argument("--defendertools", action="store_true", help="Monitor if common defender tools are running.")
    parser.add_argument("--install", action="store_true", help="Installs all required packages.")
    args = parser.parse_args()

    running_processes = get_running_processes()

    custom_fig = Figlet(font='graffiti')
    print(custom_fig.renderText('ProcMon'))
    print("Welcome to ProcMon! Please wait while we gather information about running processes...\n\n")
    
    if args.list:
        display_processes(running_processes, args.top, args.verbose)
    elif args.defendertools:
        monitor_dict_process_start(defender_tool_processes)
    elif args.s1a:
        monitor_s1a()    
    elif args.install:
        install_required_packages()  
    elif args.top:
        display_processes(processes=running_processes, top_n=args.top, verbose=args.verbose)
    elif args.process:
        process = next((p for p in running_processes if p['pid'] == args.process), None)
        if process:
            monitor_single_process(args.process, args.verbose)
        else:
            print(f"No process found with the ID '{args.process}'.")
    elif args.monitor:
        monitor_process_start(args.monitor)        
    elif args.search:
        found_processes = [p for p in running_processes if args.search.lower() in p['name'].lower()]
        if found_processes:
            display_processes(found_processes)
        else:
            print(f"No process found with the name containing '{args.search}'.")
    elif args.details:
        process = next((p for p in running_processes if p['pid'] == args.details), None)
        if process:
            if args.follow:
                monitor_process_details(args.details)
            else:
                display_process_details(args.details)
        else:
            print(f"No process found with the ID '{args.details}'.")
    elif args.connections:
        log_process_connections(args.connections)
    elif args.capture_traffic:
        capture_traffic(args.capture_traffic, args.interface)
    else:
        print("Please provide either --list, --process, --search, or --details followed by the appropriate argument.")

From Code to Binary

To make it more easy to run the tool, I recommend to craft a dedicated binary (GitHub Repository with automated release will soon follow). As a prerequisite make sure you have pyinstaller available pip3 install pyinstaller.

With the .spec below you can now compile a binary by calling pyinstaller procmon.spec

procmon.spec

You require to adjust some parts of the .spec file related to the location of the dependencies. On Mac you require Code signing to allow to see the notifications as balloon-tips. On Linux and Windows the notification should work fine. In a next iteration sending those info back to a calling host will make the notification itself optional.

# -*- mode: python ; coding: utf-8 -*-
#
# IMPORTANT: run the stuff below and adjust it to the results to make the compilation work
#
# Use the commands below to adjust the output in the datas List below
#
# python -c "import desktop_notifier; print(desktop_notifier.__path__)"
# python -c "import pyfiglet; print(pyfiglet.__path__)"
block_cipher = None

a = Analysis(['procmon.py'],
             pathex=['/add/your/path/here/procmon.py'],
             binaries=[],
             # Use python -c commands from above
             datas=[('/Library/Python/3.9/site-packages/desktop_notifier', 'desktop_notifier/resources'),
                    ('/opt/homebrew/lib/python3.11/site-packages/pyfiglet/fonts', 'pyfiglet/fonts')],
             hiddenimports=['psutil', 'pyfiglet', 'argparse', 'time', 'tabulate', 'desktop_notifier', 'asyncio', 'threading', 'subprocess', 'pkg_resources'],
             hookspath=[],
             runtime_hooks=[],
             excludes=[],
             win_no_prefer_redirects=False,
             win_private_assemblies=False,
             cipher=block_cipher,
             noarchive=False)
pyz = PYZ(a.pure, a.zipped_data,
             cipher=block_cipher)
exe = EXE(pyz,
          a.scripts,
          a.binaries,
          a.zipfiles,
          a.datas,
          [],
          name='procmon',
          debug=False,
          bootloader_ignore_signals=False,
          strip=False,
          upx=True,
          upx_exclude=[],
          runtime_tmpdir=None,
          console=True)
Written on January 30, 2024


◀ Back to attack related posts