using System; using System.Threading; using System.Threading.Tasks; namespace Swan.Threading { /// /// Provides a base implementation for application workers. /// /// public abstract class TimerWorkerBase : WorkerBase { private readonly Object _syncLock = new Object(); private readonly Timer _timer; private Boolean _isTimerAlive = true; /// /// Initializes a new instance of the class. /// /// The name. /// The execution interval. protected TimerWorkerBase(String name, TimeSpan period) : base(name, period) => // Instantiate the timer that will be used to schedule cycles this._timer = new Timer(this.ExecuteTimerCallback, this, Timeout.Infinite, Timeout.Infinite); /// public override Task StartAsync() { lock(this._syncLock) { if(this.WorkerState == WorkerState.Paused || this.WorkerState == WorkerState.Waiting) { return this.ResumeAsync(); } if(this.WorkerState != WorkerState.Created) { return Task.FromResult(this.WorkerState); } if(this.IsStopRequested) { return Task.FromResult(this.WorkerState); } Task task = this.QueueStateChange(StateChangeRequest.Start); this.Interrupt(); return task; } } /// public override Task PauseAsync() { lock(this._syncLock) { if(this.WorkerState != WorkerState.Running && this.WorkerState != WorkerState.Waiting) { return Task.FromResult(this.WorkerState); } if(this.IsStopRequested) { return Task.FromResult(this.WorkerState); } Task task = this.QueueStateChange(StateChangeRequest.Pause); this.Interrupt(); return task; } } /// public override Task ResumeAsync() { lock(this._syncLock) { if(this.WorkerState == WorkerState.Created) { return this.StartAsync(); } if(this.WorkerState != WorkerState.Paused && this.WorkerState != WorkerState.Waiting) { return Task.FromResult(this.WorkerState); } if(this.IsStopRequested) { return Task.FromResult(this.WorkerState); } Task task = this.QueueStateChange(StateChangeRequest.Resume); this.Interrupt(); return task; } } /// public override Task StopAsync() { lock(this._syncLock) { if(this.WorkerState == WorkerState.Stopped || this.WorkerState == WorkerState.Created) { this.WorkerState = WorkerState.Stopped; return Task.FromResult(this.WorkerState); } Task task = this.QueueStateChange(StateChangeRequest.Stop); this.Interrupt(); return task; } } /// /// Schedules a new cycle for execution. The delay is given in /// milliseconds. Passing a delay of 0 means a new cycle should be executed /// immediately. /// /// The delay. protected void ScheduleCycle(Int32 delay) { lock(this._syncLock) { if(!this._isTimerAlive) { return; } _ = this._timer.Change(delay, Timeout.Infinite); } } /// protected override void Dispose(Boolean disposing) { base.Dispose(disposing); lock(this._syncLock) { if(!this._isTimerAlive) { return; } this._isTimerAlive = false; this._timer.Dispose(); } } /// /// Cancels the current token and schedules a new cycle immediately. /// private void Interrupt() { lock(this._syncLock) { if(this.WorkerState == WorkerState.Stopped) { return; } this.CycleCancellation.Cancel(); this.ScheduleCycle(0); } } /// /// Executes the worker cycle control logic. /// This includes processing state change requests, /// the execution of use cycle code, /// and the scheduling of new cycles. /// private void ExecuteWorkerCycle() { this.CycleStopwatch.Restart(); lock(this._syncLock) { if(this.IsDisposing || this.IsDisposed) { this.WorkerState = WorkerState.Stopped; // Cancel any awaiters try { this.StateChangedEvent.Set(); } catch { /* Ignore */ } return; } // Prevent running another instance of the cycle if(this.CycleCompletedEvent.IsSet == false) { return; } // Lock the cycle and capture relevant state valid for this cycle this.CycleCompletedEvent.Reset(); } CancellationToken interruptToken = this.CycleCancellation.Token; WorkerState initialWorkerState = this.WorkerState; // Process the tasks that are awaiting if(this.ProcessStateChangeRequests()) { return; } try { if(initialWorkerState == WorkerState.Waiting && !interruptToken.IsCancellationRequested) { // Mark the state as Running this.WorkerState = WorkerState.Running; // Call the execution logic this.ExecuteCycleLogic(interruptToken); } } catch(Exception ex) { this.OnCycleException(ex); } finally { // Update the state this.WorkerState = initialWorkerState == WorkerState.Paused ? WorkerState.Paused : WorkerState.Waiting; lock(this._syncLock) { // Signal the cycle has been completed so new cycles can be executed this.CycleCompletedEvent.Set(); // Schedule a new cycle this.ScheduleCycle(!interruptToken.IsCancellationRequested ? this.ComputeCycleDelay(initialWorkerState) : 0); } } } /// /// Represents the callback that is executed when the ticks. /// /// The state -- this contains the worker. private void ExecuteTimerCallback(Object state) => this.ExecuteWorkerCycle(); /// /// Queues a transition in worker state for processing. Returns a task that can be awaited /// when the operation completes. /// /// The request. /// The awaitable task. private Task QueueStateChange(StateChangeRequest request) { lock(this._syncLock) { if(this.StateChangeTask != null) { return this.StateChangeTask; } Task waitingTask = new Task(() => { this.StateChangedEvent.Wait(); lock(this._syncLock) { this.StateChangeTask = null; return this.WorkerState; } }); this.StateChangeTask = waitingTask; this.StateChangedEvent.Reset(); this.StateChangeRequests[request] = true; waitingTask.Start(); this.CycleCancellation.Cancel(); return waitingTask; } } /// /// Processes the state change queue by checking pending events and scheduling /// cycle execution accordingly. The is also updated. /// /// Returns true if the execution should be terminated. false otherwise. private Boolean ProcessStateChangeRequests() { lock(this._syncLock) { WorkerState currentState = this.WorkerState; Boolean hasRequest = false; Int32 schedule = 0; // Update the state according to request priority if(this.StateChangeRequests[StateChangeRequest.Stop] || this.IsDisposing || this.IsDisposed) { hasRequest = true; this.WorkerState = WorkerState.Stopped; schedule = this.StateChangeRequests[StateChangeRequest.Stop] ? Timeout.Infinite : 0; } else if(this.StateChangeRequests[StateChangeRequest.Pause]) { hasRequest = true; this.WorkerState = WorkerState.Paused; schedule = Timeout.Infinite; } else if(this.StateChangeRequests[StateChangeRequest.Start] || this.StateChangeRequests[StateChangeRequest.Resume]) { hasRequest = true; this.WorkerState = WorkerState.Waiting; } // Signals all state changes to continue // as a command has been handled. if(hasRequest) { this.ClearStateChangeRequests(schedule, currentState, this.WorkerState); } return hasRequest; } } /// /// Signals all state change requests to set. /// /// The cycle schedule. /// The previous worker state. /// The new worker state. private void ClearStateChangeRequests(Int32 schedule, WorkerState oldState, WorkerState newState) { lock(this._syncLock) { // Mark all events as completed this.StateChangeRequests[StateChangeRequest.Start] = false; this.StateChangeRequests[StateChangeRequest.Pause] = false; this.StateChangeRequests[StateChangeRequest.Resume] = false; this.StateChangeRequests[StateChangeRequest.Stop] = false; this.StateChangedEvent.Set(); this.CycleCompletedEvent.Set(); this.OnStateChangeProcessed(oldState, newState); this.ScheduleCycle(schedule); } } } }