Implement custom command channel (not async)

This commit is contained in:
Julian Mutter 2023-11-23 11:54:08 +01:00
parent 0a0fc61522
commit 75e1da0097
4 changed files with 207 additions and 160 deletions

65
Cargo.lock generated
View File

@ -65,19 +65,6 @@ version = "1.0.75"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4668cab20f66d8d020e1fbc0ebe47217433c1b6c8f2040faf858554e394ace6"
[[package]]
name = "async-channel"
version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d37875bd9915b7d67c2f117ea2c30a0989874d0b2cb694fe25403c85763c0c9e"
dependencies = [
"concurrent-queue",
"event-listener",
"event-listener-strategy",
"futures-core",
"pin-project-lite",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -125,12 +112,6 @@ dependencies = [
"target-lexicon",
]
[[package]]
name = "cfg-if"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd"
[[package]]
name = "clap"
version = "4.4.6"
@ -177,24 +158,6 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7"
[[package]]
name = "concurrent-queue"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400"
dependencies = [
"crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.8.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5a22b2d63d4d1dc0b7f1b6b2747dd0088008a9be28b6ddf0b1e7d335e3037294"
dependencies = [
"cfg-if",
]
[[package]]
name = "env_logger"
version = "0.10.1"
@ -224,27 +187,6 @@ dependencies = [
"windows-sys",
]
[[package]]
name = "event-listener"
version = "3.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d93877bcde0eb80ca09131a08d23f0a5c18a620b01db137dba666d18cd9b30c2"
dependencies = [
"concurrent-queue",
"parking",
"pin-project-lite",
]
[[package]]
name = "event-listener-strategy"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d96b852f1345da36d551b9473fa1e2b1eb5c5195585c6c018118bc92a8d91160"
dependencies = [
"event-listener",
"pin-project-lite",
]
[[package]]
name = "field-offset"
version = "0.3.6"
@ -657,7 +599,6 @@ name = "music-reader"
version = "0.1.0"
dependencies = [
"anyhow",
"async-channel",
"cairo-rs",
"clap",
"env_logger",
@ -700,12 +641,6 @@ dependencies = [
"system-deps",
]
[[package]]
name = "parking"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae"
[[package]]
name = "pin-project-lite"
version = "0.2.13"

View File

@ -11,7 +11,6 @@ gio = "0.18.3"
glib = "0.18.3"
clap = { version = "4.4.6", features = ["derive"] }
gtk = { version = "0.7.3", package = "gtk4", features = ["v4_8"] }
async-channel = "2.1.0"
anyhow = "1.0.75"
log = "0.4.20"
env_logger = "0.10.1"

View File

@ -1,17 +1,16 @@
use crate::draw;
use anyhow::{anyhow, bail, Result};
use glib::timeout_future;
use gtk::gdk::Texture;
use log::debug;
use gtk::{gdk::Texture, prelude::TextureExt};
use log::{debug, error};
use poppler::Document;
use std::{
cell::RefCell,
collections::BTreeMap,
rc::Rc,
time::{Duration, Instant},
};
use async_channel::Sender;
use crate::draw;
type PageNumber = usize;
pub type MyPageType = Texture;
@ -19,6 +18,7 @@ pub struct PageCache {
document: Document,
max_num_stored_pages: usize,
pages: BTreeMap<usize, Rc<MyPageType>>,
last_requested_page_number: PageNumber,
}
impl PageCache {
@ -27,92 +27,131 @@ impl PageCache {
document,
max_num_stored_pages,
pages: BTreeMap::new(),
last_requested_page_number: 0,
}
}
pub fn get_page(&self, page_number: usize) -> Option<Rc<MyPageType>> {
pub fn get_page(&mut self, page_number: usize) -> Option<Rc<MyPageType>> {
self.last_requested_page_number = page_number;
self.pages.get(&page_number).map(Rc::clone)
}
pub fn cache_pages(&mut self, page_numbers: Vec<usize>, area_height: i32) {
debug!("Caching pages {:?}", page_numbers);
let begin_of_cashing = Instant::now();
for page_number in page_numbers {
if self.pages.contains_key(&page_number) {
continue;
pub fn get_page_or_cache(&mut self, page_number: usize) -> Result<Rc<MyPageType>> {
if let Some(page) = self.get_page(page_number) {
return Ok(page);
} else {
let _ = self.cache_page(page_number, 100);
if let Some(page) = self.get_page(page_number) {
return Ok(page);
} else {
bail!("Failed caching and retrieving page {}", page_number);
}
}
}
if let Some(page) = self.document.page(page_number as i32) {
let pages = vec![Rc::new(page)];
let texture = draw::draw_pages_to_texture(&pages, area_height);
pub fn cache_page(&mut self, page_number: PageNumber, height: i32) -> bool {
debug!("Caching page {}", page_number);
let begin_of_cashing = Instant::now();
if let Some(page) = self.pages.get(&page_number) {
if page.height() >= height {
debug!("Page already in cache");
return false;
}
}
self.pages.insert(page_number, Rc::new(texture));
if let Some(page) = self.document.page(page_number as i32) {
let pages = vec![Rc::new(page)];
let texture = draw::draw_pages_to_texture(&pages, height);
if self.pages.len() > self.max_num_stored_pages && self.pages.len() > 2 {
let _result = self.remove_most_distant_page(page_number);
}
// Overwrite page with lower resolution if exists
self.pages.insert(page_number, Rc::new(texture));
if self.pages.len() > self.max_num_stored_pages && self.pages.len() > 2 {
let _result = self.remove_most_distant_page();
}
}
debug!(
"done caching in {}ms",
"done caching of page {} in {}ms",
page_number,
begin_of_cashing.elapsed().as_millis()
);
true
}
fn remove_most_distant_page(&mut self, current_page_number: usize) -> Result<(), ()> {
let (min_cached_page_number, min_cached_page) = self.pages.pop_first().ok_or(())?;
let (max_cached_page_number, max_cached_page) = self.pages.pop_last().ok_or(())?;
fn remove_most_distant_page(&mut self) -> anyhow::Result<()> {
let (min_cached_page_number, min_cached_page) = self
.pages
.pop_first()
.ok_or(anyhow!("The cache is empty, cannot remove first page"))?;
let (max_cached_page_number, max_cached_page) = self
.pages
.pop_last()
.ok_or(anyhow!("The cache is empty, cannot remove last page"))?;
if current_page_number.abs_diff(min_cached_page_number)
> current_page_number.abs_diff(max_cached_page_number)
if self
.last_requested_page_number
.abs_diff(min_cached_page_number)
> self
.last_requested_page_number
.abs_diff(max_cached_page_number)
{
self.pages.insert(max_cached_page_number, max_cached_page);
debug!(
"Removed page {} from cache to keep size low...",
min_cached_page_number
);
} else {
self.pages.insert(min_cached_page_number, min_cached_page);
debug!(
"Removed page {} from cache to keep size low...",
max_cached_page_number
);
}
Ok(())
}
fn process_command(&mut self, command: CacheCommand) -> Option<CacheResponse> {
fn process_command(&mut self, command: CacheCommand) -> Result<Option<CacheResponse>> {
debug!("Processing command: {:?}...", command);
match command {
CacheCommand::CachePages { pages, area_height } => {
self.cache_pages(pages, area_height);
None
CacheCommand::Cache(command) => {
self.cache_page(command.page, command.height);
Ok(None)
}
CacheCommand::GetCurrentTwoPages { page_left_number } => {
if let Some(page_left) = self.get_page(page_left_number) {
debug!("got left page");
if let Some(page_right) = self.get_page(page_left_number + 1) {
debug!("got right page");
Some(CacheResponse::TwoPagesRetrieved {
page_left,
page_right,
})
} else {
Some(CacheResponse::SinglePageRetrieved { page: page_left })
}
} else {
debug!("did not get any page");
// TODO: if page left was not empty, this could be because page turning was too quick.
// In this case, just not rendering the current page is okay, but when no more render requests are available, one would want to wait for the caching
None
CacheCommand::Retrieve(command) => match command {
RetrievePagesCommand::GetCurrentTwoPages { page_left_number } => {
let page_left = self.get_page_or_cache(page_left_number)?;
let page_right = self.get_page_or_cache(page_left_number + 1)?;
Ok(Some(CacheResponse::TwoPagesRetrieved {
page_left,
page_right,
}))
}
}
RetrievePagesCommand::GetCurrentPage { page_number } => {
let page = self.get_page_or_cache(page_number)?;
Ok(Some(CacheResponse::SinglePageRetrieved { page }))
}
},
}
}
}
#[derive(Debug)]
pub enum CacheCommand {
CachePages {
pages: Vec<PageNumber>,
area_height: i32,
},
GetCurrentTwoPages {
page_left_number: PageNumber,
},
Cache(CachePageCommand),
Retrieve(RetrievePagesCommand),
}
#[derive(Debug)]
pub struct CachePageCommand {
page: PageNumber,
height: i32,
}
#[derive(Debug)]
pub enum RetrievePagesCommand {
GetCurrentTwoPages { page_left_number: PageNumber },
GetCurrentPage { page_number: PageNumber },
}
pub enum CacheResponse {
@ -125,21 +164,93 @@ pub enum CacheResponse {
},
}
pub fn spawn_async_cache<F>(document: Document, receiver: F) -> Sender<CacheCommand>
pub struct SyncCacheCommandChannel {
retrieve_commands: Vec<RetrievePagesCommand>,
cache_commands: Vec<CachePageCommand>,
}
pub struct SyncCacheCommandSender {
channel: Rc<RefCell<SyncCacheCommandChannel>>,
}
pub struct SyncCacheCommandReceiver {
channel: Rc<RefCell<SyncCacheCommandChannel>>,
}
impl SyncCacheCommandChannel {
pub fn open() -> (SyncCacheCommandSender, SyncCacheCommandReceiver) {
let channel = SyncCacheCommandChannel {
retrieve_commands: Vec::new(),
cache_commands: Vec::new(),
};
let channel = Rc::new(RefCell::new(channel));
let sender = SyncCacheCommandSender {
channel: Rc::clone(&channel),
};
let receiver = SyncCacheCommandReceiver { channel };
(sender, receiver)
}
}
impl SyncCacheCommandSender {
pub fn is_channel_open(&self) -> bool {
Rc::strong_count(&self.channel) > 1
}
pub fn send_retrieve_command(&self, command: RetrievePagesCommand) {
// Make newest message the most important
self.channel.borrow_mut().retrieve_commands.push(command);
}
pub fn send_cache_commands(&self, pages: &[PageNumber], height: i32) {
for &page in pages {
// Make newest message the most important
self.channel
.borrow_mut()
.cache_commands
.push(CachePageCommand { page, height });
}
}
}
impl SyncCacheCommandReceiver {
pub fn is_channel_open(&self) -> bool {
Rc::strong_count(&self.channel) > 1
}
pub fn receive_most_important_command(&self) -> Option<CacheCommand> {
let mut channel = self.channel.borrow_mut();
if let Some(command) = channel.retrieve_commands.pop() {
return Some(CacheCommand::Retrieve(command));
} else if let Some(command) = channel.cache_commands.pop() {
return Some(CacheCommand::Cache(command));
}
None
}
}
pub fn spawn_sync_cache<F>(document: Document, receiver: F) -> SyncCacheCommandSender
where
F: Fn(CacheResponse) + 'static,
{
let (command_sender, command_receiver) = async_channel::unbounded();
let (command_sender, command_receiver) = SyncCacheCommandChannel::open();
let mut cache = PageCache::new(document, 10);
let mut cache = PageCache::new(document, 20);
// Besides the name, it is not in another thread
glib::spawn_future_local(async move {
while let Ok(command) = command_receiver.recv().await {
if let Some(response) = cache.process_command(command) {
// response_sender.send_blocking(response).unwrap();
debug!("Command processed, activating receiver....");
receiver(response);
debug!("receiver done");
while command_receiver.is_channel_open() {
if let Some(command) = command_receiver.receive_most_important_command() {
if let Some(response) = cache.process_command(command).unwrap_or_else(|e| {
error!("Error processing command: {}", e);
None
}) {
// response_sender.send_blocking(response).unwrap();
debug!("Command processed, activating receiver....");
receiver(response);
debug!("receiver done");
}
}
// Add delay to tell gtk to give rendering priority

View File

@ -4,14 +4,13 @@ use std::{
rc::Rc,
};
use async_channel::Sender;
use gtk::{
glib, Application, ApplicationWindow, Box, Button, FileChooserAction, FileChooserDialog,
HeaderBar, Label, Orientation, Picture, ResponseType,
};
use log::debug;
use crate::cache::{self, CacheCommand};
use crate::cache::{self, SyncCacheCommandSender};
use glib::clone;
use gtk::prelude::*;
@ -30,11 +29,11 @@ pub struct Ui {
pub struct DocumentCanvas {
current_page_number: usize,
pub num_pages: Option<usize>,
page_cache_sender: Sender<CacheCommand>,
page_cache_sender: SyncCacheCommandSender,
}
impl DocumentCanvas {
pub fn new(page_cache_sender: Sender<CacheCommand>) -> Self {
pub fn new(page_cache_sender: SyncCacheCommandSender) -> Self {
DocumentCanvas {
current_page_number: 0,
num_pages: None,
@ -55,37 +54,40 @@ impl DocumentCanvas {
}
pub fn cache_initial_pages(&self, area_height: i32) {
self.page_cache_sender
.send_blocking(CacheCommand::CachePages {
pages: vec![self.current_page_number, self.current_page_number + 1],
area_height,
})
.unwrap();
self.page_cache_sender.send_cache_commands(
&vec![self.current_page_number, self.current_page_number + 1],
area_height,
);
}
pub fn cache_surrounding_pages(&self, area_height: i32) {
debug!("Send cache request");
self.page_cache_sender
.send_blocking(CacheCommand::CachePages {
pages: vec![
self.current_page_number.saturating_sub(2),
self.current_page_number.saturating_sub(1),
self.current_page_number,
self.current_page_number + 1,
self.current_page_number + 2,
self.current_page_number + 3,
],
area_height,
})
.unwrap();
self.page_cache_sender.send_cache_commands(
&vec![
self.current_page_number.saturating_sub(2),
self.current_page_number.saturating_sub(1),
self.current_page_number,
self.current_page_number + 1,
self.current_page_number + 2,
self.current_page_number + 3,
],
area_height,
);
}
pub fn request_to_draw_pages(&self) {
self.page_cache_sender
.send_blocking(CacheCommand::GetCurrentTwoPages {
page_left_number: self.current_page_number,
})
.unwrap();
if self.num_pages == Some(1) {
self.page_cache_sender.send_retrieve_command(
cache::RetrievePagesCommand::GetCurrentPage {
page_number: self.current_page_number,
},
)
} else {
self.page_cache_sender.send_retrieve_command(
cache::RetrievePagesCommand::GetCurrentTwoPages {
page_left_number: self.current_page_number,
},
)
}
}
}
@ -279,7 +281,7 @@ pub fn load_document(file: impl AsRef<Path>, ui: Rc<RefCell<Ui>>) {
let document = poppler::Document::from_file(&uri, None).unwrap();
let num_pages = document.n_pages() as usize;
let sender = cache::spawn_async_cache(
let sender = cache::spawn_sync_cache(
document,
clone!(@weak ui => move |cache_response| match cache_response {
cache::CacheResponse::SinglePageRetrieved { page } => {