1use std::{
2 io::{Read, Seek, SeekFrom},
3 sync::Arc,
4};
5
6use super::ReadAt;
7
8#[derive(Clone)]
10pub struct SubFile<R> {
11 inner: Arc<R>,
12 start: u64,
14 position: u64,
16 len: u64,
18}
19
20impl<R> std::fmt::Debug for SubFile<R> {
21 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
22 f.debug_struct("SubFile")
23 .field("inner", &"...")
24 .field("start", &self.start)
25 .field("position", &self.position)
26 .field("len", &self.len)
27 .finish()
28 }
29}
30
31impl<R: ReadAt> SubFile<R> {
32 pub fn new(inner: Arc<R>, start: u64, len: u64) -> std::io::Result<Self> {
37 start
38 .checked_add(len)
39 .expect("start + len should not overflow");
40 Ok(Self {
41 start,
42 inner,
43 position: 0,
44 len,
45 })
46 }
47
48 pub fn len(&self) -> u64 {
50 self.len
51 }
52
53 pub fn is_empty(&self) -> bool {
55 self.len == 0
56 }
57
58 pub fn remaining(&self) -> u64 {
60 self.len.saturating_sub(self.position)
61 }
62
63 pub fn sub_file(&self, start: u64, len: u64) -> std::io::Result<Self> {
65 Self::new(self.inner.clone(), self.start.saturating_add(start), len)
66 }
67}
68
69impl<R: ReadAt> Read for SubFile<R> {
70 fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
71 if self.position >= self.len {
72 return Ok(0);
73 }
74 let limit = usize::try_from((buf.len() as u64).min(self.remaining())).expect("valid limit");
75 let n = self
76 .inner
77 .read_at(&mut buf[..limit], self.start + self.position)?;
78 self.position += n as u64;
79 Ok(n)
80 }
81}
82
83impl<R: ReadAt> Seek for SubFile<R> {
84 fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
85 let new_position = match pos {
86 SeekFrom::Start(pos) => pos as i64,
87 SeekFrom::End(delta) => (self.len as i64).saturating_add(delta),
88 SeekFrom::Current(delta) => (self.position as i64).saturating_add(delta),
89 };
90 self.position =
91 u64::try_from(new_position).map_err(|_| std::io::ErrorKind::InvalidInput)?;
92 Ok(self.position)
93 }
94
95 fn stream_position(&mut self) -> std::io::Result<u64> {
96 Ok(self.position)
97 }
98}
99
100#[cfg(feature = "parquet")]
101impl<R: ReadAt> parquet::file::reader::Length for SubFile<R> {
102 fn len(&self) -> u64 {
103 self.len
104 }
105}
106
107#[cfg(feature = "parquet")]
108impl<R: ReadAt> parquet::file::reader::ChunkReader for SubFile<R> {
109 type T = SubFile<R>;
110
111 fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
112 Ok(Self {
113 inner: self.inner.clone(),
114 start: self.start.saturating_add(start),
115 position: 0,
116 len: self.len.saturating_sub(start),
117 })
118 }
119
120 fn get_bytes(&self, start: u64, length: usize) -> parquet::errors::Result<bytes::Bytes> {
121 let mut buf = Vec::with_capacity(length);
122 self.get_read(start)?.read_to_end(&mut buf)?;
123 Ok(buf.into())
124 }
125}
126
127#[cfg(test)]
128mod tests {
129 use std::path::Path;
130
131 use super::*;
132
133 #[test]
134 fn subfile() {
135 let path = Path::new("./target/tmp/subfile.txt");
136 std::fs::write(path, b"0123456789").unwrap();
137 let base = Arc::new(std::fs::File::open(path).unwrap());
138 let mut t = SubFile::new(base.clone(), 2, 6).unwrap();
139 let mut buf = [0; 5];
140 t.read_exact(&mut buf).unwrap();
141 assert_eq!(&buf, b"23456");
142 let mut buf = [0; 2];
143 t.seek(SeekFrom::Current(-2)).unwrap();
144 t.read_exact(&mut buf).unwrap();
145 assert_eq!(&buf, b"56");
146 t.seek(SeekFrom::Current(-3)).unwrap();
147 t.read_exact(&mut buf).unwrap();
148 assert_eq!(&buf, b"45");
149 t.seek(SeekFrom::Start(0)).unwrap();
150 t.read_exact(&mut buf).unwrap();
151 assert_eq!(&buf, b"23");
152 let mut buf = [0; 10];
153 let e = t.read_exact(&mut buf).unwrap_err();
154 assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
155 let e = t.seek(SeekFrom::End(-10)).unwrap_err();
156 assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
157 }
158}