rumqttc/v5/mqttbytes/v5/
disconnect.rs

1use std::convert::{TryFrom, TryInto};
2
3use bytes::{BufMut, Bytes, BytesMut};
4
5use super::*;
6
7use super::{property, PropertyType};
8
9#[derive(Debug, Clone, Copy, PartialEq, Eq)]
10#[repr(u8)]
11pub enum DisconnectReasonCode {
12    /// Close the connection normally. Do not send the Will Message.
13    NormalDisconnection = 0x00,
14    /// The Client wishes to disconnect but requires that the Server also publishes its Will Message.
15    DisconnectWithWillMessage = 0x04,
16    /// The Connection is closed but the sender either does not wish to reveal the reason, or none of the other Reason Codes apply.
17    UnspecifiedError = 0x80,
18    /// The received packet does not conform to this specification.
19    MalformedPacket = 0x81,
20    /// An unexpected or out of order packet was received.
21    ProtocolError = 0x82,
22    /// The packet received is valid but cannot be processed by this implementation.
23    ImplementationSpecificError = 0x83,
24    /// The request is not authorized.
25    NotAuthorized = 0x87,
26    /// The Server is busy and cannot continue processing requests from this Client.
27    ServerBusy = 0x89,
28    /// The Server is shutting down.
29    ServerShuttingDown = 0x8B,
30    /// The Connection is closed because no packet has been received for 1.5 times the Keepalive time.
31    KeepAliveTimeout = 0x8D,
32    /// Another Connection using the same ClientID has connected causing this Connection to be closed.
33    SessionTakenOver = 0x8E,
34    /// The Topic Filter is correctly formed, but is not accepted by this Sever.
35    TopicFilterInvalid = 0x8F,
36    /// The Topic Name is correctly formed, but is not accepted by this Client or Server.
37    TopicNameInvalid = 0x90,
38    /// The Client or Server has received more than Receive Maximum publication for which it has not sent PUBACK or PUBCOMP.
39    ReceiveMaximumExceeded = 0x93,
40    /// The Client or Server has received a PUBLISH packet containing a Topic Alias which is greater than the Maximum Topic Alias it sent in the CONNECT or CONNACK packet.
41    TopicAliasInvalid = 0x94,
42    /// The packet size is greater than Maximum Packet Size for this Client or Server.
43    PacketTooLarge = 0x95,
44    /// The received data rate is too high.
45    MessageRateTooHigh = 0x96,
46    /// An implementation or administrative imposed limit has been exceeded.
47    QuotaExceeded = 0x97,
48    /// The Connection is closed due to an administrative action.
49    AdministrativeAction = 0x98,
50    /// The payload format does not match the one specified by the Payload Format Indicator.
51    PayloadFormatInvalid = 0x99,
52    /// The Server has does not support retained messages.
53    RetainNotSupported = 0x9A,
54    /// The Client specified a QoS greater than the QoS specified in a Maximum QoS in the CONNACK.
55    QoSNotSupported = 0x9B,
56    /// The Client should temporarily change its Server.
57    UseAnotherServer = 0x9C,
58    /// The Server is moved and the Client should permanently change its server location.
59    ServerMoved = 0x9D,
60    /// The Server does not support Shared Subscriptions.
61    SharedSubscriptionNotSupported = 0x9E,
62    /// This connection is closed because the connection rate is too high.
63    ConnectionRateExceeded = 0x9F,
64    /// The maximum connection time authorized for this connection has been exceeded.
65    MaximumConnectTime = 0xA0,
66    /// The Server does not support Subscription Identifiers; the subscription is not accepted.
67    SubscriptionIdentifiersNotSupported = 0xA1,
68    /// The Server does not support Wildcard subscription; the subscription is not accepted.
69    WildcardSubscriptionsNotSupported = 0xA2,
70}
71
72impl TryFrom<u8> for DisconnectReasonCode {
73    type Error = Error;
74
75    fn try_from(value: u8) -> Result<Self, Self::Error> {
76        let rc = match value {
77            0x00 => Self::NormalDisconnection,
78            0x04 => Self::DisconnectWithWillMessage,
79            0x80 => Self::UnspecifiedError,
80            0x81 => Self::MalformedPacket,
81            0x82 => Self::ProtocolError,
82            0x83 => Self::ImplementationSpecificError,
83            0x87 => Self::NotAuthorized,
84            0x89 => Self::ServerBusy,
85            0x8B => Self::ServerShuttingDown,
86            0x8D => Self::KeepAliveTimeout,
87            0x8E => Self::SessionTakenOver,
88            0x8F => Self::TopicFilterInvalid,
89            0x90 => Self::TopicNameInvalid,
90            0x93 => Self::ReceiveMaximumExceeded,
91            0x94 => Self::TopicAliasInvalid,
92            0x95 => Self::PacketTooLarge,
93            0x96 => Self::MessageRateTooHigh,
94            0x97 => Self::QuotaExceeded,
95            0x98 => Self::AdministrativeAction,
96            0x99 => Self::PayloadFormatInvalid,
97            0x9A => Self::RetainNotSupported,
98            0x9B => Self::QoSNotSupported,
99            0x9C => Self::UseAnotherServer,
100            0x9D => Self::ServerMoved,
101            0x9E => Self::SharedSubscriptionNotSupported,
102            0x9F => Self::ConnectionRateExceeded,
103            0xA0 => Self::MaximumConnectTime,
104            0xA1 => Self::SubscriptionIdentifiersNotSupported,
105            0xA2 => Self::WildcardSubscriptionsNotSupported,
106            other => return Err(Error::InvalidConnectReturnCode(other)),
107        };
108
109        Ok(rc)
110    }
111}
112
113#[derive(Debug, Clone, PartialEq, Eq)]
114pub struct DisconnectProperties {
115    /// Session Expiry Interval in seconds
116    pub session_expiry_interval: Option<u32>,
117
118    /// Human readable reason for the disconnect
119    pub reason_string: Option<String>,
120
121    /// List of user properties
122    pub user_properties: Vec<(String, String)>,
123
124    /// String which can be used by the Client to identify another Server to use.
125    pub server_reference: Option<String>,
126}
127
128#[derive(Debug, Clone, PartialEq, Eq)]
129pub struct Disconnect {
130    /// Disconnect Reason Code
131    pub reason_code: DisconnectReasonCode,
132
133    /// Disconnect Properties
134    pub properties: Option<DisconnectProperties>,
135}
136
137impl DisconnectProperties {
138    // pub fn new() -> Self {
139    //     Self {
140    //         session_expiry_interval: None,
141    //         reason_string: None,
142    //         user_properties: Vec::new(),
143    //         server_reference: None,
144    //     }
145    // }
146
147    fn len(&self) -> usize {
148        let mut length = 0;
149
150        if self.session_expiry_interval.is_some() {
151            length += 1 + 4;
152        }
153
154        if let Some(reason) = &self.reason_string {
155            length += 1 + 2 + reason.len();
156        }
157
158        for (key, value) in self.user_properties.iter() {
159            length += 1 + 2 + key.len() + 2 + value.len();
160        }
161
162        if let Some(server_reference) = &self.server_reference {
163            length += 1 + 2 + server_reference.len();
164        }
165
166        length
167    }
168
169    pub fn extract(bytes: &mut Bytes) -> Result<Option<Self>, Error> {
170        let (properties_len_len, properties_len) = length(bytes.iter())?;
171
172        bytes.advance(properties_len_len);
173
174        if properties_len == 0 {
175            return Ok(None);
176        }
177
178        let mut session_expiry_interval = None;
179        let mut reason_string = None;
180        let mut user_properties = Vec::new();
181        let mut server_reference = None;
182
183        let mut cursor = 0;
184
185        // read until cursor reaches property length. properties_len = 0 will skip this loop
186        while cursor < properties_len {
187            let prop = read_u8(bytes)?;
188            cursor += 1;
189
190            match property(prop)? {
191                PropertyType::SessionExpiryInterval => {
192                    session_expiry_interval = Some(read_u32(bytes)?);
193                    cursor += 4;
194                }
195                PropertyType::ReasonString => {
196                    let reason = read_mqtt_string(bytes)?;
197                    cursor += 2 + reason.len();
198                    reason_string = Some(reason);
199                }
200                PropertyType::UserProperty => {
201                    let key = read_mqtt_string(bytes)?;
202                    let value = read_mqtt_string(bytes)?;
203                    cursor += 2 + key.len() + 2 + value.len();
204                    user_properties.push((key, value));
205                }
206                PropertyType::ServerReference => {
207                    let reference = read_mqtt_string(bytes)?;
208                    cursor += 2 + reference.len();
209                    server_reference = Some(reference);
210                }
211                _ => return Err(Error::InvalidPropertyType(prop)),
212            }
213        }
214
215        let properties = Self {
216            session_expiry_interval,
217            reason_string,
218            user_properties,
219            server_reference,
220        };
221
222        Ok(Some(properties))
223    }
224
225    fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
226        let length = self.len();
227        write_remaining_length(buffer, length)?;
228
229        if let Some(session_expiry_interval) = self.session_expiry_interval {
230            buffer.put_u8(PropertyType::SessionExpiryInterval as u8);
231            buffer.put_u32(session_expiry_interval);
232        }
233
234        if let Some(reason) = &self.reason_string {
235            buffer.put_u8(PropertyType::ReasonString as u8);
236            write_mqtt_string(buffer, reason);
237        }
238
239        for (key, value) in self.user_properties.iter() {
240            buffer.put_u8(PropertyType::UserProperty as u8);
241            write_mqtt_string(buffer, key);
242            write_mqtt_string(buffer, value);
243        }
244
245        if let Some(reference) = &self.server_reference {
246            buffer.put_u8(PropertyType::ServerReference as u8);
247            write_mqtt_string(buffer, reference);
248        }
249
250        Ok(())
251    }
252}
253
254impl Disconnect {
255    pub fn new(reason: DisconnectReasonCode) -> Self {
256        Self {
257            reason_code: reason,
258            properties: None,
259        }
260    }
261
262    fn len(&self) -> usize {
263        if self.reason_code == DisconnectReasonCode::NormalDisconnection
264            && self.properties.is_none()
265        {
266            return 2; // Packet type + 0x00
267        }
268
269        let mut length = 0;
270
271        if let Some(properties) = &self.properties {
272            length += 1; // Disconnect Reason Code
273
274            let properties_len = properties.len();
275            let properties_len_len = len_len(properties_len);
276            length += properties_len_len + properties_len;
277        } else {
278            length += 1;
279        }
280
281        length
282    }
283
284    pub fn size(&self) -> usize {
285        let len = self.len();
286        if len == 2 {
287            return len;
288        }
289
290        let remaining_len_size = len_len(len);
291
292        1 + remaining_len_size + len
293    }
294
295    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<Self, Error> {
296        let packet_type = fixed_header.byte1 >> 4;
297        let flags = fixed_header.byte1 & 0b0000_1111;
298
299        bytes.advance(fixed_header.fixed_header_len);
300
301        if packet_type != PacketType::Disconnect as u8 {
302            return Err(Error::InvalidPacketType(packet_type));
303        };
304
305        if flags != 0x00 {
306            return Err(Error::MalformedPacket);
307        };
308
309        if fixed_header.remaining_len == 0 {
310            return Ok(Self::new(DisconnectReasonCode::NormalDisconnection));
311        }
312
313        let reason_code = read_u8(&mut bytes)?;
314
315        let disconnect = Self {
316            reason_code: reason_code.try_into()?,
317            properties: DisconnectProperties::extract(&mut bytes)?,
318        };
319
320        Ok(disconnect)
321    }
322
323    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
324        buffer.put_u8(0xE0);
325
326        let length = self.len();
327
328        if length == 2 {
329            buffer.put_u8(0x00);
330            return Ok(length);
331        }
332
333        let len_len = write_remaining_length(buffer, length)?;
334
335        buffer.put_u8(self.reason_code as u8);
336
337        if let Some(properties) = &self.properties {
338            properties.write(buffer)?;
339        } else {
340            write_remaining_length(buffer, 0)?;
341        }
342
343        Ok(1 + len_len + length)
344    }
345}
346
347#[cfg(test)]
348mod test {
349    use bytes::BytesMut;
350
351    use super::parse_fixed_header;
352
353    use super::{Disconnect, DisconnectProperties, DisconnectReasonCode};
354
355    #[test]
356    fn disconnect1_parsing_works() {
357        let mut buffer = bytes::BytesMut::new();
358        let packet_bytes = [
359            0xE0, // Packet type
360            0x00, // Remaining length
361        ];
362        let expected = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
363
364        buffer.extend_from_slice(&packet_bytes[..]);
365
366        let fixed_header = parse_fixed_header(buffer.iter()).unwrap();
367        let disconnect_bytes = buffer.split_to(fixed_header.frame_length()).freeze();
368        let disconnect = Disconnect::read(fixed_header, disconnect_bytes).unwrap();
369
370        assert_eq!(disconnect, expected);
371    }
372
373    #[test]
374    fn disconnect1_encoding_works() {
375        let mut buffer = BytesMut::new();
376        let disconnect = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
377        let expected = [
378            0xE0, // Packet type
379            0x00, // Remaining length
380        ];
381
382        disconnect.write(&mut buffer).unwrap();
383
384        assert_eq!(&buffer[..], &expected);
385    }
386
387    fn sample2() -> Disconnect {
388        let properties = DisconnectProperties {
389            // TODO: change to 2137 xD
390            session_expiry_interval: Some(1234),
391            reason_string: Some("test".to_owned()),
392            user_properties: vec![("test".to_owned(), "test".to_owned())],
393            server_reference: Some("test".to_owned()),
394        };
395
396        Disconnect {
397            reason_code: DisconnectReasonCode::UnspecifiedError,
398            properties: Some(properties),
399        }
400    }
401
402    fn sample_bytes2() -> Vec<u8> {
403        vec![
404            0xE0, // Packet type
405            0x22, // Remaining length
406            0x80, // Disconnect Reason Code
407            0x20, // Properties length
408            0x11, 0x00, 0x00, 0x04, 0xd2, // Session expiry interval
409            0x1F, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, // Reason string
410            0x26, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, 0x00, 0x04, 0x74, 0x65, 0x73,
411            0x74, // User properties
412            0x1C, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, // server reference
413        ]
414    }
415
416    #[test]
417    fn disconnect2_parsing_works() {
418        let mut buffer = bytes::BytesMut::new();
419        let packet_bytes = sample_bytes2();
420        let expected = sample2();
421
422        buffer.extend_from_slice(&packet_bytes[..]);
423
424        let fixed_header = parse_fixed_header(buffer.iter()).unwrap();
425        let disconnect_bytes = buffer.split_to(fixed_header.frame_length()).freeze();
426        let disconnect = Disconnect::read(fixed_header, disconnect_bytes).unwrap();
427
428        assert_eq!(disconnect, expected);
429    }
430
431    #[test]
432    fn disconnect2_encoding_works() {
433        let mut buffer = BytesMut::new();
434
435        let disconnect = sample2();
436        let expected = sample_bytes2();
437
438        disconnect.write(&mut buffer).unwrap();
439
440        assert_eq!(&buffer[..], &expected);
441    }
442
443    // use super::*;
444    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
445    // use bytes::BytesMut;
446    use pretty_assertions::assert_eq;
447
448    #[test]
449    fn length_calculation() {
450        let mut dummy_bytes = BytesMut::new();
451        // Use user_properties to pad the size to exceed ~128 bytes to make the
452        // remaining_length field in the packet be 2 bytes long.
453        let disconn_props = DisconnectProperties {
454            session_expiry_interval: None,
455            reason_string: None,
456            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
457            server_reference: None,
458        };
459
460        let mut disconn_pkt = Disconnect::new(DisconnectReasonCode::NormalDisconnection);
461        disconn_pkt.properties = Some(disconn_props);
462
463        let size_from_size = disconn_pkt.size();
464        let size_from_write = disconn_pkt.write(&mut dummy_bytes).unwrap();
465        let size_from_bytes = dummy_bytes.len();
466
467        assert_eq!(size_from_write, size_from_bytes);
468        assert_eq!(size_from_size, size_from_bytes);
469    }
470}