banner

I think you could use ZeroMQ in a Python script to listen to the daemon for new transactions including your desired address. Then, when you see a transaction witn your address, you can filter by the output amount, maybe using an API to convert BTC to USDT. You’ll need a running Bitcoin full node and launch bitcoind with flag -zmqpubrawtx=tcp://127.0.0.1:28332. I think something like the following should work.

import zmq
import json
import subprocess
import requests


ZMQ_ADDRESS = "tcp://127.0.0.1:28332"  # Ensure bitcoind has been launched with flag `-zmqpubrawtx=tcp://127.0.0.1:28332`

# Target Bitcoin address to monitor
TARGET_ADDRESS = "your-bitcoin-address"

# Minimum and maximum USDT equivalent in BTC
MIN_USDT = 100
MAX_USDT = 115

# API to fetch BTC/USDT price
PRICE_API = "your-preferred-API"

def get_btc_price():
    """Fetch the latest BTC price in USD"""
    try:
        response = requests.get(PRICE_API).json()
        return float(response["bpi"]["USD"]["rate"].replace(",", ""))
    except Exception as e:
        print(f"Error fetching BTC price: {e}")
        return None

def decode_tx(raw_tx):
    """Decode a raw transaction using bitcoin-cli"""
    try:
        result = subprocess.run(["bitcoin-cli", "decoderawtransaction", raw_tx], capture_output=True, text=True)
        return json.loads(result.stdout) if result.stdout else None
    except Exception as e:
        print(f"Error decoding transaction: {e}")
        return None

def process_transaction(raw_tx):
    """Process and filter transactions"""
    btc_price = get_btc_price()
    if not btc_price:
        return

    tx_data = decode_tx(raw_tx)
    if not tx_data:
        return

    for vout in tx_data["vout"]:
        # Extract destination address
        if "addresses" in vout["scriptPubKey"]:
            for address in vout["scriptPubKey"]["addresses"]:
                if address == TARGET_ADDRESS:
                    amount_btc = vout["value"]
                    amount_usdt = amount_btc * btc_price
                    
                    if MIN_USDT <= amount_usdt <= MAX_USDT:
                        print(f"New Transaction Detected!")
                        print(f"TXID: {tx_data['txid']}")
                        print(f"Amount: {amount_btc} BTC (~{amount_usdt:.2f} USDT)")
                        print(f"From: Unknown (check inputs), To: {TARGET_ADDRESS}")

def listen_zmq():
    context = zmq.Context()
    socket = context.socket(zmq.SUB)
    socket.connect(ZMQ_ADDRESS)
    socket.setsockopt_string(zmq.SUBSCRIBE, "")  # Subscribe to all messages

    print("Listening for new transactions...")

    while True:
        try:
            # Receive raw transaction
            raw_tx = socket.recv_string()
            process_transaction(raw_tx)
        except Exception as e:
            print(f"Error receiving transaction: {e}")

if __name__ == "__main__":
    listen_zmq()

banner

Converter

Source: CurrencyRate
Top Selling Multipurpose WP Theme

Newsletter

Subscribe my Newsletter for new blog posts, tips & new photos. Let's stay updated!

banner

Leave a Comment

Layer 1
Your Crypto & Blockchain Beacon

CryptoInsightful

Welcome to CryptoInsightful.com, your trusted source for in-depth analysis, news, and insights into the world of cryptocurrencies, blockchain technology, NFTs (Non-Fungible Tokens), and cybersecurity. Our mission is to empower you with the knowledge and understanding you need to navigate the rapidly evolving landscape of digital assets and emerging technologies.