diff --git a/misp_modules/modules/import_mod/generic_json_import.py b/misp_modules/modules/import_mod/generic_json_import.py new file mode 100644 index 00000000..59df4c32 --- /dev/null +++ b/misp_modules/modules/import_mod/generic_json_import.py @@ -0,0 +1,375 @@ +import base64 +import json +import re +from pathlib import Path +from urllib.parse import urlparse + +import requests +from pymisp import MISPEvent, MISPObject + +misperrors = {"error": "Error"} +userConfig = { + "timeout": { + "type": "Integer", + "message": "HTTP timeout in seconds", + "default": 30, + }, + "max_records": { + "type": "Integer", + "message": "Maximum number of JSON objects to import", + "default": 1000, + }, + "include_unmapped_attributes": { + "type": "Boolean", + "message": "Import recognised indicator values that could not be mapped to an object as standalone attributes", + "default": True, + }, +} + +mispattributes = { + "inputSource": ["paste"], + "output": ["MISP Format"], + "format": "misp_standard", +} + +moduleinfo = { + "version": "0.1", + "author": "MISP Project", + "description": ( + "Fetch a JSON file from an URL and generically map its records to the closest MISP object templates." + ), + "module-type": ["import"], + "name": "Generic JSON Import", + "logo": "", + "requirements": ["requests", "PyMISP"], + "features": ( + "The module accepts an HTTP(S) URL pointing to a JSON file, fetches it, walks the JSON structure, " + "and compares discovered keys with the MISP object templates bundled with PyMISP. Best matching " + "records are emitted as MISP objects in misp_standard format; recognised indicator values that do not " + "fit an object can optionally be imported as standalone attributes." + ), + "references": ["https://github.com/MISP/misp-objects/tree/main/objects"], + "input": "URL pointing to a JSON file", + "output": "MISP objects and attributes", +} + +moduleconfig = [] + +MAX_DOWNLOAD_SIZE = 20 * 1024 * 1024 +MIN_OBJECT_SCORE = 5 + +_ALIAS_BY_TYPE = { + "AS": {"as", "asn", "autonomoussystem", "autonomous_system"}, + "domain": {"domain", "domainname", "domain_name"}, + "email": {"email", "emailaddress", "email_address", "mail"}, + "email-dst": {"emaildst", "emailto", "dstemail", "destinationemail", "to"}, + "email-src": {"emailsrc", "emailfrom", "srcemail", "sourceemail", "from"}, + "filename": {"filename", "file_name", "name"}, + "hostname": {"hostname", "host", "fqdn"}, + "ip-dst": {"ip", "ipaddress", "ip_address", "ipdst", "dstip", "destinationip", "destination_ip"}, + "ip-src": {"ipsrc", "srcip", "sourceip", "source_ip"}, + "md5": {"md5", "md5hash", "hashmd5"}, + "port": {"port", "dstport", "srcport", "destinationport", "sourceport"}, + "sha1": {"sha1", "sha1hash", "hashsha1"}, + "sha224": {"sha224", "sha224hash"}, + "sha256": {"sha256", "sha256hash", "hashsha256"}, + "sha384": {"sha384", "sha384hash"}, + "sha512": {"sha512", "sha512hash", "hashsha512"}, + "url": {"url", "uri", "link", "href"}, +} + +_ALIAS_BY_RELATION = { + "dst-port": {"dstport", "destinationport", "destination_port"}, + "ip-dst": {"ip", "ipaddress", "ip_address", "dstip", "destinationip", "destination_ip"}, + "ip-src": {"srcip", "sourceip", "source_ip"}, + "resource_path": {"path", "resourcepath", "resource_path", "urlpath", "url_path"}, + "src-port": {"srcport", "sourceport", "source_port"}, +} + +_HASH_TYPES = { + 32: "md5", + 40: "sha1", + 56: "sha224", + 64: "sha256", + 96: "sha384", + 128: "sha512", +} + +_IP_RE = re.compile(r"^(?:(?:25[0-5]|2[0-4]\d|1?\d?\d)(?:\.|$)){4}$") +_HASH_RE = re.compile(r"^[A-Fa-f0-9]{32,128}$") +_DOMAIN_RE = re.compile(r"^(?=.{1,253}$)(?:[A-Za-z0-9](?:[A-Za-z0-9-]{0,61}[A-Za-z0-9])?\.)+[A-Za-z]{2,63}$") +_EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") + + +def handler(q=False): + if q is False: + return False + try: + request = json.loads(q) + config = getPassedConfig(request) + url = getUploadedData(request).strip() + json_data = fetch_json(url, config["timeout"]) + event = MISPEvent() + generateData(event, json_data, config) + return {"results": json.loads(event.to_json())} + except Exception as exception: + return {"error": str(exception)} + + +def getUploadedData(request): + if "data" in request: + return base64.b64decode(request["data"]).decode("utf8") + if "url" in request: + return request["url"] + raise ValueError("No URL provided") + + +def getPassedConfig(request): + config = {key: value.get("default") for key, value in userConfig.items()} + config.update(request.get("config") or {}) + config["timeout"] = max(1, int(config.get("timeout") or 30)) + config["max_records"] = max(1, int(config.get("max_records") or 1000)) + config["include_unmapped_attributes"] = _to_bool(config.get("include_unmapped_attributes", True)) + return config + + +def fetch_json(url, timeout): + parsed = urlparse(url) + if parsed.scheme not in {"http", "https"} or not parsed.netloc: + raise ValueError("The input must be a valid HTTP(S) URL") + + response = requests.get(url, timeout=timeout, stream=True) + response.raise_for_status() + + content = bytearray() + for chunk in response.iter_content(chunk_size=65536): + content.extend(chunk) + if len(content) > MAX_DOWNLOAD_SIZE: + raise ValueError("The remote JSON file is larger than the supported 20 MB limit") + return json.loads(content.decode(response.encoding or "utf-8")) + + +def generateData(event, data, config): + templates = load_object_templates() + for _, record in iter_json_records(data, config["max_records"]): + flattened = flatten_record(record) + if not flattened or not has_inferred_indicator(flattened): + continue + match = find_best_template(flattened, templates) + mapped_keys = set() + if match is not None: + template_name, relation_matches = match + misp_object = MISPObject(template_name) + for key, relation, attribute_type, value in relation_matches: + if add_object_attribute(misp_object, relation, attribute_type, value): + mapped_keys.add(key) + if misp_object.Attribute: + event.objects.append(misp_object) + if config["include_unmapped_attributes"]: + add_unmapped_attributes(event, flattened, mapped_keys) + + +def load_object_templates(): + objects_path = MISPObject("url").misp_objects_path + templates = [] + for definition_path in sorted(Path(objects_path).glob("*/definition.json")): + with definition_path.open("r", encoding="utf-8") as definition_file: + definition = json.load(definition_file) + attributes = definition.get("attributes") or {} + if attributes: + templates.append( + { + "name": definition["name"], + "attributes": attributes, + "required": set(definition.get("requiredOneOf") or []), + } + ) + return templates + + +def iter_json_records(data, max_records): + emitted = 0 + stack = [("root", data)] + while stack and emitted < max_records: + path, current = stack.pop() + if isinstance(current, dict): + if has_scalar_leaf(current): + emitted += 1 + yield path, current + for key, value in reversed(list(current.items())): + if isinstance(value, (dict, list)): + stack.append((str(key), value)) + elif isinstance(current, list): + for index, value in reversed(list(enumerate(current))): + if isinstance(value, (dict, list)): + stack.append((f"{path}[{index}]", value)) + + +def has_scalar_leaf(value): + if isinstance(value, dict): + return any(not isinstance(v, (dict, list)) and v is not None for v in value.values()) + return False + + +def flatten_record(record): + flattened = [] + for key, value in record.items(): + if isinstance(value, list): + for item in value: + if is_supported_scalar(item): + flattened.append((str(key), item)) + elif is_supported_scalar(value): + flattened.append((str(key), value)) + return flattened + + +def has_inferred_indicator(flattened): + return any(infer_misp_types(value) for _, value in flattened) + + +def find_best_template(flattened, templates): + best = None + for template in templates: + score = 0 + matches = [] + used_relations = set() + for key, value in flattened: + relation_match = best_relation_for_key(key, value, template["attributes"], used_relations) + if relation_match is None: + continue + relation, attribute_type, relation_score = relation_match + score += relation_score + matches.append((key, relation, attribute_type, value)) + used_relations.add(relation) + if not matches: + continue + required_matches = template["required"].intersection(used_relations) + if template["required"] and not required_matches: + score -= 3 + else: + score += len(required_matches) + score += template_name_value_bonus(template["name"], matches) + if score >= MIN_OBJECT_SCORE and (best is None or score > best[0]): + best = (score, template["name"], matches) + if best is None: + return None + return best[1], best[2] + + +def template_name_value_bonus(template_name, matches): + normalized_template_name = normalize(template_name) + normalized_keys = {normalize(key) for key, _, _, _ in matches} + if normalized_template_name in normalized_keys: + return 5 + + template_tokens = {normalize(token) for token in re.split(r"[^A-Za-z0-9]+", template_name) if token} + if len(template_tokens) < 2: + return 0 + + signal_tokens = set(normalized_keys) + for key, _, _, value in matches: + signal_tokens.update(normalize(token) for token in re.split(r"[^A-Za-z0-9]+", key) if token) + signal_tokens.update(normalize(inferred_type) for inferred_type in infer_misp_types(value)) + return 4 if template_tokens.issubset(signal_tokens) else 0 + + +def best_relation_for_key(key, value, attributes, used_relations): + normalized_key = normalize(key) + inferred_types = infer_misp_types(value) + best = None + for relation, definition in attributes.items(): + if relation in used_relations: + continue + attribute_type = definition.get("misp-attribute") + score = 0 + if normalized_key == normalize(relation): + score += 5 + elif normalized_key in {normalize(alias) for alias in _ALIAS_BY_RELATION.get(relation, set())}: + score += 4 + elif attribute_type and normalized_key in { + normalize(alias) for alias in _ALIAS_BY_TYPE.get(attribute_type, set()) + }: + score += 3 + + if attribute_type in inferred_types: + score += 2 + elif inferred_types and attribute_type not in {"text", "comment"} and score == 0: + continue + + if score and (best is None or score > best[2]): + best = (relation, attribute_type, score) + return best + + +def add_object_attribute(misp_object, relation, attribute_type, value): + value = scalar_to_string(value) + if not value: + return False + misp_object.add_attribute(relation, type=attribute_type, value=value) + return True + + +def add_unmapped_attributes(event, flattened, mapped_keys): + for key, value in flattened: + if key in mapped_keys: + continue + inferred_types = infer_misp_types(value) + if not inferred_types: + continue + attribute_type = sorted(inferred_types)[0] + event.add_attribute(attribute_type, scalar_to_string(value), comment=f"Imported from JSON field: {key}") + + +def infer_misp_types(value): + value = scalar_to_string(value) + if not value: + return set() + lowered = value.lower() + if lowered.startswith(("http://", "https://")): + return {"url"} + if _EMAIL_RE.match(value): + return {"email", "email-src", "email-dst"} + if _IP_RE.match(value): + return {"ip-dst", "ip-src"} + if _DOMAIN_RE.match(value): + return {"domain", "hostname"} + if _HASH_RE.match(value) and len(value) in _HASH_TYPES: + return {_HASH_TYPES[len(value)]} + if value.isdigit() and 0 <= int(value) <= 65535: + return {"port"} + return set() + + +def normalize(value): + return re.sub(r"[^a-z0-9]", "", str(value).lower()) + + +def scalar_to_string(value): + if isinstance(value, bool): + return "true" if value else "false" + if value is None: + return "" + return str(value).strip() + + +def is_supported_scalar(value): + return isinstance(value, (str, int, float, bool)) and value is not None + + +def _to_bool(value): + if isinstance(value, bool): + return value + if isinstance(value, str): + return value.lower() in {"1", "true", "yes", "on"} + return bool(value) + + +def introspection(): + modulesetup = dict(mispattributes) + modulesetup["userConfig"] = userConfig + return modulesetup + + +def version(): + moduleinfo["config"] = moduleconfig + return moduleinfo diff --git a/tests/test_generic_json_import.py b/tests/test_generic_json_import.py new file mode 100644 index 00000000..7d806285 --- /dev/null +++ b/tests/test_generic_json_import.py @@ -0,0 +1,66 @@ +import base64 +import json + +from misp_modules.modules.import_mod import generic_json_import + + +def _query(url="https://example.test/indicators.json", config=None): + request = {"data": base64.b64encode(url.encode()).decode()} + if config is not None: + request["config"] = config + return json.dumps(request) + + +def test_handler_maps_json_url_records_to_misp_objects(monkeypatch): + def fake_fetch_json(url, timeout): + assert url == "https://example.test/indicators.json" + assert timeout == 30 + return [ + {"url": "https://example.com/a?b=c", "host": "example.com"}, + {"ip": "198.51.100.10", "port": 443}, + {"filename": "payload.exe", "sha256": "a" * 64}, + ] + + monkeypatch.setattr(generic_json_import, "fetch_json", fake_fetch_json) + + response = generic_json_import.handler(_query()) + + objects = response["results"]["Object"] + object_names = [misp_object["name"] for misp_object in objects] + assert "url" in object_names + assert "ip-port" in object_names + assert "file" in object_names + + +def test_handler_can_import_unmapped_indicator_attributes(monkeypatch): + monkeypatch.setattr( + generic_json_import, "fetch_json", lambda url, timeout: {"indicator_value": "https://example.com"} + ) + + response = generic_json_import.handler(_query(config={"include_unmapped_attributes": True})) + + attributes = response["results"]["Attribute"] + assert attributes[0]["type"] == "url" + assert attributes[0]["value"] == "https://example.com" + + +def test_handler_guesses_templates_per_json_element(monkeypatch): + monkeypatch.setattr( + generic_json_import, + "fetch_json", + lambda url, timeout: { + "feed_name": "mixed indicators", + "indicators": [ + {"url": "https://example.com/a?b=c", "host": "example.com"}, + {"filename": "payload.exe", "sha256": "a" * 64}, + ], + }, + ) + + response = generic_json_import.handler(_query(config={"include_unmapped_attributes": False})) + + objects = response["results"]["Object"] + assert [misp_object["name"] for misp_object in objects] == ["url", "file"] + for misp_object in objects: + relations = {attribute["object_relation"] for attribute in misp_object["Attribute"]} + assert not {"url", "host"}.intersection(relations) or not {"filename", "sha256"}.intersection(relations)