tokio/fs/file.rs
1//! Types for working with [`File`].
2//!
3//! [`File`]: File
4
5use crate::fs::{asyncify, OpenOptions};
6use crate::io::blocking::{Buf, DEFAULT_MAX_BUF_SIZE};
7use crate::io::{AsyncRead, AsyncSeek, AsyncWrite, ReadBuf};
8use crate::sync::Mutex;
9
10use std::cmp;
11use std::fmt;
12use std::fs::{Metadata, Permissions};
13use std::future::Future;
14use std::io::{self, Seek, SeekFrom};
15use std::path::Path;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{ready, Context, Poll};
19
20#[cfg(test)]
21use super::mocks::JoinHandle;
22#[cfg(test)]
23use super::mocks::MockFile as StdFile;
24#[cfg(test)]
25use super::mocks::{spawn_blocking, spawn_mandatory_blocking};
26#[cfg(not(test))]
27use crate::blocking::JoinHandle;
28#[cfg(not(test))]
29use crate::blocking::{spawn_blocking, spawn_mandatory_blocking};
30#[cfg(not(test))]
31use std::fs::File as StdFile;
32
33/// A reference to an open file on the filesystem.
34///
35/// This is a specialized version of [`std::fs::File`] for usage from the
36/// Tokio runtime.
37///
38/// An instance of a `File` can be read and/or written depending on what options
39/// it was opened with. Files also implement [`AsyncSeek`] to alter the logical
40/// cursor that the file contains internally.
41///
42/// A file will not be closed immediately when it goes out of scope if there
43/// are any IO operations that have not yet completed. To ensure that a file is
44/// closed immediately when it is dropped, you should call [`flush`] before
45/// dropping it. Note that this does not ensure that the file has been fully
46/// written to disk; the operating system might keep the changes around in an
47/// in-memory buffer. See the [`sync_all`] method for telling the OS to write
48/// the data to disk.
49///
50/// Reading and writing to a `File` is usually done using the convenience
51/// methods found on the [`AsyncReadExt`] and [`AsyncWriteExt`] traits.
52///
53/// [`AsyncSeek`]: trait@crate::io::AsyncSeek
54/// [`flush`]: fn@crate::io::AsyncWriteExt::flush
55/// [`sync_all`]: fn@crate::fs::File::sync_all
56/// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
57/// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
58///
59/// # Examples
60///
61/// Create a new file and asynchronously write bytes to it:
62///
63/// ```no_run
64/// use tokio::fs::File;
65/// use tokio::io::AsyncWriteExt; // for write_all()
66///
67/// # async fn dox() -> std::io::Result<()> {
68/// let mut file = File::create("foo.txt").await?;
69/// file.write_all(b"hello, world!").await?;
70/// # Ok(())
71/// # }
72/// ```
73///
74/// Read the contents of a file into a buffer:
75///
76/// ```no_run
77/// use tokio::fs::File;
78/// use tokio::io::AsyncReadExt; // for read_to_end()
79///
80/// # async fn dox() -> std::io::Result<()> {
81/// let mut file = File::open("foo.txt").await?;
82///
83/// let mut contents = vec![];
84/// file.read_to_end(&mut contents).await?;
85///
86/// println!("len = {}", contents.len());
87/// # Ok(())
88/// # }
89/// ```
90pub struct File {
91 std: Arc<StdFile>,
92 inner: Mutex<Inner>,
93 max_buf_size: usize,
94}
95
96struct Inner {
97 state: State,
98
99 /// Errors from writes/flushes are returned in write/flush calls. If a write
100 /// error is observed while performing a read, it is saved until the next
101 /// write / flush call.
102 last_write_err: Option<io::ErrorKind>,
103
104 pos: u64,
105}
106
107#[derive(Debug)]
108enum State {
109 Idle(Option<Buf>),
110 Busy(JoinHandle<(Operation, Buf)>),
111}
112
113#[derive(Debug)]
114enum Operation {
115 Read(io::Result<usize>),
116 Write(io::Result<()>),
117 Seek(io::Result<u64>),
118}
119
120impl File {
121 /// Attempts to open a file in read-only mode.
122 ///
123 /// See [`OpenOptions`] for more details.
124 ///
125 /// # Errors
126 ///
127 /// This function will return an error if called from outside of the Tokio
128 /// runtime or if path does not already exist. Other errors may also be
129 /// returned according to `OpenOptions::open`.
130 ///
131 /// # Examples
132 ///
133 /// ```no_run
134 /// use tokio::fs::File;
135 /// use tokio::io::AsyncReadExt;
136 ///
137 /// # async fn dox() -> std::io::Result<()> {
138 /// let mut file = File::open("foo.txt").await?;
139 ///
140 /// let mut contents = vec![];
141 /// file.read_to_end(&mut contents).await?;
142 ///
143 /// println!("len = {}", contents.len());
144 /// # Ok(())
145 /// # }
146 /// ```
147 ///
148 /// The [`read_to_end`] method is defined on the [`AsyncReadExt`] trait.
149 ///
150 /// [`read_to_end`]: fn@crate::io::AsyncReadExt::read_to_end
151 /// [`AsyncReadExt`]: trait@crate::io::AsyncReadExt
152 pub async fn open(path: impl AsRef<Path>) -> io::Result<File> {
153 let path = path.as_ref().to_owned();
154 let std = asyncify(|| StdFile::open(path)).await?;
155
156 Ok(File::from_std(std))
157 }
158
159 /// Opens a file in write-only mode.
160 ///
161 /// This function will create a file if it does not exist, and will truncate
162 /// it if it does.
163 ///
164 /// See [`OpenOptions`] for more details.
165 ///
166 /// # Errors
167 ///
168 /// Results in an error if called from outside of the Tokio runtime or if
169 /// the underlying [`create`] call results in an error.
170 ///
171 /// [`create`]: std::fs::File::create
172 ///
173 /// # Examples
174 ///
175 /// ```no_run
176 /// use tokio::fs::File;
177 /// use tokio::io::AsyncWriteExt;
178 ///
179 /// # async fn dox() -> std::io::Result<()> {
180 /// let mut file = File::create("foo.txt").await?;
181 /// file.write_all(b"hello, world!").await?;
182 /// # Ok(())
183 /// # }
184 /// ```
185 ///
186 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
187 ///
188 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
189 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
190 pub async fn create(path: impl AsRef<Path>) -> io::Result<File> {
191 let path = path.as_ref().to_owned();
192 let std_file = asyncify(move || StdFile::create(path)).await?;
193 Ok(File::from_std(std_file))
194 }
195
196 /// Opens a file in read-write mode.
197 ///
198 /// This function will create a file if it does not exist, or return an error
199 /// if it does. This way, if the call succeeds, the file returned is guaranteed
200 /// to be new.
201 ///
202 /// This option is useful because it is atomic. Otherwise between checking
203 /// whether a file exists and creating a new one, the file may have been
204 /// created by another process (a TOCTOU race condition / attack).
205 ///
206 /// This can also be written using `File::options().read(true).write(true).create_new(true).open(...)`.
207 ///
208 /// See [`OpenOptions`] for more details.
209 ///
210 /// # Examples
211 ///
212 /// ```no_run
213 /// use tokio::fs::File;
214 /// use tokio::io::AsyncWriteExt;
215 ///
216 /// # async fn dox() -> std::io::Result<()> {
217 /// let mut file = File::create_new("foo.txt").await?;
218 /// file.write_all(b"hello, world!").await?;
219 /// # Ok(())
220 /// # }
221 /// ```
222 ///
223 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
224 ///
225 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
226 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
227 pub async fn create_new<P: AsRef<Path>>(path: P) -> std::io::Result<File> {
228 Self::options()
229 .read(true)
230 .write(true)
231 .create_new(true)
232 .open(path)
233 .await
234 }
235
236 /// Returns a new [`OpenOptions`] object.
237 ///
238 /// This function returns a new `OpenOptions` object that you can use to
239 /// open or create a file with specific options if `open()` or `create()`
240 /// are not appropriate.
241 ///
242 /// It is equivalent to `OpenOptions::new()`, but allows you to write more
243 /// readable code. Instead of
244 /// `OpenOptions::new().append(true).open("example.log")`,
245 /// you can write `File::options().append(true).open("example.log")`. This
246 /// also avoids the need to import `OpenOptions`.
247 ///
248 /// See the [`OpenOptions::new`] function for more details.
249 ///
250 /// # Examples
251 ///
252 /// ```no_run
253 /// use tokio::fs::File;
254 /// use tokio::io::AsyncWriteExt;
255 ///
256 /// # async fn dox() -> std::io::Result<()> {
257 /// let mut f = File::options().append(true).open("example.log").await?;
258 /// f.write_all(b"new line\n").await?;
259 /// # Ok(())
260 /// # }
261 /// ```
262 #[must_use]
263 pub fn options() -> OpenOptions {
264 OpenOptions::new()
265 }
266
267 /// Converts a [`std::fs::File`] to a [`tokio::fs::File`](File).
268 ///
269 /// # Examples
270 ///
271 /// ```no_run
272 /// // This line could block. It is not recommended to do this on the Tokio
273 /// // runtime.
274 /// let std_file = std::fs::File::open("foo.txt").unwrap();
275 /// let file = tokio::fs::File::from_std(std_file);
276 /// ```
277 pub fn from_std(std: StdFile) -> File {
278 File {
279 std: Arc::new(std),
280 inner: Mutex::new(Inner {
281 state: State::Idle(Some(Buf::with_capacity(0))),
282 last_write_err: None,
283 pos: 0,
284 }),
285 max_buf_size: DEFAULT_MAX_BUF_SIZE,
286 }
287 }
288
289 /// Attempts to sync all OS-internal metadata to disk.
290 ///
291 /// This function will attempt to ensure that all in-core data reaches the
292 /// filesystem before returning.
293 ///
294 /// # Examples
295 ///
296 /// ```no_run
297 /// use tokio::fs::File;
298 /// use tokio::io::AsyncWriteExt;
299 ///
300 /// # async fn dox() -> std::io::Result<()> {
301 /// let mut file = File::create("foo.txt").await?;
302 /// file.write_all(b"hello, world!").await?;
303 /// file.sync_all().await?;
304 /// # Ok(())
305 /// # }
306 /// ```
307 ///
308 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
309 ///
310 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
311 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
312 pub async fn sync_all(&self) -> io::Result<()> {
313 let mut inner = self.inner.lock().await;
314 inner.complete_inflight().await;
315
316 let std = self.std.clone();
317 asyncify(move || std.sync_all()).await
318 }
319
320 /// This function is similar to `sync_all`, except that it may not
321 /// synchronize file metadata to the filesystem.
322 ///
323 /// This is intended for use cases that must synchronize content, but don't
324 /// need the metadata on disk. The goal of this method is to reduce disk
325 /// operations.
326 ///
327 /// Note that some platforms may simply implement this in terms of `sync_all`.
328 ///
329 /// # Examples
330 ///
331 /// ```no_run
332 /// use tokio::fs::File;
333 /// use tokio::io::AsyncWriteExt;
334 ///
335 /// # async fn dox() -> std::io::Result<()> {
336 /// let mut file = File::create("foo.txt").await?;
337 /// file.write_all(b"hello, world!").await?;
338 /// file.sync_data().await?;
339 /// # Ok(())
340 /// # }
341 /// ```
342 ///
343 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
344 ///
345 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
346 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
347 pub async fn sync_data(&self) -> io::Result<()> {
348 let mut inner = self.inner.lock().await;
349 inner.complete_inflight().await;
350
351 let std = self.std.clone();
352 asyncify(move || std.sync_data()).await
353 }
354
355 /// Truncates or extends the underlying file, updating the size of this file to become size.
356 ///
357 /// If the size is less than the current file's size, then the file will be
358 /// shrunk. If it is greater than the current file's size, then the file
359 /// will be extended to size and have all of the intermediate data filled in
360 /// with 0s.
361 ///
362 /// # Errors
363 ///
364 /// This function will return an error if the file is not opened for
365 /// writing.
366 ///
367 /// # Examples
368 ///
369 /// ```no_run
370 /// use tokio::fs::File;
371 /// use tokio::io::AsyncWriteExt;
372 ///
373 /// # async fn dox() -> std::io::Result<()> {
374 /// let mut file = File::create("foo.txt").await?;
375 /// file.write_all(b"hello, world!").await?;
376 /// file.set_len(10).await?;
377 /// # Ok(())
378 /// # }
379 /// ```
380 ///
381 /// The [`write_all`] method is defined on the [`AsyncWriteExt`] trait.
382 ///
383 /// [`write_all`]: fn@crate::io::AsyncWriteExt::write_all
384 /// [`AsyncWriteExt`]: trait@crate::io::AsyncWriteExt
385 pub async fn set_len(&self, size: u64) -> io::Result<()> {
386 let mut inner = self.inner.lock().await;
387 inner.complete_inflight().await;
388
389 let mut buf = match inner.state {
390 State::Idle(ref mut buf_cell) => buf_cell.take().unwrap(),
391 _ => unreachable!(),
392 };
393
394 let seek = if !buf.is_empty() {
395 Some(SeekFrom::Current(buf.discard_read()))
396 } else {
397 None
398 };
399
400 let std = self.std.clone();
401
402 inner.state = State::Busy(spawn_blocking(move || {
403 let res = if let Some(seek) = seek {
404 (&*std).seek(seek).and_then(|_| std.set_len(size))
405 } else {
406 std.set_len(size)
407 }
408 .map(|()| 0); // the value is discarded later
409
410 // Return the result as a seek
411 (Operation::Seek(res), buf)
412 }));
413
414 let (op, buf) = match inner.state {
415 State::Idle(_) => unreachable!(),
416 State::Busy(ref mut rx) => rx.await?,
417 };
418
419 inner.state = State::Idle(Some(buf));
420
421 match op {
422 Operation::Seek(res) => res.map(|pos| {
423 inner.pos = pos;
424 }),
425 _ => unreachable!(),
426 }
427 }
428
429 /// Queries metadata about the underlying file.
430 ///
431 /// # Examples
432 ///
433 /// ```no_run
434 /// use tokio::fs::File;
435 ///
436 /// # async fn dox() -> std::io::Result<()> {
437 /// let file = File::open("foo.txt").await?;
438 /// let metadata = file.metadata().await?;
439 ///
440 /// println!("{:?}", metadata);
441 /// # Ok(())
442 /// # }
443 /// ```
444 pub async fn metadata(&self) -> io::Result<Metadata> {
445 let std = self.std.clone();
446 asyncify(move || std.metadata()).await
447 }
448
449 /// Creates a new `File` instance that shares the same underlying file handle
450 /// as the existing `File` instance. Reads, writes, and seeks will affect both
451 /// File instances simultaneously.
452 ///
453 /// # Examples
454 ///
455 /// ```no_run
456 /// use tokio::fs::File;
457 ///
458 /// # async fn dox() -> std::io::Result<()> {
459 /// let file = File::open("foo.txt").await?;
460 /// let file_clone = file.try_clone().await?;
461 /// # Ok(())
462 /// # }
463 /// ```
464 pub async fn try_clone(&self) -> io::Result<File> {
465 self.inner.lock().await.complete_inflight().await;
466 let std = self.std.clone();
467 let std_file = asyncify(move || std.try_clone()).await?;
468 Ok(File::from_std(std_file))
469 }
470
471 /// Destructures `File` into a [`std::fs::File`]. This function is
472 /// async to allow any in-flight operations to complete.
473 ///
474 /// Use `File::try_into_std` to attempt conversion immediately.
475 ///
476 /// # Examples
477 ///
478 /// ```no_run
479 /// use tokio::fs::File;
480 ///
481 /// # async fn dox() -> std::io::Result<()> {
482 /// let tokio_file = File::open("foo.txt").await?;
483 /// let std_file = tokio_file.into_std().await;
484 /// # Ok(())
485 /// # }
486 /// ```
487 pub async fn into_std(mut self) -> StdFile {
488 self.inner.get_mut().complete_inflight().await;
489 Arc::try_unwrap(self.std).expect("Arc::try_unwrap failed")
490 }
491
492 /// Tries to immediately destructure `File` into a [`std::fs::File`].
493 ///
494 /// # Errors
495 ///
496 /// This function will return an error containing the file if some
497 /// operation is in-flight.
498 ///
499 /// # Examples
500 ///
501 /// ```no_run
502 /// use tokio::fs::File;
503 ///
504 /// # async fn dox() -> std::io::Result<()> {
505 /// let tokio_file = File::open("foo.txt").await?;
506 /// let std_file = tokio_file.try_into_std().unwrap();
507 /// # Ok(())
508 /// # }
509 /// ```
510 pub fn try_into_std(mut self) -> Result<StdFile, Self> {
511 match Arc::try_unwrap(self.std) {
512 Ok(file) => Ok(file),
513 Err(std_file_arc) => {
514 self.std = std_file_arc;
515 Err(self)
516 }
517 }
518 }
519
520 /// Changes the permissions on the underlying file.
521 ///
522 /// # Platform-specific behavior
523 ///
524 /// This function currently corresponds to the `fchmod` function on Unix and
525 /// the `SetFileInformationByHandle` function on Windows. Note that, this
526 /// [may change in the future][changes].
527 ///
528 /// [changes]: https://doc.rust-lang.org/std/io/index.html#platform-specific-behavior
529 ///
530 /// # Errors
531 ///
532 /// This function will return an error if the user lacks permission change
533 /// attributes on the underlying file. It may also return an error in other
534 /// os-specific unspecified cases.
535 ///
536 /// # Examples
537 ///
538 /// ```no_run
539 /// use tokio::fs::File;
540 ///
541 /// # async fn dox() -> std::io::Result<()> {
542 /// let file = File::open("foo.txt").await?;
543 /// let mut perms = file.metadata().await?.permissions();
544 /// perms.set_readonly(true);
545 /// file.set_permissions(perms).await?;
546 /// # Ok(())
547 /// # }
548 /// ```
549 pub async fn set_permissions(&self, perm: Permissions) -> io::Result<()> {
550 let std = self.std.clone();
551 asyncify(move || std.set_permissions(perm)).await
552 }
553
554 /// Set the maximum buffer size for the underlying [`AsyncRead`] / [`AsyncWrite`] operation.
555 ///
556 /// Although Tokio uses a sensible default value for this buffer size, this function would be
557 /// useful for changing that default depending on the situation.
558 ///
559 /// # Examples
560 ///
561 /// ```no_run
562 /// use tokio::fs::File;
563 /// use tokio::io::AsyncWriteExt;
564 ///
565 /// # async fn dox() -> std::io::Result<()> {
566 /// let mut file = File::open("foo.txt").await?;
567 ///
568 /// // Set maximum buffer size to 8 MiB
569 /// file.set_max_buf_size(8 * 1024 * 1024);
570 ///
571 /// let mut buf = vec![1; 1024 * 1024 * 1024];
572 ///
573 /// // Write the 1 GiB buffer in chunks up to 8 MiB each.
574 /// file.write_all(&mut buf).await?;
575 /// # Ok(())
576 /// # }
577 /// ```
578 pub fn set_max_buf_size(&mut self, max_buf_size: usize) {
579 self.max_buf_size = max_buf_size;
580 }
581}
582
583impl AsyncRead for File {
584 fn poll_read(
585 self: Pin<&mut Self>,
586 cx: &mut Context<'_>,
587 dst: &mut ReadBuf<'_>,
588 ) -> Poll<io::Result<()>> {
589 ready!(crate::trace::trace_leaf(cx));
590 let me = self.get_mut();
591 let inner = me.inner.get_mut();
592
593 loop {
594 match inner.state {
595 State::Idle(ref mut buf_cell) => {
596 let mut buf = buf_cell.take().unwrap();
597
598 if !buf.is_empty() {
599 buf.copy_to(dst);
600 *buf_cell = Some(buf);
601 return Poll::Ready(Ok(()));
602 }
603
604 let std = me.std.clone();
605
606 let max_buf_size = cmp::min(dst.remaining(), me.max_buf_size);
607 inner.state = State::Busy(spawn_blocking(move || {
608 // SAFETY: the `Read` implementation of `std` does not
609 // read from the buffer it is borrowing and correctly
610 // reports the length of the data written into the buffer.
611 let res = unsafe { buf.read_from(&mut &*std, max_buf_size) };
612 (Operation::Read(res), buf)
613 }));
614 }
615 State::Busy(ref mut rx) => {
616 let (op, mut buf) = ready!(Pin::new(rx).poll(cx))?;
617
618 match op {
619 Operation::Read(Ok(_)) => {
620 buf.copy_to(dst);
621 inner.state = State::Idle(Some(buf));
622 return Poll::Ready(Ok(()));
623 }
624 Operation::Read(Err(e)) => {
625 assert!(buf.is_empty());
626
627 inner.state = State::Idle(Some(buf));
628 return Poll::Ready(Err(e));
629 }
630 Operation::Write(Ok(())) => {
631 assert!(buf.is_empty());
632 inner.state = State::Idle(Some(buf));
633 continue;
634 }
635 Operation::Write(Err(e)) => {
636 assert!(inner.last_write_err.is_none());
637 inner.last_write_err = Some(e.kind());
638 inner.state = State::Idle(Some(buf));
639 }
640 Operation::Seek(result) => {
641 assert!(buf.is_empty());
642 inner.state = State::Idle(Some(buf));
643 if let Ok(pos) = result {
644 inner.pos = pos;
645 }
646 continue;
647 }
648 }
649 }
650 }
651 }
652 }
653}
654
655impl AsyncSeek for File {
656 fn start_seek(self: Pin<&mut Self>, mut pos: SeekFrom) -> io::Result<()> {
657 let me = self.get_mut();
658 let inner = me.inner.get_mut();
659
660 match inner.state {
661 State::Busy(_) => Err(io::Error::new(
662 io::ErrorKind::Other,
663 "other file operation is pending, call poll_complete before start_seek",
664 )),
665 State::Idle(ref mut buf_cell) => {
666 let mut buf = buf_cell.take().unwrap();
667
668 // Factor in any unread data from the buf
669 if !buf.is_empty() {
670 let n = buf.discard_read();
671
672 if let SeekFrom::Current(ref mut offset) = pos {
673 *offset += n;
674 }
675 }
676
677 let std = me.std.clone();
678
679 inner.state = State::Busy(spawn_blocking(move || {
680 let res = (&*std).seek(pos);
681 (Operation::Seek(res), buf)
682 }));
683 Ok(())
684 }
685 }
686 }
687
688 fn poll_complete(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<u64>> {
689 ready!(crate::trace::trace_leaf(cx));
690 let inner = self.inner.get_mut();
691
692 loop {
693 match inner.state {
694 State::Idle(_) => return Poll::Ready(Ok(inner.pos)),
695 State::Busy(ref mut rx) => {
696 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
697 inner.state = State::Idle(Some(buf));
698
699 match op {
700 Operation::Read(_) => {}
701 Operation::Write(Err(e)) => {
702 assert!(inner.last_write_err.is_none());
703 inner.last_write_err = Some(e.kind());
704 }
705 Operation::Write(_) => {}
706 Operation::Seek(res) => {
707 if let Ok(pos) = res {
708 inner.pos = pos;
709 }
710 return Poll::Ready(res);
711 }
712 }
713 }
714 }
715 }
716 }
717}
718
719impl AsyncWrite for File {
720 fn poll_write(
721 self: Pin<&mut Self>,
722 cx: &mut Context<'_>,
723 src: &[u8],
724 ) -> Poll<io::Result<usize>> {
725 ready!(crate::trace::trace_leaf(cx));
726 let me = self.get_mut();
727 let inner = me.inner.get_mut();
728
729 if let Some(e) = inner.last_write_err.take() {
730 return Poll::Ready(Err(e.into()));
731 }
732
733 loop {
734 match inner.state {
735 State::Idle(ref mut buf_cell) => {
736 let mut buf = buf_cell.take().unwrap();
737
738 let seek = if !buf.is_empty() {
739 Some(SeekFrom::Current(buf.discard_read()))
740 } else {
741 None
742 };
743
744 let n = buf.copy_from(src, me.max_buf_size);
745 let std = me.std.clone();
746
747 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
748 let res = if let Some(seek) = seek {
749 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
750 } else {
751 buf.write_to(&mut &*std)
752 };
753
754 (Operation::Write(res), buf)
755 })
756 .ok_or_else(|| {
757 io::Error::new(io::ErrorKind::Other, "background task failed")
758 })?;
759
760 inner.state = State::Busy(blocking_task_join_handle);
761
762 return Poll::Ready(Ok(n));
763 }
764 State::Busy(ref mut rx) => {
765 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
766 inner.state = State::Idle(Some(buf));
767
768 match op {
769 Operation::Read(_) => {
770 // We don't care about the result here. The fact
771 // that the cursor has advanced will be reflected in
772 // the next iteration of the loop
773 continue;
774 }
775 Operation::Write(res) => {
776 // If the previous write was successful, continue.
777 // Otherwise, error.
778 res?;
779 continue;
780 }
781 Operation::Seek(_) => {
782 // Ignore the seek
783 continue;
784 }
785 }
786 }
787 }
788 }
789 }
790
791 fn poll_write_vectored(
792 self: Pin<&mut Self>,
793 cx: &mut Context<'_>,
794 bufs: &[io::IoSlice<'_>],
795 ) -> Poll<Result<usize, io::Error>> {
796 ready!(crate::trace::trace_leaf(cx));
797 let me = self.get_mut();
798 let inner = me.inner.get_mut();
799
800 if let Some(e) = inner.last_write_err.take() {
801 return Poll::Ready(Err(e.into()));
802 }
803
804 loop {
805 match inner.state {
806 State::Idle(ref mut buf_cell) => {
807 let mut buf = buf_cell.take().unwrap();
808
809 let seek = if !buf.is_empty() {
810 Some(SeekFrom::Current(buf.discard_read()))
811 } else {
812 None
813 };
814
815 let n = buf.copy_from_bufs(bufs, me.max_buf_size);
816 let std = me.std.clone();
817
818 let blocking_task_join_handle = spawn_mandatory_blocking(move || {
819 let res = if let Some(seek) = seek {
820 (&*std).seek(seek).and_then(|_| buf.write_to(&mut &*std))
821 } else {
822 buf.write_to(&mut &*std)
823 };
824
825 (Operation::Write(res), buf)
826 })
827 .ok_or_else(|| {
828 io::Error::new(io::ErrorKind::Other, "background task failed")
829 })?;
830
831 inner.state = State::Busy(blocking_task_join_handle);
832
833 return Poll::Ready(Ok(n));
834 }
835 State::Busy(ref mut rx) => {
836 let (op, buf) = ready!(Pin::new(rx).poll(cx))?;
837 inner.state = State::Idle(Some(buf));
838
839 match op {
840 Operation::Read(_) => {
841 // We don't care about the result here. The fact
842 // that the cursor has advanced will be reflected in
843 // the next iteration of the loop
844 continue;
845 }
846 Operation::Write(res) => {
847 // If the previous write was successful, continue.
848 // Otherwise, error.
849 res?;
850 continue;
851 }
852 Operation::Seek(_) => {
853 // Ignore the seek
854 continue;
855 }
856 }
857 }
858 }
859 }
860 }
861
862 fn is_write_vectored(&self) -> bool {
863 true
864 }
865
866 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
867 ready!(crate::trace::trace_leaf(cx));
868 let inner = self.inner.get_mut();
869 inner.poll_flush(cx)
870 }
871
872 fn poll_shutdown(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
873 ready!(crate::trace::trace_leaf(cx));
874 self.poll_flush(cx)
875 }
876}
877
878impl From<StdFile> for File {
879 fn from(std: StdFile) -> Self {
880 Self::from_std(std)
881 }
882}
883
884impl fmt::Debug for File {
885 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
886 fmt.debug_struct("tokio::fs::File")
887 .field("std", &self.std)
888 .finish()
889 }
890}
891
892#[cfg(unix)]
893impl std::os::unix::io::AsRawFd for File {
894 fn as_raw_fd(&self) -> std::os::unix::io::RawFd {
895 self.std.as_raw_fd()
896 }
897}
898
899#[cfg(unix)]
900impl std::os::unix::io::AsFd for File {
901 fn as_fd(&self) -> std::os::unix::io::BorrowedFd<'_> {
902 unsafe {
903 std::os::unix::io::BorrowedFd::borrow_raw(std::os::unix::io::AsRawFd::as_raw_fd(self))
904 }
905 }
906}
907
908#[cfg(unix)]
909impl std::os::unix::io::FromRawFd for File {
910 unsafe fn from_raw_fd(fd: std::os::unix::io::RawFd) -> Self {
911 StdFile::from_raw_fd(fd).into()
912 }
913}
914
915cfg_windows! {
916 use crate::os::windows::io::{AsRawHandle, FromRawHandle, RawHandle, AsHandle, BorrowedHandle};
917
918 impl AsRawHandle for File {
919 fn as_raw_handle(&self) -> RawHandle {
920 self.std.as_raw_handle()
921 }
922 }
923
924 impl AsHandle for File {
925 fn as_handle(&self) -> BorrowedHandle<'_> {
926 unsafe {
927 BorrowedHandle::borrow_raw(
928 AsRawHandle::as_raw_handle(self),
929 )
930 }
931 }
932 }
933
934 impl FromRawHandle for File {
935 unsafe fn from_raw_handle(handle: RawHandle) -> Self {
936 StdFile::from_raw_handle(handle).into()
937 }
938 }
939}
940
941impl Inner {
942 async fn complete_inflight(&mut self) {
943 use std::future::poll_fn;
944
945 poll_fn(|cx| self.poll_complete_inflight(cx)).await;
946 }
947
948 fn poll_complete_inflight(&mut self, cx: &mut Context<'_>) -> Poll<()> {
949 ready!(crate::trace::trace_leaf(cx));
950 match self.poll_flush(cx) {
951 Poll::Ready(Err(e)) => {
952 self.last_write_err = Some(e.kind());
953 Poll::Ready(())
954 }
955 Poll::Ready(Ok(())) => Poll::Ready(()),
956 Poll::Pending => Poll::Pending,
957 }
958 }
959
960 fn poll_flush(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), io::Error>> {
961 if let Some(e) = self.last_write_err.take() {
962 return Poll::Ready(Err(e.into()));
963 }
964
965 let (op, buf) = match self.state {
966 State::Idle(_) => return Poll::Ready(Ok(())),
967 State::Busy(ref mut rx) => ready!(Pin::new(rx).poll(cx))?,
968 };
969
970 // The buffer is not used here
971 self.state = State::Idle(Some(buf));
972
973 match op {
974 Operation::Read(_) => Poll::Ready(Ok(())),
975 Operation::Write(res) => Poll::Ready(res),
976 Operation::Seek(_) => Poll::Ready(Ok(())),
977 }
978 }
979}
980
981#[cfg(test)]
982mod tests;