Source code for dposlib.ark.crypto

# -*- coding: utf-8 -*-
# © Toons

import base58
import hashlib

from dposlib import BytesIO, PY3
from dposlib.ark import secp256k1
from dposlib.blockchain import cfg
from dposlib.util.bin import hexlify, unhexlify, pack, pack_bytes
from dposlib.ark.secp256k1 import schnorr, ecdsa

if PY3:
    unicode = str


[docs]def getKeys(secret): """ Generate keyring containing secp256k1 keys-apir and wallet import format (WIF). Args: secret (:class:`str`, :class:`bytes` or :class:`int`): anything that could issue a private key on secp256k1 curve Returns: :class:`dict`: public, private and WIF keys """ if isinstance(secret, (str, bytes, unicode)): try: seed = unhexlify(secret) except Exception: seed = secp256k1.hash_sha256(secret) else: seed = secp256k1.bytes_from_int(secret) publicKey = secp256k1.PublicKey.from_seed(seed) return { "publicKey": hexlify(publicKey.encode()), "privateKey": hexlify(seed), "wif": getWIF(seed) }
[docs]def getMultiSignaturePublicKey(minimum, *publicKeys): """ Compute ARK multi signature public key according to `ARK AIP #18 <https://github.com/ArkEcosystem/AIPs/blob/master/AIPS/aip-18\ .md>`_. Args: minimum (:class:`int`): minimum signature required publicKeys (:class:`list of str`): public key list Returns: :class:`str`: the multisignature public key """ if 2 > minimum > len(publicKeys): raise ValueError("min signatures value error") secret = "%02x" % minimum P = secp256k1.PublicKey.from_secret( ("0" if len(secret) % 2 else "") + secret ) for publicKey in publicKeys: P = P + secp256k1.PublicKey.decode(unhexlify(publicKey)) return hexlify(P.encode())
[docs]def getAddressFromSecret(secret, marker=None): """ Compute ARK address from secret. Args: secret (:class:`str`): secret string marker (:class:`int`): network marker (optional) Returns: :class:`str`: the address """ return getAddress(getKeys(secret)["publicKey"], marker)
[docs]def getAddress(publicKey, marker=None): """ Compute ARK address from publicKey. Args: publicKey (:class:`str`): public key marker (:class:`int`): network marker (optional) Returns: :class:`str`: the address """ if marker and isinstance(marker, int): marker = hex(marker)[2:] else: marker = None ripemd160 = hashlib.new('ripemd160', unhexlify(publicKey)).digest()[:20] seed = unhexlify(cfg.marker if not marker else marker) + ripemd160 b58 = base58.b58encode_check(seed) return b58.decode('utf-8') if isinstance(b58, bytes) else b58
[docs]def getWIF(seed): """ Compute WIF address from seed. Args: seed (:class:`bytes`): a sha256 sequence bytes Returns: :class:`str`: WIF address """ if hasattr(cfg, "wif"): seed = unhexlify(cfg.wif) + seed[:32] + b"\x01" # \x01 -> compressed b58 = base58.b58encode_check(seed) return str(b58.decode('utf-8') if isinstance(b58, bytes) else b58)
[docs]def wifSignature(tx, wif): """ Generate transaction signature using private key. Args: tx (:class:`dict` or :class:`Transaction`): transaction description wif (:class:`str`): wif key Returns: :class:`str`: signature """ return wifSignatureFromBytes(getBytes(tx), wif)
[docs]def wifSignatureFromBytes(data, wif): """ Generate signature from data using WIF key. Args: data (:class:`bytes`): bytes sequence wif (:class:`str`): wif key Returns: :class:`str`: signature """ seed = base58.b58decode_check( str(wif) if not isinstance(wif, bytes) else wif )[1:33] return getSignatureFromBytes(data, hexlify(seed))
[docs]def getSignature(tx, privateKey, **options): """ Generate transaction signature using private key. Args: tx (:class:`dict` or :class:`Transaction`): transaction description privateKey (:class:`str`): private key as hex string Keyword args: exclude_sig (:class:`bool`): exclude signature during tx serialization [defalut: True] exclude_multi_sig(:class:`bool`): exclude signatures during tx serialization [defalut: True] exclude_second_sig(:class:`bool`): exclude second signatures during tx serialization [defalut: True] Returns: :class:`str`: signature """ return getSignatureFromBytes(getBytes(tx, **options), privateKey)
[docs]def getSignatureFromBytes(data, privateKey): """ Generate signature from data using private key. Args: data (:class:`bytes`): bytes sequence privateKey (:class:`str`): private key as hex string Returns: :class:`str`: signature as hex string """ secret0 = unhexlify(privateKey) msg = secp256k1.hash_sha256(data) if bytearray(data)[0] == 0xff: return hexlify(schnorr.bcrypto410_sign(msg, secret0)) else: return hexlify(ecdsa.rfc6979_sign(msg, secret0, canonical=True))
[docs]def verifySignature(value, publicKey, signature): """ Verify signature. Args: value (:class:`str`): value as hex string publicKey (:class:`str`): public key as hex string signature (:class:`str`): signature as hex string Returns: :class:`bool`: true if signature matches the public key """ return verifySignatureFromBytes(unhexlify(value), publicKey, signature)
[docs]def verifySignatureFromBytes(data, publicKey, signature): """ Verify signature. Args: data (:class:`bytes`): data publicKey (:class:`str`): public key as hex string signature (:class:`str`): signature as hex string Returns: :class:`bool`: true if signature matches the public key """ pubkey = unhexlify(publicKey) msg = secp256k1.hash_sha256(data) sig = unhexlify(signature) if len(signature) == 128: return schnorr.bcrypto410_verify(msg, pubkey, sig) else: return ecdsa.verify(msg, pubkey, sig)
[docs]def getId(tx): """ Generate transaction id. Args: tx (:class:`dict` or :class:`Transaction`): transaction object Returns: :class:`str`: id as hex string """ return getIdFromBytes(getBytes(tx, exclude_multi_sig=False))
[docs]def getIdFromBytes(data): """ Generate data id. Args: data (:class:`bytes`): data as bytes sequence Returns: :class:`str`: id as hex string """ return hexlify(secp256k1.hash_sha256(data))
[docs]def getBytes(tx, **options): """ Hash transaction. Args: tx (:class:`dict` or :class:`Transaction`): transaction object Keyword args: exclude_sig (:class:`bool`): exclude signature during tx serialization [defalut: True] exclude_multi_sig(:class:`bool`): exclude signatures during tx serialization [defalut: True] exclude_second_sig(:class:`bool`): exclude second signatures during tx serialization [defalut: True] Returns: :class:`bytes`: bytes sequence """ if tx.get("version", 0x01) >= 0x02: return serialize(tx, **options) buf = BytesIO() # write type and timestamp pack("<BI", buf, (tx["type"], int(tx["timestamp"]))) # write senderPublicKey as bytes in buffer if "senderPublicKey" in tx: pack_bytes(buf, unhexlify(tx["senderPublicKey"])) # if there is a requesterPublicKey if "requesterPublicKey" in tx: pack_bytes(buf, unhexlify(tx["requesterPublicKey"])) # if there is a recipientId or tx not a second secret nor a multi # singature registration if tx.get("recipientId", False) and tx["type"] not in [1, 4]: recipientId = tx["recipientId"] recipientId = base58.b58decode_check( str(recipientId) if not isinstance(recipientId, bytes) else recipientId ) else: recipientId = b"\x00" * 21 pack_bytes(buf, recipientId) # deal with vendorField values if "vendorFieldHex" in tx: vendorField = unhexlify(tx["vendorFieldHex"]) else: value = tx.get("vendorField", b"") if not isinstance(value, bytes): value = value.encode("utf-8") vendorField = value vendorField = vendorField[:64].ljust(64, b"\x00") pack_bytes(buf, vendorField) # write amount and fee value pack("<QQ", buf, (tx.get("amount", 0), tx["fee"])) # if there is asset data if tx.get("asset", False): asset, typ = tx["asset"], tx["type"] if typ == 1 and "signature" in asset: pack_bytes(buf, unhexlify(asset["signature"]["publicKey"])) elif typ == 2 and "delegate" in asset: pack_bytes(buf, asset["delegate"]["username"].encode("utf-8")) elif typ == 3 and "votes" in asset: pack_bytes(buf, "".join(asset["votes"]).encode("utf-8")) else: raise Exception("transaction type %s not implemented" % typ) # if there is a signature if "signature" in tx and not options.get("exclude_sig", False): pack_bytes(buf, unhexlify(tx["signature"])) # if there is a second signature if not options.get("exclude_second_sig", False): if tx.get("signSignature", False): pack_bytes(buf, unhexlify(tx["signSignature"])) elif tx.get("secondSignature", False): pack_bytes(buf, unhexlify(tx["secondSignature"])) result = buf.getvalue() buf.close() return result
# Reference: # - https://github.com/ArkEcosystem/AIPs/blob/master/AIPS/aip-11.md # - https://github.com/ArkEcosystem/AIPs/blob/master/AIPS/aip-102.md
[docs]def serialize(tx, version=None, **options): """ Serialize transaction. Args: tx (:class:`dict` or :class:`Transaction`): transaction object Returns: :class:`bytes`: bytes sequence """ buf = BytesIO() version = tx.get("version", 0x01) if not version else version # deal with vendorField value if "vendorFieldHex" in tx: vendorField = unhexlify(dict.pop(tx, "vendorFieldHex")) dict.__setitem__(tx, "vendorField", vendorField.decode("utf-8")) else: vendorField = tx.get("vendorField", "") if not isinstance(vendorField, bytes): vendorField = vendorField.encode("utf-8") # "vendorFieldLength" = 255 since height 8,128,000 vendorField = vendorField[:255] # common part pack("<BBB", buf, (0xff, version, cfg.pubkeyHash)) if version >= 0x02: pack("<IHQ", buf, (tx.get("typeGroup", 1), tx["type"], tx["nonce"],)) else: pack("<BI", buf, (tx["type"], tx["timestamp"],)) pack_bytes(buf, unhexlify(tx["senderPublicKey"])) pack("<QB", buf, (tx["fee"], len(vendorField))) pack_bytes(buf, vendorField) # custom part pack_bytes(buf, serializePayload(tx)) # signatures part if "signature" in tx and not options.get("exclude_sig", False): pack_bytes(buf, unhexlify(tx["signature"])) if not options.get("exclude_second_sig", False): if "signSignature" in tx: pack_bytes(buf, unhexlify(tx["signSignature"])) elif "secondSignature" in tx: pack_bytes(buf, unhexlify(tx["secondSignature"])) if "signatures" in tx and not options.get("exclude_multi_sig", False): if version == 0x01: pack("<B", buf, (0xff,)) pack_bytes(buf, b"".join([unhexlify(sig) for sig in tx["signatures"]])) # id part if "id" in tx: pack_bytes(buf, unhexlify(tx["id"])) result = buf.getvalue() buf.close() return result
def serializePayload(tx): asset = tx.get("asset", {}) buf = BytesIO() _type = tx["type"] # transfer transaction if _type == 0: try: recipientId = str(tx["recipientId"]) if not isinstance( tx["recipientId"], bytes ) else \ tx["recipientId"] recipientId = base58.b58decode_check(recipientId) except Exception: raise Exception("no recipientId defined") pack("<QI", buf, ( int(tx.get("amount", 0)), int(tx.get("expiration", 0)), )) pack_bytes(buf, recipientId) # secondSignature registration elif _type == 1: if "signature" in asset: secondPublicKey = asset["signature"]["publicKey"] else: raise Exception("no secondSecret or secondPublicKey given") pack_bytes(buf, unhexlify(secondPublicKey)) # delegate registration elif _type == 2: username = asset.get("delegate", {}).get("username", False) if username: length = len(username) if 3 <= length <= 255: pack("<B", buf, (length, )) pack_bytes(buf, username.encode("utf-8")) else: raise Exception("bad username length [3-255]: %s" % username) else: raise Exception("no username defined") # vote elif _type == 3: delegatePublicKeys = asset.get("votes", False) if delegatePublicKeys: pack("<B", buf, (len(delegatePublicKeys), )) for delegatePublicKey in delegatePublicKeys: delegatePublicKey = delegatePublicKey.replace("+", "01")\ .replace("-", "00") pack_bytes(buf, unhexlify(delegatePublicKey)) else: raise Exception("no up/down vote given") # Multisignature registration elif _type == 4: multiSignature = asset.get("multiSignature", False) if multiSignature: pack( "<BB", buf, (multiSignature["min"], len(multiSignature["publicKeys"])) ) pack_bytes( buf, b"".join( [unhexlify(sig) for sig in multiSignature["publicKeys"]] ) ) # IPFS elif _type == 5: try: ipfs = str(asset["ipfs"]) if not isinstance( asset["ipfs"], bytes ) else asset["ipfs"] data = base58.b58decode(ipfs) except Exception as e: raise Exception("bad ipfs autentification\n%r" % e) pack_bytes(buf, data) # multipayment elif _type == 6: try: items = [ (p["amount"], base58.b58decode_check( str(p["recipientId"]) if not isinstance( p["recipientId"], bytes ) else p["recipientId"] )) for p in asset.get("payments", {}) ] except Exception: raise Exception("error in recipientId address list") result = pack("<H", buf, (len(items), )) for amount, address in items: pack("<Q", buf, (amount, )) pack_bytes(buf, address) # delegate resignation elif _type == 7: pass # HTLC lock elif _type == 8: try: recipientId = str(tx["recipientId"]) if not isinstance( tx["recipientId"], bytes ) else tx["recipientId"] recipientId = base58.b58decode_check(recipientId) except Exception: raise Exception("no recipientId defined") lock = asset.get("lock", False) expiration = lock.get("expiration", False) if not lock or not expiration: raise Exception("no lock nor expiration data found") pack("<Q", buf, (int(tx.get("amount", 0)),)) pack_bytes(buf, unhexlify(lock["secretHash"])) pack("<BI", buf, [int(expiration["type"]), int(expiration["value"])]) pack_bytes(buf, recipientId) # HTLC claim elif _type == 9: claim = asset.get("claim", False) if not claim: raise Exception("no claim data found") pack_bytes(buf, unhexlify(claim["lockTransactionId"])) pack_bytes(buf, claim["unlockSecret"].encode("utf-8")) # HTLC refund elif _type == 10: refund = asset.get("refund", False) if not refund: raise Exception("no refund data found") pack_bytes(buf, unhexlify(refund["lockTransactionId"])) else: raise Exception("Unknown transaction type %d" % tx["type"]) result = buf.getvalue() buf.close() return result
[docs]def checkTransaction(tx, secondPublicKey=None, multiPublicKeys=[]): """ Verify transaction validity. Args: tx (:class:`dict` or :class:`Transaction`): transaction object secondPublicKey (:class:`str`): second public key to use if needed multiPublicKeys (:class:`list`): owners public keys (sorted according to associated type-4-tx asset) Returns: :class:`bool`: true if transaction is valid """ checks = [] version = tx.get("version", 0x01) publicKey = tx["senderPublicKey"] if tx["type"] == 4: multiPublicKeys = tx["asset"]["multiSignature"]["publicKeys"] # pure python dict serializer def _ser(t, v, **opt): return \ serialize(t, version=v, **opt) if v >= 0x02 else \ getBytes(t, **opt) # create a local copy of tx tx = dict(**tx) # id check # remove id from tx if any and then compare id_ = tx.pop("id", False) if id_: checks.append(getIdFromBytes(_ser(tx, version)) == id_) signature = tx.pop("signature", False) signSignature = tx.pop("signSignature", tx.pop("secondSignature", False)) signatures = tx.pop("signatures", []) # multiple signature check if len(multiPublicKeys) and len(signatures): serialized = _ser(tx, version) for sig in signatures: idx, sig = int(sig[0:2], 16), sig[2:] checks.append(verifySignatureFromBytes( serialized, multiPublicKeys[idx], sig )) tx["signatures"] = signatures if signature: # sender signature check checks.append(verifySignatureFromBytes( _ser(tx, version), publicKey, signature )) # sender second signature check if signSignature and secondPublicKey: # add signature before check tx["signature"] = signature checks.append(verifySignatureFromBytes( _ser(tx, version), secondPublicKey, signSignature )) return False not in checks