formatting cleanup

This commit is contained in:
Jimmy Song 2023-10-17 19:59:21 -05:00
parent 47ae475b33
commit 994358c5ea
17 changed files with 252 additions and 862 deletions

View File

@ -1,4 +1,4 @@
'''
"""
#markdown
# Session Objectives
* Learn Schnorr Signatures
@ -1045,4 +1045,4 @@ True
01000000000101a2b85a1372be6bc9a3d53110a4f142819d653fa6d07fbd4367138145030914200000000000ffffffff0184e4000000000000160014f5a74a3131dedb57a092ae86aad3ee3f9b8d721403403b1681a67f40e6767b2db64744ad3f005d3971645135d58a3e1826d5c960bc281ce187bc9270c51ed7833fcf5e8415501862d51b0ebd051917d9878104778f292220cd04c1bf88ca891af152fc57c36523ab59efb16b7ec07caca0cfc4a1f2051d9eac61c0cd04c1bf88ca891af152fc57c36523ab59efb16b7ec07caca0cfc4a1f2051d9e76f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dffaf5548715217f7a892c7c5ff787a97b6e2f123287a1a354fe3ccda09c39d5d7300000000
#endexercise
'''
"""

View File

@ -1,248 +0,0 @@
from io import BytesIO
from unittest import TestCase
from helper import (
hash256,
int_to_little_endian,
little_endian_to_int,
merkle_root,
read_varint,
)
from tx import Tx
GENESIS_BLOCK_HASH = bytes.fromhex('000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f')
TESTNET_GENESIS_BLOCK_HASH = bytes.fromhex('000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943')
class Block:
command = b'block'
def __init__(self, version, prev_block, merkle_root, timestamp, bits, nonce, tx_hashes=None):
self.version = version
self.prev_block = prev_block
self.merkle_root = merkle_root
self.timestamp = timestamp
self.bits = bits
self.nonce = nonce
self.tx_hashes = tx_hashes
self.merkle_tree = None
@classmethod
def parse_header(cls, s):
'''Takes a byte stream and parses a block. Returns a Block object'''
# s.read(n) will read n bytes from the stream
# version - 4 bytes, little endian, interpret as int
version = little_endian_to_int(s.read(4))
# prev_block - 32 bytes, little endian (use [::-1] to reverse)
prev_block = s.read(32)[::-1]
# merkle_root - 32 bytes, little endian (use [::-1] to reverse)
merkle_root = s.read(32)[::-1]
# timestamp - 4 bytes, little endian, interpret as int
timestamp = little_endian_to_int(s.read(4))
# bits - 4 bytes
bits = s.read(4)
# nonce - 4 bytes
nonce = s.read(4)
# initialize class
return cls(version, prev_block, merkle_root, timestamp, bits, nonce)
@classmethod
def parse(cls, s):
b = cls.parse_header(s)
num_txs = read_varint(s)
tx_hashes = []
for _ in range(num_txs):
t = Tx.parse(s)
tx_hashes.append(t.hash())
b.tx_hashes = tx_hashes
return b
def serialize(self):
'''Returns the 80 byte block header'''
# version - 4 bytes, little endian
result = int_to_little_endian(self.version, 4)
# prev_block - 32 bytes, little endian
result += self.prev_block[::-1]
# merkle_root - 32 bytes, little endian
result += self.merkle_root[::-1]
# timestamp - 4 bytes, little endian
result += int_to_little_endian(self.timestamp, 4)
# bits - 4 bytes
result += self.bits
# nonce - 4 bytes
result += self.nonce
return result
def hash(self):
'''Returns the hash256 interpreted little endian of the block'''
# serialize
s = self.serialize()
# hash256
h256 = hash256(s)
# reverse
return h256[::-1]
def id(self):
'''Human-readable hexadecimal of the block hash'''
return self.hash().hex()
def bip9(self):
'''Returns whether this block is signaling readiness for BIP9'''
# BIP9 is signalled if the top 3 bits are 001
# remember version is 32 bytes so right shift 29 (>> 29) and see if
# that is 001
return self.version >> 29 == 0b001
def bip91(self):
'''Returns whether this block is signaling readiness for BIP91'''
# BIP91 is signalled if the 5th bit from the right is 1
# shift 4 bits to the right and see if the last bit is 1
return self.version >> 4 & 1 == 1
def bip141(self):
'''Returns whether this block is signaling readiness for BIP141'''
# BIP91 is signalled if the 2nd bit from the right is 1
# shift 1 bit to the right and see if the last bit is 1
return self.version >> 1 & 1 == 1
def target(self):
'''Returns the proof-of-work target based on the bits'''
# last byte is exponent
exponent = self.bits[-1]
# the first three bytes are the coefficient in little endian
coefficient = little_endian_to_int(self.bits[:-1])
# the formula is:
# coefficient * 256**(exponent-3)
return coefficient * 256**(exponent - 3)
def difficulty(self):
'''Returns the block difficulty based on the bits'''
# note difficulty is (target of lowest difficulty) / (self's target)
# lowest difficulty has bits that equal 0xffff001d
lowest = 0xffff * 256**(0x1d - 3)
return lowest / self.target()
def check_pow(self):
'''Returns whether this block satisfies proof of work'''
# get the hash256 of the serialization of this block
h256 = hash256(self.serialize())
# interpret this hash as a little-endian number
proof = little_endian_to_int(h256)
# return whether this integer is less than the target
return proof < self.target()
def validate_merkle_root(self):
'''Gets the merkle root of the tx_hashes and checks that it's
the same as the merkle root of this block.
'''
# reverse all the transaction hashes (self.tx_hashes)
hashes = [h[::-1] for h in self.tx_hashes]
# get the Merkle Root
root = merkle_root(hashes)
# reverse the Merkle Root
# return whether self.merkle root is the same as
# the reverse of the calculated merkle root
return root[::-1] == self.merkle_root
class BlockTest(TestCase):
def test_parse(self):
block_raw = bytes.fromhex('0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000')
stream = BytesIO(block_raw)
block = Block.parse(stream)
self.assertEqual(block.merkle_root, block.tx_hashes[0])
def test_parse_header(self):
block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertEqual(block.version, 0x20000002)
want = bytes.fromhex('000000000000000000fd0c220a0a8c3bc5a7b487e8c8de0dfa2373b12894c38e')
self.assertEqual(block.prev_block, want)
want = bytes.fromhex('be258bfd38db61f957315c3f9e9c5e15216857398d50402d5089a8e0fc50075b')
self.assertEqual(block.merkle_root, want)
self.assertEqual(block.timestamp, 0x59a7771e)
self.assertEqual(block.bits, bytes.fromhex('e93c0118'))
self.assertEqual(block.nonce, bytes.fromhex('a4ffd71d'))
def test_serialize(self):
block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertEqual(block.serialize(), block_raw)
def test_hash(self):
block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertEqual(block.hash(), bytes.fromhex('0000000000000000007e9e4c586439b0cdbe13b1370bdd9435d76a644d047523'))
def test_bip9(self):
block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertTrue(block.bip9())
block_raw = bytes.fromhex('0400000039fa821848781f027a2e6dfabbf6bda920d9ae61b63400030000000000000000ecae536a304042e3154be0e3e9a8220e5568c3433a9ab49ac4cbb74f8df8e8b0cc2acf569fb9061806652c27')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertFalse(block.bip9())
def test_bip91(self):
block_raw = bytes.fromhex('1200002028856ec5bca29cf76980d368b0a163a0bb81fc192951270100000000000000003288f32a2831833c31a25401c52093eb545d28157e200a64b21b3ae8f21c507401877b5935470118144dbfd1')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertTrue(block.bip91())
block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertFalse(block.bip91())
def test_bip141(self):
block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertTrue(block.bip141())
block_raw = bytes.fromhex('0000002066f09203c1cf5ef1531f24ed21b1915ae9abeb691f0d2e0100000000000000003de0976428ce56125351bae62c5b8b8c79d8297c702ea05d60feabb4ed188b59c36fa759e93c0118b74b2618')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertFalse(block.bip141())
def test_target(self):
block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertEqual(block.target(), 0x13ce9000000000000000000000000000000000000000000)
self.assertEqual(int(block.difficulty()), 888171856257)
def test_check_pow(self):
block_raw = bytes.fromhex('04000000fbedbbf0cfdaf278c094f187f2eb987c86a199da22bbb20400000000000000007b7697b29129648fa08b4bcd13c9d5e60abb973a1efac9c8d573c71c807c56c3d6213557faa80518c3737ec1')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertTrue(block.check_pow())
block_raw = bytes.fromhex('04000000fbedbbf0cfdaf278c094f187f2eb987c86a199da22bbb20400000000000000007b7697b29129648fa08b4bcd13c9d5e60abb973a1efac9c8d573c71c807c56c3d6213557faa80518c3737ec0')
stream = BytesIO(block_raw)
block = Block.parse_header(stream)
self.assertFalse(block.check_pow())
def test_validate_merkle_root(self):
hashes_hex = [
'f54cb69e5dc1bd38ee6901e4ec2007a5030e14bdd60afb4d2f3428c88eea17c1',
'c57c2d678da0a7ee8cfa058f1cf49bfcb00ae21eda966640e312b464414731c1',
'b027077c94668a84a5d0e72ac0020bae3838cb7f9ee3fa4e81d1eecf6eda91f3',
'8131a1b8ec3a815b4800b43dff6c6963c75193c4190ec946b93245a9928a233d',
'ae7d63ffcb3ae2bc0681eca0df10dda3ca36dedb9dbf49e33c5fbe33262f0910',
'61a14b1bbdcdda8a22e61036839e8b110913832efd4b086948a6a64fd5b3377d',
'fc7051c8b536ac87344c5497595d5d2ffdaba471c73fae15fe9228547ea71881',
'77386a46e26f69b3cd435aa4faac932027f58d0b7252e62fb6c9c2489887f6df',
'59cbc055ccd26a2c4c4df2770382c7fea135c56d9e75d3f758ac465f74c025b8',
'7c2bf5687f19785a61be9f46e031ba041c7f93e2b7e9212799d84ba052395195',
'08598eebd94c18b0d59ac921e9ba99e2b8ab7d9fccde7d44f2bd4d5e2e726d2e',
'f0bb99ef46b029dd6f714e4b12a7d796258c48fee57324ebdc0bbc4700753ab1',
]
hashes = [bytes.fromhex(x) for x in hashes_hex]
stream = BytesIO(bytes.fromhex('00000020fcb19f7895db08cadc9573e7915e3919fb76d59868a51d995201000000000000acbcab8bcc1af95d8d563b77d24c3d19b18f1486383d75a5085c4e86c86beed691cfa85916ca061a00000000'))
block = Block.parse_header(stream)
block.tx_hashes = hashes
self.assertTrue(block.validate_merkle_root())

View File

@ -40,7 +40,7 @@ class FieldElement:
# this should be the inverse of the == operator
return not (self == other)
def __repr__(self):
def __reprv__(self):
return f"FieldElement_{self.prime}({self.num})"
def __add__(self, other):
@ -220,7 +220,7 @@ class S256Field(FieldElement):
class S256Point(Point):
def __init__(self, x, y, a=None, b=None):
a, b = S256Field(A), S256Field(B)
if type(x) == int:
if isinstance(x, int):
super().__init__(x=S256Field(x), y=S256Field(y), a=a, b=b)
else:
super().__init__(x=x, y=y, a=a, b=b)
@ -247,7 +247,7 @@ class S256Point(Point):
def __add__(self, other):
"""If other is an int, multiplies scalar by generator, adds result to current point"""
if type(other) == int:
if isinstance(other, int):
return super().__add__(other * G)
else:
return super().__add__(other)
@ -276,13 +276,13 @@ class S256Point(Point):
# otherwise, convert the x coordinate to Big Endian 32 bytes
return int_to_big_endian(self.x.num, 32)
def tweak(self, merkle_root=b''):
def tweak(self, merkle_root=b""):
"""returns the tweak for use in p2tr if there's no script path"""
# take the hash_taptweak of the xonly
tweak = hash_taptweak(self.xonly() + merkle_root)
return tweak
def tweaked_key(self, merkle_root=b''):
def tweaked_key(self, merkle_root=b""):
"""Creates the tweaked external key for a particular tweak."""
# Get the tweak from the tweak method
tweak = self.tweak(merkle_root)
@ -319,10 +319,11 @@ class S256Point(Point):
"""Returns the RedeemScript for a p2sh-p2wpkh redemption"""
return self.p2wpkh_script().redeem_script()
def p2tr_script(self, merkle_root=b''):
def p2tr_script(self, merkle_root=b""):
"""Returns the p2tr ScriptPubKey object"""
# avoid circular dependency
from script import P2TRScriptPubKey
# get the external pubkey
external_pubkey = self.tweaked_key(merkle_root)
# return the P2TRScriptPubKey object
@ -340,7 +341,7 @@ class S256Point(Point):
"""Returns the p2sh-p2wpkh base58 address string"""
return self.p2wpkh_script().p2sh_address(network)
def p2tr_address(self, merkle_root=b'', network="mainnet"):
def p2tr_address(self, merkle_root=b"", network="mainnet"):
"""Returns the p2tr bech32m address string"""
return self.p2tr_script(merkle_root).address(network)
@ -463,34 +464,52 @@ class TapRootTest(TestCase):
hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"
bytes_x = bytes.fromhex(hex_x)
point = S256Point.parse(bytes_x)
self.assertEqual(big_endian_to_int(point.tweak()), 67856885919469038205338506436839711332207972226461300386890540598589929564995)
self.assertEqual(
big_endian_to_int(point.tweak()),
67856885919469038205338506436839711332207972226461300386890540598589929564995,
)
def test_tweaked_key(self):
hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"
bytes_x = bytes.fromhex(hex_x)
point = S256Point.parse(bytes_x)
self.assertEqual(point.tweaked_key().xonly().hex(), "5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf")
self.assertEqual(
point.tweaked_key().xonly().hex(),
"5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf",
)
def test_p2tr_script(self):
hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"
bytes_x = bytes.fromhex(hex_x)
point = S256Point.parse(bytes_x)
self.assertEqual(point.p2tr_script().__repr__(), "OP_1 5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf")
self.assertEqual(
point.p2tr_script().__repr__(),
"OP_1 5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf",
)
class SchnorrTest(TestCase):
def test_verify(self):
msg = sha256(b"I attest to understanding Schnorr Signatures")
sig_raw = bytes.fromhex("f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a")
sig_raw = bytes.fromhex(
"f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a"
)
sig = SchnorrSignature.parse(sig_raw)
point = S256Point.parse(bytes.fromhex("f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"))
point = S256Point.parse(
bytes.fromhex(
"f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"
)
)
self.assertTrue(point.verify_schnorr(msg, sig))
def test_sign(self):
msg = sha256(b"I attest to understanding Schnorr Signatures")
priv = PrivateKey(12345)
sig = priv.sign_schnorr(msg)
self.assertEqual(sig.serialize().hex(), "f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a")
self.assertEqual(
sig.serialize().hex(),
"f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a",
)
class Signature:
@ -691,7 +710,7 @@ class PrivateKey:
# encode_base58_checksum the whole thing
return encode_base58_checksum(prefix + secret_bytes + suffix)
def tweaked_key(self, merkle_root=b''):
def tweaked_key(self, merkle_root=b""):
# get the tweak from the point's tweak method
tweak = self.point.tweak(merkle_root)
# t is the tweak interpreted as big endian
@ -730,7 +749,6 @@ class PrivateKey:
class PrivateKeyTest(TestCase):
def test_tweaked_key(self):
secret = randint(1, N)
priv = PrivateKey(secret)

View File

@ -1,8 +1,6 @@
import hashlib
import hmac
import re
from base64 import b64decode, b64encode
from unittest import TestSuite, TextTestRunner
try:

View File

@ -1,486 +0,0 @@
import socket
import time
from io import BytesIO
from random import randint
from time import sleep
from unittest import TestCase
from block import Block
from helper import (
hash256,
decode_base58,
encode_varint,
int_to_little_endian,
little_endian_to_int,
read_varint,
)
from tx import Tx
TX_DATA_TYPE = 1
BLOCK_DATA_TYPE = 2
FILTERED_BLOCK_DATA_TYPE = 3
COMPACT_BLOCK_DATA_TYPE = 4
NETWORK_MAGIC = b'\xf9\xbe\xb4\xd9'
TESTNET_NETWORK_MAGIC = b'\x0b\x11\x09\x07'
class NetworkEnvelope:
def __init__(self, command, payload, testnet=False):
self.command = command
self.payload = payload
if testnet:
self.magic = TESTNET_NETWORK_MAGIC
else:
self.magic = NETWORK_MAGIC
def __repr__(self):
return '{}: {}'.format(
self.command.decode('ascii'),
self.payload.hex(),
)
@classmethod
def parse(cls, s, testnet=False):
'''Takes a stream and creates a NetworkEnvelope'''
# check the network magic
magic = s.read(4)
if magic == b'':
raise RuntimeError('Connection reset!')
if testnet:
expected_magic = TESTNET_NETWORK_MAGIC
else:
expected_magic = NETWORK_MAGIC
if magic != expected_magic:
raise RuntimeError('magic is not right {} vs {}'.format(magic.hex(), expected_magic.hex()))
# command 12 bytes, strip the trailing 0's using .strip(b'\x00')
command = s.read(12).strip(b'\x00')
# payload length 4 bytes, little endian
payload_length = little_endian_to_int(s.read(4))
# checksum 4 bytes, first four of hash256 of payload
checksum = s.read(4)
# payload is of length payload_length
payload = s.read(payload_length)
# verify checksum
calculated_checksum = hash256(payload)[:4]
if calculated_checksum != checksum:
raise RuntimeError('checksum does not match')
return cls(command, payload, testnet=testnet)
def serialize(self):
'''Returns the byte serialization of the entire network message'''
# add the network magic using self.magic
result = self.magic
# command 12 bytes, fill leftover with b'\x00' * (12 - len(self.command))
result += self.command + b'\x00' * (12 - len(self.command))
# payload length 4 bytes, little endian
result += int_to_little_endian(len(self.payload), 4)
# checksum 4 bytes, first four of hash256 of payload
result += hash256(self.payload)[:4]
# payload
result += self.payload
return result
def stream(self):
'''Returns a stream for parsing the payload'''
return BytesIO(self.payload)
class NetworkEnvelopeTest(TestCase):
def test_parse(self):
msg = bytes.fromhex('f9beb4d976657261636b000000000000000000005df6e0e2')
stream = BytesIO(msg)
envelope = NetworkEnvelope.parse(stream)
self.assertEqual(envelope.command, b'verack')
self.assertEqual(envelope.payload, b'')
msg = bytes.fromhex('f9beb4d976657273696f6e0000000000650000005f1a69d2721101000100000000000000bc8f5e5400000000010000000000000000000000000000000000ffffc61b6409208d010000000000000000000000000000000000ffffcb0071c0208d128035cbc97953f80f2f5361746f7368693a302e392e332fcf05050001')
stream = BytesIO(msg)
envelope = NetworkEnvelope.parse(stream)
self.assertEqual(envelope.command, b'version')
self.assertEqual(envelope.payload, msg[24:])
def test_serialize(self):
msg = bytes.fromhex('f9beb4d976657261636b000000000000000000005df6e0e2')
stream = BytesIO(msg)
envelope = NetworkEnvelope.parse(stream)
self.assertEqual(envelope.serialize(), msg)
msg = bytes.fromhex('f9beb4d976657273696f6e0000000000650000005f1a69d2721101000100000000000000bc8f5e5400000000010000000000000000000000000000000000ffffc61b6409208d010000000000000000000000000000000000ffffcb0071c0208d128035cbc97953f80f2f5361746f7368693a302e392e332fcf05050001')
stream = BytesIO(msg)
envelope = NetworkEnvelope.parse(stream)
self.assertEqual(envelope.serialize(), msg)
class VersionMessage:
command = b'version'
def __init__(self, version=70015, services=0, timestamp=None,
receiver_services=0,
receiver_ip=b'\x00\x00\x00\x00', receiver_port=8333,
sender_services=0,
sender_ip=b'\x00\x00\x00\x00', sender_port=8333,
nonce=None, user_agent=b'/programmingblockchain:0.1/',
latest_block=0, relay=True):
self.version = version
self.services = services
if timestamp is None:
self.timestamp = int(time.time())
else:
self.timestamp = timestamp
self.receiver_services = receiver_services
self.receiver_ip = receiver_ip
self.receiver_port = receiver_port
self.sender_services = sender_services
self.sender_ip = sender_ip
self.sender_port = sender_port
if nonce is None:
self.nonce = int_to_little_endian(randint(0, 2**64), 8)
else:
self.nonce = nonce
self.user_agent = user_agent
self.latest_block = latest_block
self.relay = relay
def serialize(self):
'''Serialize this message to send over the network'''
# version is 4 bytes little endian
result = int_to_little_endian(self.version, 4)
# services is 8 bytes little endian
result += int_to_little_endian(self.services, 8)
# timestamp is 8 bytes little endian
result += int_to_little_endian(self.timestamp, 8)
# receiver services is 8 bytes little endian
result += int_to_little_endian(self.receiver_services, 8)
# IPV4 is 10 00 bytes and 2 ff bytes then receiver ip
result += b'\x00' * 10 + b'\xff\xff' + self.receiver_ip
# receiver port is 2 bytes, little endian
result += int_to_little_endian(self.receiver_port, 2)
# sender services is 8 bytes little endian
result += int_to_little_endian(self.sender_services, 8)
# IPV4 is 10 00 bytes and 2 ff bytes then sender ip
result += b'\x00' * 10 + b'\xff\xff' + self.sender_ip
# sender port is 2 bytes, little endian
result += int_to_little_endian(self.sender_port, 2)
# nonce
result += self.nonce
# useragent is a variable string, so varint first
result += encode_varint(len(self.user_agent))
result += self.user_agent
# latest block is 4 bytes little endian
result += int_to_little_endian(self.latest_block, 4)
# relay is 00 if false, 01 if true
if self.relay:
result += b'\x01'
else:
result += b'\x00'
return result
class VersionMessageTest(TestCase):
def test_serialize(self):
v = VersionMessage(timestamp=0, nonce=b'\x00' * 8)
self.assertEqual(v.serialize().hex(), '7f11010000000000000000000000000000000000000000000000000000000000000000000000ffff000000008d20000000000000000000000000000000000000ffff000000008d2000000000000000001b2f70726f6772616d6d696e67626c6f636b636861696e3a302e312f0000000001')
class VerAckMessage:
command = b'verack'
def __init__(self):
pass
@classmethod
def parse(cls, s):
return cls()
def serialize(self):
return b''
class PingMessage:
command = b'ping'
def __init__(self, nonce):
self.nonce = nonce
@classmethod
def parse(cls, s):
nonce = s.read(8)
return cls(nonce)
def serialize(self):
return self.nonce
class PongMessage:
command = b'pong'
def __init__(self, nonce):
self.nonce = nonce
def parse(cls, s):
nonce = s.read(8)
return cls(nonce)
def serialize(self):
return self.nonce
class GetHeadersMessage:
command = b'getheaders'
def __init__(self, version=70015, num_hashes=1, start_block=None, end_block=None):
self.version = version
self.num_hashes = num_hashes
if start_block is None:
raise RuntimeError('a start block is required')
self.start_block = start_block
if end_block is None:
self.end_block = b'\x00' * 32
else:
self.end_block = end_block
def serialize(self):
'''Serialize this message to send over the network'''
# protocol version is 4 bytes little-endian
result = int_to_little_endian(self.version, 4)
# number of hashes is a varint
result += encode_varint(self.num_hashes)
# start block is in little-endian
result += self.start_block[::-1]
# end block is also in little-endian
result += self.end_block[::-1]
return result
class GetHeadersMessageTest(TestCase):
def test_serialize(self):
block_hex = '0000000000000000001237f46acddf58578a37e213d2a6edc4884a2fcad05ba3'
gh = GetHeadersMessage(start_block=bytes.fromhex(block_hex))
self.assertEqual(gh.serialize().hex(), '7f11010001a35bd0ca2f4a88c4eda6d213e2378a5758dfcd6af437120000000000000000000000000000000000000000000000000000000000000000000000000000000000')
class HeadersMessage:
command = b'headers'
def __init__(self, headers):
self.headers = headers
@classmethod
def parse(cls, s):
# number of headers is in a varint
num_headers = read_varint(s)
# initialize the headers array
headers = []
# loop through number of headers times
for _ in range(num_headers):
# add a header to the headers array by using Block.parse_header(s)
headers.append(Block.parse_header(s))
# read the next varint (num_txs)
num_txs = read_varint(s)
# num_txs should be 0 or raise a RuntimeError
if num_txs != 0:
raise RuntimeError('number of txs not 0')
# return a class instance
return cls(headers)
def is_valid(self):
'''Return whether the headers satisfy proof-of-work and are sequential'''
last_block = None
for h in self.headers:
if not h.check_pow():
return False
if last_block and h.prev_block != last_block:
return False
last_block = h.hash()
return True
class HeadersMessageTest(TestCase):
def test_parse(self):
hex_msg = '0200000020df3b053dc46f162a9b00c7f0d5124e2676d47bbe7c5d0793a500000000000000ef445fef2ed495c275892206ca533e7411907971013ab83e3b47bd0d692d14d4dc7c835b67d8001ac157e670000000002030eb2540c41025690160a1014c577061596e32e426b712c7ca00000000000000768b89f07044e6130ead292a3f51951adbd2202df447d98789339937fd006bd44880835b67d8001ade09204600'
stream = BytesIO(bytes.fromhex(hex_msg))
headers = HeadersMessage.parse(stream)
self.assertEqual(len(headers.headers), 2)
for b in headers.headers:
self.assertEqual(b.__class__, Block)
class GetDataMessage:
command = b'getdata'
def __init__(self):
self.data = []
def add_data(self, data_type, identifier):
self.data.append((data_type, identifier))
def serialize(self):
# start with the number of items as a varint
result = encode_varint(len(self.data))
for data_type, identifier in self.data:
# data type is 4 bytes little endian
result += int_to_little_endian(data_type, 4)
# identifier needs to be in little endian
result += identifier[::-1]
return result
class GetDataMessageTest(TestCase):
def test_serialize(self):
hex_msg = '020300000030eb2540c41025690160a1014c577061596e32e426b712c7ca00000000000000030000001049847939585b0652fba793661c361223446b6fc41089b8be00000000000000'
get_data = GetDataMessage()
block1 = bytes.fromhex('00000000000000cac712b726e4326e596170574c01a16001692510c44025eb30')
get_data.add_data(FILTERED_BLOCK_DATA_TYPE, block1)
block2 = bytes.fromhex('00000000000000beb88910c46f6b442312361c6693a7fb52065b583979844910')
get_data.add_data(FILTERED_BLOCK_DATA_TYPE, block2)
self.assertEqual(get_data.serialize().hex(), hex_msg)
class GenericMessage:
def __init__(self, command, payload):
self.command = command
self.payload = payload
def serialize(self):
return self.payload
class SimpleNode:
def __init__(self, host, port=None, testnet=False, logging=False):
if port is None:
if testnet:
port = 18333
else:
port = 8333
self.testnet = testnet
self.logging = logging
# connect to socket
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((host, port))
# create a stream that we can use with the rest of the library
self.stream = self.socket.makefile('rb', None)
def handshake(self):
'''Do a handshake with the other node. Handshake is sending a version message and getting a verack back.'''
# create a version message
version = VersionMessage()
# send the command
self.send(version)
# wait for a verack message
self.wait_for(VerAckMessage)
def send(self, message):
'''Send a message to the connected node'''
# create a network envelope
envelope = NetworkEnvelope(
message.command, message.serialize(), testnet=self.testnet)
if self.logging:
print('sending: {}'.format(envelope))
# send the serialized envelope over the socket using sendall
self.socket.sendall(envelope.serialize())
def read(self):
'''Read a message from the socket'''
envelope = NetworkEnvelope.parse(self.stream, testnet=self.testnet)
if self.logging:
print('receiving: {}'.format(envelope))
return envelope
def wait_for(self, *message_classes):
'''Wait for one of the messages in the list'''
# initialize the command we have, which should be None
command = None
command_to_class = {m.command: m for m in message_classes}
# loop until the command is in the commands we want
while command not in command_to_class.keys():
# get the next network message
envelope = self.read()
# set the command to be evaluated
command = envelope.command
# we know how to respond to version and ping, handle that here
if command == VersionMessage.command:
# send verack
self.send(VerAckMessage())
elif command == PingMessage.command:
# send pong
self.send(PongMessage(envelope.payload))
# return the envelope parsed as a member of the right message class
return command_to_class[command].parse(envelope.stream())
def get_filtered_txs(self, block_hashes):
'''Returns transactions that match the bloom filter'''
from merkleblock import MerkleBlock
# create a getdata message
getdata = GetDataMessage()
# for each block request the filtered block
for block_hash in block_hashes:
# add_data (FILTERED_BLOCK_DATA_TYPE, block_hash) to request the block
getdata.add_data(FILTERED_BLOCK_DATA_TYPE, block_hash)
# send the getdata message
self.send(getdata)
# initialize the results array we'll send back
results = []
# for each block hash
for block_hash in block_hashes:
# wait for the merkleblock command
mb = self.wait_for(MerkleBlock)
# check that the merkle block's hash is the same as the block hash
if mb.hash() != block_hash:
raise RuntimeError('Wrong block sent')
# check that the merkle block is valid
if not mb.is_valid():
raise RuntimeError('Merkle Proof is invalid')
# loop through the proved transactions from the Merkle block
for tx_hash in mb.proved_txs():
# wait for the tx command
tx_obj = self.wait_for(Tx)
# check that the hash matches
if tx_obj.hash() != tx_hash:
raise RuntimeError('Wrong tx sent {} vs {}'.format(tx_hash.hex(), tx_obj.id()))
# add to the results
results.append(tx_obj)
# return the results
return results
def is_tx_accepted(self, tx_obj):
'''Returns whether a transaction has been accepted on the network'''
# sleep for a second to let everything propagate
sleep(1)
# create a GetDataMessage
get_data = GetDataMessage()
# ask for the tx
get_data.add_data(TX_DATA_TYPE, tx_obj.hash())
# send the GetDataMessage
self.send(get_data)
# now wait for a response
got_tx = self.wait_for(Tx)
if got_tx.id() == tx_obj.id():
return True
class SimpleNodeTest(TestCase):
def test_handshake(self):
node = SimpleNode('tbtc.programmingblockchain.com', testnet=True)
node.handshake()
def test_get_filtered_txs(self):
from bloomfilter import BloomFilter
bf = BloomFilter(30, 5, 90210)
h160 = decode_base58('mseRGXB89UTFVkWJhTRTzzZ9Ujj4ZPbGK5')
bf.add(h160)
node = SimpleNode('tbtc.programmingblockchain.com', testnet=True)
node.handshake()
node.send(bf.filterload())
block_hash = bytes.fromhex('00000000000377db7fde98411876c53e318a395af7304de298fd47b7c549d125')
txs = node.get_filtered_txs([block_hash])
self.assertEqual(txs[0].id(), '0c024b9d3aa2ae8faae96603b8d40c88df2fc6bf50b3f446295206f70f3cf6ad')
self.assertEqual(txs[1].id(), '0886537e27969a12478e0d33707bf6b9fe4fdaec8d5d471b5304453b04135e7e')
self.assertEqual(txs[2].id(), '23d4effc88b80fb7dbcc2e6a0b0af9821c6fe3bb4c8dc3b61bcab7c45f0f6888')

View File

@ -1260,10 +1260,19 @@ OP_CODE_NAMES = {
class TapScriptTest(TestCase):
def test_opchecksigadd(self):
from tx import Tx
hex_tx = "010000000001022373cf02ce7df6500ae46a4a0fbbb1b636d2debed8f2df91e2415627397a34090000000000fdffffff88c23d928893cd3509845516cf8411b7cab2738c054cc5ce7e4bde9586997c770000000000fdffffff0200000000000000002b6a29676d20746170726f6f7420f09fa5952068747470733a2f2f626974636f696e6465766b69742e6f72676e9e1100000000001976a91405070d0290da457409a37db2e294c1ffbc52738088ac04410adf90fd381d4a13c3e73740b337b230701189ed94abcb4030781635f035e6d3b50b8506470a68292a2bc74745b7a5732a28254b5f766f09e495929ec308090b01004620c13e6d193f5d04506723bd67abcc5d31b610395c445ac6744cb0a1846b3aabaeac20b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333cba519c21c0000000000000000000000000000000000000000000000000000000000000000104414636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301004620c13e6d193f5d04506723bd67abcc5d31b610395c445ac6744cb0a1846b3aabaeac20b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333cba519c21c0000000000000000000000000000000000000000000000000000000000000000100000000"
stream = BytesIO(bytes.fromhex(hex_tx))
tx_obj = Tx.parse(stream)
self.assertTrue(tx_obj.verify())
stack = [bytes.fromhex("4636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301"),b"",bytes.fromhex("b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333c")]
stack = [
bytes.fromhex(
"4636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301"
),
b"",
bytes.fromhex(
"b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333c"
),
]
op_checksigadd_schnorr(stack, tx_obj, 1)
self.assertEqual(stack, [b'\x01'])
self.assertEqual(stack, [b"\x01"])

View File

@ -1,7 +1,6 @@
from io import BytesIO
from bech32 import decode_bech32, encode_bech32_checksum
from ecc import S256Point
from helper import (
decode_base58,
encode_base58_checksum,
@ -37,7 +36,7 @@ class Script:
def __repr__(self):
result = ""
for command in self.commands:
if type(command) == int:
if isinstance(command, int):
if OP_CODE_NAMES.get(command):
name = OP_CODE_NAMES.get(command)
else:
@ -123,7 +122,7 @@ class Script:
# go through each command
for command in self.commands:
# if the command is an integer, it's an op code
if type(command) == int:
if isinstance(command, int):
# turn the command into a single byte integer using int_to_byte
result += int_to_byte(command)
else:
@ -166,7 +165,7 @@ class Script:
op_lookup = OP_CODE_FUNCTIONS
while len(commands) > 0:
command = commands.pop(0)
if type(command) == int:
if isinstance(command, int):
# do what the op code says
operation = op_lookup[command]
if command in (99, 100):
@ -198,7 +197,7 @@ class Script:
if (
len(commands) == 3
and commands[0] == 0xA9
and type(commands[1]) == bytes
and isinstance(commands[1], bytes)
and len(commands[1]) == 20
and commands[2] == 0x87
):
@ -264,7 +263,9 @@ class Script:
external_pubkey = control_block.external_pubkey(tap_script)
# the tweak point should be what's on the stack
if external_pubkey.parity != control_block.parity:
print(f"bad external pubkey parity {external_pubkey.parity} vs {control_block.parity}")
print(
f"bad external pubkey parity {external_pubkey.parity} vs {control_block.parity}"
)
return False
if external_pubkey.xonly() != stack.pop():
print("bad external pubkey")
@ -289,7 +290,7 @@ class Script:
len(self.commands) == 5
and self.commands[0] == 0x76
and self.commands[1] == 0xA9
and type(self.commands[2]) == bytes
and isinstance(self.commands[2], bytes)
and len(self.commands[2]) == 20
and self.commands[3] == 0x88
and self.commands[4] == 0xAC
@ -303,7 +304,7 @@ class Script:
return (
len(self.commands) == 3
and self.commands[0] == 0xA9
and type(self.commands[1]) == bytes
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 20
and self.commands[2] == 0x87
)
@ -314,7 +315,7 @@ class Script:
return (
len(self.commands) == 2
and self.commands[0] == 0x00
and type(self.commands[1]) == bytes
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 20
)
@ -324,7 +325,7 @@ class Script:
return (
len(self.commands) == 2
and self.commands[0] == 0x00
and type(self.commands[1]) == bytes
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 32
)
@ -334,7 +335,7 @@ class Script:
return (
len(self.commands) == 2
and self.commands[0] == 0x51
and type(self.commands[1]) == bytes
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 32
)
@ -375,7 +376,7 @@ class ScriptPubKey(Script):
class P2PKHScriptPubKey(ScriptPubKey):
def __init__(self, h160):
super().__init__()
if type(h160) != bytes:
if not isinstance(h160, bytes):
raise TypeError("To initialize P2PKHScriptPubKey, a hash160 is needed")
self.commands = [0x76, 0xA9, h160, 0x88, 0xAC]
@ -394,7 +395,7 @@ class P2PKHScriptPubKey(ScriptPubKey):
class P2SHScriptPubKey(ScriptPubKey):
def __init__(self, h160):
super().__init__()
if type(h160) != bytes:
if not isinstance(h160, bytes):
raise TypeError("To initialize P2SHScriptPubKey, a hash160 is needed")
self.commands = [0xA9, h160, 0x87]
@ -515,7 +516,7 @@ class SegwitPubKey(ScriptPubKey):
class P2WPKHScriptPubKey(SegwitPubKey):
def __init__(self, h160):
super().__init__()
if type(h160) != bytes:
if not isinstance(h160, bytes):
raise TypeError("To initialize P2WPKHScriptPubKey, a hash160 is needed")
self.commands = [0x00, h160]
@ -523,7 +524,7 @@ class P2WPKHScriptPubKey(SegwitPubKey):
class P2WSHScriptPubKey(SegwitPubKey):
def __init__(self, s256):
super().__init__()
if type(s256) != bytes:
if not isinstance(s256, bytes):
raise TypeError("To initialize P2WSHScriptPubKey, a sha256 is needed")
self.commands = [0x00, s256]
@ -533,7 +534,7 @@ class P2TRScriptPubKey(ScriptPubKey):
super().__init__()
if hasattr(point, "xonly"):
raw_point = point.xonly()
elif type(point) == bytes:
elif isinstance(point, bytes):
raw_point = point
else:
raise TypeError(
@ -585,8 +586,8 @@ class WitnessScript(Script):
def is_p2wsh_multisig(self):
return (
OP_CODE_NAMES[self.commands[-1]] == "OP_CHECKMULTISIG"
and type(self.commands[0]) == int
and type(self.commands[-2]) == int
and isinstance(self.commands[0], int)
and isinstance(self.commands[-2], int)
)
def get_quorum(self):

View File

@ -5,27 +5,21 @@ from ecc import S256Point
from hash import (
hash_tapbranch,
hash_tapleaf,
hash_taptweak,
)
from helper import (
int_to_byte,
)
from op import (
encode_minimal_num,
number_to_op_code,
)
from helper import int_to_byte
from op import encode_minimal_num
from script import ScriptPubKey
from timelock import Locktime, Sequence
def locktime_commands(locktime):
assert type(locktime) == Locktime, f"{locktime} needs to be Locktime"
assert isinstance(locktime, Locktime), f"{locktime} needs to be Locktime"
# 0xB1 is OP_CLTV, 0x75 is OP_DROP
return [encode_minimal_num(locktime), 0xB1, 0x75]
def sequence_commands(sequence):
assert type(sequence) == Sequence, f"{sequence} needs to be Sequence"
assert isinstance(sequence, Sequence), f"{sequence} needs to be Sequence"
# 0xB2 is OP_CSV, 0x75 is OP_DROP
return [encode_minimal_num(sequence), 0xB2, 0x75]
@ -40,7 +34,7 @@ class TapLeaf:
def __eq__(self, other):
return (
type(self) == type(other)
type(self) is type(other)
and self.tapleaf_version == other.tapleaf_version
and self.tap_script == other.tap_script
)
@ -190,32 +184,71 @@ class ControlBlock:
class TapRootTest(TestCase):
def test_tapleaf_hash(self):
tap_script = TapScript.parse(BytesIO(bytes.fromhex("4420331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeecad20158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16fac")))
tap_script = TapScript.parse(
BytesIO(
bytes.fromhex(
"4420331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeecad20158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16fac"
)
)
)
tap_leaf = TapLeaf(tap_script)
self.assertEqual(tap_leaf.hash().hex(), "d1b3ee8e8c175e5db7e2ff7a87435e8f751d148b77fb1f00e14ff8ffa1c09a40")
self.assertEqual(
tap_leaf.hash().hex(),
"d1b3ee8e8c175e5db7e2ff7a87435e8f751d148b77fb1f00e14ff8ffa1c09a40",
)
def test_tapbranch_hash(self):
pubkey_1 = S256Point.parse(bytes.fromhex("331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec")).xonly()
pubkey_2 = S256Point.parse(bytes.fromhex("158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f")).xonly()
pubkey_1 = S256Point.parse(
bytes.fromhex(
"331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec"
)
).xonly()
pubkey_2 = S256Point.parse(
bytes.fromhex(
"158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f"
)
).xonly()
tap_script_1 = TapScript([pubkey_1, 0xAC])
tap_script_2 = TapScript([pubkey_2, 0xAC])
tap_leaf_1 = TapLeaf(tap_script_1)
tap_leaf_2 = TapLeaf(tap_script_2)
tap_branch = TapBranch(tap_leaf_1, tap_leaf_2)
self.assertEqual(tap_branch.hash().hex(), "eb792962b250f4a49a572ba7136674a28f2398a49c4c078fecfc839260da6151")
self.assertEqual(
tap_branch.hash().hex(),
"eb792962b250f4a49a572ba7136674a28f2398a49c4c078fecfc839260da6151",
)
def test_control_block(self):
raw_cb = bytes.fromhex("c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e")
raw_cb = bytes.fromhex(
"c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e"
)
cb = ControlBlock.parse(raw_cb)
pubkey = bytes.fromhex("027aa71d9cdb31cd8fe037a6f441e624fe478a2deece7affa840312b14e971a4")
pubkey = bytes.fromhex(
"027aa71d9cdb31cd8fe037a6f441e624fe478a2deece7affa840312b14e971a4"
)
tap_script = TapScript([pubkey, 0xAC])
external_pubkey = cb.external_pubkey(tap_script)
self.assertEqual(external_pubkey.xonly().hex(), "cbe433288ae1eede1f24818f08046d4e647fef808cfbbffc7d10f24a698eecfd")
self.assertEqual(
external_pubkey.xonly().hex(),
"cbe433288ae1eede1f24818f08046d4e647fef808cfbbffc7d10f24a698eecfd",
)
def test_control_block_2(self):
pubkey_1 = S256Point.parse(bytes.fromhex("331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec"))
pubkey_2 = S256Point.parse(bytes.fromhex("158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f"))
pubkey_3 = S256Point.parse(bytes.fromhex("582662e8e47df59489d6756615aa3db3fa3bbaa75a424b9c78036265858f5544"))
pubkey_1 = S256Point.parse(
bytes.fromhex(
"331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec"
)
)
pubkey_2 = S256Point.parse(
bytes.fromhex(
"158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f"
)
)
pubkey_3 = S256Point.parse(
bytes.fromhex(
"582662e8e47df59489d6756615aa3db3fa3bbaa75a424b9c78036265858f5544"
)
)
tap_script_1 = TapScript([pubkey_1.xonly(), 0xAC])
tap_script_2 = TapScript([pubkey_2.xonly(), 0xAC])
tap_script_3 = TapScript([pubkey_3.xonly(), 0xAC])
@ -225,7 +258,11 @@ class TapRootTest(TestCase):
tap_branch_1 = TapBranch(tap_leaf_1, tap_leaf_2)
tap_root = TapBranch(tap_branch_1, tap_leaf_3)
hex_cb = "c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e"
internal_pubkey = S256Point.parse(bytes.fromhex("407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e5"))
internal_pubkey = S256Point.parse(
bytes.fromhex(
"407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e5"
)
)
cb = tap_root.control_block(internal_pubkey, tap_leaf_2)
self.assertEqual(cb.serialize().hex(), hex_cb)

View File

@ -42,7 +42,7 @@ class Locktime(int):
)
def __lt__(self, other):
if type(other) == int:
if isinstance(other, int):
return super().__lt__(other)
if self.is_comparable(other):
return super().__lt__(other)
@ -108,7 +108,7 @@ class Sequence(int):
)
def __lt__(self, other):
if type(other) == int:
if isinstance(other, int):
return super().__lt__(other)
if self.is_comparable(other):
return self & SEQUENCE_MASK < other & SEQUENCE_MASK

View File

@ -4,7 +4,6 @@ from urllib.request import Request, urlopen
import json
from bech32 import decode_bech32
from ecc import SchnorrSignature
from hash import hash_tapsighash
from helper import (
big_endian_to_int,

View File

@ -40,7 +40,7 @@ class FieldElement:
# this should be the inverse of the == operator
return not (self == other)
def __repr__(self):
def __reprv__(self):
return f"FieldElement_{self.prime}({self.num})"
def __add__(self, other):
@ -220,7 +220,7 @@ class S256Field(FieldElement):
class S256Point(Point):
def __init__(self, x, y, a=None, b=None):
a, b = S256Field(A), S256Field(B)
if type(x) == int:
if isinstance(x, int):
super().__init__(x=S256Field(x), y=S256Field(y), a=a, b=b)
else:
super().__init__(x=x, y=y, a=a, b=b)
@ -247,7 +247,7 @@ class S256Point(Point):
def __add__(self, other):
"""If other is an int, multiplies scalar by generator, adds result to current point"""
if type(other) == int:
if isinstance(other, int):
return super().__add__(other * G)
else:
return super().__add__(other)
@ -276,13 +276,13 @@ class S256Point(Point):
# otherwise, convert the x coordinate to Big Endian 32 bytes
return int_to_big_endian(self.x.num, 32)
def tweak(self, merkle_root=b''):
def tweak(self, merkle_root=b""):
"""returns the tweak for use in p2tr if there's no script path"""
# take the hash_taptweak of the xonly
tweak = hash_taptweak(self.xonly() + merkle_root)
return tweak
def tweaked_key(self, merkle_root=b''):
def tweaked_key(self, merkle_root=b""):
"""Creates the tweaked external key for a particular tweak."""
# Get the tweak from the tweak method
tweak = self.tweak(merkle_root)
@ -319,10 +319,11 @@ class S256Point(Point):
"""Returns the RedeemScript for a p2sh-p2wpkh redemption"""
return self.p2wpkh_script().redeem_script()
def p2tr_script(self, merkle_root=b''):
def p2tr_script(self, merkle_root=b""):
"""Returns the p2tr ScriptPubKey object"""
# avoid circular dependency
from script import P2TRScriptPubKey
# get the external pubkey
external_pubkey = self.tweaked_key(merkle_root)
# return the P2TRScriptPubKey object
@ -340,7 +341,7 @@ class S256Point(Point):
"""Returns the p2sh-p2wpkh base58 address string"""
return self.p2wpkh_script().p2sh_address(network)
def p2tr_address(self, merkle_root=b'', network="mainnet"):
def p2tr_address(self, merkle_root=b"", network="mainnet"):
"""Returns the p2tr bech32m address string"""
return self.p2tr_script(merkle_root).address(network)
@ -463,34 +464,52 @@ class TapRootTest(TestCase):
hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"
bytes_x = bytes.fromhex(hex_x)
point = S256Point.parse(bytes_x)
self.assertEqual(big_endian_to_int(point.tweak()), 67856885919469038205338506436839711332207972226461300386890540598589929564995)
self.assertEqual(
big_endian_to_int(point.tweak()),
67856885919469038205338506436839711332207972226461300386890540598589929564995,
)
def test_tweaked_key(self):
hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"
bytes_x = bytes.fromhex(hex_x)
point = S256Point.parse(bytes_x)
self.assertEqual(point.tweaked_key().xonly().hex(), "5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf")
self.assertEqual(
point.tweaked_key().xonly().hex(),
"5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf",
)
def test_p2tr_script(self):
hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"
bytes_x = bytes.fromhex(hex_x)
point = S256Point.parse(bytes_x)
self.assertEqual(point.p2tr_script().__repr__(), "OP_1 5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf")
self.assertEqual(
point.p2tr_script().__repr__(),
"OP_1 5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf",
)
class SchnorrTest(TestCase):
def test_verify(self):
msg = sha256(b"I attest to understanding Schnorr Signatures")
sig_raw = bytes.fromhex("f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a")
sig_raw = bytes.fromhex(
"f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a"
)
sig = SchnorrSignature.parse(sig_raw)
point = S256Point.parse(bytes.fromhex("f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"))
point = S256Point.parse(
bytes.fromhex(
"f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f"
)
)
self.assertTrue(point.verify_schnorr(msg, sig))
def test_sign(self):
msg = sha256(b"I attest to understanding Schnorr Signatures")
priv = PrivateKey(12345)
sig = priv.sign_schnorr(msg)
self.assertEqual(sig.serialize().hex(), "f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a")
self.assertEqual(
sig.serialize().hex(),
"f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a",
)
class Signature:
@ -691,7 +710,7 @@ class PrivateKey:
# encode_base58_checksum the whole thing
return encode_base58_checksum(prefix + secret_bytes + suffix)
def tweaked_key(self, merkle_root=b''):
def tweaked_key(self, merkle_root=b""):
# get the tweak from the point's tweak method
tweak = self.point.tweak(merkle_root)
# t is the tweak interpreted as big endian
@ -730,7 +749,6 @@ class PrivateKey:
class PrivateKeyTest(TestCase):
def test_tweaked_key(self):
secret = randint(1, N)
priv = PrivateKey(secret)

View File

@ -1,8 +1,6 @@
import hashlib
import hmac
import re
from base64 import b64decode, b64encode
from unittest import TestSuite, TextTestRunner
try:

View File

@ -1260,10 +1260,19 @@ OP_CODE_NAMES = {
class TapScriptTest(TestCase):
def test_opchecksigadd(self):
from tx import Tx
hex_tx = "010000000001022373cf02ce7df6500ae46a4a0fbbb1b636d2debed8f2df91e2415627397a34090000000000fdffffff88c23d928893cd3509845516cf8411b7cab2738c054cc5ce7e4bde9586997c770000000000fdffffff0200000000000000002b6a29676d20746170726f6f7420f09fa5952068747470733a2f2f626974636f696e6465766b69742e6f72676e9e1100000000001976a91405070d0290da457409a37db2e294c1ffbc52738088ac04410adf90fd381d4a13c3e73740b337b230701189ed94abcb4030781635f035e6d3b50b8506470a68292a2bc74745b7a5732a28254b5f766f09e495929ec308090b01004620c13e6d193f5d04506723bd67abcc5d31b610395c445ac6744cb0a1846b3aabaeac20b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333cba519c21c0000000000000000000000000000000000000000000000000000000000000000104414636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301004620c13e6d193f5d04506723bd67abcc5d31b610395c445ac6744cb0a1846b3aabaeac20b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333cba519c21c0000000000000000000000000000000000000000000000000000000000000000100000000"
stream = BytesIO(bytes.fromhex(hex_tx))
tx_obj = Tx.parse(stream)
self.assertTrue(tx_obj.verify())
stack = [bytes.fromhex("4636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301"),b"",bytes.fromhex("b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333c")]
stack = [
bytes.fromhex(
"4636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301"
),
b"",
bytes.fromhex(
"b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333c"
),
]
op_checksigadd_schnorr(stack, tx_obj, 1)
self.assertEqual(stack, [b'\x01'])
self.assertEqual(stack, [b"\x01"])

View File

@ -1,7 +1,6 @@
from io import BytesIO
from bech32 import decode_bech32, encode_bech32_checksum
from ecc import S256Point
from helper import (
decode_base58,
encode_base58_checksum,
@ -37,7 +36,7 @@ class Script:
def __repr__(self):
result = ""
for command in self.commands:
if type(command) == int:
if isinstance(command, int):
if OP_CODE_NAMES.get(command):
name = OP_CODE_NAMES.get(command)
else:
@ -123,7 +122,7 @@ class Script:
# go through each command
for command in self.commands:
# if the command is an integer, it's an op code
if type(command) == int:
if isinstance(command, int):
# turn the command into a single byte integer using int_to_byte
result += int_to_byte(command)
else:
@ -166,7 +165,7 @@ class Script:
op_lookup = OP_CODE_FUNCTIONS
while len(commands) > 0:
command = commands.pop(0)
if type(command) == int:
if isinstance(command, int):
# do what the op code says
operation = op_lookup[command]
if command in (99, 100):
@ -198,7 +197,7 @@ class Script:
if (
len(commands) == 3
and commands[0] == 0xA9
and type(commands[1]) == bytes
and isinstance(commands[1], bytes)
and len(commands[1]) == 20
and commands[2] == 0x87
):
@ -264,7 +263,9 @@ class Script:
external_pubkey = control_block.external_pubkey(tap_script)
# the tweak point should be what's on the stack
if external_pubkey.parity != control_block.parity:
print(f"bad external pubkey parity {external_pubkey.parity} vs {control_block.parity}")
print(
f"bad external pubkey parity {external_pubkey.parity} vs {control_block.parity}"
)
return False
if external_pubkey.xonly() != stack.pop():
print("bad external pubkey")
@ -289,7 +290,7 @@ class Script:
len(self.commands) == 5
and self.commands[0] == 0x76
and self.commands[1] == 0xA9
and type(self.commands[2]) == bytes
and isinstance(self.commands[2], bytes)
and len(self.commands[2]) == 20
and self.commands[3] == 0x88
and self.commands[4] == 0xAC
@ -303,7 +304,7 @@ class Script:
return (
len(self.commands) == 3
and self.commands[0] == 0xA9
and type(self.commands[1]) == bytes
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 20
and self.commands[2] == 0x87
)
@ -314,7 +315,7 @@ class Script:
return (
len(self.commands) == 2
and self.commands[0] == 0x00
and type(self.commands[1]) == bytes
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 20
)
@ -324,7 +325,7 @@ class Script:
return (
len(self.commands) == 2
and self.commands[0] == 0x00
and type(self.commands[1]) == bytes
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 32
)
@ -334,7 +335,7 @@ class Script:
return (
len(self.commands) == 2
and self.commands[0] == 0x51
and type(self.commands[1]) == bytes
and isinstance(self.commands[1], bytes)
and len(self.commands[1]) == 32
)
@ -375,7 +376,7 @@ class ScriptPubKey(Script):
class P2PKHScriptPubKey(ScriptPubKey):
def __init__(self, h160):
super().__init__()
if type(h160) != bytes:
if not isinstance(h160, bytes):
raise TypeError("To initialize P2PKHScriptPubKey, a hash160 is needed")
self.commands = [0x76, 0xA9, h160, 0x88, 0xAC]
@ -394,7 +395,7 @@ class P2PKHScriptPubKey(ScriptPubKey):
class P2SHScriptPubKey(ScriptPubKey):
def __init__(self, h160):
super().__init__()
if type(h160) != bytes:
if not isinstance(h160, bytes):
raise TypeError("To initialize P2SHScriptPubKey, a hash160 is needed")
self.commands = [0xA9, h160, 0x87]
@ -515,7 +516,7 @@ class SegwitPubKey(ScriptPubKey):
class P2WPKHScriptPubKey(SegwitPubKey):
def __init__(self, h160):
super().__init__()
if type(h160) != bytes:
if not isinstance(h160, bytes):
raise TypeError("To initialize P2WPKHScriptPubKey, a hash160 is needed")
self.commands = [0x00, h160]
@ -523,7 +524,7 @@ class P2WPKHScriptPubKey(SegwitPubKey):
class P2WSHScriptPubKey(SegwitPubKey):
def __init__(self, s256):
super().__init__()
if type(s256) != bytes:
if not isinstance(s256, bytes):
raise TypeError("To initialize P2WSHScriptPubKey, a sha256 is needed")
self.commands = [0x00, s256]
@ -533,7 +534,7 @@ class P2TRScriptPubKey(ScriptPubKey):
super().__init__()
if hasattr(point, "xonly"):
raw_point = point.xonly()
elif type(point) == bytes:
elif isinstance(point, bytes):
raw_point = point
else:
raise TypeError(
@ -585,8 +586,8 @@ class WitnessScript(Script):
def is_p2wsh_multisig(self):
return (
OP_CODE_NAMES[self.commands[-1]] == "OP_CHECKMULTISIG"
and type(self.commands[0]) == int
and type(self.commands[-2]) == int
and isinstance(self.commands[0], int)
and isinstance(self.commands[-2], int)
)
def get_quorum(self):

View File

@ -5,27 +5,21 @@ from ecc import S256Point
from hash import (
hash_tapbranch,
hash_tapleaf,
hash_taptweak,
)
from helper import (
int_to_byte,
)
from op import (
encode_minimal_num,
number_to_op_code,
)
from helper import int_to_byte
from op import encode_minimal_num
from script import ScriptPubKey
from timelock import Locktime, Sequence
def locktime_commands(locktime):
assert type(locktime) == Locktime, f"{locktime} needs to be Locktime"
assert isinstance(locktime, Locktime), f"{locktime} needs to be Locktime"
# 0xB1 is OP_CLTV, 0x75 is OP_DROP
return [encode_minimal_num(locktime), 0xB1, 0x75]
def sequence_commands(sequence):
assert type(sequence) == Sequence, f"{sequence} needs to be Sequence"
assert isinstance(sequence, Sequence), f"{sequence} needs to be Sequence"
# 0xB2 is OP_CSV, 0x75 is OP_DROP
return [encode_minimal_num(sequence), 0xB2, 0x75]
@ -40,7 +34,7 @@ class TapLeaf:
def __eq__(self, other):
return (
type(self) == type(other)
type(self) is type(other)
and self.tapleaf_version == other.tapleaf_version
and self.tap_script == other.tap_script
)
@ -190,32 +184,71 @@ class ControlBlock:
class TapRootTest(TestCase):
def test_tapleaf_hash(self):
tap_script = TapScript.parse(BytesIO(bytes.fromhex("4420331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeecad20158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16fac")))
tap_script = TapScript.parse(
BytesIO(
bytes.fromhex(
"4420331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeecad20158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16fac"
)
)
)
tap_leaf = TapLeaf(tap_script)
self.assertEqual(tap_leaf.hash().hex(), "d1b3ee8e8c175e5db7e2ff7a87435e8f751d148b77fb1f00e14ff8ffa1c09a40")
self.assertEqual(
tap_leaf.hash().hex(),
"d1b3ee8e8c175e5db7e2ff7a87435e8f751d148b77fb1f00e14ff8ffa1c09a40",
)
def test_tapbranch_hash(self):
pubkey_1 = S256Point.parse(bytes.fromhex("331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec")).xonly()
pubkey_2 = S256Point.parse(bytes.fromhex("158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f")).xonly()
pubkey_1 = S256Point.parse(
bytes.fromhex(
"331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec"
)
).xonly()
pubkey_2 = S256Point.parse(
bytes.fromhex(
"158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f"
)
).xonly()
tap_script_1 = TapScript([pubkey_1, 0xAC])
tap_script_2 = TapScript([pubkey_2, 0xAC])
tap_leaf_1 = TapLeaf(tap_script_1)
tap_leaf_2 = TapLeaf(tap_script_2)
tap_branch = TapBranch(tap_leaf_1, tap_leaf_2)
self.assertEqual(tap_branch.hash().hex(), "eb792962b250f4a49a572ba7136674a28f2398a49c4c078fecfc839260da6151")
self.assertEqual(
tap_branch.hash().hex(),
"eb792962b250f4a49a572ba7136674a28f2398a49c4c078fecfc839260da6151",
)
def test_control_block(self):
raw_cb = bytes.fromhex("c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e")
raw_cb = bytes.fromhex(
"c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e"
)
cb = ControlBlock.parse(raw_cb)
pubkey = bytes.fromhex("027aa71d9cdb31cd8fe037a6f441e624fe478a2deece7affa840312b14e971a4")
pubkey = bytes.fromhex(
"027aa71d9cdb31cd8fe037a6f441e624fe478a2deece7affa840312b14e971a4"
)
tap_script = TapScript([pubkey, 0xAC])
external_pubkey = cb.external_pubkey(tap_script)
self.assertEqual(external_pubkey.xonly().hex(), "cbe433288ae1eede1f24818f08046d4e647fef808cfbbffc7d10f24a698eecfd")
self.assertEqual(
external_pubkey.xonly().hex(),
"cbe433288ae1eede1f24818f08046d4e647fef808cfbbffc7d10f24a698eecfd",
)
def test_control_block_2(self):
pubkey_1 = S256Point.parse(bytes.fromhex("331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec"))
pubkey_2 = S256Point.parse(bytes.fromhex("158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f"))
pubkey_3 = S256Point.parse(bytes.fromhex("582662e8e47df59489d6756615aa3db3fa3bbaa75a424b9c78036265858f5544"))
pubkey_1 = S256Point.parse(
bytes.fromhex(
"331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec"
)
)
pubkey_2 = S256Point.parse(
bytes.fromhex(
"158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f"
)
)
pubkey_3 = S256Point.parse(
bytes.fromhex(
"582662e8e47df59489d6756615aa3db3fa3bbaa75a424b9c78036265858f5544"
)
)
tap_script_1 = TapScript([pubkey_1.xonly(), 0xAC])
tap_script_2 = TapScript([pubkey_2.xonly(), 0xAC])
tap_script_3 = TapScript([pubkey_3.xonly(), 0xAC])
@ -225,7 +258,11 @@ class TapRootTest(TestCase):
tap_branch_1 = TapBranch(tap_leaf_1, tap_leaf_2)
tap_root = TapBranch(tap_branch_1, tap_leaf_3)
hex_cb = "c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e"
internal_pubkey = S256Point.parse(bytes.fromhex("407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e5"))
internal_pubkey = S256Point.parse(
bytes.fromhex(
"407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e5"
)
)
cb = tap_root.control_block(internal_pubkey, tap_leaf_2)
self.assertEqual(cb.serialize().hex(), hex_cb)

View File

@ -42,7 +42,7 @@ class Locktime(int):
)
def __lt__(self, other):
if type(other) == int:
if isinstance(other, int):
return super().__lt__(other)
if self.is_comparable(other):
return super().__lt__(other)
@ -108,7 +108,7 @@ class Sequence(int):
)
def __lt__(self, other):
if type(other) == int:
if isinstance(other, int):
return super().__lt__(other)
if self.is_comparable(other):
return self & SEQUENCE_MASK < other & SEQUENCE_MASK

View File

@ -4,7 +4,6 @@ from urllib.request import Request, urlopen
import json
from bech32 import decode_bech32
from ecc import SchnorrSignature
from hash import hash_tapsighash
from helper import (
big_endian_to_int,