Bivash Nayak
19 Dec
19Dec


 Daily Threat Intel by CyberDudeBivash

Zero-days, exploit breakdowns, IOCs, detection rules & mitigation playbooks.Follow on LinkedInApps & Security ToolsWWW.CYBERDUDEBIVASH.COM CYBERDUDEBIVASH PVT LTD


#!/usr/bin/env python3"""MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)Defensive usage: query metadata for triage, threat intel, IOC enrichment.

Features:- Query by hash (sha256/md5/sha1)- Get recent samples- Search by tag or signature- Save results to JSON and optional CSV- Optional download (OFF by default) for controlled lab-only use

Docs: https://bazaar.abuse.ch/api/"""

from __future__ import annotations

import argparseimport csvimport jsonimport osimport timefrom typing import Any, Dict, List, Optional

import requests

API_URL = "https://mb-api.abuse.ch/api/v1/"


def post_api(payload: Dict[str, Any], timeout: int = 20) -> Dict[str, Any]:    r = requests.post(API_URL, data=payload, timeout=timeout)    r.raise_for_status()    return r.json()


def write_json(path: str, obj: Any) -> None:    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)    with open(path, "w", encoding="utf-8") as f:        json.dump(obj, f, indent=2, ensure_ascii=False)


def write_csv(path: str, rows: List[Dict[str, Any]], field_order: Optional[List[str]] = None) -> None:    if not rows:        return    os.makedirs(os.path.dirname(path) or ".", exist_ok=True)

# pick stable fields if field_order is None: # common MalwareBazaar keys field_order = [ "sha256_hash", "md5_hash", "sha1_hash", "file_name", "file_type", "file_type_mime", "file_size", "first_seen", "last_seen", "reporter", "signature", "tags", "intelligence" ] # add any unknown fields for k in rows[0].keys(): if k not in field_order: field_order.append(k)

    with open(path, "w", newline="", encoding="utf-8") as f:        w = csv.DictWriter(f, fieldnames=field_order)        w.writeheader()        for row in rows:            clean = dict(row)            # flatten lists/dicts for csv            for k, v in list(clean.items()):                if isinstance(v, (list, dict)):                    clean[k] = json.dumps(v, ensure_ascii=False)            w.writerow({k: clean.get(k, "") for k in field_order})


def normalize_rows(resp: Dict[str, Any]) -> List[Dict[str, Any]]:    """    MalwareBazaar usually returns:      {"query_status":"ok","data":[{...},{...}]}    or query_status not ok.    """    if resp.get("query_status") != "ok":        return []    data = resp.get("data")    if isinstance(data, list):        return data    if isinstance(data, dict):        return [data]    return []


def download_sample(sha256: str, out_dir: str, timeout: int = 60) -> str:    """    Lab-only: downloads the sample zip from MalwareBazaar.    Requires: query=get_file, sha256_hash=...    """    os.makedirs(out_dir, exist_ok=True)    payload = {"query": "get_file", "sha256_hash": sha256}    r = requests.post(API_URL, data=payload, timeout=timeout)    r.raise_for_status()

    # API returns raw file content for get_file    out_path = os.path.join(out_dir, f"{sha256}.zip")    with open(out_path, "wb") as f:        f.write(r.content)    return out_path


def main() -> int:    ap = argparse.ArgumentParser(description="MalwareBazaar API Quick-Start (CYBERDUDEBIVASH EDITION)")    sub = ap.add_subparsers(dest="cmd", required=True)

    # hash    p_hash = sub.add_parser("hash", help="Query by hash (sha256/md5/sha1)")    p_hash.add_argument("--value", required=True, help="Hash value to search")    p_hash.add_argument("--out", default="out/mb_hash.json", help="Output JSON path")    p_hash.add_argument("--csv", default="", help="Optional CSV output path")    p_hash.add_argument("--download", action="store_true", help="(LAB ONLY) Download sample zip (OFF by default)")    p_hash.add_argument("--download-dir", default="out/downloads", help="Download directory (when --download)")

    # recent    p_recent = sub.add_parser("recent", help="Get recent samples")    p_recent.add_argument("--limit", type=int, default=50, help="Number of recent items (practical limit applies)")    p_recent.add_argument("--out", default="out/mb_recent.json", help="Output JSON path")    p_recent.add_argument("--csv", default="", help="Optional CSV output path")

# tag p_tag = sub.add_parser("tag", help="Search by tag (e.g. 'stealer', 'ransomware')") p_tag.add_argument("--value", required=True, help="Tag to search") p_tag.add_argument("--limit", type=int, default=50, help="Limit (best-effort)") p_tag.add_argument("--out", default="out/mb_tag.json", help="Output JSON path") p_tag.add_argument("--csv", default="", help="Optional CSV output path")

    # signature    p_sig = sub.add_parser("signature", help="Search by signature (family)")    p_sig.add_argument("--value", required=True, help="Signature/family to search")    p_sig.add_argument("--limit", type=int, default=50, help="Limit (best-effort)")    p_sig.add_argument("--out", default="out/mb_signature.json", help="Output JSON path")    p_sig.add_argument("--csv", default="", help="Optional CSV output path")

    args = ap.parse_args()

    # Run    if args.cmd == "hash":        resp = post_api({"query": "get_info", "hash": args.value})        rows = normalize_rows(resp)        write_json(args.out, resp)        if args.csv:            write_csv(args.csv, rows)

        # Optional lab-only download: choose sha256 from response if available        if args.download:            if not rows:                raise SystemExit("No data returned; cannot download.")            sha256 = rows[0].get("sha256_hash")            if not sha256:                raise SystemExit("No sha256_hash in response; cannot download.")            path = download_sample(sha256, args.download_dir)            print(f"[CYBERDUDEBIVASH] Downloaded sample ZIP to: {path}")

        print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}")        if args.csv:            print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}")        return 0

    if args.cmd == "recent":        resp = post_api({"query": "get_recent", "selector": str(args.limit)})        rows = normalize_rows(resp)        write_json(args.out, resp)        if args.csv:            write_csv(args.csv, rows)        print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}")        if args.csv:            print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}")        return 0

    if args.cmd == "tag":        # MalwareBazaar supports: query=get_taginfo, tag=...        resp = post_api({"query": "get_taginfo", "tag": args.value})        rows = normalize_rows(resp)        # best-effort limit client-side        if rows and args.limit:            rows = rows[: args.limit]            resp = {"query_status": "ok", "data": rows, "note": "client_side_limit_applied"}        write_json(args.out, resp)        if args.csv:            write_csv(args.csv, rows)        print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}")        if args.csv:            print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}")        return 0

    if args.cmd == "signature":        # MalwareBazaar supports: query=get_siginfo, signature=...        resp = post_api({"query": "get_siginfo", "signature": args.value})        rows = normalize_rows(resp)        if rows and args.limit:            rows = rows[: args.limit]            resp = {"query_status": "ok", "data": rows, "note": "client_side_limit_applied"}        write_json(args.out, resp)        if args.csv:            write_csv(args.csv, rows)        print(f"[CYBERDUDEBIVASH] Saved JSON: {args.out}")        if args.csv:            print(f"[CYBERDUDEBIVASH] Saved CSV: {args.csv}")        return 0

    return 2


if __name__ == "__main__":    raise SystemExit(main())


Real-time usage examples 

1) Hash enrichment (SOC IOC triage)

python malbaz_quickstart.py hash --value <SHA256_OR_MD5_OR_SHA1> --out out/hash.json --csv out/hash.csv

2) Pull latest samples for daily hunting

python malbaz_quickstart.py recent --limit 100 --out out/recent.json --csv out/recent.csv

3) Hunt by tag (e.g., “stealer”, “ransomware”)

python malbaz_quickstart.py tag --value stealer --limit 50 --out out/tag_stealer.json

4) Search by malware family/signature

python malbaz_quickstart.py signature --value "AgentTesla" --limit 50 --out out/agenttesla.json

5) (LAB ONLY) Download sample zip for isolated sandbox

python malbaz_quickstart.py hash --value <SHA256> --download --download-dir out/downloads

SOC best-practice notes (CYBERDUDEBIVASH authority)

  • Use this script for metadata enrichmentIOC pipeline, not “auto-detonation.”
  • Keep downloads OFF by default and only enable in a sandbox environment.
  • Ship JSON/CSV to SIEM, then correlate with:
    • endpoint process telemetry
    • DNS / proxy logs
    • authentication anomalies
    • email/phishing events


#cyberdudebivash #CyberDudeBivash #MalwareBazaar #ThreatIntel #MalwareAnalysis
#SOC #ThreatHunting #DFIR #IncidentResponse #DetectionEngineering
#IOC #YARA #ReverseEngineering #SecurityAutomation #PythonSecurity
#SIEM #Splunk #Elastic #CyberDefense #CyberSecurity
Comments
* The email will not be published on the website.