cargo fmt

This commit is contained in:
Jasper Bekkers
2022-02-01 17:20:20 +01:00
committed by Aron Heinecke
parent 139cbbba78
commit 833ee36cb5
6 changed files with 172 additions and 154 deletions
+22 -20
View File
@@ -432,30 +432,32 @@ impl FsEventWatcher {
// channel to pass runloop around
let (rl_tx, rl_rx) = unbounded();
let thread_handle = thread::Builder::new().name("notify-rs fsevents".to_string()).spawn(move || {
let stream = stream.0;
let thread_handle = thread::Builder::new()
.name("notify-rs fsevents".to_string())
.spawn(move || {
let stream = stream.0;
unsafe {
let cur_runloop = cf::CFRunLoopGetCurrent();
unsafe {
let cur_runloop = cf::CFRunLoopGetCurrent();
fs::FSEventStreamScheduleWithRunLoop(
stream,
cur_runloop,
cf::kCFRunLoopDefaultMode,
);
fs::FSEventStreamStart(stream);
fs::FSEventStreamScheduleWithRunLoop(
stream,
cur_runloop,
cf::kCFRunLoopDefaultMode,
);
fs::FSEventStreamStart(stream);
// the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop()
rl_tx
.send(CFSendWrapper(cur_runloop))
.expect("Unable to send runloop to watcher");
// the calling to CFRunLoopRun will be terminated by CFRunLoopStop call in drop()
rl_tx
.send(CFSendWrapper(cur_runloop))
.expect("Unable to send runloop to watcher");
cf::CFRunLoopRun();
fs::FSEventStreamStop(stream);
fs::FSEventStreamInvalidate(stream);
fs::FSEventStreamRelease(stream);
}
});
cf::CFRunLoopRun();
fs::FSEventStreamStop(stream);
fs::FSEventStreamInvalidate(stream);
fs::FSEventStreamRelease(stream);
}
});
// block until runloop has been sent
self.runloop = Some((rl_rx.recv().unwrap().0, thread_handle));
+12 -8
View File
@@ -128,7 +128,9 @@ impl EventLoop {
// Run the event loop.
pub fn run(self) {
thread::Builder::new().name("notify-rs inotify".to_string()).spawn(|| self.event_loop_thread());
thread::Builder::new()
.name("notify-rs inotify".to_string())
.spawn(|| self.event_loop_thread());
}
fn event_loop_thread(mut self) {
@@ -418,14 +420,16 @@ impl EventLoop {
let event_loop_tx = self.event_loop_tx.clone();
let waker = self.event_loop_waker.clone();
let cookie = rename_event.tracker().unwrap(); // unwrap is safe because rename_event is always set with some cookie
thread::Builder::new().name("notify-rs inotify rename".to_string()).spawn(move || {
thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event
thread::Builder::new()
.name("notify-rs inotify rename".to_string())
.spawn(move || {
thread::sleep(Duration::from_millis(10)); // wait up to 10 ms for a subsequent event
// An error here means the other end of the channel was closed, a thing that can
// happen normally.
let _ = event_loop_tx.send(EventLoopMsg::RenameTimeout(cookie));
let _ = waker.wake();
});
// An error here means the other end of the channel was closed, a thing that can
// happen normally.
let _ = event_loop_tx.send(EventLoopMsg::RenameTimeout(cookie));
let _ = waker.wake();
});
}
}
Err(e) => {
+3 -1
View File
@@ -76,7 +76,9 @@ impl EventLoop {
// Run the event loop.
pub fn run(self) {
thread::Builder::new().name("notify-rs kqueue".to_string()).spawn(|| self.event_loop_thread());
thread::Builder::new()
.name("notify-rs kqueue".to_string())
.spawn(|| self.event_loop_thread());
}
fn event_loop_thread(mut self) {
+112 -106
View File
@@ -76,106 +76,113 @@ impl PollWatcher {
let event_handler = self.event_handler.clone();
let event_handler = move |res| emit_event(&event_handler, res);
thread::Builder::new().name("notify-rs poll".to_string()).spawn(move || {
// In order of priority:
// TODO: handle metadata events
// TODO: handle renames
// TODO: DRY it up
thread::Builder::new()
.name("notify-rs poll".to_string())
.spawn(move || {
// In order of priority:
// TODO: handle metadata events
// TODO: handle renames
// TODO: DRY it up
loop {
if !open.load(Ordering::SeqCst) {
break;
}
loop {
if !open.load(Ordering::SeqCst) {
break;
}
if let Ok(mut watches) = watches.lock() {
let current_time = Instant::now();
if let Ok(mut watches) = watches.lock() {
let current_time = Instant::now();
for (
watch,
&mut WatchData {
is_recursive,
ref mut paths,
},
) in watches.iter_mut()
{
match fs::metadata(watch) {
Err(e) => {
let err = Err(Error::io(e).add_path(watch.clone()));
event_handler(err);
continue;
}
Ok(metadata) => {
if !metadata.is_dir() {
let mtime =
FileTime::from_last_modification_time(&metadata).seconds();
match paths.insert(
watch.clone(),
PathData {
mtime,
last_check: current_time,
},
) {
None => {
unreachable!();
}
Some(PathData {
mtime: old_mtime, ..
}) => {
if mtime > old_mtime {
let kind = MetadataKind::WriteTime;
let meta = ModifyKind::Metadata(kind);
let kind = EventKind::Modify(meta);
let ev = Event::new(kind).add_path(watch.clone());
event_handler(Ok(ev));
for (
watch,
&mut WatchData {
is_recursive,
ref mut paths,
},
) in watches.iter_mut()
{
match fs::metadata(watch) {
Err(e) => {
let err = Err(Error::io(e).add_path(watch.clone()));
event_handler(err);
continue;
}
Ok(metadata) => {
if !metadata.is_dir() {
let mtime =
FileTime::from_last_modification_time(&metadata)
.seconds();
match paths.insert(
watch.clone(),
PathData {
mtime,
last_check: current_time,
},
) {
None => {
unreachable!();
}
Some(PathData {
mtime: old_mtime, ..
}) => {
if mtime > old_mtime {
let kind = MetadataKind::WriteTime;
let meta = ModifyKind::Metadata(kind);
let kind = EventKind::Modify(meta);
let ev =
Event::new(kind).add_path(watch.clone());
event_handler(Ok(ev));
}
}
}
}
} else {
let depth = if is_recursive { usize::max_value() } else { 1 };
for entry in WalkDir::new(watch)
.follow_links(true)
.max_depth(depth)
.into_iter()
.filter_map(|e| e.ok())
{
let path = entry.path();
} else {
let depth =
if is_recursive { usize::max_value() } else { 1 };
for entry in WalkDir::new(watch)
.follow_links(true)
.max_depth(depth)
.into_iter()
.filter_map(|e| e.ok())
{
let path = entry.path();
match entry.metadata() {
Err(e) => {
let err = Error::io(e.into())
.add_path(path.to_path_buf());
event_handler(Err(err));
}
Ok(m) => {
let mtime =
FileTime::from_last_modification_time(&m)
.seconds();
match paths.insert(
path.to_path_buf(),
PathData {
mtime,
last_check: current_time,
},
) {
None => {
let kind =
EventKind::Create(CreateKind::Any);
let ev = Event::new(kind)
.add_path(path.to_path_buf());
event_handler(Ok(ev));
}
Some(PathData {
mtime: old_mtime, ..
}) => {
if mtime > old_mtime {
let kind = MetadataKind::WriteTime;
let meta = ModifyKind::Metadata(kind);
let kind = EventKind::Modify(meta);
// TODO add new mtime as attr
match entry.metadata() {
Err(e) => {
let err = Error::io(e.into())
.add_path(path.to_path_buf());
event_handler(Err(err));
}
Ok(m) => {
let mtime =
FileTime::from_last_modification_time(&m)
.seconds();
match paths.insert(
path.to_path_buf(),
PathData {
mtime,
last_check: current_time,
},
) {
None => {
let kind =
EventKind::Create(CreateKind::Any);
let ev = Event::new(kind)
.add_path(path.to_path_buf());
event_handler(Ok(ev));
}
Some(PathData {
mtime: old_mtime, ..
}) => {
if mtime > old_mtime {
let kind = MetadataKind::WriteTime;
let meta =
ModifyKind::Metadata(kind);
let kind = EventKind::Modify(meta);
// TODO add new mtime as attr
let ev = Event::new(kind)
.add_path(path.to_path_buf());
event_handler(Ok(ev));
}
}
}
}
}
@@ -184,27 +191,26 @@ impl PollWatcher {
}
}
}
}
for (_, &mut WatchData { ref mut paths, .. }) in watches.iter_mut() {
let mut removed = Vec::new();
for (path, &PathData { last_check, .. }) in paths.iter() {
if last_check < current_time {
let ev = Event::new(EventKind::Remove(RemoveKind::Any))
.add_path(path.clone());
event_handler(Ok(ev));
removed.push(path.clone());
for (_, &mut WatchData { ref mut paths, .. }) in watches.iter_mut() {
let mut removed = Vec::new();
for (path, &PathData { last_check, .. }) in paths.iter() {
if last_check < current_time {
let ev = Event::new(EventKind::Remove(RemoveKind::Any))
.add_path(path.clone());
event_handler(Ok(ev));
removed.push(path.clone());
}
}
for path in removed {
(*paths).remove(&path);
}
}
for path in removed {
(*paths).remove(&path);
}
}
}
thread::sleep(delay);
}
});
thread::sleep(delay);
}
});
}
fn watch_inner(&mut self, path: &Path, recursive_mode: RecursiveMode) -> Result<()> {
+14 -12
View File
@@ -84,18 +84,20 @@ impl ReadDirectoryChangesServer {
let (action_tx, action_rx) = unbounded();
// it is, in fact, ok to send the semaphore across threads
let sem_temp = wakeup_sem as u64;
thread::Builder::new().name("notify-rs windows".to_string()).spawn(move || {
let wakeup_sem = sem_temp as HANDLE;
let server = ReadDirectoryChangesServer {
rx: action_rx,
event_handler,
meta_tx,
cmd_tx,
watches: HashMap::new(),
wakeup_sem,
};
server.run();
});
thread::Builder::new()
.name("notify-rs windows".to_string())
.spawn(move || {
let wakeup_sem = sem_temp as HANDLE;
let server = ReadDirectoryChangesServer {
rx: action_rx,
event_handler,
meta_tx,
cmd_tx,
watches: HashMap::new(),
wakeup_sem,
};
server.run();
});
action_tx
}
+9 -7
View File
@@ -10,14 +10,16 @@ fn test_race_with_remove_dir() {
{
let tmpdir = tmpdir.path().to_path_buf();
thread::Builder::new().name("notify-rs test-race-with-remove-dir".to_string()).spawn(move || {
let mut watcher = notify::recommended_watcher(move |result| {
eprintln!("received event: {:?}", result);
})
.unwrap();
thread::Builder::new()
.name("notify-rs test-race-with-remove-dir".to_string())
.spawn(move || {
let mut watcher = notify::recommended_watcher(move |result| {
eprintln!("received event: {:?}", result);
})
.unwrap();
watcher.watch(&tmpdir, RecursiveMode::NonRecursive).unwrap();
});
watcher.watch(&tmpdir, RecursiveMode::NonRecursive).unwrap();
});
}
let subdir = tmpdir.path().join("146d921d.tmp");