ESP32CAM人工智能教学15
Flask服务器TCP连接
小智利用Flask在计算机中创建一个虚拟的网页服务器服务器,让ESP32Cam通过WiFi连接,把摄像头拍摄到的图片发送到电脑中,并在电脑中保存成图片文件。
Flask是用Python编写的网页服务程序WebServer。这个程序首先在电脑端用Python编写一个服务器端的程序,并创建一个指向5000端口的网页服务,用于接收ESP32CAM上传的图片数据,并保存成图片文件。
本来这个程序挺简单的,但是在测试程序时一直都过不了。测试程序耗了我三天时间,最后我采用“蚂蚁啃大象”的方法,采用由简及繁的方法,终于攻下难关。本文的测试主要参考博客内容https://blog.51cto.com/u_16213596/7788901
- 第一轮给在线网站发信息
首先我们在计算机中,用Python编写一个客户端的测试程序,然后利用requests驱动库,给网上的一个测试网站发送字符信息,http://httpbin.org/post是网上为我们提供的一个学习测试的网站。
我们在测试程序中,创建一个字典d={‘jpg’,’tupian’},然后把信息发送到测试网页。点击菜单上的Run运行,结果在IDLE Shell窗口中,可以看到(第六行)返回了用户发送给网站的信息”jpg”: ”tupian”。这样表面这个程序的requests的驱动库运行成功了。(当然,运行程序之前,需要在Python中先安装requests的驱动库,安装方法看前一课,可以到清华大学的镜像网站去下载)
- 第二轮给本地Flask服务器发文字、图片信息
我们在Python中安装Flask驱动库,然后编写服务器端的程序test.py。在程序中创建一个一个网页服务,开放计算机的5000端口,并提供一个上传网页,用于接收客户端发送过来的消息。如图所示:客户端访问http://192.168.1.162:5000/updata可以给服务器端发送消息了。
修改客户端程序test02.py,把上传的地址改成本地的Flask服务器的上传网页地址,然后还是给服务器发送消息”jpg”: “tupian”。
接下来是运行测试程序,先运行服务器端的程序test.py,可以看到Flask服务器开启了http://192.168.1.162:5000/。接着运行客户端程序test02.py,可以看到服务器端的IDLE Shell窗口显示一条消息,表示接收到了一条来自客户端的消息,并读取消息字典中名为“jpg”的内容,并组成字符串返回给客户端。在客户端的IDLE Shell窗口中看到了服务器返回的字符串”user”: “tupian”。
接下来这个测试就简单了,首先把客户端中发送的消息“jpg”的内容,更改为一张图片的内容。我们先准备一张图片,命名为333.jpg,和test.py、test02.py一起,都保存在同一个文件夹中(Python的安装目录,我这里是D:\Python312\)。接着,更改服务器端的程序,返回客户端的消息为接收到的数据长度值。
接下来,先运行服务器端的程序,在运行客户端的程序,可以看到在服务器端的Shell看到接收到了数据,在客户端的Shell看到返回的图片文件大小了。这样,这张图片就上传成功了,可以在D:\Python312\中看到两个图片文件:333.jpg是被上传的原图; 555.jpg是上传后保存下的图片, 这个上传过程,就像是左手换右手一样,这两张图的内容是一模一样的。
- 第三轮ESP32Cam读取内存图片发送给Flask
接下来,我们进行的是跨设备之间的传递,我们让ESP32Cam开发板读取内存中的一张图片的数据,然后通过WiFi连接,把图片数据发送到计算机中的Flask服务器中保存下来。
首先,我们在Thonny中连接ESP32Cam,并且下载一个urequests.py文件(如果没有可以复制后面的相关代码内容,然后在这里粘贴上传),上传到ESP32Cam中。
编写ESP32Cam客户端的程序,先连接WiFi,然后连接到服务器上传网页,读取图片内容,上传数据。需要注意的是,这个urequests.py驱动库文件只能传递二进制的数据,无法传递JSON的字符消息,所以在客户端和服务器端的程序,都有相应的小修改。
我们先在Thonny中上传一张图片到ESP32Cam中。打开服务器端的程序,再运行Thonny中的程序,这样就可以看到ESP32Cam内存中的那张图片,上传到了计算机D:\Python312\文件夹中了,在Thonny的Shell窗口中,看到了服务器程序返回来的图片文件大小的数字了。
- 第三轮ESP32Cam摄像头拍摄图片发送给Flask
在这个测试中,我们把发送的图片改为摄像头拍摄到了图片,经过运行程序测试,可以看到摄像头拍摄到的图片,发送到了计算机中的D:\Python312\的文件夹中了。
避坑笔迹:之前一直测试的程序也是这个程序,但是就是一直不成功,一直找不到失败的原因,不知道传送的过程中究竟是哪个环节出现了问题,这个问题一直困扰了我三天时间,经过反反复复的测试,就是不成功。
后来我就采用“蚂蚁啃大象”的办法,从最简单的测试开始,尽可能地砍掉了所有的环节,保留最简单的连接通道(第一轮测试:本地客户端程序发送数据到线上的测试网站,保留最简单、最短传输路径的测试,因为网上的测试网站,基本可以忽略存在问题的可能,绝大的概率是正确的)。等第一轮测试通过后,就慢慢增加测试的传输路径长度,一点一点的向着最终的目标慢慢前进,经过几轮的测试,最终完成了ESP32Cam拍摄照片,利用TCP连接,发送到Flask服务器中的程序。
最后的测试成果,与网上的程序(之前一直不通过的)相比,我在摄像头初始化完成的后面,增加了一个0.2秒的延时,等待摄像头拍摄图片,然后在获取图片的数据,这样程序测试就通过了。
这个就是三天摸索的结果,不过最终还是找到了一个解决问题的办法:当程序出现问题的时候,如何化繁为简地进行测试,从最简单开始,一直到实现全部的预定功能。
如果在测试的时候,我们点击Thonny中的运行按钮,可以成功发送一张照片。第二次运行的时候,出现了错误信息,可以拔下ESP32Cam,重新插如电脑中,重新运行程序右可以了。说明这个程序还是有缺陷的,还需要改进。
第一轮测试,Python编写客户端程序,给在线HTTP服务网站发送信息import requests
import binascii, jsondef send_image():url = 'http://httpbin.org/post'd = {'jpg': 'tupian'}r = requests.post(url, data=d)print(r.text)send_image()/
第二轮测试 在电脑中创建两个Python程序,一个做服务器,一个做客户端,
在Python安装目录中,客服端读取333.jpg发送, 服务器端接收保存成555.jpgtest.py 服务器端:from flask import Flask, request
import binasciiapp = Flask(__name__)@app.route("/updata", methods=["POST","GET"])
def updata():print("保存图片1")a = request.form.get('jpg')with open('555.jpg','wb') as f:val = binascii.a2b_base64(a)f.write(val)return ('user:%d' %len(a))if __name__ == "__main__":app.run('0.0.0.0', 5000)test02.py 客户端import requests
import binascii, jsondef send_image():url = 'http://192.168.1.162:5000/updata'with open('333.jpg','rb') as f:img = f.read()img = binascii.b2a_base64(img)d = {'jpg': img}r = requests.post(url, data=d)print(r.text)send_image()/
第三轮测试 ESP32CAM读取内存中的一张照片,发送到Python创建的Flask服务器端from flask import Flask, request
import binasciiapp = Flask(__name__)@app.route("/updata", methods=["POST","GET"])
def updata():print("保存图片1")a = request.get_data() #直接接收二进制with open('555.jpg','wb') as f:f.write(a)return ('user:%d' %len(a))if __name__ == "__main__":app.run('0.0.0.0', 5000) ESP32Cam 设备作为客户端import time, networkdef connectWiFi():wlan = network.WLAN(network.STA_IF)if wlan.isconnected():wlan.disconnect()wlan.active(True)wlan.connect('ChinaNet-xxVP', '123456789')while not wlan.isconnected():passprint('network config: ', wlan.ifconfig())connectWiFi()import urequests as requestsdef send_image():url = 'http://192.168.1.162:5000/updata'with open('333.jpg', 'rb') as f:img = f.read()r = requests.post(url, data = img)print(r.text)r.closesend_image()第四轮测试 ESP32CAM拍下一张照片,发送到Python创建的Flask服务器端Python服务器端from flask import Flask, request
import binasciiapp = Flask(__name__)@app.route("/updata", methods=["POST","GET"])
def updata():print("保存图片1")a = request.get_data() #直接接收二进制with open('555.jpg','wb') as f:f.write(a)return ('user:%d' %len(a))if __name__ == "__main__":app.run('0.0.0.0', 5000) ESP32Cam客户端 import time, networkdef connectWiFi():wlan = network.WLAN(network.STA_IF)if wlan.isconnected():wlan.disconnect()wlan.active(True)wlan.connect('ChinaNet-xxVP', '123456789')while not wlan.isconnected():passprint('network config: ', wlan.ifconfig())connectWiFi()import camera
import time
import urequests as requestsdef send_image():url = 'http://192.168.1.162:5000/updata'while not camera.init(0):time.sleep(0.2)camera.deinit()time.sleep(0.2)time.sleep(0.2) #稍作延时,等待拍照img = camera.capture() # 获取照片camera.deinit()r = requests.post(url, data = img)print(r.text)r.closesend_image()//
/
保存到ESP32Cam中的 urequests.py 驱动库源代码import usocketclass Response:def __init__(self, f):self.raw = fself.encoding = "utf-8"self._cached = Nonedef close(self):if self.raw:self.raw.close()self.raw = Noneself._cached = None@propertydef content(self):if self._cached is None:self._cached = self.raw.read()self.raw.close()self.raw = Nonereturn self._cached@propertydef text(self):return str(self.content, self.encoding)def json(self):import ujsonreturn ujson.loads(self.content)def request(method, url, data=None, json=None, headers={}, stream=None,params=None):try:proto, dummy, host, path = url.split("/", 3)except ValueError:proto, dummy, host = url.split("/", 2)path = ""if proto == "http:":port = 80elif proto == "https:":import usslport = 443else:raise ValueError("Unsupported protocol: " + proto)if ":" in host:host, port = host.split(":", 1)port = int(port)if params:path = path + "?"for k in params:path = path + '&'+k+'='+params[k]ai = usocket.getaddrinfo(host, port)addr = ai[0][4]s = usocket.socket()s.connect(addr)if proto == "https:":s = ussl.wrap_socket(s)s.write(b"%s /%s HTTP/1.0\r\n" % (method, path))if not "Host" in headers:s.write(b"Host: %s\r\n" % host)# Iterate over keys to avoid tuple allocfor k in headers:s.write(k)s.write(b": ")s.write(headers[k])s.write(b"\r\n")if json is not None:assert data is Noneimport ujsondata = ujson.dumps(json)if data:s.write(b"Content-Length: %d\r\n" % len(data))s.write(b"\r\n")if data:s.write(data)l = s.readline()protover, status, msg = l.split(None, 2)status = int(status)#print(protover, status, msg)while True:l = s.readline()if not l or l == b"\r\n":break#print(l)if l.startswith(b"Transfer-Encoding:"):if b"chunked" in l:raise ValueError("Unsupported " + l)elif l.startswith(b"Location:") and not 200 <= status <= 299:raise NotImplementedError("Redirects not yet supported")resp = Response(s)resp.status_code = statusresp.reason = msg.rstrip()return respdef head(url, **kw):return request("HEAD", url, **kw)def get(url, **kw):return request("GET", url, **kw)def post(url, **kw):return request("POST", url, **kw)def put(url, **kw):return request("PUT", url, **kw)def patch(url, **kw):return request("PATCH", url, **kw)def delete(url, **kw):return request("DELETE", url, **kw)