namespace Swan.Threading { using System; using System.Threading; using System.Threading.Tasks; /// /// Provides a base implementation for application workers. /// /// public abstract class TimerWorkerBase : WorkerBase { private readonly object _syncLock = new object(); private readonly Timer _timer; private bool _isTimerAlive = true; /// /// Initializes a new instance of the class. /// /// The name. /// The execution interval. protected TimerWorkerBase(string name, TimeSpan period) : base(name, period) { // Instantiate the timer that will be used to schedule cycles _timer = new Timer( ExecuteTimerCallback, this, Timeout.Infinite, Timeout.Infinite); } /// public override Task StartAsync() { lock (_syncLock) { if (WorkerState == WorkerState.Paused || WorkerState == WorkerState.Waiting) return ResumeAsync(); if (WorkerState != WorkerState.Created) return Task.FromResult(WorkerState); if (IsStopRequested) return Task.FromResult(WorkerState); var task = QueueStateChange(StateChangeRequest.Start); Interrupt(); return task; } } /// public override Task PauseAsync() { lock (_syncLock) { if (WorkerState != WorkerState.Running && WorkerState != WorkerState.Waiting) return Task.FromResult(WorkerState); if (IsStopRequested) return Task.FromResult(WorkerState); var task = QueueStateChange(StateChangeRequest.Pause); Interrupt(); return task; } } /// public override Task ResumeAsync() { lock (_syncLock) { if (WorkerState == WorkerState.Created) return StartAsync(); if (WorkerState != WorkerState.Paused && WorkerState != WorkerState.Waiting) return Task.FromResult(WorkerState); if (IsStopRequested) return Task.FromResult(WorkerState); var task = QueueStateChange(StateChangeRequest.Resume); Interrupt(); return task; } } /// public override Task StopAsync() { lock (_syncLock) { if (WorkerState == WorkerState.Stopped || WorkerState == WorkerState.Created) { WorkerState = WorkerState.Stopped; return Task.FromResult(WorkerState); } var task = QueueStateChange(StateChangeRequest.Stop); Interrupt(); return task; } } /// /// Schedules a new cycle for execution. The delay is given in /// milliseconds. Passing a delay of 0 means a new cycle should be executed /// immediately. /// /// The delay. protected void ScheduleCycle(int delay) { lock (_syncLock) { if (!_isTimerAlive) return; _timer.Change(delay, Timeout.Infinite); } } /// protected override void Dispose(bool disposing) { base.Dispose(disposing); lock (_syncLock) { if (!_isTimerAlive) return; _isTimerAlive = false; _timer.Dispose(); } } /// /// Cancels the current token and schedules a new cycle immediately. /// private void Interrupt() { lock (_syncLock) { if (WorkerState == WorkerState.Stopped) return; CycleCancellation.Cancel(); ScheduleCycle(0); } } /// /// Executes the worker cycle control logic. /// This includes processing state change requests, /// the execution of use cycle code, /// and the scheduling of new cycles. /// private void ExecuteWorkerCycle() { CycleStopwatch.Restart(); lock (_syncLock) { if (IsDisposing || IsDisposed) { WorkerState = WorkerState.Stopped; // Cancel any awaiters try { StateChangedEvent.Set(); } catch { /* Ignore */ } return; } // Prevent running another instance of the cycle if (CycleCompletedEvent.IsSet == false) return; // Lock the cycle and capture relevant state valid for this cycle CycleCompletedEvent.Reset(); } var interruptToken = CycleCancellation.Token; var initialWorkerState = WorkerState; // Process the tasks that are awaiting if (ProcessStateChangeRequests()) return; try { if (initialWorkerState == WorkerState.Waiting && !interruptToken.IsCancellationRequested) { // Mark the state as Running WorkerState = WorkerState.Running; // Call the execution logic ExecuteCycleLogic(interruptToken); } } catch (Exception ex) { OnCycleException(ex); } finally { // Update the state WorkerState = initialWorkerState == WorkerState.Paused ? WorkerState.Paused : WorkerState.Waiting; lock (_syncLock) { // Signal the cycle has been completed so new cycles can be executed CycleCompletedEvent.Set(); // Schedule a new cycle ScheduleCycle(!interruptToken.IsCancellationRequested ? ComputeCycleDelay(initialWorkerState) : 0); } } } /// /// Represents the callback that is executed when the ticks. /// /// The state -- this contains the worker. private void ExecuteTimerCallback(object state) => ExecuteWorkerCycle(); /// /// Queues a transition in worker state for processing. Returns a task that can be awaited /// when the operation completes. /// /// The request. /// The awaitable task. private Task QueueStateChange(StateChangeRequest request) { lock (_syncLock) { if (StateChangeTask != null) return StateChangeTask; var waitingTask = new Task(() => { StateChangedEvent.Wait(); lock (_syncLock) { StateChangeTask = null; return WorkerState; } }); StateChangeTask = waitingTask; StateChangedEvent.Reset(); StateChangeRequests[request] = true; waitingTask.Start(); CycleCancellation.Cancel(); return waitingTask; } } /// /// Processes the state change queue by checking pending events and scheduling /// cycle execution accordingly. The is also updated. /// /// Returns true if the execution should be terminated. false otherwise. private bool ProcessStateChangeRequests() { lock (_syncLock) { var currentState = WorkerState; var hasRequest = false; var schedule = 0; // Update the state according to request priority if (StateChangeRequests[StateChangeRequest.Stop] || IsDisposing || IsDisposed) { hasRequest = true; WorkerState = WorkerState.Stopped; schedule = StateChangeRequests[StateChangeRequest.Stop] ? Timeout.Infinite : 0; } else if (StateChangeRequests[StateChangeRequest.Pause]) { hasRequest = true; WorkerState = WorkerState.Paused; schedule = Timeout.Infinite; } else if (StateChangeRequests[StateChangeRequest.Start] || StateChangeRequests[StateChangeRequest.Resume]) { hasRequest = true; WorkerState = WorkerState.Waiting; } // Signals all state changes to continue // as a command has been handled. if (hasRequest) { ClearStateChangeRequests(schedule, currentState, WorkerState); } return hasRequest; } } /// /// Signals all state change requests to set. /// /// The cycle schedule. /// The previous worker state. /// The new worker state. private void ClearStateChangeRequests(int schedule, WorkerState oldState, WorkerState newState) { lock (_syncLock) { // Mark all events as completed StateChangeRequests[StateChangeRequest.Start] = false; StateChangeRequests[StateChangeRequest.Pause] = false; StateChangeRequests[StateChangeRequest.Resume] = false; StateChangeRequests[StateChangeRequest.Stop] = false; StateChangedEvent.Set(); CycleCompletedEvent.Set(); OnStateChangeProcessed(oldState, newState); ScheduleCycle(schedule); } } } }