Auditing Monero (XMR) Blockchain: A Simple Script for Detecting Possible Inflation Bugs
Share
This article presents a robust and detailed Python script for auditing the Monero blockchain, a privacy-focused cryptocurrency. The code verifies the integrity of blocks by comparing actual rewards (obtained via monerod) with CoinBase transaction outputs and the total amount mined, detecting potential discrepancies such as inflation bugs (e.g., the 2017 incident). With support for multiple attempts (up to 99 retries with a 20-second timeout), detailed real-time logs, and immediate CSV recording, the script provides a reliable tool for large-scale analysis or auditing individual blocks. Features like a progress bar (using tqdm) and error handling make it accessible and efficient, ideal for researchers and cryptocurrency enthusiasts seeking to ensure the consistency of XMR issuance.
It’s simply pathetic that maximalist monkeys and "certain famous Bitcoin cops/officials" obsessed with state control and fear-based narratives, still spew the absurd lie that Monero makes it impossible to count the total supply, as if it were some unsolvable mystery designed to frustrate their lazy minds—when in reality, all it takes is opening the monerod source code, available to anyone with two neurons and a shred of curiosity, to debunk this nonsense and prove that the issuance is as auditable as the inflated egos of these centralized clowns.
So enough talk, run this thing already:
import json
import requests
import csv
import time
import subprocess
import argparse
from tqdm import tqdm
from datetime import datetime
# Constants
RPC_URL = "http://192.168.200.253:18081/json_rpc"
TOLERANCE = 1e9 # 0.001 XMR
BATCH_SIZE = 50
ATOMIC_UNITS = 1e12
MONEROD_PATH = "monerod"
TIMEOUT = 20
MAX_RETRIES = 99
# Timestamp for files
timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
CSV_FILE = f"monero_audit_{timestamp}.csv"
LOG_FILE = f"monero_audit_log_{timestamp}.txt"
def log_message(message):
with open(LOG_FILE, "a") as f:
f.write(f"[{datetime.now().isoformat()}] {message}\n")
f.flush()
def rpc_call(method, params=None):
payload = {
"jsonrpc": "2.0",
"id": "0",
"method": method,
"params": params or {}
}
log_message(f"[DEBUG] Calling RPC: {method} with params {params}")
try:
response = requests.post(RPC_URL, json=payload, timeout=TIMEOUT)
response.raise_for_status()
result = response.json().get("result")
log_message(f"[DEBUG] RPC Response: {result}")
return result
except Exception as e:
log_message(f"[ERROR] RPC {method}: {str(e)}")
return None
def get_block_height():
result = rpc_call("get_block_count")
return result["count"] - 1 if result else None
def get_block_info(height):
result = rpc_call("get_block", {"height": height})
if not result:
log_message(f"[ERROR] get_block failed {height}")
return None
log_message(f"[DEBUG] Block {height} successfully obtained")
return {
"height": height,
"hash": result["block_header"]["hash"],
"json": json.loads(result["json"])
}
def get_transaction_details(tx_hash):
result = rpc_call("get_transactions", {"txs_hashes": [tx_hash], "decode_as_json": True})
if not result or "txs" not in result or not result["txs"]:
log_message(f"[ERROR] Failed to get transaction details {tx_hash}")
return None
return json.loads(result["txs"][0]["as_json"])
def get_real_reward_from_shell(height):
retries = 0
while retries < MAX_RETRIES:
try:
cmd = [MONEROD_PATH, "--rpc-bind-ip", "192.168.200.253", "print_block", str(height)]
log_message(f"[DEBUG] Attempt {retries+1}/{MAX_RETRIES} in block {height}: {' '.join(cmd)}")
output = subprocess.check_output(cmd, text=True, timeout=TIMEOUT)
log_message(f"[DEBUG] Output block {height}: {output}")
for line in output.splitlines():
if "reward:" in line:
reward = int(float(line.split(":")[1].strip()) * ATOMIC_UNITS)
log_message(f"[DEBUG] Reward block {height}: {reward}")
return reward
log_message(f"[ERROR] 'reward:' not found in block {height}")
return None
except subprocess.TimeoutExpired:
retries += 1
log_message(f"[TIMEOUT] Attempt {retries}/{MAX_RETRIES} in block {height}")
except Exception as e:
retries += 1
log_message(f"[ERROR] Shell block {height}: {e}")
time.sleep(1)
log_message(f"[FAILURE] {MAX_RETRIES} Attempts in block {height}")
return None
def audit_block(block_info):
height = block_info["height"]
block_data = block_info["json"]
log_message(f"[DEBUG] Audit initiated for block {height}")
real_reward = get_real_reward_from_shell(height)
if real_reward is None:
log_message(f"[ERROR] Real reward not earned for block {height}")
return None
log_message(f"[DEBUG] Real reward block {height}: {real_reward}")
miner_tx = block_data["miner_tx"]
coinbase_outputs = sum(output["amount"] for output in miner_tx["vout"])
log_message(f"[DEBUG] CoinBase Block Outputs {height}: {coinbase_outputs}")
tx_hashes = block_data.get("tx_hashes", [])
total_tx_outputs = 0
for tx_hash in tx_hashes:
tx_details = get_transaction_details(tx_hash)
if tx_details:
total_tx_outputs += sum(output["amount"] for output in tx_details["vout"])
log_message(f"[DEBUG] Total TX block outputs {height}: {total_tx_outputs}")
total_mined = coinbase_outputs + total_tx_outputs
log_message(f"[DEBUG] Total mining block {height}: {total_mined}")
issues = []
if abs(real_reward - coinbase_outputs) > TOLERANCE:
issues.append(f"Reward != CoinBase ({real_reward} vs {coinbase_outputs})")
if abs(real_reward - total_mined) > TOLERANCE:
issues.append(f"Reward != TotalMined ({real_reward} vs {total_mined})")
if len(miner_tx["vin"]) != 1 or miner_tx["vin"][0]["gen"]["height"] != height:
issues.append("Invalid CoinBase")
result = {
"height": height,
"hash": block_info["hash"],
"real_reward": real_reward,
"coinbase_outputs": coinbase_outputs,
"total_mined": total_mined,
"issues": issues,
"status": "Discrepancy" if issues else "OK"
}
log_message(f"[DEBUG] Result block {height}: status={result['status']}, issues={result['issues']}")
return result
def audit_block_range(start_block, end_block):
log_message(f"Starting audit from {start_block} until {end_block}")
with open(CSV_FILE, "w", newline="") as f:
writer = csv.writer(f)
writer.writerow([
"Height", "Hash", "Real reward", "CoinBase outputs", "Total Mined", "Problems", "Status"
])
with tqdm(total=(end_block - start_block + 1), desc="Auditing blocks") as pbar:
for start in range(start_block, end_block + 1, BATCH_SIZE):
batch_end = min(start + BATCH_SIZE - 1, end_block)
log_message(f"[DEBUG] Processing batch {start}-{batch_end}")
for height in range(start, batch_end + 1):
block_info = get_block_info(height)
if not block_info:
log_message(f"[SKIP] Block {height} not obtained")
pbar.update(1)
continue
result = audit_block(block_info)
if not result:
log_message(f"[SKIP] Audit failed for block {height}")
pbar.update(1)
continue
# Save each block immediately (not CSV)
with open(CSV_FILE, "a", newline="") as f:
writer = csv.writer(f)
writer.writerow([
result["height"], result["hash"], result["real_reward"],
result["coinbase_outputs"], result["total_mined"],
"; ".join(result["issues"]) if result["issues"] else "None",
result["status"]
])
log_message(f"[DEBUG] Block {height} saved in CSV: status={result['status']}")
pbar.update(1)
log_message(f"[DEBUG] Batch {start}-{batch_end} completed")
time.sleep(0.1)
log_message(f"Audit complete. Result saved in {CSV_FILE}")
def audit_single_block(block_num):
log_message(f"Auditing single block: {block_num}")
block_info = get_block_info(block_num)
if not block_info:
print(f"[ERROR] Could not get block {block_num}")
return
result = audit_block(block_info)
if not result:
print(f"[ERROR] Audit failed for block {block_num}")
return
print(f"\n🧱 Block {result['height']} - Hash: {result['hash']}")
print(f"💰 Real reward: {result['real_reward']} atomic units")
print(f"📤 CoinBase outputs: {result['coinbase_outputs']}")
print(f"📦 Total mined: {result['total_mined']}")
print(f"⚠️ Status: {result['status']}")
if result["issues"]:
print("❗ Problems:")
for issue in result["issues"]:
print(f" - {issue}")
else:
print("✅ No problems detected.")
if __name__ == "__main__":
log_message("Script started")
parser = argparse.ArgumentParser(description="Monero blockchain audit")
parser.add_argument("--block", type=int, help="Block number to audit individually")
args = parser.parse_args()
try:
if args.block is not None:
audit_single_block(args.block)
else:
end_block = get_block_height()
if end_block is None:
print("Error getting current blockchain height.")
log_message("[ERROR] Failed to get blockchain height")
else:
audit_block_range(0, end_block)
except Exception as e:
log_message(f"[CRITICAL] General error in the script: {str(e)}")
print(f"Critical error: {str(e)}")
When executed, two files will be created as shown in the example printout:

This is the generated CSV:

And this is the debug log:


If you want to audit block 2,641,623 (tail emission), simply specify the block with the block argument:

I recommend running it on your own monerod nodes, changing only the variables below to the IP of your local node:
RPC_URL = "http://192.168.200.253:18081/json_rpc"
cmd = [MONEROD_PATH, "--rpc-bind-ip", "192.168.200.253", "print_block", str(height)]
If you want to criticize, point out a flaw in the code (which was generated with Grok 3), fix it, and send it back to me. Talk is cheap, show me the code!
The C++ and Python source code is available at: https://github.com/area31/audit-xmr
Have a lot of fun!