0

I want to get pending transaction of a contract address, I have tried many ways but didn't work

method 1: this seems to be good at sorting pending transaction but I can't get any transaction from my address, I don't know why. Please help me

def main():
    block_filter = web3.eth.filter('pending') 
    log_loop(block_filter, 0)

def log_loop(block_filter, poll_interval):
    while True: 
        for event in block_filter.get_new_entries():
            if web3.eth.getTransaction(event)['from'] == my contract:
                print(event)

method 2: this help me to get transaction from my address but all transaction it get is comfirmed, not pending

def main():
    block_filter = web3.eth.filter({'fromBlock':'pending','toBlock':'pending', 'address':contract_address}) #this is not working, return nothing

    #block_filter = web3.eth.filter({'fromBlock':0,'toBlock':'pending', 'address':contract_address}) #return confirmed transaction, not pending

    #block_filter = web3.eth.filter({'fromBlock':'pending','toBlock':'latest', 'address':contract_address}) #return confirmed transaction, not pending

    #block_filter = web3.eth.filter({'fromBlock':'latest','toBlock':'pending', 'address':contract_address}) #return error from > to

    #block_filter = web3.eth.filter({'address':contract_address}) #return confirmed transaction, not pending
    log_loop(block_filter, 0)

def log_loop(block_filter, poll_interval):
    while True: 
        for event in block_filter.get_new_entries():
            print(event)
TylerH
  • 20,799
  • 66
  • 75
  • 101
Mike
  • 53
  • 1
  • 8

1 Answers1

2

You can subscribe to new pending transactions, using WebSocket connection to your Node For more info check https://geth.ethereum.org/docs/rpc/pubsub

Than way you will receive all pending transaction txHashes. After that you can filter them and process only that sent to desired token address. One way is to use web3.eth.get_transaction() for each received txHash. It will return you a dict with transaction details. Use 'to' key from this dict to filter is pending transaction sent to desired contract address or not. Bellow is (slightly modified) solution suggested at: https://community.infura.io/t/web3-py-how-to-subscribe-to-pending-ethereum-transactions-in-python/5409

import asyncio
import json
import requests
import websockets
from web3 import Web3


web3 = Web3(Web3.HTTPProvider(node_http_url))
# specific address to watch at 
watched_address = Web3.toChecksumAddress('0x....')

async def get_event():
    async with websockets.connect(node_ws_url) as ws:
        await ws.send('{"jsonrpc": "2.0", "id": 1, "method": "eth_subscribe", "params": ["newPendingTransactions"]}')
        subscription_response = await ws.recv()
        print(subscription_response)

        while True:
            try:
                message = await asyncio.wait_for(ws.recv(), timeout=15)
                response = json.loads(message)
                txHash = response['params']['result']
                print(txHash)

                tx = web3.eth.get_transaction(txHash)
                if tx['to'] == watched_address:
                    print("Pending transaction found with the following details:")
                    print({
                        "hash": txHash,
                        "from": tx["from"],
                        "value": web3.fromWei(tx["value"], 'ether')
                    })
                pass
            except:
                pass

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    while True:
        loop.run_until_complete(get_event())
 

If you wants to decode tx['input'] you can use eth-abi https://github.com/ethereum/eth-abi

or if you have the contract abi you can create web3 contract and call decode_function_input()

wa_contract = web3.eth.contract(address=watched_address, abi=watched_address_abi)  
func_obj, func_params = wa_contract.decode_function_input(tx['input'])
nstoichkov
  • 41
  • 1
  • 4