Daily Threat Intel by CyberDudeBivash
Zero-days, exploit breakdowns, IOCs, detection rules & mitigation playbooks.Follow on LinkedInApps & Security ToolsWWW.CYBERDUDEBIVASH.COM CYBERDUDEBIVASH PVT LTD
#!/usr/bin/env python3"""MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)Defensive usage: query metadata for triage, threat intel, IOC enrichment.
Features:- Query by hash (sha256/md5/sha1)- Get recent samples- Search by tag or signature- Save results to JSON and optional CSV- Optional download (OFF by default) for controlled lab-only use
Docs: https://bazaar.abuse.ch/api/"""
from __future__ import annotations
import argparseimport csvimport jsonimport osimport timefrom typing import Any, Dict, List, Optional
import requests
API_URL = "https://mb-api.abuse.ch/api/v1/"
def post_api(payload: Dict[str, Any], timeout: int = 20) -> Dict[str, Any]: r = requests.post(API_URL, data=payload, timeout=timeout) r.raise_for_status() return r.json()
def write_json(path: str, obj: Any) -> None: os.makedirs(os.path.dirname(path) or ".", exist_ok=True) with open(path, "w", encoding="utf-8") as f: json.dump(obj, f, indent=2, ensure_ascii=False)
def write_csv(path: str, rows: List[Dict[str, Any]], field_order: Optional[List[str]] = None) -> None: if not rows: return os.makedirs(os.path.dirname(path) or ".", exist_ok=True)
# pick stable fields if field_order is None: # common MalwareBazaar keys field_order = [ "sha256_hash", "md5_hash", "sha1_hash", "file_name", "file_type", "file_type_mime", "file_size", "first_seen", "last_seen", "reporter", "signature", "tags", "intelligence" ] # add any unknown fields for k in rows[0].keys(): if k not in field_order: field_order.append(k)
with open(path, "w", newline="", encoding="utf-8") as f: w = csv.DictWriter(f, fieldnames=field_order) w.writeheader() for row in rows: clean = dict(row) # flatten lists/dicts for csv for k, v in list(clean.items()): if isinstance(v, (list, dict)): clean[k] = json.dumps(v, ensure_ascii=False) w.writerow({k: clean.get(k, "") for k in field_order})
def normalize_rows(resp: Dict[str, Any]) -> List[Dict[str, Any]]: """ MalwareBazaar usually returns: {"query_status":"ok","data":[{...},{...}]} or query_status not ok. """ if resp.get("query_status") != "ok": return [] data = resp.get("data") if isinstance(data, list): return data if isinstance(data, dict): return [data] return []
def download_sample(sha256: str, out_dir: str, timeout: int = 60) -> str: """ Lab-only: downloads the sample zip from MalwareBazaar. Requires: query=get_file, sha256_hash=... """ os.makedirs(out_dir, exist_ok=True) payload = {"query": "get_file", "sha256_hash": sha256} r = requests.post(API_URL, data=payload, timeout=timeout) r.raise_for_status()
# API returns raw file content for get_file out_path = os.path.join(out_dir, f"{sha256}.zip") with open(out_path, "wb") as f: f.write(r.content) return out_path
def main() -> int: ap = argparse.ArgumentParser(description="MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)") sub = ap.add_subparsers(dest="cmd", required=True)
# hash p_hash = sub.add_parser("hash", help="Query by hash (sha256/md5/sha1)") p_hash.add_argument("--value", required=True, help="Hash value to search") p_hash.add_argument("--out", default="out/mb_hash.json", help="Output JSON path") p_hash.add_argument("--csv", default="", help="Optional CSV output path") p_hash.add_argument("--download", action="store_true", help="(LAB ONLY) Download sample zip (OFF by default)") p_hash.add_argument("--download-dir", default="out/downloads", help="Download directory (when --download)")
# recent p_recent = sub.add_parser("recent", help="Get recent samples") p_recent.add_argument("--limit", type=int, default=50, help="Number of recent items (practical limit applies)") p_recent.add_argument("--out", default="out/mb_recent.json", help="Output JSON path") p_recent.add_argument("--csv", default="", help="Optional CSV output path")
# tag p_tag = sub.add_parser("tag", help="Search by tag (e.g. 'stealer', 'ransomware')") p_tag.add_argument("--value", required=True, help="Tag to search") p_tag.add_argument("--limit", type=int, default=50, help="Limit (best-effort)") p_tag.add_argument("--out", default="out/mb_tag.json", help="Output JSON path") p_tag.add_argument("--csv", default="", help="Optional CSV output path")
# signature p_sig = sub.add_parser("signature", help="Search by signature (family)") p_sig.add_argument("--value", required=True, help="Signature/family to search") p_sig.add_argument("--limit", type=int, default=50, help="Limit (best-effort)") p_sig.add_argument("--out", default="out/mb_signature.json", help="Output JSON path") p_sig.add_argument("--csv", default="", help="Optional CSV output path")
args = ap.parse_args()
# Run if args.cmd == "hash": resp = post_api({"query": "get_info", "hash": args.value}) rows = normalize_rows(resp) write_json(args.out, resp) if args.csv: write_csv(args.csv, rows)
# Optional lab-only download: choose sha256 from response if available if args.download: if not rows: raise SystemExit("No data returned; cannot download.") sha256 = rows[0].get("sha256_hash") if not sha256: raise SystemExit("No sha256_hash in response; cannot download.") path = download_sample(sha256, args.download_dir) print(f"[CYBERDUDEBIVASH] Downloaded sample ZIP to: {path}")
print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}") if args.csv: print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}") return 0
if args.cmd == "recent": resp = post_api({"query": "get_recent", "selector": str(args.limit)}) rows = normalize_rows(resp) write_json(args.out, resp) if args.csv: write_csv(args.csv, rows) print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}") if args.csv: print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}") return 0
if args.cmd == "tag": # MalwareBazaar supports: query=get_taginfo, tag=... resp = post_api({"query": "get_taginfo", "tag": args.value}) rows = normalize_rows(resp) # best-effort limit client-side if rows and args.limit: rows = rows[: args.limit] resp = {"query_status": "ok", "data": rows, "note": "client_side_limit_applied"} write_json(args.out, resp) if args.csv: write_csv(args.csv, rows) print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}") if args.csv: print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}") return 0
if args.cmd == "signature": # MalwareBazaar supports: query=get_siginfo, signature=... resp = post_api({"query": "get_siginfo", "signature": args.value}) rows = normalize_rows(resp) if rows and args.limit: rows = rows[: args.limit] resp = {"query_status": "ok", "data": rows, "note": "client_side_limit_applied"} write_json(args.out, resp) if args.csv: write_csv(args.csv, rows) print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}") if args.csv: print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}") return 0
return 2
if __name__ == "__main__": raise SystemExit(main())
python malbaz_quickstart.py hash --value <SHA256_OR_MD5_OR_SHA1> --out out/hash.json --csv out/hash.csvpython malbaz_quickstart.py recent --limit 100 --out out/recent.json --csv out/recent.csvpython malbaz_quickstart.py tag --value stealer --limit 50 --out out/tag_stealer.jsonpython malbaz_quickstart.py signature --value "AgentTesla" --limit 50 --out out/agenttesla.jsonpython malbaz_quickstart.py hash --value <SHA256> --download --download-dir out/downloads#cyberdudebivash #CyberDudeBivash #MalwareBazaar #ThreatIntel #MalwareAnalysis
#SOC #ThreatHunting #DFIR #IncidentResponse #DetectionEngineering
#IOC #YARA #ReverseEngineering #SecurityAutomation #PythonSecurity
#SIEM #Splunk #Elastic #CyberDefense #CyberSecurity