🐍 Python dla pentesterów

Python dla pentesterów — od skryptu do narzędzia

🎯 Dlaczego Python w pentestingu?

Python to szwajcarski scyzoryk pentestera. Od szybkich skryptów po kompletne narzędzia — ten język pozwala błyskawicznie automatyzować powtarzalne zadania, uspójniać wyniki i przyspieszać raportowanie. Największą przewagą jest połączenie prostoty z ogromnym ekosystemem bibliotek dla sieci, HTTP, parsowania, a nawet generowania raportów.

🔑 Trzy filary produktywności:
  • Szybkość: mały skrypt w 15 minut często oszczędza godziny ręcznej pracy
  • Powtarzalność: te same kroki, te same wyniki — zero „zapomniałem o…”
  • Raportowalność: dane z wielu narzędzi w jednym, czytelnym formacie

🔧 Przygotowanie środowiska

Instalacja i wirtualne środowiska

# Ubuntu/Debian sudo apt update sudo apt install -y python3 python3-pip python3-venv # Utworzenie i aktywacja środowiska python3 -m venv pentest-env source pentest-env/bin/activate # Aktualizacja pip i narzędzia pip install --upgrade pip wheel setuptools

Podstawowe zależności

# HTTP, skanowanie, parsowanie, raporty pip install requests urllib3==2.* python-nmap scapy beautifulsoup4 lxml pip install jinja2 rich colorama tqdm argparse dnspython python-docx reportlab
# requirements.txt (przykład) requests>=2.31 urllib3>=2.0 python-nmap>=0.7.1 scapy>=2.5.0 beautifulsoup4>=4.12 lxml>=4.9 jinja2>=3.1 rich>=13.0 colorama>=0.4.6 tqdm>=4.66 argparse ; python_version >= "3.8" dnspython>=2.4 python-docx>=1.1 reportlab>=4.0
🚀 Tip: Dla większych projektów rozważ użycie Poetry lub pipx. Poetry porządkuje zależności i ułatwia budowanie narzędzi wielokrotnego użytku.

🧠 Idiomy i wzorce przydatne w pentestingu

Argparse + logging = czytelny CLI

import argparse, logging, sys def setup_logger(level=logging.INFO): logging.basicConfig( level=level, format="%(asctime)s | %(levelname)s | %(message)s", datefmt="%H:%M:%S" ) return logging.getLogger("pentest") def get_args(): p = argparse.ArgumentParser(description="Przykładowe narzędzie CLI") p.add_argument("target", help="Host/Domena/CIDR") p.add_argument("-v", "--verbose", action="store_true", help="Więcej logów") return p.parse_args() if __name__ == "__main__": args = get_args() log = setup_logger(logging.DEBUG if args.verbose else logging.INFO) log.info(f"Cel: {args.target}")

Requests Session z retry i proxy (np. Burp)

import requests from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry def http_session(proxy=None): s = requests.Session() retry = Retry(total=3, backoff_factor=0.8, status_forcelist=[429, 500, 502, 503, 504]) s.mount("http://", HTTPAdapter(max_retries=retry)) s.mount("https://", HTTPAdapter(max_retries=retry)) if proxy: s.proxies.update({"http": proxy, "https": proxy}) s.verify = False # Uwaga: tylko w labie! W produkcji trzymaj verify=True s.headers.update({"User-Agent": "PentestBot/1.0"}) return s # Przykład: # sess = http_session(proxy="http://127.0.0.1:8080") # ruch przez Burpa # r = sess.get("https://example.com", timeout=10)

Równoległość: ThreadPool vs asyncio

# ThreadPool - dobry do operacji blokujących (sockety, requests) from concurrent.futures import ThreadPoolExecutor def work(item): # ... np. sprawdzenie pojedynczego URL return item with ThreadPoolExecutor(max_workers=50) as ex: results = list(ex.map(work, range(100))) # asyncio - świetne do tysięcy krótkich połączeń TCP import asyncio async def do_io(i): await asyncio.sleep(0.01) return i async def main(): results = await asyncio.gather(*(do_io(i) for i in range(1000))) print(len(results)) # asyncio.run(main())

DNS, ścieżki i operacje na plikach

import dns.resolver from pathlib import Path import json def resolve_a(domain): try: return [rdata.address for rdata in dns.resolver.resolve(domain, "A")] except Exception: return [] def save_json(path, data): p = Path(path).expanduser() p.parent.mkdir(parents=True, exist_ok=True) p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")

🧪 Mini‑labsy: od teorii do praktyki

Poniższe krótkie ćwiczenia to szybkie „klocki”. Połączysz je w kompletne narzędzie w swoim stylu.

4.1 Fingerprinting HTTP i praca przez proxy

import ssl, socket from urllib.parse import urlparse from requests.exceptions import RequestException from datetime import datetime from requests.adapters import HTTPAdapter from urllib3.util.retry import Retry import requests def session(proxy=None): s = requests.Session() retry = Retry(total=2, backoff_factor=0.5, status_forcelist=[429,500,502,503,504]) s.mount("http://", HTTPAdapter(max_retries=retry)) s.mount("https://", HTTPAdapter(max_retries=retry)) if proxy: s.proxies.update({"http": proxy, "https": proxy}) s.verify = False return s def tls_info(host, port=443, timeout=3): ctx = ssl.create_default_context() with socket.create_connection((host, port), timeout=timeout) as sock: with ctx.wrap_socket(sock, server_hostname=host) as ssock: cert = ssock.getpeercert() proto = ssock.version() return {"protocol": proto, "subject": dict(x[0] for x in cert["subject"]).get("commonName"), "issuer": dict(x[0] for x in cert["issuer"]).get("organizationName"), "notAfter": cert.get("notAfter")} # fallback return {} def http_fingerprint(url, proxy=None): u = urlparse(url) s = session(proxy) out = {"url": url, "ts": datetime.now().isoformat()} try: r = s.get(url, timeout=8) out["status"] = r.status_code out["server"] = r.headers.get("Server") out["x_powered_by"] = r.headers.get("X-Powered-By") if u.scheme == "https": out["tls"] = tls_info(u.hostname, u.port or 443) except RequestException as e: out["error"] = str(e) return out # print(http_fingerprint("https://example.com", proxy="http://127.0.0.1:8080"))

4.2 Asynchroniczny skaner portów

import asyncio async def check_port(host, port, timeout=1.0): try: reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout) writer.close() await writer.wait_closed() return port except Exception: return None async def scan_host(host, ports, concurrency=500): sem = asyncio.Semaphore(concurrency) async def task(p): async with sem: return await check_port(host, p) results = await asyncio.gather(*(task(p) for p in ports)) return [p for p in results if p] if __name__ == "__main__": target = "192.168.1.1" common = [21,22,25,53,80,110,139,143,443,445,993,995,3306,3389,5432,5900,8080] open_ports = asyncio.run(scan_host(target, common)) print(f"Otwarte na {target}: {open_ports}")

4.3 Agregator wyników Nmap (XML → JSON)

import xml.etree.ElementTree as ET import json, sys from pathlib import Path from collections import Counter def parse_nmap_xml(file_path): root = ET.parse(file_path).getroot() hosts = [] for host in root.findall("host"): state = host.find("status").get("state") addrs = [a.get("addr") for a in host.findall("address")] entry = {"state": state, "addresses": addrs, "ports": []} ports = host.find("ports") if ports is not None: for p in ports.findall("port"): svc = p.find("service") entry["ports"].append({ "port": int(p.get("portid")), "proto": p.get("protocol"), "state": p.find("state").get("state"), "service": (svc.get("name") if svc is not None else None), "product": (svc.get("product") if svc is not None else None), "version": (svc.get("version") if svc is not None else None) }) hosts.append(entry) return hosts def summarize(hosts): open_services = Counter() for h in hosts: for p in h["ports"]: if p["state"] == "open" and p["service"]: open_services[p["service"]] += 1 return { "total_hosts": len(hosts), "hosts_up": sum(1 for h in hosts if h["state"] == "up"), "services": open_services.most_common(10) } if __name__ == "__main__": if len(sys.argv) < 3: print("Użycie: python nmap_agg.py input.xml output.json") sys.exit(1) hosts = parse_nmap_xml(sys.argv[1]) out = {"hosts": hosts, "summary": summarize(hosts)} Path(sys.argv[2]).write_text(json.dumps(out, indent=2, ensure_ascii=False), encoding="utf-8") print("Zapisano:", sys.argv[2])

4.4 Raport HTML z Jinja2

from jinja2 import Template import json HTML = """ <!DOCTYPE html> <html lang="pl"> <head> <meta charset="utf-8"> <title>Raport skanowania</title> <style>body{font-family:Arial;margin:40px}table{border-collapse:collapse;width:100%} th,td{border:1px solid #ddd;padding:8px}th{background:#f4f4f4;text-align:left}</style> </head><body> <h1>Raport skanowania</h1> <p>Hosty aktywne: {{ summary.hosts_up }}/{{ summary.total_hosts }}</p> <h2>Top usługi</h2> <table><tr><th>Usługa</th><th>Liczba</th></tr> {% for name, count in summary.services %} <tr><td>{{ name }}</td><td>{{ count }}</td></tr> {% endfor %} </table> </body></html> """ def render_html(json_in, html_out): data = json.load(open(json_in, encoding="utf-8")) t = Template(HTML) open(html_out, "w", encoding="utf-8").write(t.render(**data)) print("Wygenerowano:", html_out) # render_html("scan.json", "raport.html")
✅ Pro tip: Traktuj każdy mini‑lab jak klocki LEGO. Sklej je w jedno CLI, dodaj konfigurację (JSON/YAML), a wyniki zapisuj w jednym formacie. To najszybsza droga do własnego toolkit’u.

🔗 Integracja w workflow pentestu

  • Recon: automatyczne pobranie nagłówków, screenshotów, DNS, bazy subdomen
  • Enumeracja: szybki async scan „top ports”, potem głębsze Nmap na hosty z otwartymi portami
  • Korelacja: parsuj XML/JSON z różnych narzędzi do jednego modelu danych
  • Raport: HTML/PDF wygenerowany szablonem — spójny i gotowy do wysyłki
  • Artefakty: zapisuj wszystko (raw, parsed, summary). Powtarzalność = oszczędność czasu

🛡️ Bezpieczeństwo kodu i etyka

⚠️ Ważne: Korzystaj z poniższych technik wyłącznie z pisemną autoryzacją właściciela systemu. Szanuj zakres (RoE), ograniczenia i prywatność danych.
  • Nie wyłączaj weryfikacji TLS poza środowiskiem labowym
  • Używaj limitów concurrency i timeoutów — nie przeciążaj środowisk
  • Logi traktuj jak dane poufne (PII/sekrety) — szyfruj dysk, czyść artefakty
  • Zawsze dokumentuj kroki i utrzymuj spójność z metodologią (np. PTES, OWASP)

📝 Cheatsheet: fragmenty do użycia od ręki

Requests przez Burp + timeout + retry

sess = http_session(proxy="http://127.0.0.1:8080") resp = sess.get("https://target.local/login", timeout=10) print(resp.status_code, resp.headers.get("Server"))

Szybkie czytanie plików i zapisy JSON

from pathlib import Path import json data = {"hosts": ["10.0.0.1"], "meta": {"scope": "demo"}} Path("out/results.json").parent.mkdir(parents=True, exist_ok=True) Path("out/results.json").write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")

ThreadPool: równoległe sprawdzanie URL

from concurrent.futures import ThreadPoolExecutor, as_completed import requests urls = ["https://example.org", "https://example.com"] def check(u): try: return u, requests.get(u, timeout=5).status_code except Exception: return u, None with ThreadPoolExecutor(max_workers=20) as ex: futures = [ex.submit(check, u) for u in urls] for f in as_completed(futures): print(f.result())

📚 Dalsze zasoby

Do nauki

  • OWASP Testing Guide: scenariusze i checklisty dla aplikacji web
  • Real Python, Talk Python: świetne materiały i podcasty
  • Automate the Boring Stuff: automatyzacja codziennych zadań

Biblioteki warte uwagi

  • Scapy: pakiety sieciowe i sniffing
  • Paramiko: SSH/SFTP automatyzacja
  • Shodan: API wyszukiwarki urządzeń
  • Typer/Click: eleganckie CLI z autocompletion
🚀 Następny krok: Złóż mini‑labsy w jedno narzędzie z komendą „modules”. Każdy moduł (recon, http, nmap, raport) uruchamiasz jednym przełącznikiem. Dodaj plik konfiguracyjny JSON i generowanie raportu po zakończeniu.
📧 Masz pomysł na własny toolkit? Chętnie pomogę go zaprojektować i wdrożyć. Skontaktuj się ze mną — zrobimy narzędzie, które realnie przyspieszy Twoją pracę.