ruzstd/decoding/streaming_decoder.rs
1//! The [StreamingDecoder] wraps a [FrameDecoder] and provides a Read impl that decodes data when necessary
2
3use core::borrow::BorrowMut;
4
5use crate::decoding::errors::FrameDecoderError;
6use crate::decoding::{BlockDecodingStrategy, FrameDecoder};
7#[cfg(not(feature = "std"))]
8use crate::io::ErrorKind;
9use crate::io::{Error, Read};
10
11/// High level Zstandard frame decoder that can be used to decompress a given Zstandard frame.
12///
13/// This decoder implements `io::Read`, so you can interact with it by calling
14/// `io::Read::read_to_end` / `io::Read::read_exact` or passing this to another library / module as a source for the decoded content
15///
16/// If you need more control over how decompression takes place, you can use
17/// the lower level [FrameDecoder], which allows for greater control over how
18/// decompression takes place but the implementor must call
19/// [FrameDecoder::decode_blocks] repeatedly to decode the entire frame.
20///
21/// ## Caveat
22/// [StreamingDecoder] expects the underlying stream to only contain a single frame,
23/// yet the specification states that a single archive may contain multiple frames.
24///
25/// To decode all the frames in a finite stream, the calling code needs to recreate
26/// the instance of the decoder and handle
27/// [crate::decoding::errors::ReadFrameHeaderError::SkipFrame]
28/// errors by skipping forward the `length` amount of bytes, see <https://github.com/KillingSpark/zstd-rs/issues/57>
29///
30/// ```no_run
31/// // `read_to_end` is not implemented by the no_std implementation.
32/// #[cfg(feature = "std")]
33/// {
34/// use std::fs::File;
35/// use std::io::Read;
36/// use ruzstd::decoding::StreamingDecoder;
37///
38/// // Read a Zstandard archive from the filesystem then decompress it into a vec.
39/// let mut f: File = todo!("Read a .zstd archive from somewhere");
40/// let mut decoder = StreamingDecoder::new(f).unwrap();
41/// let mut result = Vec::new();
42/// Read::read_to_end(&mut decoder, &mut result).unwrap();
43/// }
44/// ```
45pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
46 pub decoder: DEC,
47 source: READ,
48}
49
50impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
51 pub fn new_with_decoder(
52 mut source: READ,
53 mut decoder: DEC,
54 ) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
55 decoder.borrow_mut().init(&mut source)?;
56 Ok(StreamingDecoder { decoder, source })
57 }
58}
59
60impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
61 pub fn new(
62 mut source: READ,
63 ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
64 let mut decoder = FrameDecoder::new();
65 decoder.init(&mut source)?;
66 Ok(StreamingDecoder { decoder, source })
67 }
68}
69
70impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
71 /// Gets a reference to the underlying reader.
72 pub fn get_ref(&self) -> &READ {
73 &self.source
74 }
75
76 /// Gets a mutable reference to the underlying reader.
77 ///
78 /// It is inadvisable to directly read from the underlying reader.
79 pub fn get_mut(&mut self) -> &mut READ {
80 &mut self.source
81 }
82
83 /// Destructures this object into the inner reader.
84 pub fn into_inner(self) -> READ
85 where
86 READ: Sized,
87 {
88 self.source
89 }
90
91 /// Destructures this object into both the inner reader and [FrameDecoder].
92 pub fn into_parts(self) -> (READ, DEC)
93 where
94 READ: Sized,
95 {
96 (self.source, self.decoder)
97 }
98
99 /// Destructures this object into the inner [FrameDecoder].
100 pub fn into_frame_decoder(self) -> DEC {
101 self.decoder
102 }
103}
104
105impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
106 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
107 let decoder = self.decoder.borrow_mut();
108 if decoder.is_finished() && decoder.can_collect() == 0 {
109 //No more bytes can ever be decoded
110 return Ok(0);
111 }
112
113 // need to loop. The UpToBytes strategy doesn't take any effort to actually reach that limit.
114 // The first few calls can result in just filling the decode buffer but these bytes can not be collected.
115 // So we need to call this until we can actually collect enough bytes
116
117 // TODO add BlockDecodingStrategy::UntilCollectable(usize) that pushes this logic into the decode_blocks function
118 while decoder.can_collect() < buf.len() && !decoder.is_finished() {
119 //More bytes can be decoded
120 let additional_bytes_needed = buf.len() - decoder.can_collect();
121 match decoder.decode_blocks(
122 &mut self.source,
123 BlockDecodingStrategy::UptoBytes(additional_bytes_needed),
124 ) {
125 Ok(_) => { /*Nothing to do*/ }
126 Err(e) => {
127 let err;
128 #[cfg(feature = "std")]
129 {
130 err = Error::other(e);
131 }
132 #[cfg(not(feature = "std"))]
133 {
134 err = Error::new(ErrorKind::Other, alloc::boxed::Box::new(e));
135 }
136 return Err(err);
137 }
138 }
139 }
140
141 decoder.read(buf)
142 }
143}