一、项目应用场景:
不断更新展示设备状态数据,所以有2种做法:轮训和长连接推送
二、项目页面:
三、设计方案
这里选择的设计方案是websocket长连接
用一个SingleThreadScheduledExecutor()创建单线程周期性更新设备状态数据,更新完的数据放到缓存中,然后在推送map中查找当前在线的连接,推送缓存中最新的数据。在连接下线时从map中移除,如果所有连接都断开,关闭更新线程。executeMap存放多种查询条件对应的更新线程。
这样做的好处是更新和推送分开处理,只需要一个更新线程更新数据。全部断开时不用一直写入缓存。
四、开发代码
import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.websocket.OnClose;
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import com.kaicom.mes.monitor.service.EquipmentStatusService;
import com.kaicom.mes.util.GsonUtil;
import io.swagger.annotations.Api;
/**
* @Author: BillYu
* @Description:
* @Date: Created in 09:57 2019-07-03.
*/
@ServerEndpoint(value = "/websocket/equipmentStatus")
@Component
@Api(description = "设备状态")
public class EquipmentStatusWebsocket {
private static final Logger logger = LoggerFactory.getLogger(EquipmentStatusWebsocket.class);
/**
* 这里使用静态,让 service 属于类
*/
private static EquipmentStatusService statusService;
/**
* 注入的时候,给类的 service 注入
*/
@Autowired
public void setStatusService(EquipmentStatusService statusService) {
EquipmentStatusWebsocket.statusService = statusService;
}
//concurrent包的线程安全Set,用来存放每个客户端对应的MyWebSocket对象。
private static CopyOnWriteArraySet<EquipmentStatusWebsocket> webSocketSet = new CopyOnWriteArraySet<>();
private static ConcurrentHashMap<String, ScheduledExecutorService> executeMap = new ConcurrentHashMap<>();
private static ConcurrentHashMap<String, ScheduledExecutorService> noticeMap = new ConcurrentHashMap<>();
/**
* 与某个客户端的连接会话,需要通过它来给客户端发送数据
*/
private Session session;
/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
this.session = session;
//加入set中
webSocketSet.add(this);
logger.info("有新连接加入!当前在线人数为" + getOnlineCount());
try {
sendMessage("已打开长连接");
String key = session.getQueryString();
//双重锁检查
if (executeMap.get(key) == null) {
synchronized (EquipmentStatusWebsocket.class) {
if (executeMap.get(key) == null) {
// //更新线程
ScheduledExecutorService updateService = Executors.newSingleThreadScheduledExecutor();
String workshop = session.getRequestParameterMap().get("workshop").get(0);
String equipmentName = session.getRequestParameterMap().get("equipmentName").get(0);
String sessionId = session.getId();
updateService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
logger.info(Thread.currentThread()+"update ...");
try {
statusService.getEquipmentStatusStatistics(workshop,equipmentName );
} catch (Exception e) {
logger.error(e.getMessage());
}
}
}, 0, 10, TimeUnit.SECONDS);
executeMap.put(key, updateService);
}
}
}
//推送线程
String workshop = session.getRequestParameterMap().get("workshop").get(0);
String location = session.getRequestParameterMap().get("equipmentName").get(0);
ScheduledExecutorService pushService = Executors.newSingleThreadScheduledExecutor();
pushService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
logger.info("send msg");
try {
sendMessage(statusService.fetchCacheStatus(workshop,location));
} catch (IOException e) {
logger.error(e.getMessage());
}
}
}, 0, 10, TimeUnit.SECONDS);
noticeMap.put(session.getId(),pushService);
} catch (IOException e) {
logger.error("IO异常");
}
}
/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose() {
//从set中删除
webSocketSet.remove(this);
logger.info("有一连接关闭!当前在线人数为" + getOnlineCount());
//清除消息线程
if(noticeMap.get(session.getId())!=null){
noticeMap.get(session.getId()).shutdownNow();
noticeMap.remove(session.getId());
}
//清除更新数据线程
synchronized (EquipmentStatusWebsocket.class) {
Boolean needRemove = true;
for (EquipmentStatusWebsocket statusSocket : webSocketSet) {
//是否有同条件的查询
if (session.getQueryString().equals(statusSocket.session.getQueryString())) {
needRemove = false;
break;
}
}
if (needRemove) {
executeMap.get(session.getQueryString()).shutdownNow();
executeMap.remove(session.getQueryString());
}
}
}
/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
logger.info("来自客户端[" + session.getUserProperties().get("ClientIP") + "]的消息:" + message);
}
public Boolean sendClientMessage(String sessionId, String message) {
boolean isSend = false;
for (EquipmentStatusWebsocket statusWebsocket : webSocketSet) {
if (statusWebsocket.session.getId().equals(sessionId)) {
try {
statusWebsocket.sendMessage(message);
isSend = true;
} catch (IOException e) {
logger.error("推送异常:" + e.getMessage() + "sessionId:" + sessionId + "message:" + message);
isSend = false;
}
}
}
return isSend;
}
public void sendMessage(String message) throws IOException {
this.session.getBasicRemote().sendText(message);
}
/**
* 群发自定义消息
*/
public static void sendInfo(String message) throws IOException {
for (EquipmentStatusWebsocket statusWebsocket : webSocketSet) {
try {
statusWebsocket.sendMessage(message);
} catch (IOException e) {
logger.error("推送异常:" + e.getMessage() + "message:" + message);
continue;
}
}
}
public static int getOnlineCount() {
return webSocketSet.size();
}
}
//这里分为 更新线程和推送线程