网络协议-MQTT

MQTT是什么?

MQTT是基于发布订阅模式的轻量级消息协议

点击我查看中文协议规范

需要实现的关键点如下

/**
 *
 *  实现点1. 开发一个MQTT库需要提供如下命令:
     Connect :当一个TCP/IP套接字在服务器端和客户端连接建立时需要使用的命令。
     publish  : 是由客户端向服务端发送,告诉服务器端自己感兴趣的Topic。每一个publishMessage 都会与一个Topic的名字联系在一起。
     pubRec:   是publish命令的响应,只不过使用了2级QoS协议。它是2级QoS协议的第二条消息
     pubRel:    是2级QoS协议的第三条消息
     publComp: 是2级QoS协议的第四条消息
     subscribe: 允许一个客户端注册自已感兴趣的Topic 名字,发布到这些Topic的消息会以publish Message的形式由服务器端发送给客户端。
     unsubscribe:  从客户端到服务器端,退订一个Topic。
     Ping: 有客户端向服务器端发送的“are you alive”的消息。
     disconnect:断开这个TCP/IP协议。
     
     实现点2. 字典树(trie tree),用于存放topic.
     
     实现点3. 安全(security), acl控制(读写权限)
     
     实现点4. 消息的投递(post office). {磁盘存储, 内存存储, 网络存储}
     
     实现点5. 会话的控制(session)
     
 * Created by acer01 on 2018/12/5/005.
 */
public class MqttServerChannelHandler extends AbstractChannelHandler<MqttMessage> {
    private ChannelHandlerContext context;
    private MqttExchanger exchanger;
    public MqttServerChannelHandler(MqttExchanger exchanger) {
        super(true);
        this.exchanger = exchanger;
        logger.info(NamespaceUtil.newIdName(getClass())+"====");
    }

    /**
     * 服务端事件 - 有链接接入
     * @param request
     * @return
     */
    private MqttConnAckMessage onConnect(MqttConnectMessage request){
        MqttQoS qoS = request.fixedHeader().qosLevel();
        MqttConnectPayload payload = request.payload();
        MqttConnectVariableHeader variableHeader = request.variableHeader();

        MqttFixedHeader responseFixedHeader = new MqttFixedHeader(MqttMessageType.CONNACK, false, qoS, false, 0);
        MqttConnAckVariableHeader responseVariableHeader = new MqttConnAckVariableHeader(MqttConnectReturnCode.CONNECTION_ACCEPTED,true);
        MqttConnAckMessage response = new MqttConnAckMessage(responseFixedHeader,responseVariableHeader);

        exchanger.addSession(payload.clientIdentifier(),payload.userName(),
                payload.passwordInBytes(),payload.willMessageInBytes(),
                payload.willTopic(),MqttQoS.valueOf(variableHeader.willQos()),
                context.channel());
        return response;
    }

    /**
     * 客户端事件 - 链接应答
     * @param request
     * @return
     */
    private MqttMessage onConnectAck(MqttConnAckMessage request){
        return null;
    }

    /**
     * 服务端事件 - 有新消息发布
     * @param request
     * @return
     */
    private MqttPubAckMessage onPublish(MqttPublishMessage request){
        MqttQoS qoS = request.fixedHeader().qosLevel();
        if(qoS != MqttQoS.AT_LEAST_ONCE && qoS != MqttQoS.EXACTLY_ONCE){
            return null;
        }

        MqttFixedHeader responseFixedHeader = new MqttFixedHeader(MqttMessageType.PUBACK, false, qoS, false, 0);
        MqttMessageIdVariableHeader responseVariableHeader = MqttMessageIdVariableHeader.from(request.variableHeader().packetId());
        MqttPubAckMessage response = new MqttPubAckMessage(responseFixedHeader, responseVariableHeader);

        ByteBuf payload = request.payload();
        byte[] data = new byte[payload.readableBytes()];
        payload.readBytes(data);
        exchanger.addPublish(context.channel(),request.variableHeader().packetId(),request.variableHeader().topicName(),data);
        return response;
    }

    /**
     * 客户端事件 - 接到服务端的发布结果
     * @param request
     * @return
     */
    private MqttMessage onPubAck(MqttPubAckMessage request){
        return null;
    }

    /**
     * 服务端事件 - 2级QOS 接到客户端对发布结果的回应
     * @param request
     * @return
     */
    private MqttMessage onPubrec(MqttMessage request){
        MqttFixedHeader mqttFixedHeader = new MqttFixedHeader(MqttMessageType.PUBREL,false,request.fixedHeader().qosLevel(),false,0);
        MqttMessageIdVariableHeader responseVariableHeader = null;
        if(request.variableHeader() instanceof MqttMessageIdVariableHeader){
            responseVariableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) request.variableHeader()).messageId());
        }
        MqttPubAckMessage mqttPubAckMessage = new MqttPubAckMessage(mqttFixedHeader,responseVariableHeader);
        return mqttPubAckMessage;
    }

    /**
     * 客户端事件 - 2级QOS 释放发布,将不再次重复进行发布
     * @param request
     * @return
     */
    private MqttMessage onPubrel(MqttMessage request){
        return null;
    }

    /**
     * 服务端事件 - 2级QOS 接到确认客户端释放完毕 ()
     * @param request
     * @return
     */
    private MqttMessage onPubcomp(MqttMessage request){
        return null;
    }

    /**
     * 服务端事件 - 有客户端订阅
     * @param request
     * @return
     */
    private MqttSubAckMessage onSubscribe(MqttSubscribeMessage request){
        MqttMessageIdVariableHeader requestHeader = request.variableHeader();
        List<MqttTopicSubscription> subscriptions = request.payload().topicSubscriptions();
        int[] grantedQoSLevels = new int[subscriptions.size()];
        int i = 0;
        for(MqttTopicSubscription topicSubscription : subscriptions){
            grantedQoSLevels[i] = topicSubscription.qualityOfService().value();
            i++;
        }

        MqttSubAckPayload responsePayload = new MqttSubAckPayload(grantedQoSLevels);
        MqttFixedHeader responseFixedHeader = new MqttFixedHeader(MqttMessageType.SUBACK,false, request.fixedHeader().qosLevel(), false, 0);
        MqttMessageIdVariableHeader responseVariableHeader = MqttMessageIdVariableHeader.from(requestHeader.messageId());
        MqttSubAckMessage response = new MqttSubAckMessage(responseFixedHeader,responseVariableHeader,responsePayload);

        exchanger.addSubscribe(context.channel(),subscriptions);
        return response;
    }

    /**
     * 客户端事件 - 接到服务端的订阅成功应答
     * @param request
     * @return
     */
    private MqttMessage onSubAck(MqttSubAckMessage request){
        return null;
    }

    /**
     * 服务端事件 - 接到停止订阅
     * @param request
     * @return
     */
    private MqttUnsubAckMessage onUnsubscribe(MqttUnsubscribeMessage request){
        exchanger.removeSubscribe(context.channel(),request.payload().topics());

        MqttFixedHeader responseFixedHeader = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, request.fixedHeader().qosLevel(), false, 0);
        MqttMessageIdVariableHeader responseVariableHeader = MqttMessageIdVariableHeader.from(request.variableHeader().messageId());
        MqttUnsubAckMessage response = new MqttUnsubAckMessage(responseFixedHeader,responseVariableHeader);
        return response;
    }

    /**
     * 客户端事件- 停止订阅的应答
     * @param request
     * @return
     */
    private MqttMessage onUnsubAck(MqttUnsubAckMessage request){
        return null;
    }

    /**
     * 服务端事件- 客户端的ping
     * @param request
     * @return
     */
    private MqttMessage onPingReq(MqttMessage request){
        MqttMessageIdVariableHeader responseVariableHeader = null;
        if(request.variableHeader() instanceof MqttMessageIdVariableHeader){
            responseVariableHeader = MqttMessageIdVariableHeader.from(((MqttMessageIdVariableHeader) request.variableHeader()).messageId());
        }
        MqttFixedHeader responseFixedHeader = new MqttFixedHeader(MqttMessageType.PINGRESP, false,MqttQoS.AT_MOST_ONCE, false, 0);
        MqttMessage response = new MqttMessage(responseFixedHeader,responseVariableHeader);
        return response;
    }

    /**
     * 客户端事件- ping响应
     * @param request
     * @return
     */
    private MqttMessage onPingResp(MqttMessage request){
        return null;
    }

    /**
     * 服务端事件- 客户端请求断开
     * @param request
     * @return
     */
    private MqttMessage onDisconnect(MqttMessage request){
        return null;
    }

    @Override
    protected void onMessageReceived(ChannelHandlerContext ctx, MqttMessage request) throws Exception {
        this.context = ctx;
        MqttMessage responseMessage = null;
        switch (request.fixedHeader().messageType()) {
            //客户端到服务端的连接请求
            case CONNECT:{
                MqttConnAckMessage response = onConnect((MqttConnectMessage) request);
                responseMessage = response;
                break;
            }
            //服务端对连接请求的响应
            case CONNACK:{
                MqttMessage response = onConnectAck((MqttConnAckMessage) request);
                responseMessage = response;
                break;
            }
            //发布消息
            case PUBLISH:{
                MqttPubAckMessage response = onPublish((MqttPublishMessage) request);
                responseMessage = response;
                break;
            }
            //对发布消息的回应
            case PUBACK:{
                MqttMessage response = onPubAck((MqttPubAckMessage) request);
                responseMessage = response;
                break;
            }
            //收到发布消息(保证传输part1)
            case PUBREC:{
                MqttMessage response = onPubrec(request);
                responseMessage = response;
                break;
            }
            //释放发布消息(保证传输part2)
            case PUBREL:{
                MqttMessage response = onPubrel(request);
                responseMessage = response;
                break;
            }
            //完成发布消息(保证传输part3)
            case PUBCOMP:{
                MqttMessage response = onPubcomp(request);
                responseMessage = response;
                break;
            }
            //客户端订阅请求
            case SUBSCRIBE:{
                MqttSubAckMessage response = onSubscribe((MqttSubscribeMessage) request);
                responseMessage = response;
                break;
            }
            //订阅请求的回应
            case SUBACK:{
                MqttMessage response = onSubAck((MqttSubAckMessage) request);
                responseMessage = response;
                break;
            }
            //停止订阅请求
            case UNSUBSCRIBE:{
                MqttUnsubAckMessage response = onUnsubscribe((MqttUnsubscribeMessage) request);
                responseMessage = response;
                break;
            }
            //停止订阅请求响应
            case UNSUBACK:{
                MqttMessage response = onUnsubAck((MqttUnsubAckMessage) request);
                responseMessage = response;
                break;
            }
            //Ping请求(保持连接)
            case PINGREQ:{
                MqttMessage response = onPingReq(request);
                responseMessage = response;
                break;
            }
            //Ping响应
            case PINGRESP:{
                MqttMessage response = onPingResp(request);
                responseMessage = response;
                break;
            }
            //客户端正在断开
            case DISCONNECT:{
                MqttMessage response = onDisconnect(request);
                responseMessage = response;
                break;
            }
            default: {
                //保留字段 0或15 reserved
                break;
            }
        }

        if(responseMessage != null){
            ctx.writeAndFlush(responseMessage);
        }
    }
}

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

备案信息公示
京ICP备18003381号
京ICP备18003381号-1