https://github.com/mimblewimble/grin
https://github.com/mimblewimble/grin-wallet
pip3 install flask
pip3 install ecdsa
// pip3 install cryptography
拷贝ecdh.py到对应的目录下
https://github.com/mimblewimble/grin
https://github.com/mimblewimble/grin-wallet
pip3 install flask
pip3 install ecdsa
// pip3 install cryptography
拷贝ecdh.py到对应的目录下
""" | |
Class for performing Elliptic-curve Diffie-Hellman (ECDH) operations. | |
""" | |
from ecdsa.util import number_to_string | |
from ecdsa.ellipticcurve import INFINITY | |
from ecdsa import SigningKey, VerifyingKey | |
__all__ = [ | |
"ECDH", | |
"NoKeyError", | |
"NoCurveError", | |
"InvalidCurveError", | |
"InvalidSharedSecretError", | |
] | |
class NoKeyError(Exception): | |
"""ECDH. Key not found but it is needed for operation.""" | |
pass | |
class NoCurveError(Exception): | |
"""ECDH. Curve not set but it is needed for operation.""" | |
pass | |
class InvalidCurveError(Exception): | |
"""ECDH. Raised in case the public and private keys use different curves.""" | |
pass | |
class InvalidSharedSecretError(Exception): | |
"""ECDH. Raised in case the shared secret we obtained is an INFINITY.""" | |
pass | |
class ECDH(object): | |
""" | |
Elliptic-curve Diffie-Hellman (ECDH). A key agreement protocol. | |
Allows two parties, each having an elliptic-curve public-private key | |
pair, to establish a shared secret over an insecure channel | |
""" | |
def __init__(self, curve=None, private_key=None, public_key=None): | |
""" | |
ECDH init. | |
Call can be initialised without parameters, then the first operation | |
(loading either key) will set the used curve. | |
All parameters must be ultimately set before shared secret | |
calculation will be allowed. | |
:param curve: curve for operations | |
:type curve: Curve | |
:param private_key: `my` private key for ECDH | |
:type private_key: SigningKey | |
:param public_key: `their` public key for ECDH | |
:type public_key: VerifyingKey | |
""" | |
self.curve = curve | |
self.private_key = None | |
self.public_key = None | |
if private_key: | |
self.load_private_key(private_key) | |
if public_key: | |
self.load_received_public_key(public_key) | |
def _get_shared_secret(self, remote_public_key): | |
if not self.private_key: | |
raise NoKeyError( | |
"Private key needs to be set to create shared secret" | |
) | |
if not self.public_key: | |
raise NoKeyError( | |
"Public key needs to be set to create shared secret" | |
) | |
if not ( | |
self.private_key.curve == self.curve == remote_public_key.curve | |
): | |
raise InvalidCurveError( | |
"Curves for public key and private key is not equal." | |
) | |
# shared secret = PUBKEYtheirs * PRIVATEKEYours | |
result = ( | |
remote_public_key.pubkey.point | |
* self.private_key.privkey.secret_multiplier | |
) | |
if result == INFINITY: | |
raise InvalidSharedSecretError("Invalid shared secret (INFINITY).") | |
return result.x() | |
def set_curve(self, key_curve): | |
""" | |
Set the working curve for ecdh operations. | |
:param key_curve: curve from `curves` module | |
:type key_curve: Curve | |
""" | |
self.curve = key_curve | |
def generate_private_key(self): | |
""" | |
Generate local private key for ecdh operation with curve that was set. | |
:raises NoCurveError: Curve must be set before key generation. | |
:return: public (verifying) key from this private key. | |
:rtype: VerifyingKey object | |
""" | |
if not self.curve: | |
raise NoCurveError("Curve must be set prior to key generation.") | |
return self.load_private_key(SigningKey.generate(curve=self.curve)) | |
def load_private_key(self, private_key): | |
""" | |
Load private key from SigningKey (keys.py) object. | |
Needs to have the same curve as was set with set_curve method. | |
If curve is not set - it sets from this SigningKey | |
:param private_key: Initialised SigningKey class | |
:type private_key: SigningKey | |
:raises InvalidCurveError: private_key curve not the same as self.curve | |
:return: public (verifying) key from this private key. | |
:rtype: VerifyingKey object | |
""" | |
if not self.curve: | |
self.curve = private_key.curve | |
if self.curve != private_key.curve: | |
raise InvalidCurveError("Curve mismatch.") | |
self.private_key = private_key | |
return self.private_key.get_verifying_key() | |
def load_private_key_bytes(self, private_key): | |
""" | |
Load private key from byte string. | |
Uses current curve and checks if the provided key matches | |
the curve of ECDH key agreement. | |
Key loads via from_string method of SigningKey class | |
:param private_key: private key in bytes string format | |
:type private_key: :term:`bytes-like object` | |
:raises NoCurveError: Curve must be set before loading. | |
:return: public (verifying) key from this private key. | |
:rtype: VerifyingKey object | |
""" | |
if not self.curve: | |
raise NoCurveError("Curve must be set prior to key load.") | |
return self.load_private_key( | |
SigningKey.from_string(private_key, curve=self.curve) | |
) | |
def load_private_key_der(self, private_key_der): | |
""" | |
Load private key from DER byte string. | |
Compares the curve of the DER-encoded key with the ECDH set curve, | |
uses the former if unset. | |
Note, the only DER format supported is the RFC5915 | |
Look at keys.py:SigningKey.from_der() | |
:param private_key_der: string with the DER encoding of private ECDSA key | |
:type private_key_der: string | |
:raises InvalidCurveError: private_key curve not the same as self.curve | |
:return: public (verifying) key from this private key. | |
:rtype: VerifyingKey object | |
""" | |
return self.load_private_key(SigningKey.from_der(private_key_der)) | |
def load_private_key_pem(self, private_key_pem): | |
""" | |
Load private key from PEM string. | |
Compares the curve of the DER-encoded key with the ECDH set curve, | |
uses the former if unset. | |
Note, the only PEM format supported is the RFC5915 | |
Look at keys.py:SigningKey.from_pem() | |
it needs to have `EC PRIVATE KEY` section | |
:param private_key_pem: string with PEM-encoded private ECDSA key | |
:type private_key_pem: string | |
:raises InvalidCurveError: private_key curve not the same as self.curve | |
:return: public (verifying) key from this private key. | |
:rtype: VerifyingKey object | |
""" | |
return self.load_private_key(SigningKey.from_pem(private_key_pem)) | |
def get_public_key(self): | |
""" | |
Provides a public key that matches the local private key. | |
Needs to be sent to the remote party. | |
:return: public (verifying) key from local private key. | |
:rtype: VerifyingKey object | |
""" | |
return self.private_key.get_verifying_key() | |
def load_received_public_key(self, public_key): | |
""" | |
Load public key from VerifyingKey (keys.py) object. | |
Needs to have the same curve as set as current for ecdh operation. | |
If curve is not set - it sets it from VerifyingKey. | |
:param public_key: Initialised VerifyingKey class | |
:type public_key: VerifyingKey | |
:raises InvalidCurveError: public_key curve not the same as self.curve | |
""" | |
if not self.curve: | |
self.curve = public_key.curve | |
if self.curve != public_key.curve: | |
raise InvalidCurveError("Curve mismatch.") | |
self.public_key = public_key | |
def load_received_public_key_bytes(self, public_key_str): | |
""" | |
Load public key from byte string. | |
Uses current curve and checks if key length corresponds to | |
the current curve. | |
Key loads via from_string method of VerifyingKey class | |
:param public_key_str: public key in bytes string format | |
:type public_key_str: :term:`bytes-like object` | |
""" | |
return self.load_received_public_key( | |
VerifyingKey.from_string(public_key_str, self.curve) | |
) | |
def load_received_public_key_der(self, public_key_der): | |
""" | |
Load public key from DER byte string. | |
Compares the curve of the DER-encoded key with the ECDH set curve, | |
uses the former if unset. | |
Note, the only DER format supported is the RFC5912 | |
Look at keys.py:VerifyingKey.from_der() | |
:param public_key_der: string with the DER encoding of public ECDSA key | |
:type public_key_der: string | |
:raises InvalidCurveError: public_key curve not the same as self.curve | |
""" | |
return self.load_received_public_key( | |
VerifyingKey.from_der(public_key_der) | |
) | |
def load_received_public_key_pem(self, public_key_pem): | |
""" | |
Load public key from PEM string. | |
Compares the curve of the PEM-encoded key with the ECDH set curve, | |
uses the former if unset. | |
Note, the only PEM format supported is the RFC5912 | |
Look at keys.py:VerifyingKey.from_pem() | |
:param public_key_pem: string with PEM-encoded public ECDSA key | |
:type public_key_pem: string | |
:raises InvalidCurveError: public_key curve not the same as self.curve | |
""" | |
return self.load_received_public_key( | |
VerifyingKey.from_pem(public_key_pem) | |
) | |
def generate_sharedsecret_bytes(self): | |
""" | |
Generate shared secret from local private key and remote public key. | |
The objects needs to have both private key and received public key | |
before generation is allowed. | |
:raises InvalidCurveError: public_key curve not the same as self.curve | |
:raises NoKeyError: public_key or private_key is not set | |
:return: shared secret | |
:rtype: byte string | |
""" | |
return number_to_string( | |
self.generate_sharedsecret(), self.private_key.curve.order | |
) | |
def generate_sharedsecret(self): | |
""" | |
Generate shared secret from local private key and remote public key. | |
The objects needs to have both private key and received public key | |
before generation is allowed. | |
It's the same for local and remote party. | |
shared secret(local private key, remote public key ) == | |
shared secret (local public key, remote private key) | |
:raises InvalidCurveError: public_key curve not the same as self.curve | |
:raises NoKeyError: public_key or private_key is not set | |
:return: shared secret | |
:rtype: int | |
""" | |
return self._get_shared_secret(self.public_key) |
# !/usr/bin/python | |
# !!READ ME | |
# This need install flask, you need command like: pip flask or easy_install falsk | |
# pip3 install flask ; pip3 install ecdsa ; pip3 install cryptography ; 拷贝ecdh.py到对应的目录下 | |
import os | |
from flask import Flask, request, render_template | |
from ecdsa import SECP256k1 | |
from ecdh import ECDH | |
import json | |
import base64 | |
from cryptography.hazmat.primitives.ciphers.aead import AESGCM | |
app = Flask(__name__) | |
grin = "0VPkJVLRGRNrFweu6rU1" | |
walletServer = 'http://127.0.0.1:3420' | |
foreignServer = 'http://127.0.0.1:3415' | |
password = '123456' | |
sharedKey = None | |
token = None | |
@app.route('/v2/postForeign', methods=['POST']) | |
def home1(): | |
url = request.headers.get('url') | |
if (request.data): | |
data = request.data | |
# print data | |
req = "curl -H 'Content-Type:application/json' -X POST " + foreignServer + url + " --data '" + data + "' --basic -u grin:" + grin | |
result = os.popen(req) | |
str = result.read() | |
return str | |
@app.route('/v2/postOwner', methods=['POST']) | |
def v2_post(): | |
url = request.headers.get('url') | |
if (request.data): | |
data = request.data | |
print(data) | |
req = "curl -H 'Content-Type:application/json' -X POST " + walletServer + url + " --data '" + data + "' --basic -u grin:" + grin | |
result = os.popen(req) | |
str = result.read() | |
return str | |
@app.route('/v3/postOwner', methods=['POST']) | |
def v3_post(): | |
global sharedKey, token | |
if sharedKey is None: | |
sharedKey = initSecure() | |
if token is None: | |
open_wallet = {"jsonrpc": "2.0", "method": "open_wallet", "id": 1, | |
"params": {"name": 'null', "password": password}} | |
return_result = sendEncryptedRequest(open_wallet, "/v3/owner") | |
token = return_result['result']['Ok'] | |
url = request.headers.get('url') | |
if request.data: | |
data = request.data | |
post_json = addTokenToParam(str(data, encoding="utf-8")) | |
encrypted_request = sendEncryptedRequest(post_json, url) | |
return encrypted_request | |
# def whiteCommand(param): | |
# if param in list: | |
# return True | |
# if param not in list: | |
# return False | |
# ecdh密钥交换 | |
def initSecure(): | |
ecdh = ECDH() | |
ecdh.set_curve(SECP256k1) | |
ecdh.generate_private_key() | |
public_key = ecdh.get_public_key() | |
my_pubKey = bytes.hex(public_key.to_string('compressed')) | |
init_data = {"jsonrpc": "2.0", "method": "init_secure_api", | |
"params": {"ecdh_pubkey": my_pubKey}, | |
"id": 1} | |
text = json.dumps(init_data) | |
req = "curl -H 'Content-Type:application/json' -X POST " + walletServer + "/v3/owner " + " --data '" + text + "' --basic -u grin:" + grin | |
result = os.popen(req) | |
response = result.read() | |
loads = json.loads(response) | |
other_pubkey = loads['result']['Ok'] | |
ecdh.load_received_public_key_bytes(bytes.fromhex(other_pubkey)) | |
shared_key = ecdh.generate_sharedsecret_bytes() | |
return bytes.hex(shared_key) | |
# 加密传输;解密 | |
def sendEncryptedRequest(data, url="/v3/owner"): | |
nonce = os.urandom(12) | |
aesgcm = AESGCM(bytes.fromhex(sharedKey)) | |
ct = aesgcm.encrypt(nonce, bytes(json.dumps(data), encoding="utf8"), None) | |
body_enc = base64.standard_b64encode(ct) | |
encrypted_request_v3 = {"jsonrpc": "2.0", "method": "encrypted_request_v3", | |
"params": {"nonce": bytes.hex(nonce), | |
"body_enc": str(body_enc, encoding="utf-8")}, | |
"id": 1} | |
text = json.dumps(encrypted_request_v3) | |
req = "curl -H 'Content-Type:application/json' -X POST " + walletServer + url + " --data '" + text + "' --basic -u grin:" + grin | |
result = os.popen(req) | |
response = result.read() | |
loads = json.loads(response) | |
return_info = loads['result']['Ok'] | |
return_body_enc = return_info['body_enc'] | |
return_nonce = return_info['nonce'] | |
return_decrypt = aesgcm.decrypt(bytes.fromhex(return_nonce), base64.standard_b64decode(return_body_enc), None) | |
return_json = json.loads(str(return_decrypt, encoding="utf-8")) | |
return return_json | |
# 将token加入参数里面 | |
def addTokenToParam(param): | |
print(param) | |
param = param.replace('false', 'False').replace('true', 'True').replace('null', 'None') | |
result = eval(param) | |
result['params']['token'] = token | |
return result | |
if __name__ == '__main__': | |
app.run( | |
host='0.0.0.0', | |
port=3330, | |
debug=False | |
) |