Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 46 additions & 17 deletions app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
import yaml
from flask import Flask, request

app = Flask(__name__)
d

API_KEY = "SUPER_SECRET_API_KEY_12345"

def get_user_by_name(username):
conn = sqlite3.connect("test.db")
cursor = conn.cursor()
# Intentionally vulnerable query
query = f"SELECT * FROM users WHERE username = '{username}'"
cursor.execute(query)
result = cursor.fetchall()
conn.close()
return result
import sqlite3
conn = None
try:
conn = sqlite3.connect("test.db")
cursor = conn.cursor()
# PRECOGS_FIX: use parameterized query to avoid SQL injection
query = "SELECT * FROM users WHERE username = ?"
cursor.execute(query, (username,))
result = cursor.fetchall()
return result
except Exception as e:
# handle/logging could be added here
return []
finally:
if conn:
conn.close()


@app.route("/user")
Expand All @@ -27,21 +30,47 @@ def user():
return {"data": str(data)}


@app.route("/ping")
from flask import request

@app.route("/ping")
def ping():
ip = request.args.get("ip", "127.0.0.1")
# Intentionally dangerous: using user input in shell command
os.system(f"ping -c 1 {ip}")
import ipaddress, subprocess
try:
# PRECOGS_FIX: validate the IP address strictly using ipaddress
ip_obj = ipaddress.ip_address(ip)
except Exception:
return {"error": "invalid ip"}, 400

# PRECOGS_FIX: call ping without invoking a shell, pass arguments as a list
try:
subprocess.run(["ping", "-c", "1", str(ip_obj)], check=False)
except FileNotFoundError:
return {"error": "ping command not available"}, 500

return {"status": "ok"}

@app.route("/load")
from flask import request

def load():
raw = request.args.get("data", None)
if not raw:
return {"error": "no data"}, 400

# Intentionally insecure: untrusted pickle.loads
obj = pickle.loads(bytes.fromhex(raw))
import json, binascii
try:
# PRECOGS_FIX: do NOT use pickle.loads on untrusted data; expect JSON encoded in hex instead
data_bytes = bytes.fromhex(raw)
except (ValueError, TypeError):
return {"error": "invalid hex data"}, 400

try:
obj = json.loads(data_bytes.decode("utf-8"))
except Exception:
return {"error": "failed to parse JSON payload; sending pickles is not allowed"}, 400

return {"loaded": str(obj)}

@app.route("/yaml")
Expand Down
25 changes: 16 additions & 9 deletions sample-vuln/app.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,28 @@

@app.route("/ping")
from flask import request

@app.route("/ping")
def ping():
from flask import request
import ipaddress, subprocess, shutil

ip = request.args.get("ip", "127.0.0.1")
import ipaddress, subprocess
try:
# PRECOGS_FIX: validate the IP address strictly using ipaddress
ip_obj = ipaddress.ip_address(ip)
except Exception:
return {"error": "invalid ip"}, 400

# PRECOGS_FIX: call ping without invoking a shell, pass arguments as a list
# PRECOGS_FIX: reject non-global (private/reserved/loopback) IPs to prevent SSRF
if not getattr(ip_obj, "is_global", False):
return {"error": "ip not allowed"}, 403

ping_path = shutil.which("ping")
if not ping_path:
return {"error": "ping command not available"}, 500

# PRECOGS_FIX: call ping without invoking a shell, pass absolute path and use timeout
try:
subprocess.run(["ping", "-c", "1", str(ip_obj)], check=False)
subprocess.run([ping_path, "-c", "1", str(ip_obj)], check=False, timeout=5)
except FileNotFoundError:
return {"error": "ping command not available"}, 500
except subprocess.TimeoutExpired:
return {"error": "ping timed out"}, 504

return {"status": "ok"}
return {"status": "ok"}