水一篇,
BirdTalk服务端基本快写完了,开始写一个完整的客户端测试;
决定从python入手,因为与其他功能对接时候或者写机器人客服,脚本用的比较多;
直接上代码,原理参考之前的文档。
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.backends import default_backend
import os
import struct
import base64
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMACclass ECDHKeyExchange:def __init__(self):self.private_key = Noneself.public_key = Noneself.shared_key = Noneself.key_print = 0def generate_key_pair(self):"""Generate an ECDH key pair."""self.private_key = ec.generate_private_key(ec.SECP256R1(), default_backend())self.public_key = self.private_key.public_key()print("Key pair generated.")def export_public_key(self, filename):"""Export the generated public key to a file."""if self.public_key is None:raise ValueError("Public key not generated yet.")pem = self.public_key.public_bytes(encoding=serialization.Encoding.PEM,format=serialization.PublicFormat.SubjectPublicKeyInfo)if filename != "":with open(filename, 'wb') as f:f.write(pem)print(f"Public key saved to {filename}.")return pem.decode('utf-8')def get_public_key(self):if self.public_key is None:raise ValueError("Public key not generated yet.")pem = self.public_key.public_bytes(encoding=serialization.Encoding.PEM,format=serialization.PublicFormat.SubjectPublicKeyInfo)return pemdef exchange_keys_from_file(self, peer_public_key_filename):"""Exchange keys and generate the shared key using the peer's public key."""if self.private_key is None:raise ValueError("Private key not generated yet.")with open(peer_public_key_filename, 'rb') as f:peer_public_key_pem = f.read()peer_public_key = serialization.load_pem_public_key(peer_public_key_pem, backend=default_backend())self.shared_key = self.private_key.exchange(ec.ECDH(), peer_public_key)print("Shared key generated.")def exchange_keys(self, peer_public_key_pem):"""Exchange keys and generate the shared key using the peer's public key PEM string."""if self.private_key is None:raise ValueError("Private key not generated yet.")peer_public_key = serialization.load_pem_public_key(peer_public_key_pem.encode('utf-8'), backend=default_backend())self.shared_key = self.private_key.exchange(ec.ECDH(), peer_public_key)print("Shared key generated.")def get_shared_key(self):if self.shared_key is None:raise ValueError("Shared key not generated yet.")return self.shared_keydef save_shared_key(self, filename):"""Save the shared key to a file."""if self.shared_key is None:raise ValueError("Shared key not generated yet.")with open(filename, 'wb') as f:f.write(self.shared_key)print(f"Shared key saved to {filename}.")def export_shared_key_base64(self):"""Export the shared key as a base64 encoded string."""if self.shared_key is None:raise ValueError("Shared key not generated yet.")return base64.b64encode(self.shared_key).decode('utf-8')def load_shared_key(self, filename):"""Load the shared key from a file."""with open(filename, 'rb') as f:self.shared_key = f.read()print(f"Shared key loaded from {filename}.")def save_key_print(self, filename):"""Save the shared key to a file."""if self.key_print is None:raise ValueError("Shared key not generated yet.")with open(filename, 'wb') as f:data = str(self.key_print).encode('utf-8')f.write(data)print(f"key print saved to {filename}.")def load_key_print(self, filename):"""Load the shared key from a file."""with open(filename, 'rb') as f:data = f.read()str = data.decode('utf-8')self.key_print = int(str)print(f"key print loaded from {filename}.")return self.key_printdef delete_key_file(self, filename):"""Delete a key file."""if os.path.exists(filename):os.remove(filename)print(f"File {filename} deleted.")else:print(f"File {filename} does not exist.")def get_int64_print(self):if self.shared_key is None:raise ValueError("Shared key not generated yet.")"""Convert a byte array to a 64-bit integer."""# 检查字节数组长度是否足够if len(self.shared_key) < 8:raise ValueError("Insufficient bytes to convert to int64")# 将字节数组转换为 int64self.key_print = struct.unpack('<q', self.shared_key[:8])[0] # 使用 little-endian 格式return self.key_printdef encrypt_aes_ctr(self, plaintext):if self.shared_key is None:raise ValueError("Shared key not generated yet.")# 生成随机的初始化向量(IV)iv = os.urandom(16) # 初始化向量长度为 16 字节# 创建 AES-CTR 算法对象algorithm = algorithms.AES(self.shared_key)mode = modes.CTR(iv)cipher = Cipher(algorithm, mode, backend=default_backend())# 使用加密器加密数据encryptor = cipher.encryptor()encrypted_data = encryptor.update(plaintext) + encryptor.finalize()# 将随机初始化向量和加密后的数据拼接在一起ciphertext = iv + encrypted_datareturn ciphertextdef decrypt_aes_ctr(self, ciphertext):"""Decrypt ciphertext using AES-CTR with the shared key."""if self.shared_key is None:raise ValueError("Shared key not generated yet.")# 从密文中提取初始化向量(IV)iv = ciphertext[:16] # 初始化向量长度为 16 字节encrypted_data = ciphertext[16:]# 创建 AES-CTR 算法对象algorithm = algorithms.AES(self.shared_key)mode = modes.CTR(iv)cipher = Cipher(algorithm, mode, backend=default_backend())# 使用解密器解密数据decryptor = cipher.decryptor()plaintext = decryptor.update(encrypted_data) + decryptor.finalize()return plaintext
###############################################################
def test_create():# 实例化两个 ECDHKeyExchange 对象,模拟两个参与方alice = ECDHKeyExchange()bob = ECDHKeyExchange()# Alice 生成密钥对并导出公钥alice.generate_key_pair()alice_pub_key = alice.export_public_key("alice_public_key.pem")print(alice_pub_key)# Bob 生成密钥对并导出公钥bob.generate_key_pair()bob_pub_key = bob.export_public_key("bob_public_key.pem")print(bob_pub_key)# Alice 和 Bob 交换公钥并生成共享密钥alice.exchange_keys(bob_pub_key)bob.exchange_keys(alice_pub_key)# 保存共享密钥alice.save_shared_key("alice_shared_key.bin")bob.save_shared_key("bob_shared_key.bin")# 加载共享密钥alice.load_shared_key("alice_shared_key.bin")bob.load_shared_key("bob_shared_key.bin")print(alice.get_int64_print())print(bob.get_int64_print())alice.save_key_print("alice_key_print.txt")keyprint = alice.load_key_print("alice_key_print.txt")print(keyprint)# 删除密钥文件# alice.delete_key_file("alice_public_key.pem")# alice.delete_key_file("alice_shared_key.bin")# bob.delete_key_file("bob_public_key.pem")# bob.delete_key_file("bob_shared_key.bin")def test_load():alice = ECDHKeyExchange()alice.load_shared_key("alice_shared_key.bin")#print(alice.get_shared_key())key_print = alice.load_key_print("alice_key_print.txt")print(key_print)tm = '123456789412'ciper = alice.encrypt_aes_ctr(tm.encode('utf-8'))plain = alice.decrypt_aes_ctr(ciper)print(plain)# 示例使用
if __name__ == "__main__":test_load()