-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathipfs_example.py
More file actions
33 lines (30 loc) · 1 KB
/
ipfs_example.py
File metadata and controls
33 lines (30 loc) · 1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
from nodepulse import NodePulse
import requests
def check_ipfs_gateway(node):
# Test CID that should always be available
test_cid = "QmWnfdZkwWJxabDUbimrtaweYF8u9TaESDBM8xvRxxbQxv"
try:
response = requests.get(
f"{node}/ipfs/{test_cid}",
headers={'Range': 'bytes=0-0'}, # Only get first byte
timeout=5
)
if response.status_code in [200, 206]: # 206 for partial content
print(f"✅ IPFS gateway {node} is healthy")
return True
else:
print(f"❌ IPFS gateway {node} returned status {response.status_code}")
return False
except Exception as e:
print(f"❌ Error checking IPFS gateway {node}: {str(e)}")
return False
# Initialize NodePulse for IPFS
node_pulse = NodePulse(
node_type='ipfs',
network='mainnet',
node_count=3,
update_interval=10000 # Set to 10 seconds for testing
)
# Get a node and test it
node = node_pulse.get_node()
check_ipfs_gateway(node)