定时任务框架的实现
前言
基本上每个语言都有自己的定时任务实现方式,为什么我们还需要自己封装一个呢?主要是现在的定时任务框架还是不够满足我们的需求,那我们要的定时框架有哪些需求呢?
- 完善的日志支持
- 任务添加的日志
- 任务执行的日志(包含执行时间,是否出错等信息)
- 任务取消的日志
- 任务被多线程执行,提高执行效率
- 对系统时间的调整敏感
我们发现没有一个定时系统能够满足这些需求,所以我们就封装了一个。
实现思路
- 这个实现思路任务是有一定的执行延迟,最大的延迟的是等待的
最小时间间隔
- 需要有一个高效的数据结构存储任务队列,能够快速获取最近需要执行的任务
- 线程池的数量可定义,或者交给系统的线程池执行
- 自定义的系统可以方便在需要的地方记录日志
一、任务类的封装
对于在系统中可以执行的定时任务,我们进行了一些基础的封装,定义了一个定时任务的基本要求,也方便定时系统的统一调度
- 定时任务都会分配一个全局的
任务Id
- 定时任务必须指定一个
名称
- 定时任务必须指定
执行时间
- 定时任务实现
IComparable
,可以比较进行排序
public class SimpleTimerTask : IComparable<SimpleTimerTask>
{
/// <summary>
/// 任务Id自增
/// </summary>
protected static int Id;
/// <summary>
/// 任务名称
/// </summary>
public string Name { get; private set; }
/// <summary>
/// 执行时间
/// </summary>
public long ExecutionTime { get; private set; }
/// <summary>
/// 任务Id
/// </summary>
public int TaskId { get; private set; }
/// <summary>
/// 任务取消标志
/// </summary>
private volatile bool cancelled;
/// <summary>
/// 任务执行标志
/// </summary>
private volatile bool executed;
/// <summary>
/// 任务执行器
/// </summary>
protected Action _action;
public SimpleTimerTask(string name, long executionTime)
{
Name = name;
ExecutionTime = executionTime;
TaskId = NextId();
}
public SimpleTimerTask(string name, long executionTime, Action action)
{
Name = name;
ExecutionTime = executionTime;
TaskId = NextId();
_action = action;
}
public bool Cancel()
{
if (executed)
{
return false;
}
cancelled = true;
return true;
}
internal bool CanExecute()
{
if (cancelled)
{
return false;
}
executed = true;
return true;
}
internal Action GetTask() => _action;
private int NextId()
{
return Interlocked.Increment(ref Id);
}
public int CompareTo(SimpleTimerTask? other)
{
if (null == other)
{
return 1;
}
return (int)(ExecutionTime - other.ExecutionTime);
}
}
二、高效的任务队列
如果这套定时系统要高效的运行,一定要找到一个高效存储任务的数据结构。按照系统需求我们发现最小堆
是最适合的数据结构,它有以下优势:
- 有序插入的复杂度是O(nlogn)
- 获取最近的需要执行的定时任务是O(1)
- 最小堆基于数组实现,访问效率高
一般各大编程语言的SDK都提供了优先队列
的实现,可惜C#
没有提供,这里提供一个类似JDK优先队列的实现
public class PriorityQueue<T> : IEnumerable<T>
{
/// <summary>
/// 默认最小堆容量
/// </summary>
private const int DEFAULT_INITIAL_CAPACITY = 11;
/// <summary>
/// 内部数组
/// </summary>
private T[] _array;
/// <summary>
/// 最小堆的大小
/// </summary>
public int Count { get; set; }
/// <summary>
/// 比较器
/// </summary>
private readonly IComparer<T> _comparer;
/// <summary>
/// 版本号
/// </summary>
private int _version;
public PriorityQueue() : this (DEFAULT_INITIAL_CAPACITY)
{
}
public PriorityQueue(int capacity) : this (capacity, Comparer<T>.Default)
{
}
public PriorityQueue(int capacity, IComparer<T> comparer)
{
if (capacity < 1)
{
throw new ArgumentException($"{nameof(capacity)} must greater than one");
}
_array = new T[capacity];
_comparer = comparer;
}
/// <summary>
/// 将元素压入堆
/// </summary>
/// <param name="item"></param>
/// <exception cref="ArgumentException"></exception>
public void Enqueue(T item)
{
if (item == null)
{
throw new ArgumentException("item can't be null");
}
var i = Count;
if (i >= _array.Length)
{
GrowCapacity(i + 1);
}
_version++;
SiftUp(i, item);
Count += 1;
}
/// <summary>
/// 取最小堆堆顶
/// </summary>
/// <returns></returns>
public T Dequeue()
{
return TryDequeue(out var result) ? result : default;
}
/// <summary>
/// 取最小堆堆顶
/// </summary>
/// <param name="result"></param>
/// <returns></returns>
public bool TryDequeue([MaybeNullWhen(false)] out T result)
{
if (Count == 0)
{
result = default;
return false;
}
_version++;
var s = --Count; // 最后一个元素位置
result = _array[0]; // 取堆顶
var x = _array[s]; // 获取当前堆的最后一个元素
_array[s] = default; // 最后位置置空
if (s != 0)
{
SiftDown(0, x); // 当前堆的最后一个元素下沉
}
return true;
}
/// <summary>
/// 查看最小堆对顶
/// </summary>
/// <returns></returns>
public T Peek()
{
return TryPeek(out var result) ? result : default;
}
/// <summary>
/// 尝试查看最小堆堆顶
/// </summary>
/// <param name="result"></param>
/// <returns></returns>
public bool TryPeek([MaybeNullWhen(false)] out T result)
{
if (Count == 0)
{
result = default;
return false;
}
result = _array[0];
return true;
}
/// <summary>
/// 上浮
/// </summary>
/// <param name="k"></param>
/// <param name="x"></param>
/// <exception cref="NotImplementedException"></exception>
private void SiftUp(int k, T x)
{
while (k > 0)
{
var parent = (k - 1) >> 1; // k / 2
var e = _array[parent];
if (_comparer.Compare(x, e) >= 0)
{
break;
}
_array[k] = e;
k = parent;
}
_array[k] = x;
}
/// <summary>
/// 下沉
/// </summary>
/// <param name="k"></param>
/// <param name="x"></param>
private void SiftDown(int k, T x)
{
var half = Count >> 1;
while (k < half)
{
var child = (k << 1) + 1; // left child
var c = _array[child];
var right = child + 1; // right child
if (right < Count && _comparer.Compare(c, _array[right]) > 0)
{
// 左节点大于右节点,取右节点
c = _array[right];
child = right;
}
if (_comparer.Compare(x, c) <= 0)
{
break;
}
_array[k] = c;
k = child;
}
_array[k] = x;
}
/// <summary>
/// 容量扩充
/// </summary>
/// <param name="minCapacity"></param>
private void GrowCapacity(int minCapacity)
{
var oldCapacity = _array.Length;
// double size if small, else grow 50%
var newCapacity = oldCapacity + (oldCapacity < 64 ? oldCapacity + 2 : oldCapacity >> 1);
newCapacity = Math.Max(newCapacity, minCapacity);
var newArray = new T[newCapacity];
Array.Copy(_array, newArray, oldCapacity);
_array = newArray;
}
public IEnumerator<T> GetEnumerator()
{
return new Enumerator(this);
}
IEnumerator IEnumerable.GetEnumerator()
{
return GetEnumerator();
}
public struct Enumerator : IEnumerator<T>
{
private readonly PriorityQueue<T> _queue;
private int _index;
private readonly int _version;
private readonly int _size;
private T _current;
public Enumerator(PriorityQueue<T> queue)
{
_queue = queue;
_index = 0;
_version = queue._version;
_current = default;
_size = queue.Count;
}
public bool MoveNext()
{
var localQueue = _queue;
if (localQueue._version == _version && _index < _size)
{
_current = localQueue._array[_index];
_index++;
return true;
}
if (_version != localQueue._version)
{
throw new InvalidOperationException();
}
_index = _size + 1;
_current = default;
return false;
}
public void Reset()
{
_index = 0;
_current = default;
}
public T Current => _current;
object IEnumerator.Current => Current;
public void Dispose()
{
}
}
}
三、实现定时系统
定时系统基本按照最开始的流程图就可以实现,这里给出最主要的添加任务和主循环实现
private readonly PriorityQueue<SimpleTimerTask> _taskList;
/// <summary>
/// 添加定时任务
/// </summary>
/// <param name="task"></param>
public void Schedule(SimpleTimerTask task)
{
lock (_lockObj)
{
_taskList.Enqueue(task);
}
TimerLogger.Log.Info("{0}#add#{1}#{2}#{3}", "simple", task.TaskId, task.Name, task.ExecutionTime);
}
void MainLoop()
{
if (_taskList.Count <= 0)
{
return;
}
var currTime = TimeUtil.GetTimestamp(TimeUtil.FastDateTimeNow);
lock (_lockObj)
{
if (_taskList.Count > 0)
{
while (_taskList.TryPeek(out var task))
{
if (task.ExecutionTime <= currTime)
{
_taskList.Dequeue();
if (task.CanExecute())
{
// 执行任务
Task.Factory.StartNew(task.GetTask(), CancellationToken.None, TaskCreationOptions.None, GetTaskScheduler(task.TaskId));
}
}
else
{
break;
}
}
}
}
}
-
MainLoop
是一个被线程周期性调用的函数,调度代码如下:
public static (Task, CancellationTokenSource) CreateLoopTask(Action action, int interval = 200)
{
void Function()
{
while (true)
{
try
{
action();
}
catch (Exception e)
{
Log.LogFactory.Default.Error(e, "loop task error");
}
try
{
System.Threading.Thread.Sleep(interval);
}
catch
{
// ignored
}
}
}
var cts = new CancellationTokenSource();
// return (Task.Run(Function, cts.Token), cts);
return (Task.Factory.StartNew(Function, cts.Token, TaskCreationOptions.LongRunning, FixedTaskScheduler.Current), cts);
}
结语
定时任务系统的基本原理是比较简单的,但也是十分可靠的!对于游戏来说,里面的时间系统还可以基于Mock,可以被加速,有很大的想象空间