import time
import os
# from SpeedTest import perform_default_speedtest# 测试硬盘读写速度
def test_disk_speed():filename = "./testfile.bin"# 生成一个1GB大小的测试文件with open(filename, "wb") as f:f.write(os.urandom(1024*1024*1024))# 从磁盘读取文件并记录时间start_time = time.time()with open(filename, "rb") as f:while f.read(1024*1024):passend_time = time.time()# 计算磁盘读取速度speed = os.path.getsize(filename) / (end_time - start_time) / (1024*1024)print("磁盘读取速度: %.2f MB/s" % speed)# 删除测试文件os.remove(filename)# 测试内存读写速度
def test_memory_speed():# 生成一个1GB大小的测试数据data = bytearray(os.urandom(1024*1024*1024))# 从内存读取数据并记录时间start_time = time.time()for i in range(1024):chunk = data[i*1024*1024:(i+1)*1024*1024]end_time = time.time()# 计算内存读取速度speed = len(data) / (end_time - start_time) / (1024*1024)print("内存读取速度: %.2f MB/s" % speed)# 测试网络速度
# def test_network_speed():# 创建 SpeedTest 实例# speedtest = SpeedTest("www.speedtest.cn", debug=1, runs=2)# perform_detault_speedtest("www.speedtest.cn")# st = SpeedTest()# # 测试下载速度# download_speed = st.download() / (1024**2) # 将结果转换成 MB/s# print("下载速度: %.2f MB/s" % download_speed)# # 测试上传速度# upload_speed = st.upload() / (1024**2) # 将结果转换成 MB/s# print("上传速度: %.2f MB/s" % upload_speed)# # 测试延迟# ping = st.ping()# print("延迟: %.2f ms" % ping)if __name__ == "__main__":# test_network_speed()test_disk_speed()test_memory_speed()
下面是 SpeedTest .py文件内容. 用来测试网络速度的. 但是目前似乎有问题. 应该是规则变了 .
后来改用了一个linux的命令行工具进行的网速测试.
#!/usr/bin/env python
# -*- coding: utf-8 -*-from __future__ import print_functionimport argparse
import bisect
import itertools
import logging
import random
import re
import string
import sys
import platformfrom math import sqrt
from threading import currentThread, Thread
from time import timetry:from httplib import HTTPConnection
except ImportError:from http.client import HTTPConnectiontry:from urllib import urlencode
except ImportError:from urllib.parse import urlencode__program__ = 'pyspeedtest'
__version__ = '1.2.7'
__description__ = 'Test your bandwidth speed using Speedtest.net servers.'__supported_formats__ = ('default', 'json', 'xml')class SpeedTest(object):USER_AGENTS = {'Linux': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:41.0) Gecko/20100101 Firefox/41.0','Darwin': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10.10; rv:41.0) Gecko/20100101 Firefox/41.0','Windows': 'Mozilla/5.0 (Windows NT 6.3; WOW64; rv:41.0) Gecko/20100101 Firefox/41.0','Java': 'Java/1.6.0_12',}DOWNLOAD_FILES = ['/speedtest/random350x350.jpg','/speedtest/random500x500.jpg','/speedtest/random1500x1500.jpg']UPLOAD_FILES = [132884,493638]ALPHABET = string.digits + string.ascii_lettersdef __init__(self, host=None, http_debug=0, runs=2):self._host = hostself.http_debug = http_debugself.runs = runs@propertydef host(self):if not self._host:self._host = self.chooseserver()return self._host@host.setterdef host(self, new_host):self._host = new_hostdef connect(self, url):try:connection = HTTPConnection(url)connection.set_debuglevel(self.http_debug)connection.connect()return connectionexcept:raise Exception('Unable to connect to %r' % url)def downloadthread(self, connection, url):connection.request('GET', url, None, {'Connection': 'Keep-Alive'})response = connection.getresponse()self_thread = currentThread()self_thread.downloaded = len(response.read())def download(self):total_downloaded = 0connections = [self.connect(self.host) for i in range(self.runs)]total_start_time = time()for current_file in SpeedTest.DOWNLOAD_FILES:threads = []for run in range(self.runs):thread = Thread(target=self.downloadthread,args=(connections[run],'%s?x=%d' % (current_file, int(time() * 1000))))thread.run_number = run + 1thread.start()threads.append(thread)for thread in threads:thread.join()total_downloaded += thread.downloadedLOG.debug('Run %d for %s finished',thread.run_number, current_file)total_ms = (time() - total_start_time) * 1000for connection in connections:connection.close()LOG.info('Took %d ms to download %d bytes',total_ms, total_downloaded)return total_downloaded * 8000 / total_msdef uploadthread(self, connection, data):url = '/speedtest/upload.php?x=%d' % randint()connection.request('POST', url, data, {'Connection': 'Keep-Alive','Content-Type': 'application/x-www-form-urlencoded'})response = connection.getresponse()reply = response.read().decode('utf-8')self_thread = currentThread()self_thread.uploaded = int(reply.split('=')[1])def upload(self):connections = [self.connect(self.host) for i in range(self.runs)]post_data = [urlencode({'content0': content(s)}) for s in SpeedTest.UPLOAD_FILES]total_uploaded = 0total_start_time = time()for data in post_data:threads = []for run in range(self.runs):thread = Thread(target=self.uploadthread,args=(connections[run], data))thread.run_number = run + 1thread.start()threads.append(thread)for thread in threads:thread.join()LOG.debug('Run %d for %d bytes finished',thread.run_number, thread.uploaded)total_uploaded += thread.uploadedtotal_ms = (time() - total_start_time) * 1000for connection in connections:connection.close()LOG.info('Took %d ms to upload %d bytes',total_ms, total_uploaded)return total_uploaded * 8000 / total_msdef ping(self, server=None):if not server:server = self.hostconnection = self.connect(server)times = []worst = 0for _ in range(5):total_start_time = time()connection.request('GET','/speedtest/latency.txt?x=%d' % randint(),None,{'Connection': 'Keep-Alive'})response = connection.getresponse()response.read()total_ms = time() - total_start_timetimes.append(total_ms)if total_ms > worst:worst = total_mstimes.remove(worst)total_ms = sum(times) * 250 # * 1000 / number of tries (4) = 250connection.close()LOG.debug('Latency for %s - %d', server, total_ms)return total_msdef chooseserver(self):connection = self.connect('www.speedtest.cn')now = int(time() * 1000)# really contribute to speedtest.net OS statistics# maybe they won't block us again...extra_headers = {'Connection': 'Keep-Alive','User-Agent': self.USER_AGENTS.get(platform.system(), self.USER_AGENTS['Linux'])}connection.request('GET', '/speedtest-config.php?x=%d' % now, None, extra_headers)response = connection.getresponse()reply = response.read().decode('utf-8')match = re.search(r'<client ip="([^"]*)" lat="([^"]*)" lon="([^"]*)"', reply)location = Noneif match is None:LOG.info('Failed to retrieve coordinates')return Nonelocation = match.groups()LOG.info('Your IP: %s', location[0])LOG.info('Your latitude: %s', location[1])LOG.info('Your longitude: %s', location[2])connection.request('GET', '/speedtest-servers.php?x=%d' % now, None, extra_headers)response = connection.getresponse()reply = response.read().decode('utf-8')server_list = re.findall(r'<server url="([^"]*)" lat="([^"]*)" lon="([^"]*)"', reply)my_lat = float(location[1])my_lon = float(location[2])sorted_server_list = []for server in server_list:s_lat = float(server[1])s_lon = float(server[2])distance = sqrt(pow(s_lat - my_lat, 2) + pow(s_lon - my_lon, 2))bisect.insort_left(sorted_server_list, (distance, server[0]))best_server = (999999, '')for server in sorted_server_list[:10]:LOG.debug(server[1])match = re.search(r'http://([^/]+)/speedtest/upload\.php', server[1])if match is None:continueserver_host = match.groups()[0]latency = self.ping(server_host)if latency < best_server[0]:best_server = (latency, server_host)if not best_server[1]:raise Exception('Cannot find a test server')LOG.debug('Best server: %s', best_server[1])return best_server[1]def content(length):"""Return alphanumeric string of indicated length."""cycle = itertools.cycle(SpeedTest.ALPHABET)return ''.join(next(cycle) for i in range(length))def init_logging(loglevel=logging.WARNING):"""Initialize program logger."""scriptlogger = logging.getLogger(__program__)# ensure logger is not reconfigured# it would be nice to use hasHandlers here, but that's Python 3 onlyif not scriptlogger.handlers:# set log levelscriptlogger.setLevel(loglevel)# log message formatfmt = '%(name)s:%(levelname)s: %(message)s'# configure terminal logstreamhandler = logging.StreamHandler()streamhandler.setFormatter(logging.Formatter(fmt))scriptlogger.addHandler(streamhandler)def parseargs(args):class SmartFormatter(argparse.HelpFormatter):def _split_lines(self, text, width):"""argparse.RawTextHelpFormatter._split_lines"""if text.startswith('r|'):return text[2:].splitlines()return argparse.HelpFormatter._split_lines(self, text, width)def positive_int(value):try:ivalue = int(value)if ivalue < 0:raise ValueErrorreturn ivalueexcept ValueError:raise argparse.ArgumentTypeError('invalid positive int value: %r' % value)def format_enum(value):if value.lower() not in __supported_formats__:raise argparse.ArgumentTypeError('output format not supported: %r' % value)return valueparser = argparse.ArgumentParser(add_help=False,description=__description__,formatter_class=SmartFormatter,usage='%(prog)s [OPTION]...')parser.add_argument('-d', '--debug',default=0,help='set http connection debug level (default is 0)',metavar='L',type=positive_int)parser.add_argument('-h', '--help',action='help',help=argparse.SUPPRESS)parser.add_argument('-m', '--mode',choices=range(1, 8),default=7,help='''r|test mode: 1 - download2 - upload4 - ping1 + 2 + 4 = 7 - all (default)''',metavar='M',type=int)parser.add_argument('-r', '--runs',default=2,help='use N runs (default is 2)',metavar='N',type=positive_int)parser.add_argument('-s', '--server',help='use specific server',metavar='H')parser.add_argument('-f', '--format',default='default',help='output format ' + str(__supported_formats__),metavar='F',type=format_enum)parser.add_argument('-v', '--verbose',action='store_true',dest='verbose',help='output additional information')parser.add_argument('--version',action='version',version='%(prog)s ' + __version__)return parser.parse_args(args)def perform_default_speedtest(server,debug=0,runs=2, format='default'):speedtest = SpeedTest(server, debug, runs)if format in __supported_formats__:if format == 'default':print('Using server: %s' % speedtest.host)#if mode & 4 == 4:print('Ping: %d ms' % speedtest.ping())#if mode & 1 == 1:print('Download speed: %s' % pretty_speed(speedtest.download()))#if mode & 2 == 2:print('Upload speed: %s' % pretty_speed(speedtest.upload()))else:stats = dict(server=speedtest.host)#if mode & 4 == 4:stats['ping'] = speedtest.ping()#if mode & 1 == 1:stats['download'] = speedtest.download()#if mode & 2 == 2:stats['upload'] = speedtest.upload()if format == 'json':from json import dumpsprint(dumps(stats))elif format == 'xml':from xml.etree.ElementTree import Element, tostringxml = Element('data')for key, val in stats.items():child = Element(key)child.text = str(val)xml.append(child)print(tostring(xml).decode('utf-8'))else:raise Exception('Output format not supported: %s' % format)def perform_speedtest(opts):speedtest = SpeedTest(opts.server, opts.debug, opts.runs)if opts.format in __supported_formats__:if opts.format == 'default':print('Using server: %s' % speedtest.host)if opts.mode & 4 == 4:print('Ping: %d ms' % speedtest.ping())if opts.mode & 1 == 1:print('Download speed: %s' % pretty_speed(speedtest.download()))if opts.mode & 2 == 2:print('Upload speed: %s' % pretty_speed(speedtest.upload()))else:stats = dict(server=speedtest.host)if opts.mode & 4 == 4:stats['ping'] = speedtest.ping()if opts.mode & 1 == 1:stats['download'] = speedtest.download()if opts.mode & 2 == 2:stats['upload'] = speedtest.upload()if opts.format == 'json':from json import dumpsprint(dumps(stats))elif opts.format == 'xml':from xml.etree.ElementTree import Element, tostringxml = Element('data')for key, val in stats.items():child = Element(key)child.text = str(val)xml.append(child)print(tostring(xml).decode('utf-8'))else:raise Exception('Output format not supported: %s' % opts.format)def main(args=None):opts = parseargs(args)init_logging(logging.DEBUG if opts.verbose else logging.WARNING)try:perform_speedtest(opts)except Exception as e:if opts.verbose:LOG.exception(e)else:LOG.error(e)sys.exit(1)def pretty_speed(speed):units = ['bps', 'Kbps', 'Mbps', 'Gbps']unit = 0while speed >= 1024:speed /= 1024unit += 1return '%0.2f %s' % (speed, units[unit])def randint():"""Return a random 12 digit integer."""return random.randint(100000000000, 999999999999)LOG = logging.getLogger(__program__)if __name__ == '__main__':main()
安装网络测试 命令行工具
curl -s https://packagecloud.io/install/repositories/ookla/speedtest-cli/script.rpm.sh | sudo bashsudo yum install speedtest
执行网络测试用命令
/usr/bin/speedtest