omf/file/
sub_file.rs

1use std::{
2    io::{Read, Seek, SeekFrom},
3    sync::Arc,
4};
5
6use super::ReadAt;
7
8/// A seek-able sub-file with a start and end point within a larger file.
9#[derive(Clone)]
10pub struct SubFile<R> {
11    inner: Arc<R>,
12    /// Start of the sub-file within `inner`.
13    start: u64,
14    /// The current file cursor position within the sub-file.
15    position: u64,
16    /// The length of the sub-file in bytes.
17    len: u64,
18}
19
20impl<R> std::fmt::Debug for SubFile<R> {
21    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22        f.debug_struct("SubFile")
23            .field("inner", &"...")
24            .field("start", &self.start)
25            .field("position", &self.position)
26            .field("len", &self.len)
27            .finish()
28    }
29}
30
31impl<R: ReadAt> SubFile<R> {
32    /// Creates a sub-file from seek-able object.
33    ///
34    /// This new file will its start and zero position at the current position of `inner` and
35    /// extend up to `len` bytes.
36    pub fn new(inner: Arc<R>, start: u64, len: u64) -> std::io::Result<Self> {
37        start
38            .checked_add(len)
39            .expect("start + len should not overflow");
40        Ok(Self {
41            start,
42            inner,
43            position: 0,
44            len,
45        })
46    }
47
48    /// Returns the total length of the sub-file, ignoring the current position.
49    pub fn len(&self) -> u64 {
50        self.len
51    }
52
53    /// Returns true if the file is empty.
54    pub fn is_empty(&self) -> bool {
55        self.len == 0
56    }
57
58    /// Returns the number of bytes remaining in the sub-file.
59    pub fn remaining(&self) -> u64 {
60        self.len.saturating_sub(self.position)
61    }
62
63    /// Returns a new sub-file that is a sub-range of this one.
64    pub fn sub_file(&self, start: u64, len: u64) -> std::io::Result<Self> {
65        Self::new(self.inner.clone(), self.start.saturating_add(start), len)
66    }
67}
68
69impl<R: ReadAt> Read for SubFile<R> {
70    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
71        if self.position >= self.len {
72            return Ok(0);
73        }
74        let limit = usize::try_from((buf.len() as u64).min(self.remaining())).expect("valid limit");
75        let n = self
76            .inner
77            .read_at(&mut buf[..limit], self.start + self.position)?;
78        self.position += n as u64;
79        Ok(n)
80    }
81}
82
83impl<R: ReadAt> Seek for SubFile<R> {
84    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
85        let new_position = match pos {
86            SeekFrom::Start(pos) => pos as i64,
87            SeekFrom::End(delta) => (self.len as i64).saturating_add(delta),
88            SeekFrom::Current(delta) => (self.position as i64).saturating_add(delta),
89        };
90        self.position =
91            u64::try_from(new_position).map_err(|_| std::io::ErrorKind::InvalidInput)?;
92        Ok(self.position)
93    }
94
95    fn stream_position(&mut self) -> std::io::Result<u64> {
96        Ok(self.position)
97    }
98}
99
100#[cfg(feature = "parquet")]
101impl<R: ReadAt> parquet::file::reader::Length for SubFile<R> {
102    fn len(&self) -> u64 {
103        self.len
104    }
105}
106
107#[cfg(feature = "parquet")]
108impl<R: ReadAt> parquet::file::reader::ChunkReader for SubFile<R> {
109    type T = SubFile<R>;
110
111    fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
112        Ok(Self {
113            inner: self.inner.clone(),
114            start: self.start.saturating_add(start),
115            position: 0,
116            len: self.len.saturating_sub(start),
117        })
118    }
119
120    fn get_bytes(&self, start: u64, length: usize) -> parquet::errors::Result<bytes::Bytes> {
121        let mut buf = Vec::with_capacity(length);
122        self.get_read(start)?.read_to_end(&mut buf)?;
123        Ok(buf.into())
124    }
125}
126
127#[cfg(test)]
128mod tests {
129    use std::path::Path;
130
131    use super::*;
132
133    #[test]
134    fn subfile() {
135        let path = Path::new("./target/tmp/subfile.txt");
136        std::fs::write(path, b"0123456789").unwrap();
137        let base = Arc::new(std::fs::File::open(path).unwrap());
138        let mut t = SubFile::new(base.clone(), 2, 6).unwrap();
139        let mut buf = [0; 5];
140        t.read_exact(&mut buf).unwrap();
141        assert_eq!(&buf, b"23456");
142        let mut buf = [0; 2];
143        t.seek(SeekFrom::Current(-2)).unwrap();
144        t.read_exact(&mut buf).unwrap();
145        assert_eq!(&buf, b"56");
146        t.seek(SeekFrom::Current(-3)).unwrap();
147        t.read_exact(&mut buf).unwrap();
148        assert_eq!(&buf, b"45");
149        t.seek(SeekFrom::Start(0)).unwrap();
150        t.read_exact(&mut buf).unwrap();
151        assert_eq!(&buf, b"23");
152        let mut buf = [0; 10];
153        let e = t.read_exact(&mut buf).unwrap_err();
154        assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
155        let e = t.seek(SeekFrom::End(-10)).unwrap_err();
156        assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
157    }
158}