文章目录
- 前端代码
- api
- vue界面
- 后端
- model
- websocket
- config
- resource
龙年到了,先祝福各位龙年快乐,事业有成!
最近在搞webrtc,想到【前后端的那些事】好久都没有更新了,所以打算先把最近编写的小demo发出来。
p2p webrtc的demo在编写的时需要编写人员以不同的客户端角度出发编写代码,因此对编码造成一定的障碍,详细的介绍文章不是特别好写,所以我打算先把demo代码先分享出来,后续再进一步整理
效果
前端代码
api
/src/api/webrtc.ts
export const SIGNAL_TYPE_JOIN = "join";
export const SIGNAL_TYPE_RESP_JOIN = "resp-join"; // 告知加入者对方是谁
export const SIGNAL_TYPE_LEAVE = "leave";
export const SIGNAL_TYPE_NEW_PEER = "new-peer";
export const SIGNAL_TYPE_PEER_LEAVE = "peer-leave";
export const SIGNAL_TYPE_OFFER = "offer";
export const SIGNAL_TYPE_ANSWER = "answer";
export const SIGNAL_TYPE_CANDIDATE = "candidate";export class Message {userId: string;roomId: string;remoteUserId: string;data: any;cmd: string;constructor() {this.roomId = "1";}
}export default {SIGNAL_TYPE_JOIN,SIGNAL_TYPE_RESP_JOIN,SIGNAL_TYPE_LEAVE,SIGNAL_TYPE_NEW_PEER,SIGNAL_TYPE_PEER_LEAVE,SIGNAL_TYPE_OFFER,SIGNAL_TYPE_ANSWER,SIGNAL_TYPE_CANDIDATE
}
vue界面
/src/views/welecome/index.vue
<script setup lang="ts">
import { ref, onMounted } from "vue";
import {Message,SIGNAL_TYPE_JOIN,SIGNAL_TYPE_NEW_PEER,SIGNAL_TYPE_RESP_JOIN,SIGNAL_TYPE_OFFER,SIGNAL_TYPE_ANSWER,SIGNAL_TYPE_CANDIDATE
} from "@/api/webrtc";// 链接websocket
const userId = ref<string>(Math.random().toString(36).substr(2));
const remoteUserId = ref<string>();
const ws = new WebSocket("ws://localhost:1000/ws/" + userId.value);const localVideo = ref<HTMLVideoElement>();
const localStream = ref<MediaStream>();const remoteVideo = ref<HTMLVideoElement>();
const remoteStream = ref<MediaStream>();const pc = ref<RTCPeerConnection>();onMounted(() => {localVideo.value = document.querySelector("#localVideo");remoteVideo.value = document.querySelector("#remoteVideo");
})ws.onopen = (ev: Event) => {console.log("连接成功 userId = " + userId.value);
}ws.onmessage = (ev: MessageEvent) => {const data = JSON.parse(ev.data);if (data.cmd === SIGNAL_TYPE_NEW_PEER) {handleNewPeer(data);} else if (data.cmd === SIGNAL_TYPE_RESP_JOIN) {handleRespJoin(data);} else if (data.cmd === SIGNAL_TYPE_OFFER) {handleRemoteOffer(data);} else if (data.cmd === SIGNAL_TYPE_ANSWER) {handleRemoteAnswer(data);} else if (data.cmd === SIGNAL_TYPE_CANDIDATE) {handleRemoteCandidate(data);}
}ws.onclose = (ev) => {console.log("连接关闭 userId = " + userId.value);
}const handleRemoteCandidate = (msg : Message) => {console.log("handleRemoteCandidate...");// 保存远程cadidatepc.value.addIceCandidate(msg.data);
}/*** 处理远端发送来的answer */
const handleRemoteAnswer = (msg : Message) => {console.log("handleRemoteAnswer...");// 保存远端发送的answer(offer)pc.value.setRemoteDescription(msg.data);
}/*** 处理对端发送过来的offer, 并且发送answer(offer) * @param msg */
const handleRemoteOffer = async (msg : Message) => {console.log("handleRemoteOffer...");// 存储对端的offerpc.value.setRemoteDescription(msg.data);// 创建自己的offer(answer)const answer = await pc.value.createAnswer();// 保存本地offerpc.value.setLocalDescription(answer);// 转发answerconst answerMsg = new Message();answerMsg.userId = userId.value;answerMsg.remoteUserId = remoteUserId.value;answerMsg.cmd = SIGNAL_TYPE_ANSWER;answerMsg.data = answer;console.log("发送answer...");ws.send(JSON.stringify(answerMsg));
}/** * 创建offer,设置本地offer并且发送给对端 */
const handleNewPeer = async (msg : Message) => {console.log("handleNewPeer...");// 存储对端用户idremoteUserId.value = msg.remoteUserId;// todo:// 创建offerconst offer = await pc.value.createOffer()// 本地存储offerpc.value.setLocalDescription(offer);// 转发offerconst offerMsg = new Message();offerMsg.userId = userId.value;offerMsg.remoteUserId = remoteUserId.value;offerMsg.data = offer;offerMsg.cmd = SIGNAL_TYPE_OFFER;console.log("发送offer...");ws.send(JSON.stringify(offerMsg));
}const handleRespJoin = (msg: Message) => {console.log("handleRespJoin...");console.log(msg);remoteUserId.value = msg.remoteUserId;
}const join = async () => {// 初始化视频流console.log(navigator.mediaDevices);const stream = await navigator.mediaDevices.getUserMedia({audio: true,video: true})localVideo.value!.srcObject = streamlocalVideo.value!.play()localStream.value = stream;// 创建pccreatePeerConn();// 加入房间doJoin();
}const doJoin = () => {// 创建信息对象const message = new Message();message.cmd = SIGNAL_TYPE_JOIN;message.userId = userId.value;const msg = JSON.stringify(message);// send messagews.send(msg);
}/*** 创建peerConnection*/
const createPeerConn = () => {pc.value = new RTCPeerConnection();// 将本地流的控制权交给pc// const tracks = localStream.value.getTracks()// for (const track of tracks) {// pc.value.addTrack(track); // } localStream.value.getTracks().forEach(track => {pc.value.addTrack(track, localStream.value);});pc.value.onicecandidate = (event : RTCPeerConnectionIceEvent) => {if (event.candidate) {// 发送candidateconst msg = new Message();msg.data = event.candidate;msg.userId = userId.value;msg.remoteUserId = remoteUserId.value;msg.cmd = SIGNAL_TYPE_CANDIDATE;console.log("onicecandidate...");console.log(msg);ws.send(JSON.stringify(msg));} else {console.log('candidate is null');}}pc.value.ontrack = (event: RTCTrackEvent) => {console.log("handleRemoteStream add...");// 添加远程的streamremoteVideo.value.srcObject = event.streams[0];remoteStream.value = event.streams[0];}pc.value.onconnectionstatechange = () => {if(pc != null) {console.info("ConnectionState -> " + pc.value.connectionState);}};pc.value.oniceconnectionstatechange = () => {if(pc != null) {console.info("IceConnectionState -> " + pc.value.iceConnectionState);}
}
}
</script><template><el-button @click="join">加入</el-button><div id="videos"><video id="localVideo" autoplay muted playsinline>本地窗口</video><video id="remoteVideo" autoplay playsinline>远端窗口</video></div>
</template>
后端
model
Client.java
import lombok.Data;import javax.websocket.Session;@Data
public class Client {private String userId;private String roomId;private Session session;
}
Message.java
import lombok.Data;@Data
public class Message {private String userId;private String remoteUserId;private Object data;private String roomId;private String cmd;@Overridepublic String toString() {return "Message{" +"userId='" + userId + '\'' +", remoteUserId='" + remoteUserId + '\'' +", roomId='" + roomId + '\'' +", cmd='" + cmd + '\'' +'}';}
}
Constant.java
public interface Constant {String SIGNAL_TYPE_JOIN = "join";String SIGNAL_TYPE_RESP_JOIN = "resp-join";String SIGNAL_TYPE_LEAVE = "leave";String SIGNAL_TYPE_NEW_PEER = "new-peer";String SIGNAL_TYPE_PEER_LEAVE = "peer-leave";String SIGNAL_TYPE_OFFER = "offer";String SIGNAL_TYPE_ANSWER = "answer";String SIGNAL_TYPE_CANDIDATE = "candidate";
}
websocket
WebSocket.java
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fgbg.webrtc.model.Client;
import com.fgbg.webrtc.model.Message;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;import static com.fgbg.webrtc.model.Constant.*;@Component
@Slf4j
@ServerEndpoint("/ws/{userId}")
public class WebSocket {//与某个客户端的连接会话,需要通过它来给客户端发送数据private Client client;// 存储用户private static Map<String, Client> clientMap = new ConcurrentHashMap<>();// 存储房间private static Map<String, Set<String>> roomMap = new ConcurrentHashMap<>();// 为了简化逻辑, 只有一个房间->1号房间static {roomMap.put("1", new HashSet<String>());}private ObjectMapper objectMapper = new ObjectMapper();@OnOpenpublic void onOpen(Session session, @PathParam(value="userId")String userId) {log.info("userId = " + userId + " 加入房间1");Client client = new Client();client.setRoomId("1");client.setSession(session);client.setUserId(userId);this.client = client;clientMap.put(userId, client);}@OnClosepublic void onClose() {String userId = client.getUserId();clientMap.remove(userId);roomMap.get("1").remove(userId);log.info("userId = " + userId + " 退出房间1");}@OnMessagepublic void onMessage(String message) throws JsonProcessingException {// 反序列化messagelog.info("userId = " + client.getUserId() + " 收到消息");Message msg = objectMapper.readValue(message, Message.class);switch (msg.getCmd()) {case SIGNAL_TYPE_JOIN:handleJoin(message, msg);break;case SIGNAL_TYPE_OFFER:handleOffer(message, msg);break;case SIGNAL_TYPE_ANSWER:handleAnswer(message, msg);break;case SIGNAL_TYPE_CANDIDATE:handleCandidate(message, msg);break;}}/*** 转发candidate* @param message* @param msg*/private void handleCandidate(String message, Message msg) throws JsonProcessingException {System.out.println("handleCandidate msg = " + msg);String remoteId = msg.getRemoteUserId();sendMsgByUserId(msg, remoteId);}/*** 转发answer* @param message* @param msg*/private void handleAnswer(String message, Message msg) throws JsonProcessingException {System.out.println("handleAnswer msg = " + msg);String remoteId = msg.getRemoteUserId();sendMsgByUserId(msg, remoteId);}/*** 转发offer* @param message* @param msg*/private void handleOffer(String message, Message msg) throws JsonProcessingException {System.out.println("handleOffer msg = " + msg);String remoteId = msg.getRemoteUserId();sendMsgByUserId(msg, remoteId);}/*** 处理加入房间逻辑* @param message* @param msg*/private void handleJoin(String message, Message msg) throws JsonProcessingException {String roomId = msg.getRoomId();String userId = msg.getUserId();System.out.println("userId = " + msg.getUserId() + " join 房间" + roomId);// 添加到房间内Set<String> room = roomMap.get(roomId);room.add(userId);if (room.size() == 2) {String remoteId = null;for (String id : room) {if (!id.equals(userId)) {remoteId = id;}}// 通知两个客户端// resp-joinMessage respJoinMsg = new Message();respJoinMsg.setUserId(userId);respJoinMsg.setRemoteUserId(remoteId);respJoinMsg.setCmd(SIGNAL_TYPE_RESP_JOIN);sendMsgByUserId(respJoinMsg, userId);// new-peerMessage newPeerMsg = new Message();newPeerMsg.setUserId(remoteId);newPeerMsg.setRemoteUserId(userId);newPeerMsg.setCmd(SIGNAL_TYPE_NEW_PEER);sendMsgByUserId(newPeerMsg, remoteId);}else if (room.size() > 2) {log.error("房间号" + roomId + " 人数过多");return;}}/*** 根据远端用户id, 转发信息*/private void sendMsgByUserId(Message msg, String remoteId) throws JsonProcessingException {Client client = clientMap.get(remoteId);client.getSession().getAsyncRemote().sendText(objectMapper.writeValueAsString(msg));System.out.println("信息转发: " + msg);}
}
config
WebSocketConfig.java
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {/*** 注入ServerEndpointExporter,* 这个bean会自动注册使用了@ServerEndpoint注解声明的Websocket endpoint*/@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}}
resource
application.yml
server:port: 1000