ruzstd/decoding/
streaming_decoder.rs

1//! The [StreamingDecoder] wraps a [FrameDecoder] and provides a Read impl that decodes data when necessary
2
3use core::borrow::BorrowMut;
4
5use crate::decoding::errors::FrameDecoderError;
6use crate::decoding::{BlockDecodingStrategy, FrameDecoder};
7use crate::io::{Error, ErrorKind, Read};
8
9/// High level Zstandard frame decoder that can be used to decompress a given Zstandard frame.
10///
11/// This decoder implements `io::Read`, so you can interact with it by calling
12/// `io::Read::read_to_end` / `io::Read::read_exact` or passing this to another library / module as a source for the decoded content
13///
14/// If you need more control over how decompression takes place, you can use
15/// the lower level [FrameDecoder], which allows for greater control over how
16/// decompression takes place but the implementor must call
17/// [FrameDecoder::decode_blocks] repeatedly to decode the entire frame.
18///
19/// ## Caveat
20/// [StreamingDecoder] expects the underlying stream to only contain a single frame,
21/// yet the specification states that a single archive may contain multiple frames.
22///
23/// To decode all the frames in a finite stream, the calling code needs to recreate
24/// the instance of the decoder and handle
25/// [crate::decoding::errors::ReadFrameHeaderError::SkipFrame]
26/// errors by skipping forward the `length` amount of bytes, see <https://github.com/KillingSpark/zstd-rs/issues/57>
27///
28/// ```no_run
29/// // `read_to_end` is not implemented by the no_std implementation.
30/// #[cfg(feature = "std")]
31/// {
32///     use std::fs::File;
33///     use std::io::Read;
34///     use ruzstd::decoding::StreamingDecoder;
35///
36///     // Read a Zstandard archive from the filesystem then decompress it into a vec.
37///     let mut f: File = todo!("Read a .zstd archive from somewhere");
38///     let mut decoder = StreamingDecoder::new(f).unwrap();
39///     let mut result = Vec::new();
40///     Read::read_to_end(&mut decoder, &mut result).unwrap();
41/// }
42/// ```
43pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
44    pub decoder: DEC,
45    source: READ,
46}
47
48impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
49    pub fn new_with_decoder(
50        mut source: READ,
51        mut decoder: DEC,
52    ) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
53        decoder.borrow_mut().init(&mut source)?;
54        Ok(StreamingDecoder { decoder, source })
55    }
56}
57
58impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
59    pub fn new(
60        mut source: READ,
61    ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
62        let mut decoder = FrameDecoder::new();
63        decoder.init(&mut source)?;
64        Ok(StreamingDecoder { decoder, source })
65    }
66}
67
68impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
69    /// Gets a reference to the underlying reader.
70    pub fn get_ref(&self) -> &READ {
71        &self.source
72    }
73
74    /// Gets a mutable reference to the underlying reader.
75    ///
76    /// It is inadvisable to directly read from the underlying reader.
77    pub fn get_mut(&mut self) -> &mut READ {
78        &mut self.source
79    }
80
81    /// Destructures this object into the inner reader.
82    pub fn into_inner(self) -> READ
83    where
84        READ: Sized,
85    {
86        self.source
87    }
88
89    /// Destructures this object into both the inner reader and [FrameDecoder].
90    pub fn into_parts(self) -> (READ, DEC)
91    where
92        READ: Sized,
93    {
94        (self.source, self.decoder)
95    }
96
97    /// Destructures this object into the inner [FrameDecoder].
98    pub fn into_frame_decoder(self) -> DEC {
99        self.decoder
100    }
101}
102
103impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
104    fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
105        let decoder = self.decoder.borrow_mut();
106        if decoder.is_finished() && decoder.can_collect() == 0 {
107            //No more bytes can ever be decoded
108            return Ok(0);
109        }
110
111        // need to loop. The UpToBytes strategy doesn't take any effort to actually reach that limit.
112        // The first few calls can result in just filling the decode buffer but these bytes can not be collected.
113        // So we need to call this until we can actually collect enough bytes
114
115        // TODO add BlockDecodingStrategy::UntilCollectable(usize) that pushes this logic into the decode_blocks function
116        while decoder.can_collect() < buf.len() && !decoder.is_finished() {
117            //More bytes can be decoded
118            let additional_bytes_needed = buf.len() - decoder.can_collect();
119            match decoder.decode_blocks(
120                &mut self.source,
121                BlockDecodingStrategy::UptoBytes(additional_bytes_needed),
122            ) {
123                Ok(_) => { /*Nothing to do*/ }
124                Err(e) => {
125                    let err;
126                    #[cfg(feature = "std")]
127                    {
128                        err = Error::new(ErrorKind::Other, e);
129                    }
130                    #[cfg(not(feature = "std"))]
131                    {
132                        err = Error::new(ErrorKind::Other, alloc::boxed::Box::new(e));
133                    }
134                    return Err(err);
135                }
136            }
137        }
138
139        decoder.read(buf)
140    }
141}