ruzstd/streaming_decoder.rs
1use core::borrow::BorrowMut;
2
3use crate::frame_decoder::{BlockDecodingStrategy, FrameDecoder, FrameDecoderError};
4use crate::io::{Error, ErrorKind, Read};
5
6/// High level Zstandard frame decoder that can be used to decompress a given Zstandard frame.
7///
8/// This decoder implements `io::Read`, so you can interact with it by calling
9/// `io::Read::read_to_end` / `io::Read::read_exact` or passing this to another library / module as a source for the decoded content
10///
11/// If you need more control over how decompression takes place, you can use
12/// the lower level [FrameDecoder], which allows for greater control over how
13/// decompression takes place but the implementor must call
14/// [FrameDecoder::decode_blocks] repeatedly to decode the entire frame.
15///
16/// ## Caveat
17/// [StreamingDecoder] expects the underlying stream to only contain a single frame,
18/// yet the specification states that a single archive may contain multiple frames.
19///
20/// To decode all the frames in a finite stream, the calling code needs to recreate
21/// the instance of the decoder and handle
22/// [crate::frame::ReadFrameHeaderError::SkipFrame]
23/// errors by skipping forward the `length` amount of bytes, see <https://github.com/KillingSpark/zstd-rs/issues/57>
24///
25/// ```no_run
26/// // `read_to_end` is not implemented by the no_std implementation.
27/// #[cfg(feature = "std")]
28/// {
29/// use std::fs::File;
30/// use std::io::Read;
31/// use ruzstd::{StreamingDecoder};
32///
33/// // Read a Zstandard archive from the filesystem then decompress it into a vec.
34/// let mut f: File = todo!("Read a .zstd archive from somewhere");
35/// let mut decoder = StreamingDecoder::new(f).unwrap();
36/// let mut result = Vec::new();
37/// Read::read_to_end(&mut decoder, &mut result).unwrap();
38/// }
39/// ```
40pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
41 pub decoder: DEC,
42 source: READ,
43}
44
45impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
46 pub fn new_with_decoder(
47 mut source: READ,
48 mut decoder: DEC,
49 ) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
50 decoder.borrow_mut().init(&mut source)?;
51 Ok(StreamingDecoder { decoder, source })
52 }
53}
54
55impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
56 pub fn new(
57 mut source: READ,
58 ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
59 let mut decoder = FrameDecoder::new();
60 decoder.init(&mut source)?;
61 Ok(StreamingDecoder { decoder, source })
62 }
63}
64
65impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
66 /// Gets a reference to the underlying reader.
67 pub fn get_ref(&self) -> &READ {
68 &self.source
69 }
70
71 /// Gets a mutable reference to the underlying reader.
72 ///
73 /// It is inadvisable to directly read from the underlying reader.
74 pub fn get_mut(&mut self) -> &mut READ {
75 &mut self.source
76 }
77
78 /// Destructures this object into the inner reader.
79 pub fn into_inner(self) -> READ
80 where
81 READ: Sized,
82 {
83 self.source
84 }
85
86 /// Destructures this object into both the inner reader and [FrameDecoder].
87 pub fn into_parts(self) -> (READ, DEC)
88 where
89 READ: Sized,
90 {
91 (self.source, self.decoder)
92 }
93
94 /// Destructures this object into the inner [FrameDecoder].
95 pub fn into_frame_decoder(self) -> DEC {
96 self.decoder
97 }
98}
99
100impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
101 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
102 let decoder = self.decoder.borrow_mut();
103 if decoder.is_finished() && decoder.can_collect() == 0 {
104 //No more bytes can ever be decoded
105 return Ok(0);
106 }
107
108 // need to loop. The UpToBytes strategy doesn't take any effort to actually reach that limit.
109 // The first few calls can result in just filling the decode buffer but these bytes can not be collected.
110 // So we need to call this until we can actually collect enough bytes
111
112 // TODO add BlockDecodingStrategy::UntilCollectable(usize) that pushes this logic into the decode_blocks function
113 while decoder.can_collect() < buf.len() && !decoder.is_finished() {
114 //More bytes can be decoded
115 let additional_bytes_needed = buf.len() - decoder.can_collect();
116 match decoder.decode_blocks(
117 &mut self.source,
118 BlockDecodingStrategy::UptoBytes(additional_bytes_needed),
119 ) {
120 Ok(_) => { /*Nothing to do*/ }
121 Err(e) => {
122 let err;
123 #[cfg(feature = "std")]
124 {
125 err = Error::new(ErrorKind::Other, e);
126 }
127 #[cfg(not(feature = "std"))]
128 {
129 err = Error::new(ErrorKind::Other, alloc::boxed::Box::new(e));
130 }
131 return Err(err);
132 }
133 }
134 }
135
136 decoder.read(buf)
137 }
138}