Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions devolutions-session/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ optional = true
features = [
"Win32_Foundation",
"Win32_System_Shutdown",
"Win32_UI_Accessibility",
"Win32_UI_WindowsAndMessaging",
"Win32_UI_Shell",
"Win32_System_Console",
Expand Down
1 change: 1 addition & 0 deletions devolutions-session/src/dvc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,5 +40,6 @@ pub mod now_message_dissector;
pub mod process;
pub mod rdm;
pub mod task;
pub mod window_monitor;

mod env;
3 changes: 3 additions & 0 deletions devolutions-session/src/dvc/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,9 @@ pub enum ServerChannelEvent {
session_id: u32,
error: ExecError,
},
WindowRecordingEvent {
message: now_proto_pdu::OwnedNowSessionWindowRecEventMsg,
},
}

pub struct WinApiProcessCtx {
Expand Down
113 changes: 94 additions & 19 deletions devolutions-session/src/dvc/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ use now_proto_pdu::{
NowExecBatchMsg, NowExecCancelRspMsg, NowExecCapsetFlags, NowExecDataMsg, NowExecDataStreamKind, NowExecMessage,
NowExecProcessMsg, NowExecPwshMsg, NowExecResultMsg, NowExecRunMsg, NowExecStartedMsg, NowExecWinPsMsg, NowMessage,
NowMsgBoxResponse, NowProtoError, NowProtoVersion, NowRdmMessage, NowSessionCapsetFlags, NowSessionMessage,
NowSessionMsgBoxReqMsg, NowSessionMsgBoxRspMsg, NowStatusError, NowSystemCapsetFlags, NowSystemMessage,
SetKbdLayoutOption,
NowSessionMsgBoxRspMsg, NowSessionWindowRecStartMsg, NowStatusError, NowSystemCapsetFlags, NowSystemMessage,
OwnedNowExecResultMsg, OwnedNowMessage, OwnedNowSessionMsgBoxReqMsg, OwnedNowSessionWindowRecEventMsg,
SetKbdLayoutOption, WindowRecStartFlags,
};
use tokio::select;
use tokio::sync::mpsc::{self, Receiver, Sender};
Expand Down Expand Up @@ -39,6 +40,7 @@ use crate::dvc::fs::TmpFileGuard;
use crate::dvc::io::run_dvc_io;
use crate::dvc::process::{ExecError, ServerChannelEvent, WinApiProcess, WinApiProcessBuilder};
use crate::dvc::rdm::RdmMessageProcessor;
use crate::dvc::window_monitor::{WindowMonitorConfig, run_window_monitor};

// One minute heartbeat interval by default
const DEFAULT_HEARTBEAT_INTERVAL: core::time::Duration = core::time::Duration::from_secs(60);
Expand Down Expand Up @@ -107,8 +109,8 @@ impl Task for DvcIoTask {
}

async fn process_messages(
mut read_rx: Receiver<NowMessage<'static>>,
dvc_tx: WinapiSignaledSender<NowMessage<'static>>,
mut read_rx: Receiver<OwnedNowMessage>,
dvc_tx: WinapiSignaledSender<OwnedNowMessage>,
mut shutdown_signal: devolutions_gateway_task::ShutdownSignal,
) -> anyhow::Result<()> {
let (io_notification_tx, mut task_rx) = mpsc::channel(100);
Expand Down Expand Up @@ -230,6 +232,11 @@ async fn process_messages(

handle_exec_error(&dvc_tx, session_id, error).await;
}
ServerChannelEvent::WindowRecordingEvent { message } => {
if let Err(error) = handle_window_recording_event(&dvc_tx, message).await {
error!(%error, "Failed to handle window recording event");
}
}
ServerChannelEvent::CloseChannel => {
info!("Received close channel notification, shutting down...");

Expand Down Expand Up @@ -266,7 +273,8 @@ fn default_server_caps() -> NowChannelCapsetMsg {
NowSessionCapsetFlags::LOCK
| NowSessionCapsetFlags::LOGOFF
| NowSessionCapsetFlags::MSGBOX
| NowSessionCapsetFlags::SET_KBD_LAYOUT,
| NowSessionCapsetFlags::SET_KBD_LAYOUT
| NowSessionCapsetFlags::WINDOW_RECORDING,
)
.with_exec_capset(
NowExecCapsetFlags::STYLE_RUN
Expand All @@ -285,18 +293,22 @@ enum ProcessMessageAction {
}

struct MessageProcessor {
dvc_tx: WinapiSignaledSender<NowMessage<'static>>,
dvc_tx: WinapiSignaledSender<OwnedNowMessage>,
io_notification_tx: Sender<ServerChannelEvent>,
#[allow(dead_code)] // Not yet used.
capabilities: NowChannelCapsetMsg,
sessions: HashMap<u32, WinApiProcess>,
rdm_handler: RdmMessageProcessor,
/// Shutdown signal sender for window monitoring task.
window_monitor_shutdown_tx: Option<tokio::sync::oneshot::Sender<()>>,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: Add inline comment explaining why you use a oneshot channel over other obvious synchronisation primitives such as Notify. From what I’ve seen in the code, you are after the "exactly once" semantic that Notify does not provide.

/// Handle for the window monitor task.
window_monitor_handle: Option<tokio::task::JoinHandle<()>>,
}

impl MessageProcessor {
pub(crate) fn new(
capabilities: NowChannelCapsetMsg,
dvc_tx: WinapiSignaledSender<NowMessage<'static>>,
dvc_tx: WinapiSignaledSender<OwnedNowMessage>,
io_notification_tx: Sender<ServerChannelEvent>,
) -> Self {
let rdm_handler = RdmMessageProcessor::new(dvc_tx.clone());
Expand All @@ -306,6 +318,8 @@ impl MessageProcessor {
capabilities,
sessions: HashMap::new(),
rdm_handler,
window_monitor_shutdown_tx: None,
window_monitor_handle: None,
}
}

Expand All @@ -330,10 +344,7 @@ impl MessageProcessor {
Ok(())
}

pub(crate) async fn process_message(
&mut self,
message: NowMessage<'static>,
) -> anyhow::Result<ProcessMessageAction> {
pub(crate) async fn process_message(&mut self, message: OwnedNowMessage) -> anyhow::Result<ProcessMessageAction> {
match message {
NowMessage::Channel(NowChannelMessage::Capset(client_caps)) => {
return Ok(ProcessMessageAction::Restart(client_caps));
Expand Down Expand Up @@ -470,6 +481,14 @@ impl MessageProcessor {
error!(%error, "Failed to set keyboard layout");
}
}
NowMessage::Session(NowSessionMessage::WindowRecStart(start_msg)) => {
if let Err(error) = self.start_window_recording(start_msg).await {
error!(%error, "Failed to start window recording");
}
}
NowMessage::Session(NowSessionMessage::WindowRecStop(_stop_msg)) => {
self.stop_window_recording().await;
}
NowMessage::System(NowSystemMessage::Shutdown(shutdown_msg)) => {
let mut current_process_token =
Process::current_process().token(TOKEN_ADJUST_PRIVILEGES | TOKEN_QUERY)?;
Expand Down Expand Up @@ -773,6 +792,56 @@ impl MessageProcessor {

self.sessions.clear();
}

async fn start_window_recording(&mut self, start_msg: NowSessionWindowRecStartMsg) -> anyhow::Result<()> {
// Stop any existing window recording first.
self.stop_window_recording().await;

info!("Starting window recording");

let track_title_changes = start_msg.flags.contains(WindowRecStartFlags::TRACK_TITLE_CHANGE);

// Create shutdown channel for window monitor.
let (shutdown_tx, shutdown_rx) = tokio::sync::oneshot::channel();

// Store shutdown sender so we can stop monitoring later.
self.window_monitor_shutdown_tx = Some(shutdown_tx);

// Spawn window monitor task.
let event_tx = self.io_notification_tx.clone();
let poll_interval = start_msg.poll_interval;
let window_monitor_handle = tokio::task::spawn(async move {
let mut config = WindowMonitorConfig::new(event_tx, track_title_changes, shutdown_rx);

// Only set custom poll interval if specified (non-zero).
if poll_interval > 0 {
config = config.with_poll_interval_ms(u64::from(poll_interval));
}

if let Err(error) = run_window_monitor(config).await {
error!(%error, "Window monitor failed");
}
});

self.window_monitor_handle = Some(window_monitor_handle);

Ok(())
}

async fn stop_window_recording(&mut self) {
if let Some(shutdown_tx) = self.window_monitor_shutdown_tx.take() {
info!("Stopping window recording");
// Send shutdown signal (ignore errors if receiver was already dropped).
let _ = shutdown_tx.send(());

// Wait for the task to finish.
if let Some(handle) = self.window_monitor_handle.take()
&& let Err(error) = handle.await
{
error!(%error, "Window monitor task panicked");
}
}
}
}

fn append_ps_args(args: &mut Vec<String>, msg: &NowExecWinPsMsg<'_>) {
Expand Down Expand Up @@ -859,7 +928,7 @@ fn append_pwsh_args(args: &mut Vec<String>, msg: &NowExecPwshMsg<'_>) {
}
}

fn show_message_box<'a>(request: &NowSessionMsgBoxReqMsg<'static>) -> NowSessionMsgBoxRspMsg<'a> {
fn show_message_box<'a>(request: &OwnedNowSessionMsgBoxReqMsg) -> NowSessionMsgBoxRspMsg<'a> {
info!("Processing message box request `{}`", request.request_id());

let title = WideString::from(request.title().unwrap_or("Devolutions Session"));
Expand Down Expand Up @@ -913,10 +982,7 @@ fn show_message_box<'a>(request: &NowSessionMsgBoxReqMsg<'static>) -> NowSession
NowSessionMsgBoxRspMsg::new_success(request.request_id(), NowMsgBoxResponse::new(message_box_response))
}

async fn process_msg_box_req(
request: NowSessionMsgBoxReqMsg<'static>,
dvc_tx: WinapiSignaledSender<NowMessage<'static>>,
) {
async fn process_msg_box_req(request: OwnedNowSessionMsgBoxReqMsg, dvc_tx: WinapiSignaledSender<OwnedNowMessage>) {
let response = show_message_box(&request).into_owned();

if !request.is_response_expected() {
Expand All @@ -928,7 +994,7 @@ async fn process_msg_box_req(
}
}

fn make_status_error_failsafe(session_id: u32, error: NowStatusError) -> NowExecResultMsg<'static> {
fn make_status_error_failsafe(session_id: u32, error: NowStatusError) -> OwnedNowExecResultMsg {
NowExecResultMsg::new_error(session_id, error)
.unwrap_or_else(|error| {
warn!(%error, "Now status error message do not fit into NOW-PROTO error message; sending error without message");
Expand All @@ -937,7 +1003,7 @@ fn make_status_error_failsafe(session_id: u32, error: NowStatusError) -> NowExec
})
}

fn make_generic_error_failsafe(session_id: u32, code: u32, message: String) -> NowExecResultMsg<'static> {
fn make_generic_error_failsafe(session_id: u32, code: u32, message: String) -> OwnedNowExecResultMsg {
let error = NowStatusError::new_generic(code);

error
Expand All @@ -950,7 +1016,16 @@ fn make_generic_error_failsafe(session_id: u32, code: u32, message: String) -> N
})
}

async fn handle_exec_error(dvc_tx: &WinapiSignaledSender<NowMessage<'static>>, session_id: u32, error: ExecError) {
async fn handle_window_recording_event(
dvc_tx: &WinapiSignaledSender<OwnedNowMessage>,
message: OwnedNowSessionWindowRecEventMsg,
) -> anyhow::Result<()> {
dvc_tx.send(NowMessage::from(message.into_owned())).await?;

Ok(())
}

async fn handle_exec_error(dvc_tx: &WinapiSignaledSender<OwnedNowMessage>, session_id: u32, error: ExecError) {
let msg = match error {
ExecError::NowStatus(status) => {
warn!(%session_id, %status, "Process execution failed with NOW-PROTO error");
Expand Down
Loading