文章目录
- 加密货币平台项目介绍
- 技术栈
- 1. 用户管理
- 2. 新闻和帖子管理
- 3. 加密货币数据
- 4. 对话获取
- 5. 数据获取
- 服务端代码
- 完整代码
加密货币平台项目介绍
这个项目是一个基于 Flask 和 MongoDB 的深度学习应用程序,通过爬虫爬取加密货币行情和介绍信息,并根据新的数据使用LSTM去预测行情价格。展示涵盖了用户管理、新闻获取、加密货币数据处理、对话获取和处理、帖子管理等多个功能。
技术栈
- 后端: Flask 提供了一个轻量级的网页服务器和后端API。
- 前端: 使用angular+echart进行界面构建和图表绘制。
- 数据库: 使用 MongoDB 作为数据库,用于存储用户信息、新闻、帖子和其他数据。
- 数据处理: 对于加密货币价格预测,使用 LSTM 模型处理历史市场数据。
- 代理支持: 在获取外部数据时支持通过代理访问。
1. 用户管理
- 注册(/signin): 允许新用户创建账户。系统会检查用户名是否已存在,并对密码进行加密保存。
- 登录(/login): 用户可以登录到系统。该接口验证用户名和密码的正确性。
- 密码修改(/modify): 用户可以修改他们的密码。
- 删除用户(/dele): 提供用户删除自己账户的功能。
2. 新闻和帖子管理
- 获取卡片详情(/card-details/<card_id>): 根据卡片ID获取加密货币相关新闻的详细信息。
- 上传帖子(/uploadpost): 允许用户上传新的帖子,包括标题、内容、日期等信息。
- 获取所有帖子(/getposts): 可以获取平台上所有用户上传的帖子。
- 更新帖子喜欢数(/updatelikes) 和 更新评论数(/updatecomments): 这两个接口允许用户更新帖子的喜欢数和评论数。
3. 加密货币数据
- 获取市场数据(/market): 从外部API获取实时的加密货币市场数据。
- 获取特定加密货币数据(/cryptos): 允许用户根据特定加密货币获取历史市场数据。
- 价格预测(/predict): 使用 LSTM 模型预测特定加密货币的未来价格。
4. 对话获取
- 获取对话(/dialog): 提供一个接口来获取存储的对话数据。
5. 数据获取
- 获取数据(/fetch_data 和 /fetch_data2): 这两个接口用于从指定的URL获取数据,支持代理设置。
服务端代码
flask的服务端代码如下
import json
import bcrypt
from bson import json_util
from flask import Flask, request, jsonify, session
from pymongo import MongoClient
from flask_cors import CORS
import requests
from predict_price import lstm_predictor# 连接到 MongoDB 服务器
mongo_client = MongoClient("mongodb://localhost:27017/")# 选择数据库和集合
db_name = "crypto_db"
collection_name = "users"
collection_name2 = "news"
collection_name3 = "crypto"
collection_name4 = "dialog"
collection_name5 = "post"
db = mongo_client[db_name]
collection = db[collection_name]
collection2 = db[collection_name2]
collection3 = db[collection_name3]
collection4 = db[collection_name4]
collection5 = db[collection_name5]
# 如果集合为空,则插入初始化记录
if collection.estimated_document_count() == 0:initial_record = {"usr": "admin", "pwd": "admin"}collection.insert_one(initial_record)# 初始化 Flask 应用
app = Flask(__name__)
CORS(app)def need_proxy():url = "https://www.google.com"try:response = requests.get(url, timeout=1)# 检查响应状态是否正常(状态码 200)if response.status_code == 200:return 1else:return 0except Exception as e:# print(f"Error accessing Google: {e}")return 0# 路由 - 注册
@app.route('/signin', methods=['POST'])
def signin():user_data = request.get_json()username = user_data.get('usr')password = user_data.get('pwd')print(username, password)if username and password:# 检查用户名是否已存在existing_user = collection.find_one({"usr": username})if existing_user:return jsonify({"status": "fail", "message": "Username already exists."})# 哈希密码并插入新用户password = password.encode('utf-8')hashed_password = bcrypt.hashpw(password, bcrypt.gensalt())new_user = {"usr": username, "pwd": hashed_password}collection.insert_one(new_user)return jsonify({"status": "success", "message": "User registered successfully."})else:return jsonify({"status": "fail", "message": "Username or password missing."})# 路由 - 登录
@app.route('/login', methods=['POST'])
def login():user_data = request.get_json()username = user_data.get('usr')password = user_data.get('pwd')if username and password:# 查找用户existing_user = collection.find_one({"usr": username})if existing_user:# 验证密码password = password.encode('utf-8')if bcrypt.checkpw(password, existing_user['pwd']):return jsonify({"status": "success", "message": "Login successful."})else:return jsonify({"status": "fail", "message": "Incorrect password."})else:return jsonify({"status": "fail", "message": "User not found."})else:return jsonify({"status": "fail", "message": "Username or password missing."})@app.route('/card-details/<card_id>', methods=['GET'])
def get_card_details(card_id):card = collection2.find_one({'_id': int(card_id)})# print(card)if card:card_dict = json_util.loads(json_util.dumps(card)) # 将结果转换为 dict 类型card_dict.pop('_id') # 删除 '_id' 键,因为它无法被 JSON 序列化return jsonify(card_dict)else:return jsonify({'error': 'Card not found'}), 404@app.route('/dialog', methods=['GET'])
def get_dialogue():dialogue_data = collection4.find_one()if dialogue_data:dialogue_data.pop('_id')return dialogue_dataelse:return jsonify({"error": "Data not found"}), 404def fetch_market_data():url = "https://www.okx.com/priapi/v5/market/mult-cup-tickers?t=1682245351541&ccys=BTC,ETH,USDT,BNB,USDC,XRP,ADA,OKB,DOGE,MATIC,SOL,DOT,LTC,SHIB,TRX,AVAX,DAI,WBTC,UNI,LINK,TON,LEO,ATOM,XMR,ETC,XLM,ICP,BCH,FIL,TUSD"# 设置代理proxies = {"http": "http://127.0.0.1:10809","https": "http://127.0.0.1:10809",}res = need_proxy()try:if res:response = requests.get(url)else:response = requests.get(url, proxies=proxies)if response.status_code == 200:return response.json()else:print(f"Error fetching market data: {response.status_code}")return 0except Exception as e:print(f"Error fetching market data: {e}")return 0@app.route('/market', methods=['GET'])
def get_market_data():market_data = fetch_market_data()# print(market_data)return jsonify(market_data['data'][:9])def fetch_market_data2(coin):if coin == 'USDT':url = 'https://www.okx.com/priapi/v5/market/index-candles?t=1682316274275&instId=USDT-USD&limit=1000&bar=1D'else:url = "https://www.okx.com/priapi/v5/market/candles?t=1682307645213&instId=" + coin + "-USDT&limit=1000&bar=1D"# 设置代理proxies = {"http": "http://127.0.0.1:10809","https": "http://127.0.0.1:10809",}res = need_proxy()try:if res:response = requests.get(url)else:response = requests.get(url, proxies=proxies)if response.status_code == 200:return response.json()else:print(f"Error fetching market data: {response.status_code}")return 0except Exception as e:print(f"Error fetching market data: {e}")return 0@app.route('/cryptos', methods=['POST'])
def get_cryptos():data = request.get_json()coin = data.get('coin')market_data = fetch_market_data2(coin)# save the data to local,filename is coin+timestapfilename = "datasets/" + coin + ".csv"with open(filename, 'w') as f:# 保存每个元素的第一个和第六个元素,即时间和收盘价存为csv文件f.write('time,price\n')for i in market_data['data']:f.write(str(i[0]) + ',' + str(i[5]) + '\n')return jsonify(market_data['data'][::-1])@app.route('/modify', methods=['POST'])
def modify():user_data = request.get_json()username = user_data.get('usr')password = user_data.get('pwd')if username and password:# 查找用户existing_user = collection.find_one({"usr": username})if existing_user:# 哈希新密码并更新用户密码new_password = password.encode('utf-8')hashed_new_password = bcrypt.hashpw(new_password, bcrypt.gensalt())collection.update_one({"usr": username}, {"$set": {"pwd": hashed_new_password}})return jsonify({"status": "success", "message": "Password updated successfully."})else:return jsonify({"status": "fail", "message": "Username not found."})else:return jsonify({"status": "fail", "message": "Username, or new password missing."})@app.route('/dele', methods=['POST'])
def dele():user_data = request.get_json()username = user_data.get('usr')if username:# 查找用户existing_user = collection.find_one({"usr": username})if existing_user:collection.delete_one({"usr": username})return jsonify({"status": "success", "message": "User deleted successfully."})else:return jsonify({"status": "fail", "message": "Username not found."})else:return jsonify({"status": "fail", "message": "Username missing."})# post
@app.route('/uploadpost', methods=['POST'])
def upload_post():post_data = request.get_json()title = post_data.get('title')content = post_data.get('content')date = post_data.get('date')summary = post_data.get('summary')likes = post_data.get('likes')comments = post_data.get('comments')if title and content and date and summary:new_post = {"title": title,"content": content,"date": date,"summary": summary,"likes": likes,"comments": comments}collection5.insert_one(new_post)print(new_post)return jsonify({"status": "success", "message": "Post saved successfully."})else:return jsonify({"status": "fail", "message": "Post data missing or incomplete."})@app.route('/getposts', methods=['GET'])
def get_all_posts():try:posts = list(collection5.find({}))for post in posts:post['_id'] = str(post['_id'])return jsonify({"status": "success", "data": posts})except Exception as e:print(f"Error retrieving posts: {e}")return jsonify({"status": "fail", "message": "Error retrieving posts."})@app.route('/updatelikes', methods=['POST'])
def updatelikes():post_data = request.get_json()posttitle = post_data.get('title')new_likes = post_data.get('likes')if posttitle and new_likes is not None:result = collection5.update_one({"title": posttitle}, {"$set": {"likes": new_likes}})if result.modified_count > 0:return jsonify({"status": "success", "message": "Post likes updated successfully."})else:return jsonify({"status": "fail", "message": "Post not found or likes not updated."})else:return jsonify({"status": "fail", "message": "Post title or new likes missing."})@app.route('/updatecomments', methods=['POST'])
def updatecomments():post_data = request.get_json()posttitle = post_data.get('title')new_comments = post_data.get('comments')if posttitle and new_comments is not None:result = collection5.update_one({"title": posttitle}, {"$set": {"comments": new_comments}})if result.modified_count > 0:return jsonify({"status": "success", "message": "Post comments updated successfully."})else:return jsonify({"status": "fail", "message": "Post not found or comments not updated."})else:return jsonify({"status": "fail", "message": "Post title or new comments missing."})@app.route('/fetch_data', methods=['POST'])
def fetch_data():data = request.jsonurl = data['url']print(url)proxies = {"http": "http://127.0.0.1:10809","https": "http://127.0.0.1:10809",}response = requests.get(url, proxies=proxies)xml_data = json.loads(response.text)return xml_data["data"]@app.route('/fetch_data2', methods=['POST'])
def fetch_data2():data = request.jsonurl = data['url']print(url)proxies = {"http": "http://127.0.0.1:10809","https": "http://127.0.0.1:10809",}response = requests.get(url, proxies=proxies)xml_data = json.loads(response.text)return jsonify({"data": xml_data["data"]["introduce"]})@app.route('/predict', methods=['POST'])
def predict():data = request.jsoncoin = data['coin']return jsonify({"data": lstm_predictor(coin)})if __name__ == '__main__':app.run(debug=True)
完整代码
整个项目为加密货币爱好者提供了一个综合平台,不仅支持基本的用户管理和社交功能,还整合了市场数据获取和分析,以及数据预测功能,是一个多功能的加密货币社区应用。
需要获取源码的可以关注公众号"一颗程序树",点击菜单栏的免费源码-源码集合的页面中输入关键词加密货币即可