I have an async function (let's call is my_async_fn
) that should support cancellation (I want to allow the caller to use tokio::time::timeout or futures::future::select with my_async_fn
). I also need to perform some cleanup, regardless of whether my_async_fn
was cancelled or not (I need to remove some contents in a file that my_async_fn
writes).
So my approach was to write a future that forwards polling to an inner future and implements Drop
by calling a provided FnMut
:
trait OnDropFutureExt
where
Self: Future + Sized,
{
fn on_drop<D: FnMut()>(self, on_drop: D) -> OnDropFuture<Self, D>;
}
impl<F: Future> OnDropFutureExt for F {
fn on_drop<D: FnMut()>(self, on_drop: D) -> OnDropFuture<Self, D> {
OnDropFuture {
inner: self,
on_drop,
}
}
}
struct OnDropFuture<F: Future, D: FnMut()> {
inner: F,
on_drop: D,
}
impl<F: Future, D: FnMut()> OnDropFuture<F, D> {
// See: https://doc.rust-lang.org/std/pin/#pinning-is-structural-for-field
fn get_mut_inner(self: Pin<&mut Self>) -> Pin<&mut F> {
unsafe { self.map_unchecked_mut(|s| &mut s.inner) }
}
// See: https://doc.rust-lang.org/std/pin/#pinning-is-not-structural-for-field
fn get_mut_on_drop(self: Pin<&mut Self>) -> &mut D {
unsafe { &mut self.get_unchecked_mut().on_drop }
}
}
impl<F: Future, D: FnMut()> Future for OnDropFuture<F, D> {
type Output = F::Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
self.get_mut_inner().poll(cx)
}
}
impl<F: Future, D: FnMut()> Drop for OnDropFuture<F, D> {
fn drop(&mut self) {
// See: https://doc.rust-lang.org/std/pin/#drop-implementation
inner_drop(unsafe { Pin::new_unchecked(self) });
fn inner_drop<F: Future, D: FnMut()>(this: Pin<&mut OnDropFuture<F, D>>) {
this.get_mut_on_drop()();
}
}
}
This allows me to call my_async_fn
like this:
my_async_fn().on_drop(|| clean_up_file(&file_path)).await?;
Now I have a few questions:
- Since I can't propagate an error from
clean_up_file
I guess the best I can do is to just log if I failed to clean up the file and continue. Or is there a better option? - In https://doc.rust-lang.org/std/pin/#pinning-is-structural-for-field it sais:
You must make sure that you uphold the Drop guarantee: once your struct is pinned, the memory that contains the content is not overwritten or deallocated without calling the content’s destructors. This can be tricky, as witnessed by VecDeque: the destructor of VecDeque can fail to call drop on all elements if one of the destructors panics. This violates the Drop guarantee, because it can lead to elements being deallocated without their destructor being called. (VecDeque has no pinning projections, so this does not cause unsoundness.)
My future does have a pinning projection for the inner
future, so do I have to make sure that on_drop
never panics? Should I add something like std::panic::catch_unwind
? How would I do that, since mutable references are not UnwindSafe
?
Since drop is a sync function I would implement
clean_up_file
as a sync function as well. It shouldn't block to long to clean up the file, so that shouldn't be a problem, right? Or is it better to use futures::executor::block_on? Would that not block the current thread and allow other tasks to run while waiting for the file to get cleaned up?Ideally I would want
on_drop
to be aFnOnce()
, since it is only called once anyways. Is it safe to change the type ofon_drop
toManuallyDrop<D>
and implementDrop
like this:
impl<F: Future, D: FnOnce()> Drop for OnDropFuture<F, D> {
fn drop(&mut self) {
// See: https://doc.rust-lang.org/std/pin/#drop-implementation
inner_drop(unsafe { Pin::new_unchecked(self) });
fn inner_drop<F: Future, D: FnOnce()>(this: Pin<&mut OnDropFuture<F, D>>) {
let on_drop = unsafe { ManuallyDrop::take(this.get_mut_on_drop()) };
on_drop()
}
}
}
According to https://stackoverflow.com/a/74914046/4149050 this should be safe right?