2

I have my app consisting of service and http server.

  • Service has some things going on with OS api, waiting for events, etc. and it has loop for this purposes as well as async db writes. I start this service with async function.
  • Server(written with rocket) uses async request hadnles as well because I'm currently using SeaORM which uses async.

Problem: When I hit my server with request, it never starts async tasks within handler, unless event in my service loop is fired. When event is fired in my service, handler finishes well but on the next request its the same.

I tried to:

  • Use tokio::spawn, tokio::task::spawn, both of them were working exactly the same way (blocked execution)
  • As far as I know I can't spawn regular thread, because I wouldn't be able to .await anyways.
  • Also I tried to mark main with #[rocket::main(worker_threads = 4)] which should make more async threads? But It's still the same.

How can I overcome this? I can think of is to just use another ORM like diesel which is not async, and as I don't currently use async anywhere else beside ORM it will work, but I don't think this is a good solution. And another thought is to add ticks to my loop, so it won't be stuck until service event is fired, but this also looks weird and handles latency will still depend on this.

Minimal reproducible example:

extern crate rocket;

use std::sync::mpsc::{channel, Sender};

use once_cell::sync::OnceCell;
use rocket::serde::{json::Json, Deserialize, Serialize};
use rocket::State;
use sea_orm::{entity::prelude::*, Database, Set};
use sea_orm::{DbBackend, Schema};
use tokio::join;
use windows::{
    w,
    Win32::Foundation::HWND,
    Win32::UI::{
        Accessibility::{SetWinEventHook, HWINEVENTHOOK},
        WindowsAndMessaging::{MessageBoxW, EVENT_SYSTEM_FOREGROUND, MB_OK},
    },
};

thread_local! {
    static TX: OnceCell<Sender<RawWindowEvent>>= OnceCell::new()
}

#[rocket::main]
async fn main() {
    let db = Database::connect("sqlite://data.db?mode=rwc")
        .await
        .unwrap();

    let builder = db.get_database_backend();

    let stmt = builder.build(
        Schema::new(DbBackend::Sqlite)
            .create_table_from_entity(Entity)
            .if_not_exists(),
    );

    db.execute(stmt).await.unwrap();

    let server = rocket::build()
        .manage(db.clone())
        .mount("/", routes![get_events])
        .launch();

    let service = tokio::spawn(service(db.clone()));

    join!(server, service);
}

#[get("/event")]
async fn get_events(db: &State<DatabaseConnection>) -> Json<Vec<Model>> {
    let db = db as &DatabaseConnection;

    let events = Entity::find().all(db).await.unwrap();

    Json(events)
}

extern "system" fn win_event_hook_callback(
    child_id: HWINEVENTHOOK,
    hook_handle: u32,
    event_id: HWND,
    window_handle: i32,
    object_id: i32,
    thread_id: u32,
    timestamp: u32,
) -> () {
    let event = RawWindowEvent {
        child_id,
        hook_handle,
        event_id,
        window_handle,
        object_id,
        thread_id,
        timestamp,
    };

    TX.with(|f| {
        let tx: &Sender<RawWindowEvent> = f.get().unwrap();

        tx.send(event).unwrap();
    });
}

async fn service(db: DatabaseConnection) {
    let (tx, cx) = channel::<RawWindowEvent>();

    std::thread::spawn(move || {
        TX.with(|f| f.set(tx)).unwrap();

        let hook = unsafe {
            SetWinEventHook(
                EVENT_SYSTEM_FOREGROUND,
                EVENT_SYSTEM_FOREGROUND,
                None,
                Some(win_event_hook_callback),
                0,
                0,
                0,
            )
        };

        let _ = unsafe { MessageBoxW(None, w!("Text"), w!("Text"), MB_OK) };
    });

    loop {
        let event = cx.recv();

        if (event.is_err()) {
            break;
        }

        let event = event.unwrap();

        // There goes some event processing with another windows api calls or simple calculations...

        let record = ActiveModel {
            timestamp: Set(event.timestamp),
            ..Default::default()
        };

        Entity::insert(record).exec(&db).await.unwrap();
    }
}

#[derive(Clone, Debug, PartialEq, Eq, DeriveEntityModel, Serialize, Deserialize)]
#[serde(crate = "rocket::serde")]
#[sea_orm(table_name = "event")]
pub struct Model {
    #[sea_orm(primary_key)]
    pub id: i32,
    pub timestamp: u32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {}

impl ActiveModelBehavior for ActiveModel {}

#[derive(Debug, Clone, Copy)]
pub struct RawWindowEvent {
    pub child_id: HWINEVENTHOOK,
    pub hook_handle: u32,
    pub event_id: HWND,
    pub window_handle: i32,
    pub object_id: i32,
    pub thread_id: u32,
    pub timestamp: u32,
}

Dependencies in Cargo.toml:

[dependencies]
dotenv = "0.15.0"

tokio = { version = "1.28.2", features = ["full"] }

sea-orm = { version = "^0.11", features = [ "sqlx-sqlite", "runtime-tokio-native-tls", "macros" ] }

rocket = {version = "0.5.0-rc.3", features = ["json"]}

once_cell = "1.17.1"

[dependencies.windows]
version = "0.48.0"
features = [
    "Win32_Foundation",
    "Win32_UI_Accessibility",
    "Win32_UI_WindowsAndMessaging",
    "Win32_System_Threading",
    "Win32_System_ProcessStatus"
]
dalvi
  • 27
  • 5
  • 2
    You're going to need to be more specific about "some things going on"; this sounds like you have wrapped a blocking call into a `Future`, which would not *actually* do anything. Simply wrapping a blocking process into a future doesn't magically make it async - if it is blocking, it'll need to be fired on a thread, or changed to be non-blocking. – Sébastien Renauld Jun 25 '23 at 12:31
  • 1
    It sounds like you're blocking the runtime somewhere, but we need code to be sure. Please post a [Minimal, Reproducible Example](https://stackoverflow.com/help/minimal-reproducible-example). – Chayim Friedman Jun 25 '23 at 12:31
  • @SébastienRenauld, yes, it should be blocking because I use `loop` which waits for channel message. Channel message then processed and written into database with async function. Based on your answer: How can I make my service run into another thread if its async function? – dalvi Jun 25 '23 at 12:44
  • @ChayimFriedman, updated my question with example – dalvi Jun 25 '23 at 12:56

1 Answers1

3

You're using a synchronous channel, and by that you're blocking the runtime. Use the channel defined in tokio: tokio::sync::mpsc.

Chayim Friedman
  • 47,971
  • 5
  • 48
  • 77
  • Well, this should solve my problem but I can't send message to channel with this within `win_event_hook_callback`, because I can't mark this function as async, neither use async closure with `move |f| async move {...}`, because `event` value may outlive closure. Any thoughts? – dalvi Jun 25 '23 at 13:17
  • 1
    @dalvi `tokio::sync::mpsc::UnboundedSender` (which is the equivalent of the channel you're currently using) has a non-async `send()`. – Chayim Friedman Jun 25 '23 at 13:38
  • And this works! Thank you! – dalvi Jun 25 '23 at 13:42