diff --git a/session/answers.py b/session/answers.py index f07d89c..60496f9 100644 --- a/session/answers.py +++ b/session/answers.py @@ -1,4 +1,4 @@ -''' +""" #markdown # Session Objectives * Learn Schnorr Signatures @@ -1045,4 +1045,4 @@ True 01000000000101a2b85a1372be6bc9a3d53110a4f142819d653fa6d07fbd4367138145030914200000000000ffffffff0184e4000000000000160014f5a74a3131dedb57a092ae86aad3ee3f9b8d721403403b1681a67f40e6767b2db64744ad3f005d3971645135d58a3e1826d5c960bc281ce187bc9270c51ed7833fcf5e8415501862d51b0ebd051917d9878104778f292220cd04c1bf88ca891af152fc57c36523ab59efb16b7ec07caca0cfc4a1f2051d9eac61c0cd04c1bf88ca891af152fc57c36523ab59efb16b7ec07caca0cfc4a1f2051d9e76f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dffaf5548715217f7a892c7c5ff787a97b6e2f123287a1a354fe3ccda09c39d5d7300000000 #endexercise -''' +""" diff --git a/session/complete/block.py b/session/complete/block.py deleted file mode 100644 index 94aa3dc..0000000 --- a/session/complete/block.py +++ /dev/null @@ -1,248 +0,0 @@ -from io import BytesIO -from unittest import TestCase - -from helper import ( - hash256, - int_to_little_endian, - little_endian_to_int, - merkle_root, - read_varint, -) -from tx import Tx - - -GENESIS_BLOCK_HASH = bytes.fromhex('000000000019d6689c085ae165831e934ff763ae46a2a6c172b3f1b60a8ce26f') -TESTNET_GENESIS_BLOCK_HASH = bytes.fromhex('000000000933ea01ad0ee984209779baaec3ced90fa3f408719526f8d77f4943') - - -class Block: - command = b'block' - - def __init__(self, version, prev_block, merkle_root, timestamp, bits, nonce, tx_hashes=None): - self.version = version - self.prev_block = prev_block - self.merkle_root = merkle_root - self.timestamp = timestamp - self.bits = bits - self.nonce = nonce - self.tx_hashes = tx_hashes - self.merkle_tree = None - - @classmethod - def parse_header(cls, s): - '''Takes a byte stream and parses a block. Returns a Block object''' - # s.read(n) will read n bytes from the stream - # version - 4 bytes, little endian, interpret as int - version = little_endian_to_int(s.read(4)) - # prev_block - 32 bytes, little endian (use [::-1] to reverse) - prev_block = s.read(32)[::-1] - # merkle_root - 32 bytes, little endian (use [::-1] to reverse) - merkle_root = s.read(32)[::-1] - # timestamp - 4 bytes, little endian, interpret as int - timestamp = little_endian_to_int(s.read(4)) - # bits - 4 bytes - bits = s.read(4) - # nonce - 4 bytes - nonce = s.read(4) - # initialize class - return cls(version, prev_block, merkle_root, timestamp, bits, nonce) - - @classmethod - def parse(cls, s): - b = cls.parse_header(s) - num_txs = read_varint(s) - tx_hashes = [] - for _ in range(num_txs): - t = Tx.parse(s) - tx_hashes.append(t.hash()) - b.tx_hashes = tx_hashes - return b - - def serialize(self): - '''Returns the 80 byte block header''' - # version - 4 bytes, little endian - result = int_to_little_endian(self.version, 4) - # prev_block - 32 bytes, little endian - result += self.prev_block[::-1] - # merkle_root - 32 bytes, little endian - result += self.merkle_root[::-1] - # timestamp - 4 bytes, little endian - result += int_to_little_endian(self.timestamp, 4) - # bits - 4 bytes - result += self.bits - # nonce - 4 bytes - result += self.nonce - return result - - def hash(self): - '''Returns the hash256 interpreted little endian of the block''' - # serialize - s = self.serialize() - # hash256 - h256 = hash256(s) - # reverse - return h256[::-1] - - def id(self): - '''Human-readable hexadecimal of the block hash''' - return self.hash().hex() - - def bip9(self): - '''Returns whether this block is signaling readiness for BIP9''' - # BIP9 is signalled if the top 3 bits are 001 - # remember version is 32 bytes so right shift 29 (>> 29) and see if - # that is 001 - return self.version >> 29 == 0b001 - - def bip91(self): - '''Returns whether this block is signaling readiness for BIP91''' - # BIP91 is signalled if the 5th bit from the right is 1 - # shift 4 bits to the right and see if the last bit is 1 - return self.version >> 4 & 1 == 1 - - def bip141(self): - '''Returns whether this block is signaling readiness for BIP141''' - # BIP91 is signalled if the 2nd bit from the right is 1 - # shift 1 bit to the right and see if the last bit is 1 - return self.version >> 1 & 1 == 1 - - def target(self): - '''Returns the proof-of-work target based on the bits''' - # last byte is exponent - exponent = self.bits[-1] - # the first three bytes are the coefficient in little endian - coefficient = little_endian_to_int(self.bits[:-1]) - # the formula is: - # coefficient * 256**(exponent-3) - return coefficient * 256**(exponent - 3) - - def difficulty(self): - '''Returns the block difficulty based on the bits''' - # note difficulty is (target of lowest difficulty) / (self's target) - # lowest difficulty has bits that equal 0xffff001d - lowest = 0xffff * 256**(0x1d - 3) - return lowest / self.target() - - def check_pow(self): - '''Returns whether this block satisfies proof of work''' - # get the hash256 of the serialization of this block - h256 = hash256(self.serialize()) - # interpret this hash as a little-endian number - proof = little_endian_to_int(h256) - # return whether this integer is less than the target - return proof < self.target() - - def validate_merkle_root(self): - '''Gets the merkle root of the tx_hashes and checks that it's - the same as the merkle root of this block. - ''' - # reverse all the transaction hashes (self.tx_hashes) - hashes = [h[::-1] for h in self.tx_hashes] - # get the Merkle Root - root = merkle_root(hashes) - # reverse the Merkle Root - # return whether self.merkle root is the same as - # the reverse of the calculated merkle root - return root[::-1] == self.merkle_root - - -class BlockTest(TestCase): - - def test_parse(self): - block_raw = bytes.fromhex('0100000000000000000000000000000000000000000000000000000000000000000000003ba3edfd7a7b12b27ac72c3e67768f617fc81bc3888a51323a9fb8aa4b1e5e4a29ab5f49ffff001d1dac2b7c0101000000010000000000000000000000000000000000000000000000000000000000000000ffffffff4d04ffff001d0104455468652054696d65732030332f4a616e2f32303039204368616e63656c6c6f72206f6e206272696e6b206f66207365636f6e64206261696c6f757420666f722062616e6b73ffffffff0100f2052a01000000434104678afdb0fe5548271967f1a67130b7105cd6a828e03909a67962e0ea1f61deb649f6bc3f4cef38c4f35504e51ec112de5c384df7ba0b8d578a4c702b6bf11d5fac00000000') - stream = BytesIO(block_raw) - block = Block.parse(stream) - self.assertEqual(block.merkle_root, block.tx_hashes[0]) - - def test_parse_header(self): - block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertEqual(block.version, 0x20000002) - want = bytes.fromhex('000000000000000000fd0c220a0a8c3bc5a7b487e8c8de0dfa2373b12894c38e') - self.assertEqual(block.prev_block, want) - want = bytes.fromhex('be258bfd38db61f957315c3f9e9c5e15216857398d50402d5089a8e0fc50075b') - self.assertEqual(block.merkle_root, want) - self.assertEqual(block.timestamp, 0x59a7771e) - self.assertEqual(block.bits, bytes.fromhex('e93c0118')) - self.assertEqual(block.nonce, bytes.fromhex('a4ffd71d')) - - def test_serialize(self): - block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertEqual(block.serialize(), block_raw) - - def test_hash(self): - block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertEqual(block.hash(), bytes.fromhex('0000000000000000007e9e4c586439b0cdbe13b1370bdd9435d76a644d047523')) - - def test_bip9(self): - block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertTrue(block.bip9()) - block_raw = bytes.fromhex('0400000039fa821848781f027a2e6dfabbf6bda920d9ae61b63400030000000000000000ecae536a304042e3154be0e3e9a8220e5568c3433a9ab49ac4cbb74f8df8e8b0cc2acf569fb9061806652c27') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertFalse(block.bip9()) - - def test_bip91(self): - block_raw = bytes.fromhex('1200002028856ec5bca29cf76980d368b0a163a0bb81fc192951270100000000000000003288f32a2831833c31a25401c52093eb545d28157e200a64b21b3ae8f21c507401877b5935470118144dbfd1') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertTrue(block.bip91()) - block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertFalse(block.bip91()) - - def test_bip141(self): - block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertTrue(block.bip141()) - block_raw = bytes.fromhex('0000002066f09203c1cf5ef1531f24ed21b1915ae9abeb691f0d2e0100000000000000003de0976428ce56125351bae62c5b8b8c79d8297c702ea05d60feabb4ed188b59c36fa759e93c0118b74b2618') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertFalse(block.bip141()) - - def test_target(self): - block_raw = bytes.fromhex('020000208ec39428b17323fa0ddec8e887b4a7c53b8c0a0a220cfd0000000000000000005b0750fce0a889502d40508d39576821155e9c9e3f5c3157f961db38fd8b25be1e77a759e93c0118a4ffd71d') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertEqual(block.target(), 0x13ce9000000000000000000000000000000000000000000) - self.assertEqual(int(block.difficulty()), 888171856257) - - def test_check_pow(self): - block_raw = bytes.fromhex('04000000fbedbbf0cfdaf278c094f187f2eb987c86a199da22bbb20400000000000000007b7697b29129648fa08b4bcd13c9d5e60abb973a1efac9c8d573c71c807c56c3d6213557faa80518c3737ec1') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertTrue(block.check_pow()) - block_raw = bytes.fromhex('04000000fbedbbf0cfdaf278c094f187f2eb987c86a199da22bbb20400000000000000007b7697b29129648fa08b4bcd13c9d5e60abb973a1efac9c8d573c71c807c56c3d6213557faa80518c3737ec0') - stream = BytesIO(block_raw) - block = Block.parse_header(stream) - self.assertFalse(block.check_pow()) - - def test_validate_merkle_root(self): - hashes_hex = [ - 'f54cb69e5dc1bd38ee6901e4ec2007a5030e14bdd60afb4d2f3428c88eea17c1', - 'c57c2d678da0a7ee8cfa058f1cf49bfcb00ae21eda966640e312b464414731c1', - 'b027077c94668a84a5d0e72ac0020bae3838cb7f9ee3fa4e81d1eecf6eda91f3', - '8131a1b8ec3a815b4800b43dff6c6963c75193c4190ec946b93245a9928a233d', - 'ae7d63ffcb3ae2bc0681eca0df10dda3ca36dedb9dbf49e33c5fbe33262f0910', - '61a14b1bbdcdda8a22e61036839e8b110913832efd4b086948a6a64fd5b3377d', - 'fc7051c8b536ac87344c5497595d5d2ffdaba471c73fae15fe9228547ea71881', - '77386a46e26f69b3cd435aa4faac932027f58d0b7252e62fb6c9c2489887f6df', - '59cbc055ccd26a2c4c4df2770382c7fea135c56d9e75d3f758ac465f74c025b8', - '7c2bf5687f19785a61be9f46e031ba041c7f93e2b7e9212799d84ba052395195', - '08598eebd94c18b0d59ac921e9ba99e2b8ab7d9fccde7d44f2bd4d5e2e726d2e', - 'f0bb99ef46b029dd6f714e4b12a7d796258c48fee57324ebdc0bbc4700753ab1', - ] - hashes = [bytes.fromhex(x) for x in hashes_hex] - stream = BytesIO(bytes.fromhex('00000020fcb19f7895db08cadc9573e7915e3919fb76d59868a51d995201000000000000acbcab8bcc1af95d8d563b77d24c3d19b18f1486383d75a5085c4e86c86beed691cfa85916ca061a00000000')) - block = Block.parse_header(stream) - block.tx_hashes = hashes - self.assertTrue(block.validate_merkle_root()) diff --git a/session/complete/ecc.py b/session/complete/ecc.py index 13a6349..92fc741 100644 --- a/session/complete/ecc.py +++ b/session/complete/ecc.py @@ -40,7 +40,7 @@ class FieldElement: # this should be the inverse of the == operator return not (self == other) - def __repr__(self): + def __reprv__(self): return f"FieldElement_{self.prime}({self.num})" def __add__(self, other): @@ -220,7 +220,7 @@ class S256Field(FieldElement): class S256Point(Point): def __init__(self, x, y, a=None, b=None): a, b = S256Field(A), S256Field(B) - if type(x) == int: + if isinstance(x, int): super().__init__(x=S256Field(x), y=S256Field(y), a=a, b=b) else: super().__init__(x=x, y=y, a=a, b=b) @@ -247,7 +247,7 @@ class S256Point(Point): def __add__(self, other): """If other is an int, multiplies scalar by generator, adds result to current point""" - if type(other) == int: + if isinstance(other, int): return super().__add__(other * G) else: return super().__add__(other) @@ -276,13 +276,13 @@ class S256Point(Point): # otherwise, convert the x coordinate to Big Endian 32 bytes return int_to_big_endian(self.x.num, 32) - def tweak(self, merkle_root=b''): + def tweak(self, merkle_root=b""): """returns the tweak for use in p2tr if there's no script path""" # take the hash_taptweak of the xonly tweak = hash_taptweak(self.xonly() + merkle_root) return tweak - def tweaked_key(self, merkle_root=b''): + def tweaked_key(self, merkle_root=b""): """Creates the tweaked external key for a particular tweak.""" # Get the tweak from the tweak method tweak = self.tweak(merkle_root) @@ -319,10 +319,11 @@ class S256Point(Point): """Returns the RedeemScript for a p2sh-p2wpkh redemption""" return self.p2wpkh_script().redeem_script() - def p2tr_script(self, merkle_root=b''): + def p2tr_script(self, merkle_root=b""): """Returns the p2tr ScriptPubKey object""" # avoid circular dependency from script import P2TRScriptPubKey + # get the external pubkey external_pubkey = self.tweaked_key(merkle_root) # return the P2TRScriptPubKey object @@ -340,7 +341,7 @@ class S256Point(Point): """Returns the p2sh-p2wpkh base58 address string""" return self.p2wpkh_script().p2sh_address(network) - def p2tr_address(self, merkle_root=b'', network="mainnet"): + def p2tr_address(self, merkle_root=b"", network="mainnet"): """Returns the p2tr bech32m address string""" return self.p2tr_script(merkle_root).address(network) @@ -463,34 +464,52 @@ class TapRootTest(TestCase): hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f" bytes_x = bytes.fromhex(hex_x) point = S256Point.parse(bytes_x) - self.assertEqual(big_endian_to_int(point.tweak()), 67856885919469038205338506436839711332207972226461300386890540598589929564995) + self.assertEqual( + big_endian_to_int(point.tweak()), + 67856885919469038205338506436839711332207972226461300386890540598589929564995, + ) def test_tweaked_key(self): hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f" bytes_x = bytes.fromhex(hex_x) point = S256Point.parse(bytes_x) - self.assertEqual(point.tweaked_key().xonly().hex(), "5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf") + self.assertEqual( + point.tweaked_key().xonly().hex(), + "5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf", + ) def test_p2tr_script(self): hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f" bytes_x = bytes.fromhex(hex_x) point = S256Point.parse(bytes_x) - self.assertEqual(point.p2tr_script().__repr__(), "OP_1 5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf") + self.assertEqual( + point.p2tr_script().__repr__(), + "OP_1 5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf", + ) class SchnorrTest(TestCase): def test_verify(self): msg = sha256(b"I attest to understanding Schnorr Signatures") - sig_raw = bytes.fromhex("f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a") + sig_raw = bytes.fromhex( + "f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a" + ) sig = SchnorrSignature.parse(sig_raw) - point = S256Point.parse(bytes.fromhex("f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f")) + point = S256Point.parse( + bytes.fromhex( + "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f" + ) + ) self.assertTrue(point.verify_schnorr(msg, sig)) def test_sign(self): msg = sha256(b"I attest to understanding Schnorr Signatures") priv = PrivateKey(12345) sig = priv.sign_schnorr(msg) - self.assertEqual(sig.serialize().hex(), "f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a") + self.assertEqual( + sig.serialize().hex(), + "f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a", + ) class Signature: @@ -691,7 +710,7 @@ class PrivateKey: # encode_base58_checksum the whole thing return encode_base58_checksum(prefix + secret_bytes + suffix) - def tweaked_key(self, merkle_root=b''): + def tweaked_key(self, merkle_root=b""): # get the tweak from the point's tweak method tweak = self.point.tweak(merkle_root) # t is the tweak interpreted as big endian @@ -730,7 +749,6 @@ class PrivateKey: class PrivateKeyTest(TestCase): - def test_tweaked_key(self): secret = randint(1, N) priv = PrivateKey(secret) diff --git a/session/complete/helper.py b/session/complete/helper.py index b40af40..ba819a0 100644 --- a/session/complete/helper.py +++ b/session/complete/helper.py @@ -1,8 +1,6 @@ import hashlib -import hmac import re -from base64 import b64decode, b64encode from unittest import TestSuite, TextTestRunner try: diff --git a/session/complete/network.py b/session/complete/network.py deleted file mode 100644 index 0386d8a..0000000 --- a/session/complete/network.py +++ /dev/null @@ -1,486 +0,0 @@ -import socket -import time - -from io import BytesIO -from random import randint -from time import sleep -from unittest import TestCase - -from block import Block -from helper import ( - hash256, - decode_base58, - encode_varint, - int_to_little_endian, - little_endian_to_int, - read_varint, -) -from tx import Tx - -TX_DATA_TYPE = 1 -BLOCK_DATA_TYPE = 2 -FILTERED_BLOCK_DATA_TYPE = 3 -COMPACT_BLOCK_DATA_TYPE = 4 - -NETWORK_MAGIC = b'\xf9\xbe\xb4\xd9' -TESTNET_NETWORK_MAGIC = b'\x0b\x11\x09\x07' - - -class NetworkEnvelope: - - def __init__(self, command, payload, testnet=False): - self.command = command - self.payload = payload - if testnet: - self.magic = TESTNET_NETWORK_MAGIC - else: - self.magic = NETWORK_MAGIC - - def __repr__(self): - return '{}: {}'.format( - self.command.decode('ascii'), - self.payload.hex(), - ) - - @classmethod - def parse(cls, s, testnet=False): - '''Takes a stream and creates a NetworkEnvelope''' - # check the network magic - magic = s.read(4) - if magic == b'': - raise RuntimeError('Connection reset!') - if testnet: - expected_magic = TESTNET_NETWORK_MAGIC - else: - expected_magic = NETWORK_MAGIC - if magic != expected_magic: - raise RuntimeError('magic is not right {} vs {}'.format(magic.hex(), expected_magic.hex())) - # command 12 bytes, strip the trailing 0's using .strip(b'\x00') - command = s.read(12).strip(b'\x00') - # payload length 4 bytes, little endian - payload_length = little_endian_to_int(s.read(4)) - # checksum 4 bytes, first four of hash256 of payload - checksum = s.read(4) - # payload is of length payload_length - payload = s.read(payload_length) - # verify checksum - calculated_checksum = hash256(payload)[:4] - if calculated_checksum != checksum: - raise RuntimeError('checksum does not match') - return cls(command, payload, testnet=testnet) - - def serialize(self): - '''Returns the byte serialization of the entire network message''' - # add the network magic using self.magic - result = self.magic - # command 12 bytes, fill leftover with b'\x00' * (12 - len(self.command)) - result += self.command + b'\x00' * (12 - len(self.command)) - # payload length 4 bytes, little endian - result += int_to_little_endian(len(self.payload), 4) - # checksum 4 bytes, first four of hash256 of payload - result += hash256(self.payload)[:4] - # payload - result += self.payload - return result - - def stream(self): - '''Returns a stream for parsing the payload''' - return BytesIO(self.payload) - - -class NetworkEnvelopeTest(TestCase): - - def test_parse(self): - msg = bytes.fromhex('f9beb4d976657261636b000000000000000000005df6e0e2') - stream = BytesIO(msg) - envelope = NetworkEnvelope.parse(stream) - self.assertEqual(envelope.command, b'verack') - self.assertEqual(envelope.payload, b'') - msg = bytes.fromhex('f9beb4d976657273696f6e0000000000650000005f1a69d2721101000100000000000000bc8f5e5400000000010000000000000000000000000000000000ffffc61b6409208d010000000000000000000000000000000000ffffcb0071c0208d128035cbc97953f80f2f5361746f7368693a302e392e332fcf05050001') - stream = BytesIO(msg) - envelope = NetworkEnvelope.parse(stream) - self.assertEqual(envelope.command, b'version') - self.assertEqual(envelope.payload, msg[24:]) - - def test_serialize(self): - msg = bytes.fromhex('f9beb4d976657261636b000000000000000000005df6e0e2') - stream = BytesIO(msg) - envelope = NetworkEnvelope.parse(stream) - self.assertEqual(envelope.serialize(), msg) - msg = bytes.fromhex('f9beb4d976657273696f6e0000000000650000005f1a69d2721101000100000000000000bc8f5e5400000000010000000000000000000000000000000000ffffc61b6409208d010000000000000000000000000000000000ffffcb0071c0208d128035cbc97953f80f2f5361746f7368693a302e392e332fcf05050001') - stream = BytesIO(msg) - envelope = NetworkEnvelope.parse(stream) - self.assertEqual(envelope.serialize(), msg) - - -class VersionMessage: - command = b'version' - - def __init__(self, version=70015, services=0, timestamp=None, - receiver_services=0, - receiver_ip=b'\x00\x00\x00\x00', receiver_port=8333, - sender_services=0, - sender_ip=b'\x00\x00\x00\x00', sender_port=8333, - nonce=None, user_agent=b'/programmingblockchain:0.1/', - latest_block=0, relay=True): - self.version = version - self.services = services - if timestamp is None: - self.timestamp = int(time.time()) - else: - self.timestamp = timestamp - self.receiver_services = receiver_services - self.receiver_ip = receiver_ip - self.receiver_port = receiver_port - self.sender_services = sender_services - self.sender_ip = sender_ip - self.sender_port = sender_port - if nonce is None: - self.nonce = int_to_little_endian(randint(0, 2**64), 8) - else: - self.nonce = nonce - self.user_agent = user_agent - self.latest_block = latest_block - self.relay = relay - - def serialize(self): - '''Serialize this message to send over the network''' - # version is 4 bytes little endian - result = int_to_little_endian(self.version, 4) - # services is 8 bytes little endian - result += int_to_little_endian(self.services, 8) - # timestamp is 8 bytes little endian - result += int_to_little_endian(self.timestamp, 8) - # receiver services is 8 bytes little endian - result += int_to_little_endian(self.receiver_services, 8) - # IPV4 is 10 00 bytes and 2 ff bytes then receiver ip - result += b'\x00' * 10 + b'\xff\xff' + self.receiver_ip - # receiver port is 2 bytes, little endian - result += int_to_little_endian(self.receiver_port, 2) - # sender services is 8 bytes little endian - result += int_to_little_endian(self.sender_services, 8) - # IPV4 is 10 00 bytes and 2 ff bytes then sender ip - result += b'\x00' * 10 + b'\xff\xff' + self.sender_ip - # sender port is 2 bytes, little endian - result += int_to_little_endian(self.sender_port, 2) - # nonce - result += self.nonce - # useragent is a variable string, so varint first - result += encode_varint(len(self.user_agent)) - result += self.user_agent - # latest block is 4 bytes little endian - result += int_to_little_endian(self.latest_block, 4) - # relay is 00 if false, 01 if true - if self.relay: - result += b'\x01' - else: - result += b'\x00' - return result - - -class VersionMessageTest(TestCase): - - def test_serialize(self): - v = VersionMessage(timestamp=0, nonce=b'\x00' * 8) - self.assertEqual(v.serialize().hex(), '7f11010000000000000000000000000000000000000000000000000000000000000000000000ffff000000008d20000000000000000000000000000000000000ffff000000008d2000000000000000001b2f70726f6772616d6d696e67626c6f636b636861696e3a302e312f0000000001') - - -class VerAckMessage: - command = b'verack' - - def __init__(self): - pass - - @classmethod - def parse(cls, s): - return cls() - - def serialize(self): - return b'' - - -class PingMessage: - command = b'ping' - - def __init__(self, nonce): - self.nonce = nonce - - @classmethod - def parse(cls, s): - nonce = s.read(8) - return cls(nonce) - - def serialize(self): - return self.nonce - - -class PongMessage: - command = b'pong' - - def __init__(self, nonce): - self.nonce = nonce - - def parse(cls, s): - nonce = s.read(8) - return cls(nonce) - - def serialize(self): - return self.nonce - - -class GetHeadersMessage: - command = b'getheaders' - - def __init__(self, version=70015, num_hashes=1, start_block=None, end_block=None): - self.version = version - self.num_hashes = num_hashes - if start_block is None: - raise RuntimeError('a start block is required') - self.start_block = start_block - if end_block is None: - self.end_block = b'\x00' * 32 - else: - self.end_block = end_block - - def serialize(self): - '''Serialize this message to send over the network''' - # protocol version is 4 bytes little-endian - result = int_to_little_endian(self.version, 4) - # number of hashes is a varint - result += encode_varint(self.num_hashes) - # start block is in little-endian - result += self.start_block[::-1] - # end block is also in little-endian - result += self.end_block[::-1] - return result - - -class GetHeadersMessageTest(TestCase): - - def test_serialize(self): - block_hex = '0000000000000000001237f46acddf58578a37e213d2a6edc4884a2fcad05ba3' - gh = GetHeadersMessage(start_block=bytes.fromhex(block_hex)) - self.assertEqual(gh.serialize().hex(), '7f11010001a35bd0ca2f4a88c4eda6d213e2378a5758dfcd6af437120000000000000000000000000000000000000000000000000000000000000000000000000000000000') - - -class HeadersMessage: - command = b'headers' - - def __init__(self, headers): - self.headers = headers - - @classmethod - def parse(cls, s): - # number of headers is in a varint - num_headers = read_varint(s) - # initialize the headers array - headers = [] - # loop through number of headers times - for _ in range(num_headers): - # add a header to the headers array by using Block.parse_header(s) - headers.append(Block.parse_header(s)) - # read the next varint (num_txs) - num_txs = read_varint(s) - # num_txs should be 0 or raise a RuntimeError - if num_txs != 0: - raise RuntimeError('number of txs not 0') - # return a class instance - return cls(headers) - - def is_valid(self): - '''Return whether the headers satisfy proof-of-work and are sequential''' - last_block = None - for h in self.headers: - if not h.check_pow(): - return False - if last_block and h.prev_block != last_block: - return False - last_block = h.hash() - return True - - -class HeadersMessageTest(TestCase): - - def test_parse(self): - hex_msg = '0200000020df3b053dc46f162a9b00c7f0d5124e2676d47bbe7c5d0793a500000000000000ef445fef2ed495c275892206ca533e7411907971013ab83e3b47bd0d692d14d4dc7c835b67d8001ac157e670000000002030eb2540c41025690160a1014c577061596e32e426b712c7ca00000000000000768b89f07044e6130ead292a3f51951adbd2202df447d98789339937fd006bd44880835b67d8001ade09204600' - stream = BytesIO(bytes.fromhex(hex_msg)) - headers = HeadersMessage.parse(stream) - self.assertEqual(len(headers.headers), 2) - for b in headers.headers: - self.assertEqual(b.__class__, Block) - - -class GetDataMessage: - command = b'getdata' - - def __init__(self): - self.data = [] - - def add_data(self, data_type, identifier): - self.data.append((data_type, identifier)) - - def serialize(self): - # start with the number of items as a varint - result = encode_varint(len(self.data)) - for data_type, identifier in self.data: - # data type is 4 bytes little endian - result += int_to_little_endian(data_type, 4) - # identifier needs to be in little endian - result += identifier[::-1] - return result - - -class GetDataMessageTest(TestCase): - - def test_serialize(self): - hex_msg = '020300000030eb2540c41025690160a1014c577061596e32e426b712c7ca00000000000000030000001049847939585b0652fba793661c361223446b6fc41089b8be00000000000000' - get_data = GetDataMessage() - block1 = bytes.fromhex('00000000000000cac712b726e4326e596170574c01a16001692510c44025eb30') - get_data.add_data(FILTERED_BLOCK_DATA_TYPE, block1) - block2 = bytes.fromhex('00000000000000beb88910c46f6b442312361c6693a7fb52065b583979844910') - get_data.add_data(FILTERED_BLOCK_DATA_TYPE, block2) - self.assertEqual(get_data.serialize().hex(), hex_msg) - - -class GenericMessage: - def __init__(self, command, payload): - self.command = command - self.payload = payload - - def serialize(self): - return self.payload - - -class SimpleNode: - - def __init__(self, host, port=None, testnet=False, logging=False): - if port is None: - if testnet: - port = 18333 - else: - port = 8333 - self.testnet = testnet - self.logging = logging - # connect to socket - self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.socket.connect((host, port)) - # create a stream that we can use with the rest of the library - self.stream = self.socket.makefile('rb', None) - - def handshake(self): - '''Do a handshake with the other node. Handshake is sending a version message and getting a verack back.''' - # create a version message - version = VersionMessage() - # send the command - self.send(version) - # wait for a verack message - self.wait_for(VerAckMessage) - - def send(self, message): - '''Send a message to the connected node''' - # create a network envelope - envelope = NetworkEnvelope( - message.command, message.serialize(), testnet=self.testnet) - if self.logging: - print('sending: {}'.format(envelope)) - # send the serialized envelope over the socket using sendall - self.socket.sendall(envelope.serialize()) - - def read(self): - '''Read a message from the socket''' - envelope = NetworkEnvelope.parse(self.stream, testnet=self.testnet) - if self.logging: - print('receiving: {}'.format(envelope)) - return envelope - - def wait_for(self, *message_classes): - '''Wait for one of the messages in the list''' - # initialize the command we have, which should be None - command = None - command_to_class = {m.command: m for m in message_classes} - # loop until the command is in the commands we want - while command not in command_to_class.keys(): - # get the next network message - envelope = self.read() - # set the command to be evaluated - command = envelope.command - # we know how to respond to version and ping, handle that here - if command == VersionMessage.command: - # send verack - self.send(VerAckMessage()) - elif command == PingMessage.command: - # send pong - self.send(PongMessage(envelope.payload)) - # return the envelope parsed as a member of the right message class - return command_to_class[command].parse(envelope.stream()) - - def get_filtered_txs(self, block_hashes): - '''Returns transactions that match the bloom filter''' - from merkleblock import MerkleBlock - # create a getdata message - getdata = GetDataMessage() - # for each block request the filtered block - for block_hash in block_hashes: - # add_data (FILTERED_BLOCK_DATA_TYPE, block_hash) to request the block - getdata.add_data(FILTERED_BLOCK_DATA_TYPE, block_hash) - # send the getdata message - self.send(getdata) - # initialize the results array we'll send back - results = [] - # for each block hash - for block_hash in block_hashes: - # wait for the merkleblock command - mb = self.wait_for(MerkleBlock) - # check that the merkle block's hash is the same as the block hash - if mb.hash() != block_hash: - raise RuntimeError('Wrong block sent') - # check that the merkle block is valid - if not mb.is_valid(): - raise RuntimeError('Merkle Proof is invalid') - # loop through the proved transactions from the Merkle block - for tx_hash in mb.proved_txs(): - # wait for the tx command - tx_obj = self.wait_for(Tx) - # check that the hash matches - if tx_obj.hash() != tx_hash: - raise RuntimeError('Wrong tx sent {} vs {}'.format(tx_hash.hex(), tx_obj.id())) - # add to the results - results.append(tx_obj) - # return the results - return results - - def is_tx_accepted(self, tx_obj): - '''Returns whether a transaction has been accepted on the network''' - # sleep for a second to let everything propagate - sleep(1) - # create a GetDataMessage - get_data = GetDataMessage() - # ask for the tx - get_data.add_data(TX_DATA_TYPE, tx_obj.hash()) - # send the GetDataMessage - self.send(get_data) - # now wait for a response - got_tx = self.wait_for(Tx) - if got_tx.id() == tx_obj.id(): - return True - - -class SimpleNodeTest(TestCase): - - def test_handshake(self): - node = SimpleNode('tbtc.programmingblockchain.com', testnet=True) - node.handshake() - - def test_get_filtered_txs(self): - from bloomfilter import BloomFilter - bf = BloomFilter(30, 5, 90210) - h160 = decode_base58('mseRGXB89UTFVkWJhTRTzzZ9Ujj4ZPbGK5') - bf.add(h160) - node = SimpleNode('tbtc.programmingblockchain.com', testnet=True) - node.handshake() - node.send(bf.filterload()) - block_hash = bytes.fromhex('00000000000377db7fde98411876c53e318a395af7304de298fd47b7c549d125') - txs = node.get_filtered_txs([block_hash]) - self.assertEqual(txs[0].id(), '0c024b9d3aa2ae8faae96603b8d40c88df2fc6bf50b3f446295206f70f3cf6ad') - self.assertEqual(txs[1].id(), '0886537e27969a12478e0d33707bf6b9fe4fdaec8d5d471b5304453b04135e7e') - self.assertEqual(txs[2].id(), '23d4effc88b80fb7dbcc2e6a0b0af9821c6fe3bb4c8dc3b61bcab7c45f0f6888') diff --git a/session/complete/op.py b/session/complete/op.py index d31c882..0c8631d 100644 --- a/session/complete/op.py +++ b/session/complete/op.py @@ -1260,10 +1260,19 @@ OP_CODE_NAMES = { class TapScriptTest(TestCase): def test_opchecksigadd(self): from tx import Tx + hex_tx = "010000000001022373cf02ce7df6500ae46a4a0fbbb1b636d2debed8f2df91e2415627397a34090000000000fdffffff88c23d928893cd3509845516cf8411b7cab2738c054cc5ce7e4bde9586997c770000000000fdffffff0200000000000000002b6a29676d20746170726f6f7420f09fa5952068747470733a2f2f626974636f696e6465766b69742e6f72676e9e1100000000001976a91405070d0290da457409a37db2e294c1ffbc52738088ac04410adf90fd381d4a13c3e73740b337b230701189ed94abcb4030781635f035e6d3b50b8506470a68292a2bc74745b7a5732a28254b5f766f09e495929ec308090b01004620c13e6d193f5d04506723bd67abcc5d31b610395c445ac6744cb0a1846b3aabaeac20b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333cba519c21c0000000000000000000000000000000000000000000000000000000000000000104414636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301004620c13e6d193f5d04506723bd67abcc5d31b610395c445ac6744cb0a1846b3aabaeac20b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333cba519c21c0000000000000000000000000000000000000000000000000000000000000000100000000" stream = BytesIO(bytes.fromhex(hex_tx)) tx_obj = Tx.parse(stream) self.assertTrue(tx_obj.verify()) - stack = [bytes.fromhex("4636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301"),b"",bytes.fromhex("b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333c")] + stack = [ + bytes.fromhex( + "4636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301" + ), + b"", + bytes.fromhex( + "b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333c" + ), + ] op_checksigadd_schnorr(stack, tx_obj, 1) - self.assertEqual(stack, [b'\x01']) + self.assertEqual(stack, [b"\x01"]) diff --git a/session/complete/script.py b/session/complete/script.py index 87ecf05..4acf583 100644 --- a/session/complete/script.py +++ b/session/complete/script.py @@ -1,7 +1,6 @@ from io import BytesIO from bech32 import decode_bech32, encode_bech32_checksum -from ecc import S256Point from helper import ( decode_base58, encode_base58_checksum, @@ -37,7 +36,7 @@ class Script: def __repr__(self): result = "" for command in self.commands: - if type(command) == int: + if isinstance(command, int): if OP_CODE_NAMES.get(command): name = OP_CODE_NAMES.get(command) else: @@ -123,7 +122,7 @@ class Script: # go through each command for command in self.commands: # if the command is an integer, it's an op code - if type(command) == int: + if isinstance(command, int): # turn the command into a single byte integer using int_to_byte result += int_to_byte(command) else: @@ -166,7 +165,7 @@ class Script: op_lookup = OP_CODE_FUNCTIONS while len(commands) > 0: command = commands.pop(0) - if type(command) == int: + if isinstance(command, int): # do what the op code says operation = op_lookup[command] if command in (99, 100): @@ -198,7 +197,7 @@ class Script: if ( len(commands) == 3 and commands[0] == 0xA9 - and type(commands[1]) == bytes + and isinstance(commands[1], bytes) and len(commands[1]) == 20 and commands[2] == 0x87 ): @@ -264,7 +263,9 @@ class Script: external_pubkey = control_block.external_pubkey(tap_script) # the tweak point should be what's on the stack if external_pubkey.parity != control_block.parity: - print(f"bad external pubkey parity {external_pubkey.parity} vs {control_block.parity}") + print( + f"bad external pubkey parity {external_pubkey.parity} vs {control_block.parity}" + ) return False if external_pubkey.xonly() != stack.pop(): print("bad external pubkey") @@ -289,7 +290,7 @@ class Script: len(self.commands) == 5 and self.commands[0] == 0x76 and self.commands[1] == 0xA9 - and type(self.commands[2]) == bytes + and isinstance(self.commands[2], bytes) and len(self.commands[2]) == 20 and self.commands[3] == 0x88 and self.commands[4] == 0xAC @@ -303,7 +304,7 @@ class Script: return ( len(self.commands) == 3 and self.commands[0] == 0xA9 - and type(self.commands[1]) == bytes + and isinstance(self.commands[1], bytes) and len(self.commands[1]) == 20 and self.commands[2] == 0x87 ) @@ -314,7 +315,7 @@ class Script: return ( len(self.commands) == 2 and self.commands[0] == 0x00 - and type(self.commands[1]) == bytes + and isinstance(self.commands[1], bytes) and len(self.commands[1]) == 20 ) @@ -324,7 +325,7 @@ class Script: return ( len(self.commands) == 2 and self.commands[0] == 0x00 - and type(self.commands[1]) == bytes + and isinstance(self.commands[1], bytes) and len(self.commands[1]) == 32 ) @@ -334,7 +335,7 @@ class Script: return ( len(self.commands) == 2 and self.commands[0] == 0x51 - and type(self.commands[1]) == bytes + and isinstance(self.commands[1], bytes) and len(self.commands[1]) == 32 ) @@ -375,7 +376,7 @@ class ScriptPubKey(Script): class P2PKHScriptPubKey(ScriptPubKey): def __init__(self, h160): super().__init__() - if type(h160) != bytes: + if not isinstance(h160, bytes): raise TypeError("To initialize P2PKHScriptPubKey, a hash160 is needed") self.commands = [0x76, 0xA9, h160, 0x88, 0xAC] @@ -394,7 +395,7 @@ class P2PKHScriptPubKey(ScriptPubKey): class P2SHScriptPubKey(ScriptPubKey): def __init__(self, h160): super().__init__() - if type(h160) != bytes: + if not isinstance(h160, bytes): raise TypeError("To initialize P2SHScriptPubKey, a hash160 is needed") self.commands = [0xA9, h160, 0x87] @@ -515,7 +516,7 @@ class SegwitPubKey(ScriptPubKey): class P2WPKHScriptPubKey(SegwitPubKey): def __init__(self, h160): super().__init__() - if type(h160) != bytes: + if not isinstance(h160, bytes): raise TypeError("To initialize P2WPKHScriptPubKey, a hash160 is needed") self.commands = [0x00, h160] @@ -523,7 +524,7 @@ class P2WPKHScriptPubKey(SegwitPubKey): class P2WSHScriptPubKey(SegwitPubKey): def __init__(self, s256): super().__init__() - if type(s256) != bytes: + if not isinstance(s256, bytes): raise TypeError("To initialize P2WSHScriptPubKey, a sha256 is needed") self.commands = [0x00, s256] @@ -533,7 +534,7 @@ class P2TRScriptPubKey(ScriptPubKey): super().__init__() if hasattr(point, "xonly"): raw_point = point.xonly() - elif type(point) == bytes: + elif isinstance(point, bytes): raw_point = point else: raise TypeError( @@ -585,8 +586,8 @@ class WitnessScript(Script): def is_p2wsh_multisig(self): return ( OP_CODE_NAMES[self.commands[-1]] == "OP_CHECKMULTISIG" - and type(self.commands[0]) == int - and type(self.commands[-2]) == int + and isinstance(self.commands[0], int) + and isinstance(self.commands[-2], int) ) def get_quorum(self): diff --git a/session/complete/taproot.py b/session/complete/taproot.py index f1e7f22..6d69b6f 100644 --- a/session/complete/taproot.py +++ b/session/complete/taproot.py @@ -5,27 +5,21 @@ from ecc import S256Point from hash import ( hash_tapbranch, hash_tapleaf, - hash_taptweak, -) -from helper import ( - int_to_byte, -) -from op import ( - encode_minimal_num, - number_to_op_code, ) +from helper import int_to_byte +from op import encode_minimal_num from script import ScriptPubKey from timelock import Locktime, Sequence def locktime_commands(locktime): - assert type(locktime) == Locktime, f"{locktime} needs to be Locktime" + assert isinstance(locktime, Locktime), f"{locktime} needs to be Locktime" # 0xB1 is OP_CLTV, 0x75 is OP_DROP return [encode_minimal_num(locktime), 0xB1, 0x75] def sequence_commands(sequence): - assert type(sequence) == Sequence, f"{sequence} needs to be Sequence" + assert isinstance(sequence, Sequence), f"{sequence} needs to be Sequence" # 0xB2 is OP_CSV, 0x75 is OP_DROP return [encode_minimal_num(sequence), 0xB2, 0x75] @@ -40,7 +34,7 @@ class TapLeaf: def __eq__(self, other): return ( - type(self) == type(other) + type(self) is type(other) and self.tapleaf_version == other.tapleaf_version and self.tap_script == other.tap_script ) @@ -190,32 +184,71 @@ class ControlBlock: class TapRootTest(TestCase): def test_tapleaf_hash(self): - tap_script = TapScript.parse(BytesIO(bytes.fromhex("4420331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeecad20158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16fac"))) + tap_script = TapScript.parse( + BytesIO( + bytes.fromhex( + "4420331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeecad20158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16fac" + ) + ) + ) tap_leaf = TapLeaf(tap_script) - self.assertEqual(tap_leaf.hash().hex(), "d1b3ee8e8c175e5db7e2ff7a87435e8f751d148b77fb1f00e14ff8ffa1c09a40") + self.assertEqual( + tap_leaf.hash().hex(), + "d1b3ee8e8c175e5db7e2ff7a87435e8f751d148b77fb1f00e14ff8ffa1c09a40", + ) def test_tapbranch_hash(self): - pubkey_1 = S256Point.parse(bytes.fromhex("331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec")).xonly() - pubkey_2 = S256Point.parse(bytes.fromhex("158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f")).xonly() + pubkey_1 = S256Point.parse( + bytes.fromhex( + "331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec" + ) + ).xonly() + pubkey_2 = S256Point.parse( + bytes.fromhex( + "158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f" + ) + ).xonly() tap_script_1 = TapScript([pubkey_1, 0xAC]) tap_script_2 = TapScript([pubkey_2, 0xAC]) tap_leaf_1 = TapLeaf(tap_script_1) tap_leaf_2 = TapLeaf(tap_script_2) tap_branch = TapBranch(tap_leaf_1, tap_leaf_2) - self.assertEqual(tap_branch.hash().hex(), "eb792962b250f4a49a572ba7136674a28f2398a49c4c078fecfc839260da6151") + self.assertEqual( + tap_branch.hash().hex(), + "eb792962b250f4a49a572ba7136674a28f2398a49c4c078fecfc839260da6151", + ) def test_control_block(self): - raw_cb = bytes.fromhex("c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e") + raw_cb = bytes.fromhex( + "c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e" + ) cb = ControlBlock.parse(raw_cb) - pubkey = bytes.fromhex("027aa71d9cdb31cd8fe037a6f441e624fe478a2deece7affa840312b14e971a4") + pubkey = bytes.fromhex( + "027aa71d9cdb31cd8fe037a6f441e624fe478a2deece7affa840312b14e971a4" + ) tap_script = TapScript([pubkey, 0xAC]) external_pubkey = cb.external_pubkey(tap_script) - self.assertEqual(external_pubkey.xonly().hex(), "cbe433288ae1eede1f24818f08046d4e647fef808cfbbffc7d10f24a698eecfd") + self.assertEqual( + external_pubkey.xonly().hex(), + "cbe433288ae1eede1f24818f08046d4e647fef808cfbbffc7d10f24a698eecfd", + ) def test_control_block_2(self): - pubkey_1 = S256Point.parse(bytes.fromhex("331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec")) - pubkey_2 = S256Point.parse(bytes.fromhex("158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f")) - pubkey_3 = S256Point.parse(bytes.fromhex("582662e8e47df59489d6756615aa3db3fa3bbaa75a424b9c78036265858f5544")) + pubkey_1 = S256Point.parse( + bytes.fromhex( + "331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec" + ) + ) + pubkey_2 = S256Point.parse( + bytes.fromhex( + "158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f" + ) + ) + pubkey_3 = S256Point.parse( + bytes.fromhex( + "582662e8e47df59489d6756615aa3db3fa3bbaa75a424b9c78036265858f5544" + ) + ) tap_script_1 = TapScript([pubkey_1.xonly(), 0xAC]) tap_script_2 = TapScript([pubkey_2.xonly(), 0xAC]) tap_script_3 = TapScript([pubkey_3.xonly(), 0xAC]) @@ -225,7 +258,11 @@ class TapRootTest(TestCase): tap_branch_1 = TapBranch(tap_leaf_1, tap_leaf_2) tap_root = TapBranch(tap_branch_1, tap_leaf_3) hex_cb = "c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e" - internal_pubkey = S256Point.parse(bytes.fromhex("407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e5")) + internal_pubkey = S256Point.parse( + bytes.fromhex( + "407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e5" + ) + ) cb = tap_root.control_block(internal_pubkey, tap_leaf_2) self.assertEqual(cb.serialize().hex(), hex_cb) diff --git a/session/complete/timelock.py b/session/complete/timelock.py index 71b6518..4190338 100644 --- a/session/complete/timelock.py +++ b/session/complete/timelock.py @@ -42,7 +42,7 @@ class Locktime(int): ) def __lt__(self, other): - if type(other) == int: + if isinstance(other, int): return super().__lt__(other) if self.is_comparable(other): return super().__lt__(other) @@ -108,7 +108,7 @@ class Sequence(int): ) def __lt__(self, other): - if type(other) == int: + if isinstance(other, int): return super().__lt__(other) if self.is_comparable(other): return self & SEQUENCE_MASK < other & SEQUENCE_MASK diff --git a/session/complete/tx.py b/session/complete/tx.py index 84b2a90..367c283 100644 --- a/session/complete/tx.py +++ b/session/complete/tx.py @@ -4,7 +4,6 @@ from urllib.request import Request, urlopen import json from bech32 import decode_bech32 -from ecc import SchnorrSignature from hash import hash_tapsighash from helper import ( big_endian_to_int, diff --git a/session/ecc.py b/session/ecc.py index 13a6349..92fc741 100644 --- a/session/ecc.py +++ b/session/ecc.py @@ -40,7 +40,7 @@ class FieldElement: # this should be the inverse of the == operator return not (self == other) - def __repr__(self): + def __reprv__(self): return f"FieldElement_{self.prime}({self.num})" def __add__(self, other): @@ -220,7 +220,7 @@ class S256Field(FieldElement): class S256Point(Point): def __init__(self, x, y, a=None, b=None): a, b = S256Field(A), S256Field(B) - if type(x) == int: + if isinstance(x, int): super().__init__(x=S256Field(x), y=S256Field(y), a=a, b=b) else: super().__init__(x=x, y=y, a=a, b=b) @@ -247,7 +247,7 @@ class S256Point(Point): def __add__(self, other): """If other is an int, multiplies scalar by generator, adds result to current point""" - if type(other) == int: + if isinstance(other, int): return super().__add__(other * G) else: return super().__add__(other) @@ -276,13 +276,13 @@ class S256Point(Point): # otherwise, convert the x coordinate to Big Endian 32 bytes return int_to_big_endian(self.x.num, 32) - def tweak(self, merkle_root=b''): + def tweak(self, merkle_root=b""): """returns the tweak for use in p2tr if there's no script path""" # take the hash_taptweak of the xonly tweak = hash_taptweak(self.xonly() + merkle_root) return tweak - def tweaked_key(self, merkle_root=b''): + def tweaked_key(self, merkle_root=b""): """Creates the tweaked external key for a particular tweak.""" # Get the tweak from the tweak method tweak = self.tweak(merkle_root) @@ -319,10 +319,11 @@ class S256Point(Point): """Returns the RedeemScript for a p2sh-p2wpkh redemption""" return self.p2wpkh_script().redeem_script() - def p2tr_script(self, merkle_root=b''): + def p2tr_script(self, merkle_root=b""): """Returns the p2tr ScriptPubKey object""" # avoid circular dependency from script import P2TRScriptPubKey + # get the external pubkey external_pubkey = self.tweaked_key(merkle_root) # return the P2TRScriptPubKey object @@ -340,7 +341,7 @@ class S256Point(Point): """Returns the p2sh-p2wpkh base58 address string""" return self.p2wpkh_script().p2sh_address(network) - def p2tr_address(self, merkle_root=b'', network="mainnet"): + def p2tr_address(self, merkle_root=b"", network="mainnet"): """Returns the p2tr bech32m address string""" return self.p2tr_script(merkle_root).address(network) @@ -463,34 +464,52 @@ class TapRootTest(TestCase): hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f" bytes_x = bytes.fromhex(hex_x) point = S256Point.parse(bytes_x) - self.assertEqual(big_endian_to_int(point.tweak()), 67856885919469038205338506436839711332207972226461300386890540598589929564995) + self.assertEqual( + big_endian_to_int(point.tweak()), + 67856885919469038205338506436839711332207972226461300386890540598589929564995, + ) def test_tweaked_key(self): hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f" bytes_x = bytes.fromhex(hex_x) point = S256Point.parse(bytes_x) - self.assertEqual(point.tweaked_key().xonly().hex(), "5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf") + self.assertEqual( + point.tweaked_key().xonly().hex(), + "5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf", + ) def test_p2tr_script(self): hex_x = "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f" bytes_x = bytes.fromhex(hex_x) point = S256Point.parse(bytes_x) - self.assertEqual(point.p2tr_script().__repr__(), "OP_1 5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf") + self.assertEqual( + point.p2tr_script().__repr__(), + "OP_1 5b9cfb912266844a6265820f268052b6c500a94ae498c8b50acc8f1c43db9daf", + ) class SchnorrTest(TestCase): def test_verify(self): msg = sha256(b"I attest to understanding Schnorr Signatures") - sig_raw = bytes.fromhex("f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a") + sig_raw = bytes.fromhex( + "f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a" + ) sig = SchnorrSignature.parse(sig_raw) - point = S256Point.parse(bytes.fromhex("f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f")) + point = S256Point.parse( + bytes.fromhex( + "f01d6b9018ab421dd410404cb869072065522bf85734008f105cf385a023a80f" + ) + ) self.assertTrue(point.verify_schnorr(msg, sig)) def test_sign(self): msg = sha256(b"I attest to understanding Schnorr Signatures") priv = PrivateKey(12345) sig = priv.sign_schnorr(msg) - self.assertEqual(sig.serialize().hex(), "f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a") + self.assertEqual( + sig.serialize().hex(), + "f3626c99fe36167e5fef6b95e5ed6e5687caa4dc828986a7de8f9423c0f77f9bc73091ed86085ce43de0e255b3d0afafc7eee41ddc9970c3dc8472acfcdfd39a", + ) class Signature: @@ -691,7 +710,7 @@ class PrivateKey: # encode_base58_checksum the whole thing return encode_base58_checksum(prefix + secret_bytes + suffix) - def tweaked_key(self, merkle_root=b''): + def tweaked_key(self, merkle_root=b""): # get the tweak from the point's tweak method tweak = self.point.tweak(merkle_root) # t is the tweak interpreted as big endian @@ -730,7 +749,6 @@ class PrivateKey: class PrivateKeyTest(TestCase): - def test_tweaked_key(self): secret = randint(1, N) priv = PrivateKey(secret) diff --git a/session/helper.py b/session/helper.py index b40af40..ba819a0 100644 --- a/session/helper.py +++ b/session/helper.py @@ -1,8 +1,6 @@ import hashlib -import hmac import re -from base64 import b64decode, b64encode from unittest import TestSuite, TextTestRunner try: diff --git a/session/op.py b/session/op.py index d31c882..0c8631d 100644 --- a/session/op.py +++ b/session/op.py @@ -1260,10 +1260,19 @@ OP_CODE_NAMES = { class TapScriptTest(TestCase): def test_opchecksigadd(self): from tx import Tx + hex_tx = "010000000001022373cf02ce7df6500ae46a4a0fbbb1b636d2debed8f2df91e2415627397a34090000000000fdffffff88c23d928893cd3509845516cf8411b7cab2738c054cc5ce7e4bde9586997c770000000000fdffffff0200000000000000002b6a29676d20746170726f6f7420f09fa5952068747470733a2f2f626974636f696e6465766b69742e6f72676e9e1100000000001976a91405070d0290da457409a37db2e294c1ffbc52738088ac04410adf90fd381d4a13c3e73740b337b230701189ed94abcb4030781635f035e6d3b50b8506470a68292a2bc74745b7a5732a28254b5f766f09e495929ec308090b01004620c13e6d193f5d04506723bd67abcc5d31b610395c445ac6744cb0a1846b3aabaeac20b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333cba519c21c0000000000000000000000000000000000000000000000000000000000000000104414636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301004620c13e6d193f5d04506723bd67abcc5d31b610395c445ac6744cb0a1846b3aabaeac20b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333cba519c21c0000000000000000000000000000000000000000000000000000000000000000100000000" stream = BytesIO(bytes.fromhex(hex_tx)) tx_obj = Tx.parse(stream) self.assertTrue(tx_obj.verify()) - stack = [bytes.fromhex("4636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301"),b"",bytes.fromhex("b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333c")] + stack = [ + bytes.fromhex( + "4636070d21adc8280735383102f7a0f5978cea257777a23934dd3b458b79bf388aca218e39e23533a059da173e402c4fc5e3375e1f839efb22e9a5c2a815b07301" + ), + b"", + bytes.fromhex( + "b0e2e48ad7c3d776cf6f2395c504dc19551268ea7429496726c5d5bf72f9333c" + ), + ] op_checksigadd_schnorr(stack, tx_obj, 1) - self.assertEqual(stack, [b'\x01']) + self.assertEqual(stack, [b"\x01"]) diff --git a/session/script.py b/session/script.py index 87ecf05..4acf583 100644 --- a/session/script.py +++ b/session/script.py @@ -1,7 +1,6 @@ from io import BytesIO from bech32 import decode_bech32, encode_bech32_checksum -from ecc import S256Point from helper import ( decode_base58, encode_base58_checksum, @@ -37,7 +36,7 @@ class Script: def __repr__(self): result = "" for command in self.commands: - if type(command) == int: + if isinstance(command, int): if OP_CODE_NAMES.get(command): name = OP_CODE_NAMES.get(command) else: @@ -123,7 +122,7 @@ class Script: # go through each command for command in self.commands: # if the command is an integer, it's an op code - if type(command) == int: + if isinstance(command, int): # turn the command into a single byte integer using int_to_byte result += int_to_byte(command) else: @@ -166,7 +165,7 @@ class Script: op_lookup = OP_CODE_FUNCTIONS while len(commands) > 0: command = commands.pop(0) - if type(command) == int: + if isinstance(command, int): # do what the op code says operation = op_lookup[command] if command in (99, 100): @@ -198,7 +197,7 @@ class Script: if ( len(commands) == 3 and commands[0] == 0xA9 - and type(commands[1]) == bytes + and isinstance(commands[1], bytes) and len(commands[1]) == 20 and commands[2] == 0x87 ): @@ -264,7 +263,9 @@ class Script: external_pubkey = control_block.external_pubkey(tap_script) # the tweak point should be what's on the stack if external_pubkey.parity != control_block.parity: - print(f"bad external pubkey parity {external_pubkey.parity} vs {control_block.parity}") + print( + f"bad external pubkey parity {external_pubkey.parity} vs {control_block.parity}" + ) return False if external_pubkey.xonly() != stack.pop(): print("bad external pubkey") @@ -289,7 +290,7 @@ class Script: len(self.commands) == 5 and self.commands[0] == 0x76 and self.commands[1] == 0xA9 - and type(self.commands[2]) == bytes + and isinstance(self.commands[2], bytes) and len(self.commands[2]) == 20 and self.commands[3] == 0x88 and self.commands[4] == 0xAC @@ -303,7 +304,7 @@ class Script: return ( len(self.commands) == 3 and self.commands[0] == 0xA9 - and type(self.commands[1]) == bytes + and isinstance(self.commands[1], bytes) and len(self.commands[1]) == 20 and self.commands[2] == 0x87 ) @@ -314,7 +315,7 @@ class Script: return ( len(self.commands) == 2 and self.commands[0] == 0x00 - and type(self.commands[1]) == bytes + and isinstance(self.commands[1], bytes) and len(self.commands[1]) == 20 ) @@ -324,7 +325,7 @@ class Script: return ( len(self.commands) == 2 and self.commands[0] == 0x00 - and type(self.commands[1]) == bytes + and isinstance(self.commands[1], bytes) and len(self.commands[1]) == 32 ) @@ -334,7 +335,7 @@ class Script: return ( len(self.commands) == 2 and self.commands[0] == 0x51 - and type(self.commands[1]) == bytes + and isinstance(self.commands[1], bytes) and len(self.commands[1]) == 32 ) @@ -375,7 +376,7 @@ class ScriptPubKey(Script): class P2PKHScriptPubKey(ScriptPubKey): def __init__(self, h160): super().__init__() - if type(h160) != bytes: + if not isinstance(h160, bytes): raise TypeError("To initialize P2PKHScriptPubKey, a hash160 is needed") self.commands = [0x76, 0xA9, h160, 0x88, 0xAC] @@ -394,7 +395,7 @@ class P2PKHScriptPubKey(ScriptPubKey): class P2SHScriptPubKey(ScriptPubKey): def __init__(self, h160): super().__init__() - if type(h160) != bytes: + if not isinstance(h160, bytes): raise TypeError("To initialize P2SHScriptPubKey, a hash160 is needed") self.commands = [0xA9, h160, 0x87] @@ -515,7 +516,7 @@ class SegwitPubKey(ScriptPubKey): class P2WPKHScriptPubKey(SegwitPubKey): def __init__(self, h160): super().__init__() - if type(h160) != bytes: + if not isinstance(h160, bytes): raise TypeError("To initialize P2WPKHScriptPubKey, a hash160 is needed") self.commands = [0x00, h160] @@ -523,7 +524,7 @@ class P2WPKHScriptPubKey(SegwitPubKey): class P2WSHScriptPubKey(SegwitPubKey): def __init__(self, s256): super().__init__() - if type(s256) != bytes: + if not isinstance(s256, bytes): raise TypeError("To initialize P2WSHScriptPubKey, a sha256 is needed") self.commands = [0x00, s256] @@ -533,7 +534,7 @@ class P2TRScriptPubKey(ScriptPubKey): super().__init__() if hasattr(point, "xonly"): raw_point = point.xonly() - elif type(point) == bytes: + elif isinstance(point, bytes): raw_point = point else: raise TypeError( @@ -585,8 +586,8 @@ class WitnessScript(Script): def is_p2wsh_multisig(self): return ( OP_CODE_NAMES[self.commands[-1]] == "OP_CHECKMULTISIG" - and type(self.commands[0]) == int - and type(self.commands[-2]) == int + and isinstance(self.commands[0], int) + and isinstance(self.commands[-2], int) ) def get_quorum(self): diff --git a/session/taproot.py b/session/taproot.py index f1e7f22..6d69b6f 100644 --- a/session/taproot.py +++ b/session/taproot.py @@ -5,27 +5,21 @@ from ecc import S256Point from hash import ( hash_tapbranch, hash_tapleaf, - hash_taptweak, -) -from helper import ( - int_to_byte, -) -from op import ( - encode_minimal_num, - number_to_op_code, ) +from helper import int_to_byte +from op import encode_minimal_num from script import ScriptPubKey from timelock import Locktime, Sequence def locktime_commands(locktime): - assert type(locktime) == Locktime, f"{locktime} needs to be Locktime" + assert isinstance(locktime, Locktime), f"{locktime} needs to be Locktime" # 0xB1 is OP_CLTV, 0x75 is OP_DROP return [encode_minimal_num(locktime), 0xB1, 0x75] def sequence_commands(sequence): - assert type(sequence) == Sequence, f"{sequence} needs to be Sequence" + assert isinstance(sequence, Sequence), f"{sequence} needs to be Sequence" # 0xB2 is OP_CSV, 0x75 is OP_DROP return [encode_minimal_num(sequence), 0xB2, 0x75] @@ -40,7 +34,7 @@ class TapLeaf: def __eq__(self, other): return ( - type(self) == type(other) + type(self) is type(other) and self.tapleaf_version == other.tapleaf_version and self.tap_script == other.tap_script ) @@ -190,32 +184,71 @@ class ControlBlock: class TapRootTest(TestCase): def test_tapleaf_hash(self): - tap_script = TapScript.parse(BytesIO(bytes.fromhex("4420331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeecad20158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16fac"))) + tap_script = TapScript.parse( + BytesIO( + bytes.fromhex( + "4420331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeecad20158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16fac" + ) + ) + ) tap_leaf = TapLeaf(tap_script) - self.assertEqual(tap_leaf.hash().hex(), "d1b3ee8e8c175e5db7e2ff7a87435e8f751d148b77fb1f00e14ff8ffa1c09a40") + self.assertEqual( + tap_leaf.hash().hex(), + "d1b3ee8e8c175e5db7e2ff7a87435e8f751d148b77fb1f00e14ff8ffa1c09a40", + ) def test_tapbranch_hash(self): - pubkey_1 = S256Point.parse(bytes.fromhex("331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec")).xonly() - pubkey_2 = S256Point.parse(bytes.fromhex("158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f")).xonly() + pubkey_1 = S256Point.parse( + bytes.fromhex( + "331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec" + ) + ).xonly() + pubkey_2 = S256Point.parse( + bytes.fromhex( + "158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f" + ) + ).xonly() tap_script_1 = TapScript([pubkey_1, 0xAC]) tap_script_2 = TapScript([pubkey_2, 0xAC]) tap_leaf_1 = TapLeaf(tap_script_1) tap_leaf_2 = TapLeaf(tap_script_2) tap_branch = TapBranch(tap_leaf_1, tap_leaf_2) - self.assertEqual(tap_branch.hash().hex(), "eb792962b250f4a49a572ba7136674a28f2398a49c4c078fecfc839260da6151") + self.assertEqual( + tap_branch.hash().hex(), + "eb792962b250f4a49a572ba7136674a28f2398a49c4c078fecfc839260da6151", + ) def test_control_block(self): - raw_cb = bytes.fromhex("c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e") + raw_cb = bytes.fromhex( + "c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e" + ) cb = ControlBlock.parse(raw_cb) - pubkey = bytes.fromhex("027aa71d9cdb31cd8fe037a6f441e624fe478a2deece7affa840312b14e971a4") + pubkey = bytes.fromhex( + "027aa71d9cdb31cd8fe037a6f441e624fe478a2deece7affa840312b14e971a4" + ) tap_script = TapScript([pubkey, 0xAC]) external_pubkey = cb.external_pubkey(tap_script) - self.assertEqual(external_pubkey.xonly().hex(), "cbe433288ae1eede1f24818f08046d4e647fef808cfbbffc7d10f24a698eecfd") + self.assertEqual( + external_pubkey.xonly().hex(), + "cbe433288ae1eede1f24818f08046d4e647fef808cfbbffc7d10f24a698eecfd", + ) def test_control_block_2(self): - pubkey_1 = S256Point.parse(bytes.fromhex("331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec")) - pubkey_2 = S256Point.parse(bytes.fromhex("158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f")) - pubkey_3 = S256Point.parse(bytes.fromhex("582662e8e47df59489d6756615aa3db3fa3bbaa75a424b9c78036265858f5544")) + pubkey_1 = S256Point.parse( + bytes.fromhex( + "331a8f6a14e1b41a6b523ddb505fbc0662a6446bd42408692497297d3474aeec" + ) + ) + pubkey_2 = S256Point.parse( + bytes.fromhex( + "158a49d62c384c539a453e41a70214cfb85184954ae5c8df4b47eb74d58ff16f" + ) + ) + pubkey_3 = S256Point.parse( + bytes.fromhex( + "582662e8e47df59489d6756615aa3db3fa3bbaa75a424b9c78036265858f5544" + ) + ) tap_script_1 = TapScript([pubkey_1.xonly(), 0xAC]) tap_script_2 = TapScript([pubkey_2.xonly(), 0xAC]) tap_script_3 = TapScript([pubkey_3.xonly(), 0xAC]) @@ -225,7 +258,11 @@ class TapRootTest(TestCase): tap_branch_1 = TapBranch(tap_leaf_1, tap_leaf_2) tap_root = TapBranch(tap_branch_1, tap_leaf_3) hex_cb = "c0407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e576f5c1cdfc8b07dc8edca5bef2b4991201c5a0e18b1dbbcfe00ef2295b8f6dff5dd270ec91aa5644d907059400edfd98e307a6f1c6fe3a2d1d4550674ff6bc6e" - internal_pubkey = S256Point.parse(bytes.fromhex("407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e5")) + internal_pubkey = S256Point.parse( + bytes.fromhex( + "407910a4cfa5fe195ad4844b6069489fcb429f27dff811c65e99f7d776e943e5" + ) + ) cb = tap_root.control_block(internal_pubkey, tap_leaf_2) self.assertEqual(cb.serialize().hex(), hex_cb) diff --git a/session/timelock.py b/session/timelock.py index 71b6518..4190338 100644 --- a/session/timelock.py +++ b/session/timelock.py @@ -42,7 +42,7 @@ class Locktime(int): ) def __lt__(self, other): - if type(other) == int: + if isinstance(other, int): return super().__lt__(other) if self.is_comparable(other): return super().__lt__(other) @@ -108,7 +108,7 @@ class Sequence(int): ) def __lt__(self, other): - if type(other) == int: + if isinstance(other, int): return super().__lt__(other) if self.is_comparable(other): return self & SEQUENCE_MASK < other & SEQUENCE_MASK diff --git a/session/tx.py b/session/tx.py index 84b2a90..367c283 100644 --- a/session/tx.py +++ b/session/tx.py @@ -4,7 +4,6 @@ from urllib.request import Request, urlopen import json from bech32 import decode_bech32 -from ecc import SchnorrSignature from hash import hash_tapsighash from helper import ( big_endian_to_int,