rumqttc/v5/mqttbytes/v5/
pubrel.rs

1use super::*;
2use bytes::{Buf, BufMut, Bytes, BytesMut};
3
4/// Return code in PubRel
5#[derive(Debug, Clone, Copy, PartialEq, Eq)]
6#[repr(u8)]
7pub enum PubRelReason {
8    Success,
9    PacketIdentifierNotFound,
10}
11
12/// QoS2 Publish release, in response to PUBREC packet
13#[derive(Debug, Clone, PartialEq, Eq)]
14pub struct PubRel {
15    pub pkid: u16,
16    pub reason: PubRelReason,
17    pub properties: Option<PubRelProperties>,
18}
19
20impl PubRel {
21    pub fn new(pkid: u16, properties: Option<PubRelProperties>) -> Self {
22        Self {
23            pkid,
24            reason: PubRelReason::Success,
25            properties,
26        }
27    }
28
29    pub fn size(&self) -> usize {
30        // If there are no properties during success, sending reason code is optional
31        if self.reason == PubRelReason::Success && self.properties.is_none() {
32            return 4;
33        }
34
35        let len = self.len();
36        let remaining_len_size = len_len(len);
37
38        1 + remaining_len_size + len
39    }
40
41    fn len(&self) -> usize {
42        let mut len = 2 + 1; // pkid + reason
43
44        // The Reason Code and Property Length can be omitted if the Reason Code is 0x00 (Success)
45        // and there are no Properties. In this case the PUBREL has a Remaining Length of 2.
46        // <https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901144>
47        if self.reason == PubRelReason::Success && self.properties.is_none() {
48            return 2;
49        }
50
51        if let Some(p) = &self.properties {
52            let properties_len = p.len();
53            let properties_len_len = len_len(properties_len);
54            len += properties_len_len + properties_len;
55        } else {
56            len += 1;
57        }
58
59        len
60    }
61
62    pub fn read(fixed_header: FixedHeader, mut bytes: Bytes) -> Result<PubRel, Error> {
63        let variable_header_index = fixed_header.fixed_header_len;
64        bytes.advance(variable_header_index);
65        let pkid = read_u16(&mut bytes)?;
66        if fixed_header.remaining_len == 2 {
67            return Ok(PubRel {
68                pkid,
69                reason: PubRelReason::Success,
70                properties: None,
71            });
72        }
73
74        let ack_reason = read_u8(&mut bytes)?;
75        if fixed_header.remaining_len < 4 {
76            return Ok(PubRel {
77                pkid,
78                reason: reason(ack_reason)?,
79                properties: None,
80            });
81        }
82
83        let properties = PubRelProperties::read(&mut bytes)?;
84        let puback = PubRel {
85            pkid,
86            reason: reason(ack_reason)?,
87            properties,
88        };
89
90        Ok(puback)
91    }
92
93    pub fn write(&self, buffer: &mut BytesMut) -> Result<usize, Error> {
94        let len = self.len();
95        buffer.put_u8(0x62);
96        let count = write_remaining_length(buffer, len)?;
97        buffer.put_u16(self.pkid);
98
99        // If there are no properties during success, sending reason code is optional
100        if self.reason == PubRelReason::Success && self.properties.is_none() {
101            return Ok(4);
102        }
103
104        buffer.put_u8(code(self.reason));
105
106        if let Some(p) = &self.properties {
107            p.write(buffer)?;
108        } else {
109            write_remaining_length(buffer, 0)?;
110        }
111
112        Ok(1 + count + len)
113    }
114}
115
116#[derive(Debug, Clone, PartialEq, Eq)]
117pub struct PubRelProperties {
118    pub reason_string: Option<String>,
119    pub user_properties: Vec<(String, String)>,
120}
121
122impl PubRelProperties {
123    fn len(&self) -> usize {
124        let mut len = 0;
125
126        if let Some(reason) = &self.reason_string {
127            len += 1 + 2 + reason.len();
128        }
129
130        for (key, value) in self.user_properties.iter() {
131            len += 1 + 2 + key.len() + 2 + value.len();
132        }
133
134        len
135    }
136
137    pub fn read(bytes: &mut Bytes) -> Result<Option<PubRelProperties>, Error> {
138        let mut reason_string = None;
139        let mut user_properties = Vec::new();
140
141        let (properties_len_len, properties_len) = length(bytes.iter())?;
142        bytes.advance(properties_len_len);
143        if properties_len == 0 {
144            return Ok(None);
145        }
146
147        let mut cursor = 0;
148        // read until cursor reaches property length. properties_len = 0 will skip this loop
149        while cursor < properties_len {
150            let prop = read_u8(bytes)?;
151            cursor += 1;
152
153            match property(prop)? {
154                PropertyType::ReasonString => {
155                    let reason = read_mqtt_string(bytes)?;
156                    cursor += 2 + reason.len();
157                    reason_string = Some(reason);
158                }
159                PropertyType::UserProperty => {
160                    let key = read_mqtt_string(bytes)?;
161                    let value = read_mqtt_string(bytes)?;
162                    cursor += 2 + key.len() + 2 + value.len();
163                    user_properties.push((key, value));
164                }
165                _ => return Err(Error::InvalidPropertyType(prop)),
166            }
167        }
168
169        Ok(Some(PubRelProperties {
170            reason_string,
171            user_properties,
172        }))
173    }
174
175    pub fn write(&self, buffer: &mut BytesMut) -> Result<(), Error> {
176        let len = self.len();
177        write_remaining_length(buffer, len)?;
178
179        if let Some(reason) = &self.reason_string {
180            buffer.put_u8(PropertyType::ReasonString as u8);
181            write_mqtt_string(buffer, reason);
182        }
183
184        for (key, value) in self.user_properties.iter() {
185            buffer.put_u8(PropertyType::UserProperty as u8);
186            write_mqtt_string(buffer, key);
187            write_mqtt_string(buffer, value);
188        }
189
190        Ok(())
191    }
192}
193
194/// Connection return code type
195fn reason(num: u8) -> Result<PubRelReason, Error> {
196    let code = match num {
197        0 => PubRelReason::Success,
198        146 => PubRelReason::PacketIdentifierNotFound,
199        num => return Err(Error::InvalidConnectReturnCode(num)),
200    };
201
202    Ok(code)
203}
204
205fn code(reason: PubRelReason) -> u8 {
206    match reason {
207        PubRelReason::Success => 0,
208        PubRelReason::PacketIdentifierNotFound => 146,
209    }
210}
211
212#[cfg(test)]
213mod test {
214    use super::super::test::{USER_PROP_KEY, USER_PROP_VAL};
215    use super::*;
216    use bytes::BytesMut;
217    use pretty_assertions::assert_eq;
218
219    #[test]
220    fn length_calculation() {
221        let mut dummy_bytes = BytesMut::new();
222        // Use user_properties to pad the size to exceed ~128 bytes to make the
223        // remaining_length field in the packet be 2 bytes long.
224        let pubrel_props = PubRelProperties {
225            reason_string: None,
226            user_properties: vec![(USER_PROP_KEY.into(), USER_PROP_VAL.into())],
227        };
228
229        let pubrel_pkt = PubRel::new(1, Some(pubrel_props));
230
231        let size_from_size = pubrel_pkt.size();
232        let size_from_write = pubrel_pkt.write(&mut dummy_bytes).unwrap();
233        let size_from_bytes = dummy_bytes.len();
234
235        assert_eq!(size_from_write, size_from_bytes);
236        assert_eq!(size_from_size, size_from_bytes);
237    }
238}