Loom :在 Unity 多线程编程中实现线程间的数据同步,避免非主线程直接操作 Unity 对象。
在本文,笔者将使用 UnityEngine.LowLevel 命名空间下的 PlayerLoop 提供的 API 来重写 Loom 细节 。
前言:
在多线程异步编程中,非 UI 线程不得操作 UI 组件 (Unity中则是不得操作继承 UnityEngin.Object的组件),因此,便需要一个同步上下文的工具在各个“平行”的线程中来回穿插,传递线程执行的结果。
于是,我和 Loom 相遇了,这是一个久远而又美妙的相遇,虽 N 久不用,犹念念不忘。
Loom 译为织机 ,用在线程间数据同步,形如快速穿梭在众多平行线之中的梭子,意境恰如其名~
前段时间写 Security-Camera-Toolkit-For-Unity 时有用到 async /await
语法糖,需要用到线程间数据同步,便写了一个,名曰:TaskSync ,译为:任务同步器。
临近行文,笔者兴起将 TaskSync
重命名为 Loom
,于是本文标题也顺其自然的引入了:老瓶新酒 的说法,下面就看看笔者是如何将 “新酒” 装入如此经典的 “老瓶” 之中的...
实现:
- 使用属性:
RuntimeInitializeOnLoadMethod
在场景载入前安装本工具。
[RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.BeforeSceneLoad)]
static void Install()
{
}
- 使用方法:
PlayerLoop.GetCurrentPlayerLoop()
获得PlayerLoopSystem
var playerloop = PlayerLoop.GetCurrentPlayerLoop();
- 用户自定义一个
PlayerLoopSystem
并插入到第二步中获取的PlayerLoopSystem
中
var loop = new PlayerLoopSystem
{
type = typeof(Loom),
updateDelegate = Update
};
//1. 找到 Update Loop System
int index = Array.FindIndex(playerloop.subSystemList, v => v.type == typeof(UnityEngine.PlayerLoop.Update));
//2. 将咱们的 loop 插入到 Update loop 中
var updateloop = playerloop.subSystemList[index];
var temp = updateloop.subSystemList.ToList();
temp.Add(loop);
updateloop.subSystemList = temp.ToArray();
playerloop.subSystemList[index] = updateloop;
- 使用方法:
PlayerLoop.SetPlayerLoop()
将编辑后的PlayerLoopSystem
设置回 Unity 引擎。
//3. 设置自定义的 Loop 到 Unity 引擎
PlayerLoop.SetPlayerLoop(playerloop);
代码:
Talk is cheap ,show me the code.
// Copyright (c) https://github.com/Bian-Sh
// Licensed under the MIT License.
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
#if UNITY_EDITOR
using UnityEditor;
#endif
using UnityEngine;
using UnityEngine.LowLevel;
namespace zFramework.Media.Internal
{
/// <summary>
/// 任务同步器:在主线程中执行 Action 委托
/// <br>原名 TaskSync,但是觉得 Loom(织布机)更有意境</br>
/// </summary>
public static class Loom
{
static SynchronizationContext context;
static readonly ConcurrentQueue<Action> tasks = new ConcurrentQueue<Action>();
[RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.BeforeSceneLoad)]
static void Install()
{
context = SynchronizationContext.Current;
#region 使用 PlayerLoop 在 Unity 主线程的 Update 中更新本任务同步器
var playerloop = PlayerLoop.GetCurrentPlayerLoop();
var loop = new PlayerLoopSystem
{
type = typeof(Loom),
updateDelegate = Update
};
//1. 找到 Update Loop System
int index = Array.FindIndex(playerloop.subSystemList, v => v.type == typeof(UnityEngine.PlayerLoop.Update));
//2. 将咱们的 loop 插入到 Update loop 中
var updateloop = playerloop.subSystemList[index];
var temp = updateloop.subSystemList.ToList();
temp.Add(loop);
updateloop.subSystemList = temp.ToArray();
playerloop.subSystemList[index] = updateloop;
//3. 设置自定义的 Loop 到 Unity 引擎
PlayerLoop.SetPlayerLoop(playerloop);
#if UNITY_EDITOR
//4. 已知:编辑器停止 Play 我们自己插入的 loop 依旧会触发,进入或退出Play 模式先清空 tasks
EditorApplication.playModeStateChanged -= EditorApplication_playModeStateChanged;
EditorApplication.playModeStateChanged += EditorApplication_playModeStateChanged;
static void EditorApplication_playModeStateChanged(PlayModeStateChange obj)
{
if (obj == PlayModeStateChange.ExitingEditMode ||
obj == PlayModeStateChange.ExitingPlayMode)
{
//清空任务列表
while (tasks.TryDequeue(out _)) { }
}
}
#endif
#endregion
}
#if UNITY_EDITOR
//5. 确保编辑器下推送的事件也能被执行
[InitializeOnLoadMethod]
static void EditorForceUpdate()
{
Install();
EditorApplication.update -= ForceEditorPlayerLoopUpdate;
EditorApplication.update += ForceEditorPlayerLoopUpdate;
void ForceEditorPlayerLoopUpdate()
{
if (EditorApplication.isPlayingOrWillChangePlaymode || EditorApplication.isCompiling || EditorApplication.isUpdating)
{
// Not in Edit mode, don't interfere
return;
}
Update();
}
}
#endif
// 将需要在主线程中执行的委托传递进来
public static void Post(Action task)
{
if (SynchronizationContext.Current == context)
{
task?.Invoke();
}
else
{
tasks.Enqueue(task);
}
}
static void Update()
{
while (tasks.TryDequeue(out var task))
{
try
{
task?.Invoke();
}
catch (Exception e)
{
Debug.Log($"{nameof(Loom)}: 封送的任务执行过程中发现异常,请确认: {e}");
}
}
}
}
}
本文的主角,Security-Camera-Toolkit-For-Unity 用到的 Loom 组件托管地址: 点我
用法:
/// <summary>
/// NVR 登录
/// <para>执行登录逻辑之前通过<see cref="INVRStateHandler.OnLogin"/>向名下监控发送事件</para>
/// </summary>
public virtual async Task LoginAsync()
{
foreach (var item in cameras)
{
Loom.Post(() => item.OnLogin(loginHandle));
}
await QueryCameraStatusAsync(true);
}
在实际生产中的使用请 点我
结语:
- 基于PlayerLoop API 实现的 Loom 不依赖
Monobehaviour
组件,无需关注 Loom 生命周期。 - 虽寥寥数行,却也实现了 Editor 下的 线程间通信,并且与播放时的使用不冲突。
- 使用 Lambda 表达式的闭包优势,故而 Action 没有设计参数。
- 使用
ConcurrentQueue
线程安全队列实现多线程共享任务列表,保证了线程安全。 - 笔者仅仅在
PlayerLoopSystem
中的 Update 子系统中插入了自定义的方法 ,各位同学慎重把玩,笔者对用户自己行为造成的损失概不负责。
扩展阅读:
- UniTask.PlayerLoopHelper.cs - 思路参考,相当于是它的精简版本。
- SynchronizationContext - .NET 框架中使用的同步线程间数据的类
- UnitySynchronizationContext - UnityEngine中的同步上下文组件,遥想刚发布时这个组件还存在死锁 bug ,所以呀,看待事物要以发展的眼光。如果愿意,可以扒下此脚本再把ExecuteTasks使用PlayerLoopSystem驱动一下就能使用了。
- PlayerLoop 由实验性的 API 转移到 Lowlevel 命名空间下,代表其趋于稳定,大家可以进一步了解。
补充:
使用如下代码可以输出 Unity 中当前使用的所有的 PlayerLoopSyetem ,建议 LOG 单列显示
using UnityEngine;
using UnityEngine.LowLevel;
public static class Foo
{
[RuntimeInitializeOnLoadMethod( RuntimeInitializeLoadType.BeforeSceneLoad)]
static void Install()
{
int ident = 0;
void ShowSystem(PlayerLoopSystem system)
{
ident++;
foreach (var item in system.subSystemList)
{
Debug.Log($"{new string('\t',ident)}{item .type}");
if (item.subSystemList?.Length>0)
{
ShowSystem(item);
}
}
ident--;
}
var system = PlayerLoop.GetCurrentPlayerLoop();
ShowSystem(system);
}
}
2022年5月8日更新
其实也可以不用这么麻烦,如果不关心使用哪个 Update 来执行投递的任务,完全可以化简成对 UnitySynchronizationContext
简单包装形式就行,至于何时以何种频度执行投递的任务那就是 Unity 的事了,代码如下:
using System;
using System.Threading;
using UnityEngine;
public static class Loom
{
static int threadId;
static SynchronizationContext context;
#if UNITY_EDITOR
[UnityEditor.InitializeOnLoadMethod()]
#endif
[RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.BeforeSceneLoad)]
private static void Install()
{
context = SynchronizationContext.Current;
threadId = Thread.CurrentThread.ManagedThreadId;
}
public static void Post(Action call)
{
if (Thread.CurrentThread.ManagedThreadId == threadId)
{
call?.Invoke();
}
else
{
context.Post(_state => call?.Invoke(), null);
}
}
}
当然,现在 UnitySynchronizationContext
还是 internal 的,可能后期这个类类型就该公开了吧。届时这个轮子就会搁置,权当作为扩展阅读了。
2023年5月12日更新
- Loom 是在逻辑代码中插入 lambda 表达式以切换到主线程,这并不优雅,改成 await 可行吗?
- 在 async 修饰的方法中还是需要使用
await Task.Run(async () => { });
这样也引入了 lambda 表达式,改成 await 可行吗?
答案是:yes!我们完全可以使用自上而下一句一行的同步写法随心切换线程,代码和效果如下图展示。
测试代码 | 效果展示 |
---|---|
PS:ToThreadPoolAsync 改 ToOtherThreadAsync 保证此 API 跟 ToMainThreadAsync 可以互相对举、联想。
那我该如何实现呢?
await ToMainThreadAsync 的实现
- 在 Loom 中插入以下方法
/// <summary>
/// 切换到主线程中执行
/// </summary>
public static SwitchToUnityThreadAwaitable ToMainThreadAsync() => new SwitchToUnityThreadAwaitable();
- 新增一个结构
public struct SwitchToUnityThreadAwaitable
{
public Awaiter GetAwaiter() => new Awaiter();
public struct Awaiter : INotifyCompletion
{
public bool IsCompleted => Loom.IsMainThread;
public void GetResult() { }
public void OnCompleted(Action continuation) => Loom.Post(continuation);
}
}
await ToOtherThreadAsync 的实现
- 在 Loom 中插入以下方法
/// <summary>
/// 切换到线程池中执行
/// </summary>
public static SwitchToThreadPoolAwaitable ToOtherThreadAsync() => new SwitchToThreadPoolAwaitable();
- 新增一个结构
public struct SwitchToThreadPoolAwaitable
{
public Awaiter GetAwaiter() => new Awaiter();
public struct Awaiter : ICriticalNotifyCompletion
{
static readonly WaitCallback switchToCallback = Callback;
public bool IsCompleted => false;
public void GetResult() { }
public void OnCompleted(Action continuation) => ThreadPool.UnsafeQueueUserWorkItem(switchToCallback, continuation);
public void UnsafeOnCompleted(Action continuation) => ThreadPool.UnsafeQueueUserWorkItem(switchToCallback, continuation);
static void Callback(object state)
{
var continuation = (Action)state;
continuation();
}
}
}
由于 Loom 成员发生了变化,下面给出完整代码,拷贝即可食用!
// Copyright (c) https://github.com/Bian-Sh
// Licensed under the MIT License.
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Threading;
#if UNITY_EDITOR
using UnityEditor;
#endif
using UnityEngine;
using UnityEngine.LowLevel;
namespace zFramework.Internal
{
/// <summary>
/// 任务同步器:在主线程中执行 Action 委托
/// <br>原名 TaskSync,但是觉得 Loom(织布机)更有意境</br>
/// </summary>
public static class Loom
{
public static int mainThreadId;
public static bool IsMainThread => Thread.CurrentThread.ManagedThreadId == mainThreadId;
static readonly ConcurrentQueue<Action> tasks = new ConcurrentQueue<Action>();
[RuntimeInitializeOnLoadMethod(RuntimeInitializeLoadType.BeforeSceneLoad)]
static void Install()
{
mainThreadId = Thread.CurrentThread.ManagedThreadId;
#region 使用 PlayerLoop 在 Unity 主线程的 Update 中更新本任务同步器
var playerloop = PlayerLoop.GetCurrentPlayerLoop();
var loop = new PlayerLoopSystem
{
type = typeof(Loom),
updateDelegate = Update
};
//1. 找到 Update Loop System
int index = Array.FindIndex(playerloop.subSystemList, v => v.type == typeof(UnityEngine.PlayerLoop.Update));
//2. 将咱们的 loop 插入到 Update loop 中
var updateloop = playerloop.subSystemList[index];
var temp = updateloop.subSystemList.ToList();
temp.Add(loop);
updateloop.subSystemList = temp.ToArray();
playerloop.subSystemList[index] = updateloop;
//3. 设置自定义的 Loop 到 Unity 引擎
PlayerLoop.SetPlayerLoop(playerloop);
#if UNITY_EDITOR
//4. 已知:编辑器停止 Play 我们自己插入的 loop 依旧会触发,进入或退出Play 模式先清空 tasks
EditorApplication.playModeStateChanged -= EditorApplication_playModeStateChanged;
EditorApplication.playModeStateChanged += EditorApplication_playModeStateChanged;
static void EditorApplication_playModeStateChanged(PlayModeStateChange obj)
{
if (obj == PlayModeStateChange.ExitingEditMode ||
obj == PlayModeStateChange.ExitingPlayMode)
{
//清空任务列表
while (tasks.TryDequeue(out _)) { }
}
}
#endif
#endregion
}
#if UNITY_EDITOR
//5. 确保编辑器下推送的事件也能被执行
[InitializeOnLoadMethod]
static void EditorForceUpdate()
{
Install();
EditorApplication.update -= ForceEditorPlayerLoopUpdate;
EditorApplication.update += ForceEditorPlayerLoopUpdate;
void ForceEditorPlayerLoopUpdate()
{
if (EditorApplication.isPlayingOrWillChangePlaymode || EditorApplication.isCompiling || EditorApplication.isUpdating)
{
// Not in Edit mode, don't interfere
return;
}
Update();
}
}
#endif
/// <summary>
/// 在主线程中执行
/// </summary>
/// <param name="task">要执行的委托</param>
public static void Post(Action task)
{
if (IsMainThread)
{
task?.Invoke();
}
else
{
tasks.Enqueue(task);
}
}
static void Update()
{
while (tasks.TryDequeue(out var task))
{
try
{
task?.Invoke();
}
catch (Exception e)
{
Debug.Log($"{nameof(Loom)}: 封送的任务执行过程中发现异常,请确认: {e}");
}
}
}
/// <summary>
/// 切换到主线程中执行
/// </summary>
public static SwitchToUnityThreadAwaitable ToMainThreadAsync() => new SwitchToUnityThreadAwaitable();
/// <summary>
/// 切换到线程池中执行
/// </summary>
public static SwitchToThreadPoolAwaitable ToOtherThreadAsync() => new SwitchToThreadPoolAwaitable();
}
public struct SwitchToUnityThreadAwaitable
{
public Awaiter GetAwaiter() => new();
public struct Awaiter : INotifyCompletion
{
public bool IsCompleted => Loom.IsMainThread;
public void GetResult() { }
public void OnCompleted(Action continuation) => Loom.Post(continuation);
}
}
public struct SwitchToThreadPoolAwaitable
{
public Awaiter GetAwaiter() => new();
public struct Awaiter : ICriticalNotifyCompletion
{
static readonly WaitCallback switchToCallback = Callback;
public bool IsCompleted => false;
public void GetResult() { }
public void OnCompleted(Action continuation) => ThreadPool.UnsafeQueueUserWorkItem(switchToCallback, continuation);
public void UnsafeOnCompleted(Action continuation) => ThreadPool.UnsafeQueueUserWorkItem(switchToCallback, continuation);
static void Callback(object state)
{
var continuation = (Action)state;
continuation();
}
}
}
}
2023年6月2日 更新
把 ToMainThreadAsync()
方法改属性 ToOtherThread
、ToOtherThreadAsync()
方法改属性 ToOtherThread
,调用更简洁(少key单词,少key符号,版面看着更连贯)。
PS:源码自己改,就不为了两个字符的变革重新写一遍上面的代码了
2023年7月5日 更新
做成 upm 包,方便维护和使用
https://github.com/Bian-Sh/Loom