Auditing Monero (XMR) Blockchain: A Simple Script for Detecting Possible Inflation Bugs

Auditing Monero (XMR) Blockchain: A Simple Script for Detecting Possible Inflation Bugs

This article presents a robust and detailed Python script for auditing the Monero blockchain, a privacy-focused cryptocurrency. The code verifies the integrity of blocks by comparing actual rewards (obtained via monerod) with CoinBase transaction outputs and the total amount mined, detecting potential discrepancies such as inflation bugs (e.g., the 2017 incident). With support for multiple attempts (up to 99 retries with a 20-second timeout), detailed real-time logs, and immediate CSV recording, the script provides a reliable tool for large-scale analysis or auditing individual blocks. Features like a progress bar (using tqdm) and error handling make it accessible and efficient, ideal for researchers and cryptocurrency enthusiasts seeking to ensure the consistency of XMR issuance.

It’s simply pathetic that maximalist monkeys and "certain famous Bitcoin cops/officials" obsessed with state control and fear-based narratives, still spew the absurd lie that Monero makes it impossible to count the total supply, as if it were some unsolvable mystery designed to frustrate their lazy minds—when in reality, all it takes is opening the monerod source code, available to anyone with two neurons and a shred of curiosity, to debunk this nonsense and prove that the issuance is as auditable as the inflated egos of these centralized clowns.

So enough talk, run this thing already:

python

import json
import requests
import csv
import time
import subprocess
import argparse
from tqdm import tqdm
from datetime import datetime

# Constants
RPC_URL = "http://192.168.200.253:18081/json_rpc"
TOLERANCE = 1e9  # 0.001 XMR
BATCH_SIZE = 50
ATOMIC_UNITS = 1e12
MONEROD_PATH = "monerod"
TIMEOUT = 20
MAX_RETRIES = 99

# Timestamp for files
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
CSV_FILE = f"monero_audit_{timestamp}.csv"
LOG_FILE = f"monero_audit_log_{timestamp}.txt"

def log_message(message):
    with open(LOG_FILE, "a") as f:
        f.write(f"[{datetime.now().isoformat()}] {message}\n")
        f.flush()

def rpc_call(method, params=None):
    payload = {
        "jsonrpc": "2.0",
        "id": "0",
        "method": method,
        "params": params or {}
    }
    log_message(f"[DEBUG] Calling RPC: {method} with params {params}")
    try:
        response = requests.post(RPC_URL, json=payload, timeout=TIMEOUT)
        response.raise_for_status()
        result = response.json().get("result")
        log_message(f"[DEBUG] RPC Response: {result}")
        return result
    except Exception as e:
        log_message(f"[ERROR] RPC {method}: {str(e)}")
        return None

def get_block_height():
    result = rpc_call("get_block_count")
    return result["count"] - 1 if result else None

def get_block_info(height):
    result = rpc_call("get_block", {"height": height})
    if not result:
        log_message(f"[ERROR] get_block failed {height}")
        return None
    log_message(f"[DEBUG] Block {height} successfully obtained")
    return {
        "height": height,
        "hash": result["block_header"]["hash"],
        "json": json.loads(result["json"])
    }

def get_transaction_details(tx_hash):
    result = rpc_call("get_transactions", {"txs_hashes": [tx_hash], "decode_as_json": True})
    if not result or "txs" not in result or not result["txs"]:
        log_message(f"[ERROR] Failed to get transaction details {tx_hash}")
        return None
    return json.loads(result["txs"][0]["as_json"])

def get_real_reward_from_shell(height):
    retries = 0
    while retries < MAX_RETRIES:
        try:
            cmd = [MONEROD_PATH, "--rpc-bind-ip", "192.168.200.253", "print_block", str(height)]
            log_message(f"[DEBUG] Attempt {retries+1}/{MAX_RETRIES} in block {height}: {' '.join(cmd)}")
            output = subprocess.check_output(cmd, text=True, timeout=TIMEOUT)
            log_message(f"[DEBUG] Output block {height}: {output}")
            for line in output.splitlines():
                if "reward:" in line:
                    reward = int(float(line.split(":")[1].strip()) * ATOMIC_UNITS)
                    log_message(f"[DEBUG] Reward block {height}: {reward}")
                    return reward
            log_message(f"[ERROR] 'reward:' not found in block {height}")
            return None
        except subprocess.TimeoutExpired:
            retries += 1
            log_message(f"[TIMEOUT] Attempt {retries}/{MAX_RETRIES} in block {height}")
        except Exception as e:
            retries += 1
            log_message(f"[ERROR] Shell block {height}: {e}")
        time.sleep(1)
    log_message(f"[FAILURE] {MAX_RETRIES} Attempts in block {height}")
    return None

def audit_block(block_info):
    height = block_info["height"]
    block_data = block_info["json"]

    log_message(f"[DEBUG] Audit initiated for block {height}")
    real_reward = get_real_reward_from_shell(height)
    if real_reward is None:
        log_message(f"[ERROR] Real reward not earned for block {height}")
        return None
    log_message(f"[DEBUG] Real reward block {height}: {real_reward}")

    miner_tx = block_data["miner_tx"]
    coinbase_outputs = sum(output["amount"] for output in miner_tx["vout"])
    log_message(f"[DEBUG] CoinBase Block Outputs {height}: {coinbase_outputs}")

    tx_hashes = block_data.get("tx_hashes", [])
    total_tx_outputs = 0
    for tx_hash in tx_hashes:
        tx_details = get_transaction_details(tx_hash)
        if tx_details:
            total_tx_outputs += sum(output["amount"] for output in tx_details["vout"])
    log_message(f"[DEBUG] Total TX block outputs {height}: {total_tx_outputs}")

    total_mined = coinbase_outputs + total_tx_outputs
    log_message(f"[DEBUG] Total mining block {height}: {total_mined}")

    issues = []
    if abs(real_reward - coinbase_outputs) > TOLERANCE:
        issues.append(f"Reward != CoinBase ({real_reward} vs {coinbase_outputs})")
    if abs(real_reward - total_mined) > TOLERANCE:
        issues.append(f"Reward != TotalMined ({real_reward} vs {total_mined})")
    if len(miner_tx["vin"]) != 1 or miner_tx["vin"][0]["gen"]["height"] != height:
        issues.append("Invalid CoinBase")

    result = {
        "height": height,
        "hash": block_info["hash"],
        "real_reward": real_reward,
        "coinbase_outputs": coinbase_outputs,
        "total_mined": total_mined,
        "issues": issues,
        "status": "Discrepancy" if issues else "OK"
    }
    log_message(f"[DEBUG] Result block {height}: status={result['status']}, issues={result['issues']}")
    return result

def audit_block_range(start_block, end_block):
    log_message(f"Starting audit from {start_block} until {end_block}")

    with open(CSV_FILE, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow([
            "Height", "Hash", "Real reward", "CoinBase outputs", "Total Mined", "Problems", "Status"
        ])

    with tqdm(total=(end_block - start_block + 1), desc="Auditing blocks") as pbar:
        for start in range(start_block, end_block + 1, BATCH_SIZE):
            batch_end = min(start + BATCH_SIZE - 1, end_block)
            log_message(f"[DEBUG] Processing batch {start}-{batch_end}")

            for height in range(start, batch_end + 1):
                block_info = get_block_info(height)
                if not block_info:
                    log_message(f"[SKIP] Block {height} not obtained")
                    pbar.update(1)
                    continue
                result = audit_block(block_info)
                if not result:
                    log_message(f"[SKIP] Audit failed for block {height}")
                    pbar.update(1)
                    continue

                # Save each block immediately (not CSV)
                with open(CSV_FILE, "a", newline="") as f:
                    writer = csv.writer(f)
                    writer.writerow([
                        result["height"], result["hash"], result["real_reward"],
                        result["coinbase_outputs"], result["total_mined"],
                        "; ".join(result["issues"]) if result["issues"] else "None",
                        result["status"]
                    ])
                log_message(f"[DEBUG] Block {height} saved in CSV: status={result['status']}")
                pbar.update(1)

            log_message(f"[DEBUG] Batch {start}-{batch_end} completed")
            time.sleep(0.1)

    log_message(f"Audit complete. Result saved in {CSV_FILE}")

def audit_single_block(block_num):
    log_message(f"Auditing single block: {block_num}")
    block_info = get_block_info(block_num)
    if not block_info:
        print(f"[ERROR] Could not get block {block_num}")
        return
    result = audit_block(block_info)
    if not result:
        print(f"[ERROR] Audit failed for block {block_num}")
        return

    print(f"\n🧱 Block {result['height']} - Hash: {result['hash']}")
    print(f"💰 Real reward: {result['real_reward']} atomic units")
    print(f"📤 CoinBase outputs: {result['coinbase_outputs']}")
    print(f"📦 Total mined: {result['total_mined']}")
    print(f"⚠️ Status: {result['status']}")
    if result["issues"]:
        print("❗ Problems:")
        for issue in result["issues"]:
            print(f"  - {issue}")
    else:
        print("✅ No problems detected.")

if __name__ == "__main__":
    log_message("Script started")
    parser = argparse.ArgumentParser(description="Monero blockchain audit")
    parser.add_argument("--block", type=int, help="Block number to audit individually")
    args = parser.parse_args()

    try:
        if args.block is not None:
            audit_single_block(args.block)
        else:
            end_block = get_block_height()
            if end_block is None:
                print("Error getting current blockchain height.")
                log_message("[ERROR] Failed to get blockchain height")
            else:
                audit_block_range(0, end_block)
    except Exception as e:
        log_message(f"[CRITICAL] General error in the script: {str(e)}")
        print(f"Critical error: {str(e)}")
  

When executed, two files will be created as shown in the example printout:

This is the generated CSV:

And this is the debug log:

If you want to audit block 2,641,623 (tail emission), simply specify the block with the block argument:

I recommend running it on your own monerod nodes, changing only the variables below to the IP of your local node:

Python

RPC_URL = "http://192.168.200.253:18081/json_rpc"

cmd = [MONEROD_PATH, "--rpc-bind-ip", "192.168.200.253", "print_block", str(height)]
  


If you want to criticize, point out a flaw in the code (which was generated with Grok 3), fix it, and send it back to me. Talk is cheap, show me the code!

The C++ and Python source code is available at: https://github.com/area31/audit-xmr

Have a lot of fun!

 

Back to blog