杭州seo博客seo关键词排名优化推荐
背景
由于当时项目周期赶,引入了一个PLC4X组件,上手快。接下来就是使用这个组件遇到的一些问题:
- 关闭连接NioEventLoop没有释放导致oom
- 设计思想是一个设备一个连接,而不是一个网关一个连接
- 连接断开后客户端无从感知
前两个问题解决方案参考上篇文章,最后一个问题虽然可以通过isConnect()方法获取到状态,但是连接断开后这个状态并没有更新,只能代码实现失败重连。
所以为了解决以上问题,我打算重新封装一个Modbus组件。
步骤
代码如下所示,目前只分享modbus-core相关的代码。
- modbus-core:实现设备读写指令的下发以及应答。
- modbus-app:实现通用的可灵活配置的modbus设备接入层,通过更新配置信息即可快速引入新设备,无需手写代码重启应用。
为了快速实现modbus组件封装,这里引入了Vertx框架(基于事件+异步)官网链接,而不是原生的Netty框架。
引入架包
<!-- 目前我这里引入最新的版本(4.4.4) -->
<dependency><groupId>io.vertx</groupId><artifactId>vertx-core</artifactId><version>${vertx.version}</version></dependency>
工具类
ByteUtil
package com.bho.modbus.utils;import java.nio.ByteBuffer;public class ByteUtil {/*** 字节数组转字符串* @param bytes* @return*/public static String bytesToHexString(byte[] bytes) {StringBuffer sb = new StringBuffer(bytes.length);String sTemp;for (int i = 0; i < bytes.length; i++) {sTemp = Integer.toHexString(0xFF & bytes[i]);if (sTemp.length() < 2) {sb.append(0);}sb.append(sTemp.toUpperCase());}return sb.toString();}/*** int整型转字节数组* @param data* @param offset* @param len* @return*/public static byte[] intToBytes(int data, int offset, int len) {ByteBuffer buffer = ByteBuffer.allocate(4);buffer.putInt(data);byte[] bytes = buffer.array();if (len - offset == 4) {return bytes;}byte[] dest = new byte[len];System.arraycopy(bytes, offset, dest, 0, len);return dest;}/*** 字节数组转int整型* @param bytes* @param offset* @param len* @return*/public static int bytesToInt(byte[] bytes, int offset, int len) {ByteBuffer buffer = ByteBuffer.allocate(4);for (int i = len; i < 4; i ++) {buffer.put((byte) 0x00);}for (int i = offset; i < offset + len; i++) {buffer.put(bytes[i]);}buffer.flip();return buffer.getInt();}}
Crc16
package com.bho.modbus.utils;public class Crc16 {/*** 获取CRC16校验码* @param arr_buff* @return*/public static byte[] getCrc16(byte[] arr_buff) {int len = arr_buff.length;// 预置 1 个 16 位的寄存器为十六进制FFFF, 称此寄存器为 CRC寄存器。int crc = 0xFFFF;int i, j;for (i = 0; i < len; i++) {// 把第一个 8 位二进制数据 与 16 位的 CRC寄存器的低 8 位相异或, 把结果放于 CRC寄存器crc = ((crc & 0xFF00) | (crc & 0x00FF) ^ (arr_buff[i] & 0xFF));for (j = 0; j < 8; j++) {// 把 CRC 寄存器的内容右移一位( 朝低位)用 0 填补最高位, 并检查右移后的移出位if ((crc & 0x0001) > 0) {// 如果移出位为 1, CRC寄存器与多项式A001进行异或crc = crc >> 1;crc = crc ^ 0xA001;} else// 如果移出位为 0,再次右移一位crc = crc >> 1;}}return intToBytes(crc);}private static byte[] intToBytes(int value) {byte[] src = new byte[2];src[1] = (byte) ((value >> 8) & 0xFF);src[0] = (byte) (value & 0xFF);return src;}}
实体类
ModbusMode
目前只实现了以下两种通信方式,可根据自己需求加入其它通信方式。
package com.bho.modbus.model;import com.bho.modbus.utils.ByteUtil;
import com.bho.modbus.utils.Crc16;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import lombok.extern.log4j.Log4j2;import java.nio.ByteOrder;@Log4j2
public enum ModbusMode {/*** 【事务ID(2) + 协议标识(2) + 数据长度(2)】 + 从机地址(1) + 功能码(1) + 数据区(N)*/TCP,/*** 从机地址(1) + 功能码(1) + 数据区(N) + 【校验码(2)】**/RTU,;public ByteToMessageDecoder getDecoder() {if (this == ModbusMode.TCP) {return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 4,2, 0, 6, true);}if (this == ModbusMode.RTU){return new LengthFieldBasedFrameDecoder(ByteOrder.BIG_ENDIAN, 65536, 2,1, 2, 0, true);}return null;}public byte[] readData(byte[] bytes) {int len = bytes.length;if (this == ModbusMode.RTU) {byte[] tempArr = new byte[len - 2];System.arraycopy(bytes, 0, tempArr, 0, tempArr.length);byte[] crc16 = Crc16.getCrc16(tempArr);if (crc16[0] != bytes[len -2] || crc16[1] != bytes[len - 1]) {log.error("Modbus receive illegal data:{}", ByteUtil.bytesToHexString(bytes));return null;}if (log.isDebugEnabled()) {log.debug("read data:{}", ByteUtil.bytesToHexString(tempArr));}return tempArr;}if (this == ModbusMode.TCP) {if (log.isDebugEnabled()) {log.debug("read data:{}", ByteUtil.bytesToHexString(bytes));}return bytes;}return null;}public byte[] writeData(byte[] bytes) {if (log.isDebugEnabled()) {log.debug("write data:{}",ByteUtil.bytesToHexString(bytes));}int len = bytes.length;if (this == ModbusMode.RTU) {byte[] crc16 = Crc16.getCrc16(bytes);byte[] tempArr = new byte[len + 2];System.arraycopy(bytes, 0, tempArr, 0, len);tempArr[len] = crc16[0];tempArr[len + 1] = crc16[1];return tempArr;}if (this == ModbusMode.TCP) {byte[] tempArr = new byte[len + 6];tempArr[1] = 0x01;byte[] lenBytes = ByteUtil.intToBytes(len, 2, 2);tempArr[4] = lenBytes[0];tempArr[5] = lenBytes[1];System.arraycopy(bytes, 0, tempArr, 6, len);return tempArr;}return null;}}
ModbusFunc
功能码
package com.bho.modbus.model;/*** Modbus常见功能码*/
public enum ModbusFunc {/*** 错误代码* 01:非法的功能码* 02:非法的寄存器地址* 03:非法的数据值* 04:从机故障*//*** 请求:* 功能代码:1字节 0x01* 起始地址:2字节 0x0000-0xffff* 线圈数量:2字节 0x0001-0x07d0(2000)** 正确响应:* 功能代码:1字节 0x01* 字节数:1字节 N(读线圈个数/8,余数不为0则加1)* 线圈状态:N字节** 错误响应:* 功能代码:1字节 0x81* 错误代码:1字节 0x01-0x04*/READ_COILS((byte)0x01),//读连续线圈状态READ_DISCRETE_COILS((byte)0x02),//读离散线圈状态 同上/*** 请求:* 功能代码:1字节 0x03* 起始地址:2字节 0x0000-0xffff* 寄存器数量:2字节 0x0001-0x007d(125)** 正确响应:* 功能代码:1字节 0x03* 字节数:1字节 2N(N为寄存器数量)* 寄存器数量:2N字节** 错误响应:* 功能代码:1字节 0x83* 错误代码:1字节 0x01-0x04*/READ_HOLDING_REGISTERS((byte)0x03),//读保持寄存器值READ_INPUT_REGISTERS((byte)0x04),//读输入寄存器值 同上/*** 请求:* 功能代码:1字节 0x05* 起始地址:2字节 0x0000-0xffff* 线圈状态:2字节 0x0000/0xff00** 正确响应:* 功能代码:1字节 0x05* 起始地址:2字节 0x0000-0xffff* 线圈状态:2字节 0x0000/0xff00** 错误响应:* 功能代码:1字节 0x85* 错误代码:1字节 0x01-0x04*/WRITE_SINGLE_COILS((byte)0x05),//写单个线圈/*** 请求:* 功能代码:1字节 0x06* 起始地址:2字节 0x0000-0xffff* 寄存器值:2字节 0x0000-0xffff** 正确响应:* 功能代码:1字节 0x06* 起始地址:2字节 0x0000-0xffff* 寄存器值:2字节 0x0000-0xffff** 错误响应:* 功能代码:1字节 0x86* 错误代码:1字节 0x01-0x04*/WRITE_SINGLE_HOLDING_REGISTERS((byte)0x06),//写单个保持寄存器/*** 请求:* 功能代码:1字节 0x10* 起始地址:2字节 0x0000-0xffff* 写入寄存器个数:2字节 0x0001-0x007b(123)* 写入字节数:1字节 2N(N为寄存器个数)* 寄存器值:2N字节 0x0000-0xffff** 正确响应:* 功能代码:1字节 0x10* 起始地址:2字节 0x0000-0xffff* 写入寄存器个数:2字节 0x0001-0x007b(123)** 错误响应:* 功能代码:1字节 0x90* 错误代码:1字节 0x01-0x04*/WRITE_MULTI_HOLDING_REGISTERS((byte)0x10),//写多个保持寄存器/*** 请求:* 功能代码:1字节 0x0F* 起始地址:2字节 0x0000-0xffff* 写入线圈个数:2字节 0x0001-0x07b0(1968)* 写入字节数:1字节 N(N为线圈个数/8,余数不为0则加1)* 线圈状态:N字节** 正确响应:* 功能代码:1字节 0x0F* 起始地址:2字节 0x0000-0xffff* 写入线圈个数:2字节 0x0001-0x07b0(1968)** 错误响应:* 功能代码:1字节 0x8F* 错误代码:1字节 0x01-0x04*/WRITE_MULTI_COILS((byte)0x0F),//写多个线圈;private byte func;ModbusFunc(byte func) {this.func = func;}public byte getFunc() {return func;}
}
ModbusParamConfig
下发指令参数配置信息
package com.bho.modbus.model;import lombok.Data;@Data
public class ModbusParamConfig {private RegisterType registerType;//寄存器类型private int registerAddress;//寄存器地址private String name;//指标名称private DataType dataType;//指标数据类型private int numberSplit;//(除)倍数public enum RegisterType {COIL,HOLDING_REGISTER,INPUT_REGISTER;}public enum DataType {BOOL,FLOAT,INT;}}
SendCmdTask
下发指令任务
package com.bho.modbus.model;import com.alibaba.fastjson.JSONObject;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.Data;import java.util.List;@Data
public class SendCmdTask {private List<ModbusParamConfig> paramConfigs;//参数列表private JSONObject reqParam;//请求参数 写数据必填private Boolean isWrite;//是否是写数据private Integer slaveId;//从机IDprivate Integer reqTimeout;//请求超时时间(秒)private Promise<JSONObject> promise;private Long timerId;public SendCmdTask(Vertx vertx, List<ModbusParamConfig> paramConfigs, JSONObject reqParam, Boolean isWrite, Integer slaveId, Integer reqTimeout) {this.paramConfigs = paramConfigs;this.reqParam = reqParam;this.isWrite = isWrite;this.slaveId = slaveId;this.reqTimeout = Math.max(reqTimeout, 5);Promise<JSONObject> promise = Promise.promise();this.promise = promise;this.timerId = vertx.setTimer(reqTimeout * 1000, hh -> promise.tryFail("Request timeout"));}
}
核心类
package com.bho.modbus.core;import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.SendCmdTask;
import com.bho.modbus.model.ModbusFunc;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.model.ModbusParamConfig;import com.bho.modbus.utils.ByteUtil;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.buffer.impl.BufferImpl;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetSocketImpl;
import lombok.extern.log4j.Log4j2;import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;@Log4j2
public class ModbusConnection {private String ip;//从机IPprivate Integer port;//从机端口private AtomicBoolean isAlive;//从机是否在线private ModbusMode mode;//通讯模式private NetSocket netSocket;//客户端连接private boolean isInitiativeClose;//是否是主动关闭连接private Long failRetryTimerId;//失败重试定时器IDprivate Integer failRetryIntervalSecond;//连接断开后重连间隔时间private Integer reqTimeoutSecond = 1;//请求超时时间private Long queueTimerId;//队列定时器private ConcurrentLinkedQueue<SendCmdTask> writeQueue;//写队列 优先写private ConcurrentLinkedQueue<SendCmdTask> readQueue;//读队列private Map<String, Promise<byte[]>> promiseMap;private Vertx vertx;public ModbusConnection(Vertx vertx, String ip, Integer port, Integer failRetryIntervalSecond, ModbusMode mode) {this.vertx = vertx;this.ip = ip;this.port = port;this.failRetryIntervalSecond = failRetryIntervalSecond;this.mode = mode;this.isAlive = new AtomicBoolean(false);this.writeQueue = new ConcurrentLinkedQueue<>();this.readQueue = new ConcurrentLinkedQueue<>();this.promiseMap = new ConcurrentHashMap<>();consumerTaskQueue(true);}/*** 建立连接* @return*/public Future<Boolean> connect(){NetClient netClient = vertx.createNetClient();return vertx.executeBlocking(b -> {netClient.connect(port, ip).onSuccess(socket -> {log.info("Modbus connect success, ip:{}, port:{}", ip, port);netSocket = socket;isAlive.set(true);b.tryComplete(true);NetSocketImpl netSocketImpl = (NetSocketImpl) socket;netSocketImpl.channelHandlerContext().pipeline().addFirst(mode.getDecoder());socket.handler(buf -> {byte[] bytes = mode.readData(buf.getBytes());if (bytes == null) {return;}int slaveId = ByteUtil.bytesToInt(bytes, 0, 1);int funcNo = ByteUtil.bytesToInt(bytes, 1, 1);int errFuncNo = funcNo - 128;String key = String.format("%s_%s", slaveId, funcNo);String errKey = String.format("%s_%s", slaveId, errFuncNo);if (promiseMap.containsKey(key)) {Promise<byte[]> promise = promiseMap.get(key);byte[] content = new byte[bytes.length - 2];System.arraycopy(bytes, 2, content, 0, content.length);promise.tryComplete(content);} else if (promiseMap.containsKey(errKey)) {Promise<byte[]> promise = promiseMap.get(errKey);int data = ByteUtil.bytesToInt(bytes, 2, 1);switch (data) {case 1:promise.tryFail("Illegal function code");break;case 2:promise.tryFail("Illegal register address");break;case 3:promise.tryFail("Illegal data value");break;case 4:promise.tryFail("Slave fault");break;}}});socket.closeHandler(h -> {if (!isInitiativeClose) {log.error("Modbus connect close, ip:{}, port:{}", ip, port);failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, hh -> connect());} else {log.info("Modbus connect close, ip:{}, port:{}", ip, port);}});}).onFailure(err -> {log.error("Modbus connect fail, ip:{}, port:{}, msg:{}", ip, port, err.getMessage());isAlive.set(false);b.fail(err.getMessage());failRetryTimerId = vertx.setTimer(failRetryIntervalSecond * 1000, h -> connect());});});}/*** 是否在线* @return*/public boolean isActive() {return isAlive.get();}/*** 断开连接*/public void close() {isInitiativeClose = true;if (failRetryTimerId != null) {vertx.cancelTimer(failRetryTimerId);}if (queueTimerId != null) {vertx.cancelTimer(queueTimerId);}if (netSocket != null) {netSocket.close();}}/*** 下发读写任务(串行 优先写任务)* 若并行可直接调用executeTask执行任务,无需排队等候一个个消费任务* @param task 读写任务* @return*/public Promise<JSONObject> offerTask(SendCmdTask task) {if (task.getIsWrite()) {writeQueue.offer(task);} else {readQueue.offer(task);}return task.getPromise();}/*** 消费任务队列 500毫秒轮询一次 优先消费写任务* @param delayFlag*/private void consumerTaskQueue(boolean delayFlag){if(delayFlag){queueTimerId = vertx.setTimer(500,id->{consumerTaskQueue(false);});return;}if(writeQueue.isEmpty() && readQueue.isEmpty()){consumerTaskQueue(true);return;}if(!writeQueue.isEmpty()){SendCmdTask sendCmdTask = writeQueue.poll();sendCmdTask.getPromise().future().onComplete(h->{consumerTaskQueue(false);});executeTask(sendCmdTask);return;}if(!readQueue.isEmpty()){SendCmdTask sendCmdTask = readQueue.poll();sendCmdTask.getPromise().future().onComplete(h->{consumerTaskQueue(false);});executeTask(sendCmdTask);}}private Future<Void> executeTask(SendCmdTask sendCmdTask){vertx.cancelTimer(sendCmdTask.getTimerId());Future<JSONObject> future;if (sendCmdTask.getIsWrite()) {future = executeWrite(sendCmdTask.getReqParam(), sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());} else {future = executeQuery(sendCmdTask.getParamConfigs(), sendCmdTask.getSlaveId());}return future.onSuccess(res -> sendCmdTask.getPromise().tryComplete(res)).onFailure(err -> sendCmdTask.getPromise().tryFail(err)).map(o -> null);}/*** 写数据* @param reqParam 下发参数* @param paramConfigs 参数配置列表* @param slaveId 从机ID* @return*/private Future<JSONObject> executeWrite(JSONObject reqParam, List<ModbusParamConfig> paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture("Gateway offline");}boolean isMerge = isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();Promise<byte[]> promise = Promise.promise();List<String> keyList = paramConfigs.stream().map(ModbusParamConfig::getName).collect(Collectors.toList());return vertx.executeBlocking(h -> {Buffer buffer = getWriteCmd(registerAddress, slaveId, reqParam, keyList, registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {h.complete(reqParam);}).onFailure(err -> {log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}List<Future<Object>> futures = new ArrayList<>();Future blockingFuture = Future.succeededFuture();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();Promise<byte[]> promise = Promise.promise();blockingFuture = blockingFuture.compose(suc -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig),err -> singleExecuteWrite(slaveId, reqParam, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private Future<Object> singleExecuteWrite(int slaveId, JSONObject reqParam, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h -> {Buffer buffer = getWriteCmd(paramConfig.getRegisterAddress(), slaveId, reqParam, Arrays.asList(paramConfig.getName()), registerType, promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {h.tryComplete(reqParam.get(paramConfig.getName()));}).onFailure(err -> {log.error("Modbus executeWrite fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 读数据* @param paramConfigs 参数配置列表* @param slaveId 从机ID* @return*/private Future<JSONObject> executeQuery(List<ModbusParamConfig> paramConfigs, Integer slaveId) {if (!isActive()) {return Future.failedFuture("Gateway offline");}boolean isMerge = isMergeSendCmd(paramConfigs);if (isMerge) {int registerAddress = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();int num = paramConfigs.size();Promise<byte[]> promise = Promise.promise();Buffer buffer = getQueryCmd(registerAddress, num, slaveId, registerType, promise);return vertx.executeBlocking(h -> {netSocket.write(buffer);promise.future().onSuccess(buf -> {JSONObject jsonObject = new JSONObject();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);switch (registerType) {case COIL:Integer pow = Double.valueOf(Math.pow(2, i % 8)).intValue();jsonObject.put(paramConfig.getName(), (pow & buf[i / 8 + 1]) == pow);break;case INPUT_REGISTER:case HOLDING_REGISTER:jsonObject.put(paramConfig.getName(), getValue(ByteUtil.bytesToInt(buf, i * 2 + 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}h.complete(jsonObject);}).onFailure(err -> {log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, msg:{}", ip, port, slaveId, err.getMessage());h.tryFail(err.getMessage());});});}List<Future<Object>> futures = new ArrayList<>();Future blockingFuture = Future.succeededFuture();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);ModbusParamConfig.RegisterType registerType = paramConfig.getRegisterType();Promise<byte[]> promise = Promise.promise();blockingFuture = blockingFuture.compose(suc -> singleExecuteQuery(slaveId, promise, registerType, paramConfig),err -> singleExecuteQuery(slaveId, promise, registerType, paramConfig));futures.add(blockingFuture);}return commonReplyResult(futures, paramConfigs);}private Future<Object> singleExecuteQuery(int slaveId, Promise<byte[]> promise, ModbusParamConfig.RegisterType registerType, ModbusParamConfig paramConfig) {return vertx.executeBlocking(h -> {Buffer buffer = getQueryCmd(paramConfig.getRegisterAddress(), 1, slaveId, paramConfig.getRegisterType(), promise);netSocket.write(buffer);promise.future().onSuccess(buf -> {switch (registerType) {case COIL:h.complete(Integer.valueOf(buf[1]) == 1);break;case INPUT_REGISTER:case HOLDING_REGISTER:h.complete(getValue(ByteUtil.bytesToInt(buf, 1, 2), paramConfig.getNumberSplit(), paramConfig.getDataType()));break;}}).onFailure(err -> {log.error("Modbus executeQuery fail, ip:{}, port:{}, slaveId:{}, key:{}, msg:{}",ip, port, slaveId, paramConfig.getName(), err.getMessage());h.tryFail(err.getMessage());});});}/*** 如果所有参数寄存器类型一致并且地址连续 则合并成一条命令下发* @param paramConfigs* @return 是否可以合并下发命令*/private boolean isMergeSendCmd(List<ModbusParamConfig> paramConfigs) {if (paramConfigs.size() == 1) {return false;}int lastPos = paramConfigs.get(0).getRegisterAddress();ModbusParamConfig.RegisterType registerType = paramConfigs.get(0).getRegisterType();for (int i = 1; i < paramConfigs.size(); i++) {int curPos = paramConfigs.get(i).getRegisterAddress();if (curPos - lastPos != 1) {return false;}ModbusParamConfig.RegisterType curRegisterType = paramConfigs.get(i).getRegisterType();if (registerType != curRegisterType) {return false;}lastPos = curPos;}return true;}/*** 获取查询数据命令* @param startPos 查询地址* @param num 查询数量* @param slaveId 从机ID* @param registerType 寄存器类型* @param promise* @return*/private Buffer getQueryCmd(int startPos, int num, int slaveId, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {byte[] bytes = new byte[6];bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];switch (registerType) {case COIL:bytes[1] = ModbusFunc.READ_COILS.getFunc();break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.READ_HOLDING_REGISTERS.getFunc();break;case INPUT_REGISTER:bytes[1] = ModbusFunc.READ_INPUT_REGISTERS.getFunc();break;}Integer func = ByteUtil.bytesToInt(bytes, 1, 1);String key = String.format("%s_%s", slaveId, func);byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);bytes[2] = startPosBytes[2];bytes[3] = startPosBytes[3];byte[] numBytes = ByteUtil.intToBytes(num, 0, 4);bytes[4] = numBytes[2];bytes[5] = numBytes[3];Buffer buffer = new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));promise.future().onComplete(res -> {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}/*** 获取写数据命令* @param startPos 查询地址* @param slaveId 从机ID* @param reqParam 写参数* @param keys 参数列表* @param registerType 寄存器类型* @param promise* @return*/private Buffer getWriteCmd(int startPos, int slaveId, JSONObject reqParam,List<String> keys, ModbusParamConfig.RegisterType registerType, Promise<byte[]> promise) {int len = keys.size() == 1 ? 6 : (registerType == ModbusParamConfig.RegisterType.HOLDING_REGISTER ?7 + keys.size() * 2 : 7 + Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue());byte[] bytes = new byte[len];bytes[0] = ByteUtil.intToBytes(slaveId, 3, 1)[0];byte[] startPosBytes = ByteUtil.intToBytes(startPos, 0, 4);bytes[2] = startPosBytes[2];bytes[3] = startPosBytes[3];if (keys.size() == 1) {switch (registerType) {case COIL:bytes[1] = ModbusFunc.WRITE_SINGLE_COILS.getFunc();boolean value = reqParam.getBoolean(keys.get(0));if (value) {bytes[4] = (byte) 0xFF;} else {bytes[4] = 0x00;}bytes[5] = 0x00;break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.WRITE_SINGLE_HOLDING_REGISTERS.getFunc();byte[] dataArr = ByteUtil.intToBytes(reqParam.getInteger(keys.get(0)), 2, 2);bytes[4] = dataArr[0];bytes[5] = dataArr[1];break;}} else {byte[] dataNum = ByteUtil.intToBytes(keys.size(), 2, 2);bytes[4] = dataNum[0];bytes[5] = dataNum[1];switch (registerType) {case COIL:bytes[1] = ModbusFunc.WRITE_MULTI_COILS.getFunc();int dataSize = Double.valueOf(Math.ceil(keys.size() / 8.0)).intValue();bytes[6] = ByteUtil.intToBytes(dataSize, 3, 1)[0];for (int i = 0; i < dataSize; i += 2) {int sum = 0;int startIndex = i * 8;int endIndex = (i + 2) * 8;endIndex = endIndex > keys.size() ? keys.size() : endIndex;for (int j = startIndex; j < endIndex; j++) {sum += Double.valueOf(Math.pow(2, j)).intValue() * (reqParam.getBoolean(keys.get(j)) ? 1 : 0);}byte[] sumArr = ByteUtil.intToBytes(sum, 2, 2);if (i + 8 < keys.size()) {bytes[i + 7] = sumArr[0];bytes[i + 8] = sumArr[1];} else {bytes[i + 7] = sumArr[1];}}break;case HOLDING_REGISTER:bytes[1] = ModbusFunc.WRITE_MULTI_HOLDING_REGISTERS.getFunc();bytes[6] = ByteUtil.intToBytes(keys.size() * 2, 3, 1)[0];for (int i = 0; i < keys.size(); i++) {String paramKey = keys.get(i);Integer value = reqParam.getInteger(paramKey);byte[] dataArr = ByteUtil.intToBytes(value, 2, 2);bytes[i * 2 + 7] = dataArr[0];bytes[i * 2 + 8] = dataArr[1];}break;}}Integer func = ByteUtil.bytesToInt(bytes, 1, 1);String key = String.format("%s_%s", slaveId, func);Buffer buffer = new BufferImpl();buffer.appendBytes(mode.writeData(bytes));promiseMap.put(key, promise);long timeId = vertx.setTimer(reqTimeoutSecond * 1000, h -> promise.tryFail("Request timeout"));promise.future().onComplete(res -> {promiseMap.remove(key);vertx.cancelTimer(timeId);});return buffer;}private Future<JSONObject> commonReplyResult(List<Future<Object >> futures, List<ModbusParamConfig> paramConfigs) {return vertx.executeBlocking(b -> {Future.join(futures).onComplete(h -> {JSONObject okJson = new JSONObject();JSONObject errJson = new JSONObject();for (int i = 0; i < paramConfigs.size(); i++) {ModbusParamConfig paramConfig = paramConfigs.get(i);Future<Object> objectFuture = futures.get(i);if (objectFuture.succeeded()) {okJson.put(paramConfig.getName(), objectFuture.result());} else {errJson.put(paramConfig.getName(), objectFuture.cause().getMessage());}}if (okJson.size() > 0) {b.tryComplete(okJson);} else {b.tryFail(errJson.getString(paramConfigs.get(0).getName()));}});});}private Object getValue(int value, int numberSplit, ModbusParamConfig.DataType dataType) {if (numberSplit == 1) {return value;}Float temp = value * 1f / numberSplit;switch (dataType) {case INT :return Math.round(temp);case FLOAT:return temp;}return temp;}}
测试
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.bho.modbus.model.ModbusMode;
import com.bho.modbus.core.ModbusConnection;
import com.bho.modbus.model.ModbusParamConfig;
import com.bho.modbus.model.SendCmdTask;
import io.vertx.core.Future;
import io.vertx.core.Promise;
import io.vertx.core.Vertx;
import lombok.extern.log4j.Log4j2;import java.util.List;@Log4j2
public class TestModbus {public static final String READ_DATA = "[" +" {" +" \"name\": \"a\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 504," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"b\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 505," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"c\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 506," +" \"dataType\": \"FLOAT\"," +" \"numberSplit\": 10" +" }," +" {" +" \"name\": \"d\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 507," +" \"dataType\": \"INT\"," +" \"numberSplit\": 1" +" }," +" {" +" \"name\": \"e\"," +" \"registerType\": \"HOLDING_REGISTER\"," +" \"registerAddress\": 508," +" \"dataType\": \"INT\"," +" \"numberSplit\": 1" +" }]";private static final String WRITE_DATA = "[" +" {" +" \"name\": \"do0\"," +" \"registerType\": \"COIL\"," +" \"registerAddress\": 20," +" \"dataType\": \"BOOL\"," +" \"numberSplit\": 1" +" }" +" ,{" +" \"name\": \"do1\"," +" \"registerType\": \"COIL\"," +" \"registerAddress\": 21," +" \"dataType\": \"BOOL\"," +" \"numberSplit\": 1" +" }" +"]";public static void main(String[] args) {testReadData();
// testWriteData();;}private static void testWriteData() {Vertx vertx = Vertx.vertx();ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);Future<Boolean> connectFuture = connection.connect();JSONObject reqParam = new JSONObject();reqParam.put("do0", false);reqParam.put("do1", false);List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(WRITE_DATA, ModbusParamConfig.class);connectFuture.onComplete(con -> {if (connectFuture.succeeded()) {SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 21, 10);Promise<JSONObject> promise = connection.offerTask(task);promise.future().onSuccess(suc -> {log.info("read:"+suc);}).onFailure(err -> System.err.println(err.getMessage()));SendCmdTask task2 = new SendCmdTask(vertx, modbusParamConfigs, reqParam, true, 21, 10);Promise<JSONObject> promise2 = connection.offerTask(task2);promise2.future().onSuccess(suc -> {log.info("write:"+suc);}).onFailure(err -> System.err.println(err.getMessage()));} else {System.err.println("gateway offline");}});}private static void testReadData() {Vertx vertx = Vertx.vertx();ModbusConnection connection = new ModbusConnection(vertx,"127.0.0.1", 502, 30, ModbusMode.TCP);Future<Boolean> connectFuture = connection.connect();List<ModbusParamConfig> modbusParamConfigs = JSONArray.parseArray(READ_DATA, ModbusParamConfig.class);connectFuture.onComplete(con -> {if (connection.isActive()) {SendCmdTask task = new SendCmdTask(vertx, modbusParamConfigs, null, false, 2, 10);Promise<JSONObject> promise = connection.offerTask(task);promise.future().onSuccess(suc -> {log.info(suc);}).onFailure(err -> System.err.println(err.getMessage()));} else {System.err.println("gateway offline");}});}
}
运行结果如下:
其实这两个读写示例如果是一个网关可以共用一个Modbus连接。
modbus-app配置参数
格式如下:
{"readable": {"devType01": {"ReportData": [{"name" : "xxx","registerType" : "COIL","registerAddress" : 1,"dataType" : "BOOL","numberSplit" : 1}]},"devType02": {"ReportData": [{"name" : "a","registerType" : "HOLDING_REGISTER","registerAddress" : 1,"dataType" : "INT","numberSplit" : 1},{"name" : "b","registerType" : "HOLDING_REGISTER","registerAddress" : 2,"dataType" : "INT","numberSplit" : 10},{"name": "c","registerType": "","dataType": "FLOAT","mbScript": "(a*10000+b)/10"}]}},"writable": {"devType01": {"Control": [{"name": "operation","registerType": "COIL","registerAddress": 21,"dataType": "BOOL","numberSplit": 1}]}},"readDataPeriods": [{"period" : 60,"deviceTypes": ["devType01"]},{"period" : 600,"deviceTypes": ["devType02","devType03"]}]
}
具体怎么实现这边就不过多讲解了…
结束
不保证代码正确,我这边只是大概实现了一下,仅供参考。若有问题,请批评指出,我会虚心接受并积极修复问题。