🎯 Dlaczego Python w pentestingu?
Python to szwajcarski scyzoryk pentestera. Od szybkich skryptów po kompletne narzędzia — ten język pozwala błyskawicznie automatyzować powtarzalne zadania, uspójniać wyniki i przyspieszać raportowanie. Największą przewagą jest połączenie prostoty z ogromnym ekosystemem bibliotek dla sieci, HTTP, parsowania, a nawet generowania raportów.
🔑 Trzy filary produktywności:
- Szybkość: mały skrypt w 15 minut często oszczędza godziny ręcznej pracy
- Powtarzalność: te same kroki, te same wyniki — zero „zapomniałem o…”
- Raportowalność: dane z wielu narzędzi w jednym, czytelnym formacie
🔧 Przygotowanie środowiska
Instalacja i wirtualne środowiska
# Ubuntu/Debian
sudo apt update
sudo apt install -y python3 python3-pip python3-venv
# Utworzenie i aktywacja środowiska
python3 -m venv pentest-env
source pentest-env/bin/activate
# Aktualizacja pip i narzędzia
pip install --upgrade pip wheel setuptools
Podstawowe zależności
# HTTP, skanowanie, parsowanie, raporty
pip install requests urllib3==2.* python-nmap scapy beautifulsoup4 lxml
pip install jinja2 rich colorama tqdm argparse dnspython python-docx reportlab
# requirements.txt (przykład)
requests>=2.31
urllib3>=2.0
python-nmap>=0.7.1
scapy>=2.5.0
beautifulsoup4>=4.12
lxml>=4.9
jinja2>=3.1
rich>=13.0
colorama>=0.4.6
tqdm>=4.66
argparse ; python_version >= "3.8"
dnspython>=2.4
python-docx>=1.1
reportlab>=4.0
🚀 Tip: Dla większych projektów rozważ użycie Poetry lub pipx. Poetry porządkuje zależności i ułatwia budowanie narzędzi wielokrotnego użytku.
🧠 Idiomy i wzorce przydatne w pentestingu
Argparse + logging = czytelny CLI
import argparse, logging, sys
def setup_logger(level=logging.INFO):
logging.basicConfig(
level=level,
format="%(asctime)s | %(levelname)s | %(message)s",
datefmt="%H:%M:%S"
)
return logging.getLogger("pentest")
def get_args():
p = argparse.ArgumentParser(description="Przykładowe narzędzie CLI")
p.add_argument("target", help="Host/Domena/CIDR")
p.add_argument("-v", "--verbose", action="store_true", help="Więcej logów")
return p.parse_args()
if __name__ == "__main__":
args = get_args()
log = setup_logger(logging.DEBUG if args.verbose else logging.INFO)
log.info(f"Cel: {args.target}")
Requests Session z retry i proxy (np. Burp)
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def http_session(proxy=None):
s = requests.Session()
retry = Retry(total=3, backoff_factor=0.8, status_forcelist=[429, 500, 502, 503, 504])
s.mount("http://", HTTPAdapter(max_retries=retry))
s.mount("https://", HTTPAdapter(max_retries=retry))
if proxy:
s.proxies.update({"http": proxy, "https": proxy})
s.verify = False # Uwaga: tylko w labie! W produkcji trzymaj verify=True
s.headers.update({"User-Agent": "PentestBot/1.0"})
return s
# Przykład:
# sess = http_session(proxy="http://127.0.0.1:8080") # ruch przez Burpa
# r = sess.get("https://example.com", timeout=10)
Równoległość: ThreadPool vs asyncio
# ThreadPool - dobry do operacji blokujących (sockety, requests)
from concurrent.futures import ThreadPoolExecutor
def work(item):
# ... np. sprawdzenie pojedynczego URL
return item
with ThreadPoolExecutor(max_workers=50) as ex:
results = list(ex.map(work, range(100)))
# asyncio - świetne do tysięcy krótkich połączeń TCP
import asyncio
async def do_io(i):
await asyncio.sleep(0.01)
return i
async def main():
results = await asyncio.gather(*(do_io(i) for i in range(1000)))
print(len(results))
# asyncio.run(main())
DNS, ścieżki i operacje na plikach
import dns.resolver
from pathlib import Path
import json
def resolve_a(domain):
try:
return [rdata.address for rdata in dns.resolver.resolve(domain, "A")]
except Exception:
return []
def save_json(path, data):
p = Path(path).expanduser()
p.parent.mkdir(parents=True, exist_ok=True)
p.write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
🧪 Mini‑labsy: od teorii do praktyki
Poniższe krótkie ćwiczenia to szybkie „klocki”. Połączysz je w kompletne narzędzie w swoim stylu.
4.1 Fingerprinting HTTP i praca przez proxy
import ssl, socket
from urllib.parse import urlparse
from requests.exceptions import RequestException
from datetime import datetime
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
import requests
def session(proxy=None):
s = requests.Session()
retry = Retry(total=2, backoff_factor=0.5, status_forcelist=[429,500,502,503,504])
s.mount("http://", HTTPAdapter(max_retries=retry))
s.mount("https://", HTTPAdapter(max_retries=retry))
if proxy:
s.proxies.update({"http": proxy, "https": proxy})
s.verify = False
return s
def tls_info(host, port=443, timeout=3):
ctx = ssl.create_default_context()
with socket.create_connection((host, port), timeout=timeout) as sock:
with ctx.wrap_socket(sock, server_hostname=host) as ssock:
cert = ssock.getpeercert()
proto = ssock.version()
return {"protocol": proto, "subject": dict(x[0] for x in cert["subject"]).get("commonName"),
"issuer": dict(x[0] for x in cert["issuer"]).get("organizationName"),
"notAfter": cert.get("notAfter")}
# fallback
return {}
def http_fingerprint(url, proxy=None):
u = urlparse(url)
s = session(proxy)
out = {"url": url, "ts": datetime.now().isoformat()}
try:
r = s.get(url, timeout=8)
out["status"] = r.status_code
out["server"] = r.headers.get("Server")
out["x_powered_by"] = r.headers.get("X-Powered-By")
if u.scheme == "https":
out["tls"] = tls_info(u.hostname, u.port or 443)
except RequestException as e:
out["error"] = str(e)
return out
# print(http_fingerprint("https://example.com", proxy="http://127.0.0.1:8080"))
4.2 Asynchroniczny skaner portów
import asyncio
async def check_port(host, port, timeout=1.0):
try:
reader, writer = await asyncio.wait_for(asyncio.open_connection(host, port), timeout=timeout)
writer.close()
await writer.wait_closed()
return port
except Exception:
return None
async def scan_host(host, ports, concurrency=500):
sem = asyncio.Semaphore(concurrency)
async def task(p):
async with sem:
return await check_port(host, p)
results = await asyncio.gather(*(task(p) for p in ports))
return [p for p in results if p]
if __name__ == "__main__":
target = "192.168.1.1"
common = [21,22,25,53,80,110,139,143,443,445,993,995,3306,3389,5432,5900,8080]
open_ports = asyncio.run(scan_host(target, common))
print(f"Otwarte na {target}: {open_ports}")
4.3 Agregator wyników Nmap (XML → JSON)
import xml.etree.ElementTree as ET
import json, sys
from pathlib import Path
from collections import Counter
def parse_nmap_xml(file_path):
root = ET.parse(file_path).getroot()
hosts = []
for host in root.findall("host"):
state = host.find("status").get("state")
addrs = [a.get("addr") for a in host.findall("address")]
entry = {"state": state, "addresses": addrs, "ports": []}
ports = host.find("ports")
if ports is not None:
for p in ports.findall("port"):
svc = p.find("service")
entry["ports"].append({
"port": int(p.get("portid")),
"proto": p.get("protocol"),
"state": p.find("state").get("state"),
"service": (svc.get("name") if svc is not None else None),
"product": (svc.get("product") if svc is not None else None),
"version": (svc.get("version") if svc is not None else None)
})
hosts.append(entry)
return hosts
def summarize(hosts):
open_services = Counter()
for h in hosts:
for p in h["ports"]:
if p["state"] == "open" and p["service"]:
open_services[p["service"]] += 1
return {
"total_hosts": len(hosts),
"hosts_up": sum(1 for h in hosts if h["state"] == "up"),
"services": open_services.most_common(10)
}
if __name__ == "__main__":
if len(sys.argv) < 3:
print("Użycie: python nmap_agg.py input.xml output.json")
sys.exit(1)
hosts = parse_nmap_xml(sys.argv[1])
out = {"hosts": hosts, "summary": summarize(hosts)}
Path(sys.argv[2]).write_text(json.dumps(out, indent=2, ensure_ascii=False), encoding="utf-8")
print("Zapisano:", sys.argv[2])
4.4 Raport HTML z Jinja2
from jinja2 import Template
import json
HTML = """
<!DOCTYPE html>
<html lang="pl">
<head>
<meta charset="utf-8">
<title>Raport skanowania</title>
<style>body{font-family:Arial;margin:40px}table{border-collapse:collapse;width:100%}
th,td{border:1px solid #ddd;padding:8px}th{background:#f4f4f4;text-align:left}</style>
</head><body>
<h1>Raport skanowania</h1>
<p>Hosty aktywne: {{ summary.hosts_up }}/{{ summary.total_hosts }}</p>
<h2>Top usługi</h2>
<table><tr><th>Usługa</th><th>Liczba</th></tr>
{% for name, count in summary.services %}
<tr><td>{{ name }}</td><td>{{ count }}</td></tr>
{% endfor %}
</table>
</body></html>
"""
def render_html(json_in, html_out):
data = json.load(open(json_in, encoding="utf-8"))
t = Template(HTML)
open(html_out, "w", encoding="utf-8").write(t.render(**data))
print("Wygenerowano:", html_out)
# render_html("scan.json", "raport.html")
✅ Pro tip: Traktuj każdy mini‑lab jak klocki LEGO. Sklej je w jedno CLI, dodaj konfigurację (JSON/YAML), a wyniki zapisuj w jednym formacie. To najszybsza droga do własnego toolkit’u.
🔗 Integracja w workflow pentestu
- Recon: automatyczne pobranie nagłówków, screenshotów, DNS, bazy subdomen
- Enumeracja: szybki async scan „top ports”, potem głębsze Nmap na hosty z otwartymi portami
- Korelacja: parsuj XML/JSON z różnych narzędzi do jednego modelu danych
- Raport: HTML/PDF wygenerowany szablonem — spójny i gotowy do wysyłki
- Artefakty: zapisuj wszystko (raw, parsed, summary). Powtarzalność = oszczędność czasu
🛡️ Bezpieczeństwo kodu i etyka
⚠️ Ważne: Korzystaj z poniższych technik wyłącznie z pisemną autoryzacją właściciela systemu. Szanuj zakres (RoE), ograniczenia i prywatność danych.
- Nie wyłączaj weryfikacji TLS poza środowiskiem labowym
- Używaj limitów concurrency i timeoutów — nie przeciążaj środowisk
- Logi traktuj jak dane poufne (PII/sekrety) — szyfruj dysk, czyść artefakty
- Zawsze dokumentuj kroki i utrzymuj spójność z metodologią (np. PTES, OWASP)
📝 Cheatsheet: fragmenty do użycia od ręki
Requests przez Burp + timeout + retry
sess = http_session(proxy="http://127.0.0.1:8080")
resp = sess.get("https://target.local/login", timeout=10)
print(resp.status_code, resp.headers.get("Server"))
Szybkie czytanie plików i zapisy JSON
from pathlib import Path
import json
data = {"hosts": ["10.0.0.1"], "meta": {"scope": "demo"}}
Path("out/results.json").parent.mkdir(parents=True, exist_ok=True)
Path("out/results.json").write_text(json.dumps(data, indent=2, ensure_ascii=False), encoding="utf-8")
ThreadPool: równoległe sprawdzanie URL
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
urls = ["https://example.org", "https://example.com"]
def check(u):
try:
return u, requests.get(u, timeout=5).status_code
except Exception:
return u, None
with ThreadPoolExecutor(max_workers=20) as ex:
futures = [ex.submit(check, u) for u in urls]
for f in as_completed(futures):
print(f.result())
📚 Dalsze zasoby
Do nauki
- OWASP Testing Guide: scenariusze i checklisty dla aplikacji web
- Real Python, Talk Python: świetne materiały i podcasty
- Automate the Boring Stuff: automatyzacja codziennych zadań
Biblioteki warte uwagi
- Scapy: pakiety sieciowe i sniffing
- Paramiko: SSH/SFTP automatyzacja
- Shodan: API wyszukiwarki urządzeń
- Typer/Click: eleganckie CLI z autocompletion
🚀 Następny krok: Złóż mini‑labsy w jedno narzędzie z komendą „modules”. Każdy moduł (recon, http, nmap, raport) uruchamiasz jednym przełącznikiem. Dodaj plik konfiguracyjny JSON i generowanie raportu po zakończeniu.
📧 Masz pomysł na własny toolkit? Chętnie pomogę go zaprojektować i wdrożyć. Skontaktuj się ze mną — zrobimy narzędzie, które realnie przyspieszy Twoją pracę.