1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
use std::{
    fs::File,
    io::{Read, Seek, SeekFrom},
    sync::Arc,
};

/// A seek-able sub-file with a start and end point within a larger file.
pub struct SubFile {
    inner: Arc<File>,
    /// Start of the sub-file within `inner`.
    start: u64,
    /// The current file cursor position within the sub-file.
    position: u64,
    /// The length of the sub-file in bytes.
    len: u64,
}

impl SubFile {
    /// Creates a sub-file from seek-able object.
    ///
    /// This new file will its start and zero position at the current position of `inner` and
    /// extend up to `len` bytes.
    pub fn new(inner: Arc<File>, start: u64, len: u64) -> std::io::Result<Self> {
        start
            .checked_add(len)
            .expect("start + len should not overflow");
        Ok(Self {
            start,
            inner,
            position: 0,
            len,
        })
    }

    /// Returns the total length of the sub-file, ignoring the current position.
    pub fn len(&self) -> u64 {
        self.len
    }

    /// Returns true if the file is empty.
    pub fn is_empty(&self) -> bool {
        self.len == 0
    }

    /// Returns the number of bytes remaining in the sub-file.
    fn remaining(&self) -> u64 {
        self.len.saturating_sub(self.position)
    }
}

impl Read for SubFile {
    fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
        if self.position >= self.len {
            return Ok(0);
        }
        let limit = usize::try_from((buf.len() as u64).min(self.remaining())).expect("valid limit");
        let n = read_at(
            self.inner.as_ref(),
            &mut buf[..limit],
            self.start + self.position,
        )?;
        self.position += n as u64;
        Ok(n)
    }
}

impl Seek for SubFile {
    fn seek(&mut self, pos: SeekFrom) -> std::io::Result<u64> {
        let new_position = match pos {
            SeekFrom::Start(pos) => pos as i64,
            SeekFrom::End(delta) => (self.len as i64).saturating_add(delta),
            SeekFrom::Current(delta) => (self.position as i64).saturating_add(delta),
        };
        self.position =
            u64::try_from(new_position).map_err(|_| std::io::ErrorKind::InvalidInput)?;
        Ok(self.position)
    }

    fn stream_position(&mut self) -> std::io::Result<u64> {
        Ok(self.position)
    }
}

#[cfg(feature = "parquet")]
impl parquet::file::reader::Length for SubFile {
    fn len(&self) -> u64 {
        self.len
    }
}

#[cfg(feature = "parquet")]
impl parquet::file::reader::ChunkReader for SubFile {
    type T = <std::fs::File as parquet::file::reader::ChunkReader>::T;

    fn get_read(&self, start: u64) -> parquet::errors::Result<Self::T> {
        self.inner.get_read(self.start.saturating_add(start))
    }

    fn get_bytes(&self, start: u64, length: usize) -> parquet::errors::Result<bytes::Bytes> {
        self.inner
            .get_bytes(self.start.saturating_add(start), length)
    }
}

/// Reads from a file at a specific offset
///
/// The Windows implementation moves the file cursor, which the Unix one doesn't,
/// so this should only be used from code that doesn't care about the file cursor.
#[cfg(windows)]
fn read_at(file: &std::fs::File, buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
    use std::os::windows::fs::FileExt;
    file.seek_read(buf, offset)
}

/// Reads from a file at a specific offset
#[cfg(unix)]
fn read_at(file: &std::fs::File, buf: &mut [u8], offset: u64) -> std::io::Result<usize> {
    use std::os::unix::fs::FileExt;
    file.read_at(buf, offset)
}

#[cfg(test)]
mod tests {
    use std::path::Path;

    use super::*;

    #[test]
    fn subfile() {
        let path = Path::new("./target/tmp/subfile.txt");
        std::fs::write(path, b"0123456789").unwrap();
        let base = Arc::new(File::open(path).unwrap());
        let mut t = SubFile::new(base.clone(), 2, 6).unwrap();
        let mut buf = [0; 5];
        t.read_exact(&mut buf).unwrap();
        assert_eq!(&buf, b"23456");
        let mut buf = [0; 2];
        t.seek(SeekFrom::Current(-2)).unwrap();
        t.read_exact(&mut buf).unwrap();
        assert_eq!(&buf, b"56");
        t.seek(SeekFrom::Current(-3)).unwrap();
        t.read_exact(&mut buf).unwrap();
        assert_eq!(&buf, b"45");
        t.seek(SeekFrom::Start(0)).unwrap();
        t.read_exact(&mut buf).unwrap();
        assert_eq!(&buf, b"23");
        let mut buf = [0; 10];
        let e = t.read_exact(&mut buf).unwrap_err();
        assert_eq!(e.kind(), std::io::ErrorKind::UnexpectedEof);
        let e = t.seek(SeekFrom::End(-10)).unwrap_err();
        assert_eq!(e.kind(), std::io::ErrorKind::InvalidInput);
    }
}