import argparse
from pprint import pprint
import aiosqlite
import asyncio
import uvloop
import time
import dotenv
from os import environ
from web3 import AsyncWeb3
from web3.providers import WebSocketProvider
# Load environment variables
dotenv.load_dotenv()
# WebSocket provider for Web3 with Particle Network
ARBITRUM_WS_URL = f"wss://rpc.particle.network/evm-chain?chainId=42161&projectUuid={environ.get('PROJECT_ID')}&projectKey={environ.get('PROJECT_SERVER_KEY')}"
web3 = AsyncWeb3(WebSocketProvider(ARBITRUM_WS_URL))
async def enable_wal(conn):
await conn.execute("PRAGMA journal_mode=WAL;")
# Initialize SQLite database and tables asynchronously
async def initialize_db():
conn = await aiosqlite.connect('arbitrum_transactions.db')
await enable_wal(conn)
async with conn.cursor() as c:
# Table to store all transactions
await c.execute('''
CREATE TABLE IF NOT EXISTS transactions (
tx_hash TEXT PRIMARY KEY,
block_number INTEGER,
from_address TEXT,
to_address TEXT,
value TEXT,
gas INTEGER,
gas_price TEXT,
nonce INTEGER,
input_data TEXT
)
''')
# Table to store contract-specific transactions
await c.execute('''
CREATE TABLE IF NOT EXISTS contract_transactions (
tx_hash TEXT PRIMARY KEY,
block_number INTEGER,
from_address TEXT,
to_address TEXT,
value TEXT,
gas INTEGER,
gas_price TEXT,
nonce INTEGER,
input_data TEXT
)
''')
# Table to store contract addresses for filtering
await c.execute('''
CREATE TABLE IF NOT EXISTS contract_addresses (
address TEXT PRIMARY KEY
)
''')
# Table to store address activity (track sending and receiving separately)
await c.execute('''
CREATE TABLE IF NOT EXISTS address_activity (
address TEXT PRIMARY KEY,
sent_transactions INTEGER DEFAULT 0,
received_transactions INTEGER DEFAULT 0
)
''')
await conn.commit()
return conn
# Store a transaction in the 'transactions' table (for all transactions)
async def store_transaction(conn, tx):
async with conn.cursor() as c:
await c.execute('''
INSERT OR IGNORE INTO transactions
(tx_hash, block_number, from_address, to_address, value, gas, gas_price, nonce, input_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
tx['hash'],
tx['blockNumber'],
tx['from'],
tx['to'],
str(tx['value']),
tx['gas'],
str(tx['gasPrice']),
tx['nonce'],
tx['input']
))
# Update activity for the from_address and to_address
await update_address_activity(conn, tx['from'], sent=True)
if tx['to'] is not None:
await update_address_activity(conn, tx['to'], received=True)
# Committing is deferred to batch processing
# Store a transaction in the 'contract_transactions' table (for specific contracts)
async def store_contract_transaction(conn, tx):
async with conn.cursor() as c:
await c.execute('''
INSERT OR IGNORE INTO contract_transactions
(tx_hash, block_number, from_address, to_address, value, gas, gas_price, nonce, input_data)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
tx['hash'],
tx['blockNumber'],
tx['from'],
tx['to'],
str(tx['value']),
tx['gas'],
str(tx['gasPrice']),
tx['nonce'],
tx['input']
))
# Committing is deferred to batch processing
# Update transaction count for an address (separate sent and received)
async def update_address_activity(conn, address, sent=False, received=False):
async with conn.cursor() as c:
if sent:
await c.execute('''
INSERT INTO address_activity (address, sent_transactions, received_transactions)
VALUES (?, 1, 0)
ON CONFLICT(address) DO UPDATE SET sent_transactions = sent_transactions + 1
''', (address,))
if received:
await c.execute('''
INSERT INTO address_activity (address, sent_transactions, received_transactions)
VALUES (?, 0, 1)
ON CONFLICT(address) DO UPDATE SET received_transactions = received_transactions + 1
''', (address,))
# Committing is deferred to batch processing
# Fetch the most active addresses by senders
async def fetch_most_active_senders(conn, n):
async with conn.cursor() as c:
await c.execute('''
SELECT address, sent_transactions
FROM address_activity
ORDER BY sent_transactions DESC
LIMIT ?
''', (n,))
return await c.fetchall()
# Fetch the most active addresses by receivers
async def fetch_most_active_receivers(conn, n):
async with conn.cursor() as c:
await c.execute('''
SELECT address, received_transactions
FROM address_activity
ORDER BY received_transactions DESC
LIMIT ?
''', (n,))
return await c.fetchall()
# Fetch the most active addresses by combined sent and received
async def fetch_most_active_both(conn, n):
async with conn.cursor() as c:
await c.execute('''
SELECT address, (sent_transactions + received_transactions) AS total_transactions
FROM address_activity
ORDER BY total_transactions DESC
LIMIT ?
''', (n,))
return await c.fetchall()
# Add a contract address to the contract_addresses table
async def add_contract_address(conn, address):
async with conn.cursor() as c:
await c.execute('''
INSERT OR IGNORE INTO contract_addresses (address)
VALUES (?)
''', (address,))
await conn.commit()
# Get all contract addresses from the contract_addresses table
async def get_contract_addresses(conn):
async with conn.cursor() as c:
await c.execute('SELECT address FROM contract_addresses')
rows = await c.fetchall()
return {row[0] for row in rows} # Return a set for O(1) lookup
# Asynchronous function to fetch a block using async WebSocket provider with retry logic
async def fetch_block(block_number, sem, retries=3, delay=0.25):
async with sem:
for attempt in range(retries):
try:
block = await web3.eth.get_block(block_number, full_transactions=True)
return block
except Exception as e:
if attempt < retries - 1:
await asyncio.sleep(delay)
delay *= 2 # Exponential backoff
else:
print(f"Failed to fetch block {block_number}: {e}")
return None
# Asynchronous function to handle the indexing of a block's transactions
async def process_block(conn, block, contract_addresses):
# Begin transaction
async with conn.cursor() as c:
await c.execute('BEGIN')
for tx in block['transactions']:
# Store the transaction in the general transactions table
await store_transaction(conn, tx)
# If the transaction is sent to a contract address, store it in the contract_transactions table
if tx['to'] in contract_addresses:
if not any(signature in tx['input'] for signature in ['ed95fd3e', 'e6ca5ebb', '3804ff06']):
pass
else:
pprint(tx)
await store_contract_transaction(conn, tx)
await conn.commit() # Commit after processing the block
# Function to calculate the average block time
def calculate_average_block_time(block_times):
if len(block_times) < 2:
return 15 # Default block time if we don't have enough data
return min(15.0, sum(block_times) / len(block_times)) # Cap sleep time at 15 seconds
# Loop forever to index blocks, calculate block time, and sleep accordingly
async def index_transactions_forever(conn, start_block):
sem = asyncio.Semaphore(10) # Limit concurrent fetches to 10
last_block_time = time.monotonic()
block_times = []
while True:
# Get contract addresses to filter transactions
contract_addresses = await get_contract_addresses(conn)
block_range = range(start_block, start_block + 50)
print(f"Indexing blocks {start_block} to {start_block + 49}")
# Fetch blocks concurrently with limited concurrency
fetch_tasks = [fetch_block(bn, sem) for bn in block_range]
blocks = await asyncio.gather(*fetch_tasks)
# Filter out any failed block fetches
valid_blocks = [block for block in blocks if block is not None]
# Process each block's transactions sequentially to avoid SQLite write conflicts
for block in valid_blocks:
await process_block(conn, block, contract_addresses)
# Update the latest block number
start_block += 50
# Calculate average block time
current_time = time.monotonic()
block_time = current_time - last_block_time
block_times.append(block_time)
if len(block_times) > 50:
block_times.pop(0)
last_block_time = current_time
avg_block_time = calculate_average_block_time(block_times)
print(f"Sleeping for {avg_block_time:.2f} seconds (average block time)")
await asyncio.sleep(avg_block_time)
# Main function with argparse
async def main():
await web3.provider.connect()
parser = argparse.ArgumentParser(description="Arbitrum transaction indexer")
parser.add_argument('--index', action='store_true', help="Index transactions from Arbitrum chain")
parser.add_argument('--start-block', type=int, default=None, help="Start block number to index from")
parser.add_argument('--top-senders', type=int, default=None, help="Retrieve top N active sending addresses")
parser.add_argument('--top-receivers', type=int, default=None, help="Retrieve top N active receiving addresses")
parser.add_argument('--top-both', type=int, default=None,
help="Retrieve top N active addresses by total transactions (sent + received)")
parser.add_argument('--add-contract', type=str, help="Add a contract address to monitor")
args = parser.parse_args()
conn = await initialize_db()
# Add a contract address to the monitoring table
if args.add_contract:
await add_contract_address(conn, args.add_contract)
print(f"Added contract address: {args.add_contract}")
# Index transactions
if args.index:
uvloop.install()
start_block = args.start_block if args.start_block else await web3.eth.block_number
await index_transactions_forever(conn, start_block)
# Retrieve most active sending addresses
if args.top_senders:
top_senders = await fetch_most_active_senders(conn, args.top_senders)
print(f"Top {args.top_senders} active sending addresses:")
for address, count in top_senders:
print(f"Address: {address} Sent Transactions: {count}")
# Retrieve most active receiving addresses
if args.top_receivers:
top_receivers = await fetch_most_active_receivers(conn, args.top_receivers)
print(f"Top {args.top_receivers} active receiving addresses:")
for address, count in top_receivers:
print(f"Address: {address}, Received Transactions: {count}")
# Retrieve most active addresses by total transactions (sent + received)
if args.top_both:
top_both = await fetch_most_active_both(conn, args.top_both)
print(f"Top {args.top_both} active addresses by total transactions (sent + received):")
for address, total in top_both:
print(f"Address: {address}, Total Transactions: {total}")
await conn.close()
if __name__ == '__main__':
# Use uvloop for faster event loop
# Start the asyncio event loop
asyncio.run(main())
I haven’t slept in 3 days, but once I wake up, I’ll have a full database of the most active addresses on Arbitrum. The plan? Parse the data, process it, and find the traders worth copy-trading based on historical averages of key metrics like max drawdown, max profit, and average profit. I also need to calculate risk intelligently—haven’t figured out the exact formula yet, but it’s coming.
Blah, blah. Now, the interesting part: I can monitor Arbitrum without any lag by batching the calls to get_block
. Unfortunately, I can’t do the same with multicall... or can I? Honestly, I’m too fried to figure that out right now. I’m leaning toward no, but who knows. When I’m less sleep-deprived, it’s definitely worth a deeper look.
For risk, I’ll probably use a mix of these formulas. First, there’s the Sharpe Ratio for adjusting performance against volatility:
Sharpe Ratio=R−Rfσ\text{Sharpe Ratio} = \frac{R - R_f}{\sigma}Sharpe Ratio=σR−Rf
Where:
RRR is the trader’s average return
RfR_fRf is the risk-free rate (which is basically zero in DeFi)
σ\sigmaσ is the standard deviation of their returns
Then there’s the good old Max Drawdown—you know, how bad their portfolio tanks:
Max Drawdown=Peak Value−Trough ValuePeak Value\text{Max Drawdown} = \frac{\text{Peak Value} - \text{Trough Value}}{\text{Peak Value}}Max Drawdown=Peak ValuePeak Value−Trough Value
And if I want to get fancy with position sizing, I might throw in the Kelly Criterion for good measure:
f∗=p(b+1)−1bf^* = \frac{p(b+1) - 1}{b}f∗=bp(b+1)−1
Where:
ppp is the probability of winning
bbb is the average win/loss ratio (aka how much profit they make vs. how much they lose on average)
Anyway, that’s the math. But back to the fun part: I’ll soon be dropping some next-level alpha for my paid subscribers. We’re talking anonymous transacting in 2024—how to do it and profit while flying under the radar of blockchain analytics firms. I’ve been refining these strategies for years, and when that Chainalysis video on "tracking XMR" leaked, it confirmed everything I suspected. My protocol for making Monero untraceable works exactly as I thought. I will show you how to deterministically break the link between your XMR and you.
I’ll also be diving into Railgun, how it’s pushing the boundaries of privacy in DeFi, and ways you can make money helping keep this whole privacy thing alive, information you will not find anywhere else. Stay tuned.