KubeEdge分析-topic整理
前言
在《KubeEdge分析-mapper与deviceTwin交互流程》,已经分析了部分topic以及twin与mapper的交互,但是没有对topic进行一个完整的整理,
这里我们整理下目前所有的topic以及所需的消息格式。
结论
内容比较多,所以先把结论提前
topic | 发布者(publish) | 订阅者(subscribe) | 用途简介 | mapper是否必须实现 |
---|---|---|---|---|
$hw/events/node/+/membership/updated | edgecore | mapper | 订阅设备列表的变化 | 建议实现 |
$hw/events/node/+/membership/get | mapper | edgecore | 查询设备列表 | 建议实现 |
$hw/events/node/+/membership/get/result | edgecore | mapper | 获取查询设备列表的结果 | 建议实现 |
$hw/events/device/+/updated | edgecore | mapper | 订阅设备属性描述的变化 | 否 |
$hw/events/device/+/twin/update/result | edgecore | mapper | 获取设备属性更新是否成功 | 建议实现 |
$hw/events/device/+/twin/update/delta | edgecore | mapper | 获取设备属性更新的值 | 是 |
$hw/events/device/+/twin/update/document | edgecore | mapper | 获取设备属性更新的操作记录 | 否 |
$hw/events/device/+/twin/get/result | edgecore | mapper、apps | 返回获取设备属性的值 | 否 |
$hw/events/device/+/twin/update | mapper | edgecore | 通知设备属性的值更新 | 是 |
$hw/events/device/+/twin/get | mapper,apps | edgecore | 获取设备属性的值 | 否 |
$hw/events/device/+/state/update | mapper | edgecore | 通知设备状态更新 | 建议实现 |
$hw/events/device/+/state/update/result | edgecore | mapper | 获取设备状态更新结果 | 否 |
$ke/events/device/+/data/update | mapper | apps | 获取设备属性的时序数据 | 是 |
$hw/events/upload/# | 暂无 | edgecore | 转发云端 | 否 |
SYS/dis/upload_records | 暂无 | edgecore | 转发云端 | 否 |
topics
之前的文档中就提过,在官方文档中的message_topics的topic列表和eventbus文档中的是不一样的
- message_topics文档
1. "$hw/events/node/+/membership/get"
2. "$hw/events/device/+/state/update"
3. "$hw/events/device/+/twin/+"
4. "$hw/events/upload/#"
5. "SYS/dis/upload_records"
6. "$ke/events/+/device/data/update"
- eventbus文档
- $hw/events/upload/#
- SYS/dis/upload_records
- SYS/dis/upload_records/+
- $hw/event/node/+/membership/get
- $hw/event/node/+/membership/get/+
- $hw/events/device/+/state/update
- $hw/events/device/+/state/update/+
- $hw/event/device/+/twin/+
下面我们就根据代码来确认一下
eventbus代码中的topic
在edgecore中,只有eventbus会和mqtt broker进行交互,所以,eventbus中代码中的topic应该是最准确的。
见edge\pkg\eventbus\mqtt\client.go
const UploadTopic = "SYS/dis/upload_records"
var (
// MQTTHub client
MQTTHub *Client
// GroupID stands for group id
GroupID string
// ConnectedTopic to send connect event
ConnectedTopic = "$hw/events/connected/%s"
// DisconnectedTopic to send disconnect event
DisconnectedTopic = "$hw/events/disconnected/%s"
// MemberGet to get membership device
MemberGet = "$hw/events/edgeGroup/%s/membership/get"
// MemberGetRes to get membership device
MemberGetRes = "$hw/events/edgeGroup/%s/membership/get/result"
// MemberDetail which edge-client should be pub when service start
MemberDetail = "$hw/events/edgeGroup/%s/membership/detail"
// MemberDetailRes MemberDetail topic resp
MemberDetailRes = "$hw/events/edgeGroup/%s/membership/detail/result"
// MemberUpdate updating of the twin
MemberUpdate = "$hw/events/edgeGroup/%s/membership/updated"
// GroupUpdate updates a edgegroup
GroupUpdate = "$hw/events/edgeGroup/%s/updated"
// GroupAuthGet get temperary aksk from cloudhub
GroupAuthGet = "$hw/events/edgeGroup/%s/authInfo/get"
// GroupAuthGetRes temperary aksk from cloudhub
GroupAuthGetRes = "$hw/events/edgeGroup/%s/authInfo/get/result"
// SubTopics which edge-client should be sub
SubTopics = []string{
"$hw/events/upload/#",
"$hw/events/device/+/state/update",
"$hw/events/device/+/twin/+",
"$hw/events/node/+/membership/get",
UploadTopic,
}
)
devicetwin中的topic
// LifeCycleConnectETPrefix the topic prefix for connected event
LifeCycleConnectETPrefix = "$hw/events/connected/"
// LifeCycleDisconnectETPrefix the topic prefix for disconnected event
LifeCycleDisconnectETPrefix = "$hw/events/disconnected/"
// MemETPrefix the topic prefix for membership event
MemETPrefix = "$hw/events/node/"
// MemETUpdateSuffix the topic suffix for membership updated event
MemETUpdateSuffix = "/membership/updated"
// MemETDetailSuffix the topic suffix for membership detail
MemETDetailSuffix = "/membership/detail"
// MemETDetailResultSuffix the topic suffix for membership detail event
MemETDetailResultSuffix = "/membership/detail/result"
// MemETGetSuffix the topic suffix for membership get
MemETGetSuffix = "/membership/get"
// MemETGetResultSuffix the topic suffix for membership get event
MemETGetResultSuffix = "/membership/get/result"
// DeviceETPrefix the topic prefix for device event
DeviceETPrefix = "$hw/events/device/"
// TwinETUpdateSuffix the topic suffix for twin update event
TwinETUpdateSuffix = "/twin/update"
// TwinETUpdateResultSuffix the topic suffix for twin update result event
TwinETUpdateResultSuffix = "/twin/update/result"
// TwinETGetSuffix the topic suffix for twin get
TwinETGetSuffix = "/twin/get"
// TwinETGetResultSuffix the topic suffix for twin get event
TwinETGetResultSuffix = "/twin/get/result"
// TwinETCloudSyncSuffix the topic suffix for twin sync event
TwinETCloudSyncSuffix = "/twin/cloud_updated"
// TwinETEdgeSyncSuffix the topic suffix for twin sync event
TwinETEdgeSyncSuffix = "/twin/edge_updated"
// TwinETDeltaSuffix the topic suffix for twin delta event
TwinETDeltaSuffix = "/twin/update/delta"
// TwinETDocumentSuffix the topic suffix for twin document event
TwinETDocumentSuffix = "/twin/update/document"
// DeviceETUpdatedSuffix the topic suffix for device updated event
DeviceETUpdatedSuffix = "/updated"
// DeviceETStateUpdateSuffix the topic suffix for device state update event
DeviceETStateUpdateSuffix = "/state/update"
// DeviceETStateGetSuffix the topipc suffix for device state get event
DeviceETStateGetSuffix = "/state/get"
本来觉得是eventbus中定义了所有topic的,但是发现devicetwin中又定义了一些,这里有些应该是和eventbus中重复的。
分析完源码回过头来看,eventbus是支持其他module告诉他要处理些什么topic的,所以这里是有可能device twinm module告诉eventbus要订阅、发布哪些topic。
eventbus订阅
eventbus代码中定义了很多topic,但是这些topic目前有没有用到,需要一一确认。
InitInternalTopics(edge\pkg\eventbus\mqtt\server.go)中会对SubTopics中topic都进行订阅
// InitInternalTopics sets internal topics to server by default.
func (m *Server) InitInternalTopics() {
for _, v := range SubTopics {
m.tree.Set(v, packet.Subscription{Topic: v, QOS: packet.QOS(m.qos)})
klog.Infof("Subscribe internal topic to %s", v)
}
}
eventbus处理
// OnSubMessageReceived msg received callback
func OnSubMessageReceived(client MQTT.Client, message MQTT.Message) {
klog.Infof("OnSubMessageReceived receive msg from topic: %s", message.Topic())
// for "$hw/events/device/+/twin/+", "$hw/events/node/+/membership/get", send to twin
// for other, send to hub
// for topic, no need to base64 topic
var target string
resource := base64.URLEncoding.EncodeToString([]byte(message.Topic()))
if strings.HasPrefix(message.Topic(), "$hw/events/device") || strings.HasPrefix(message.Topic(), "$hw/events/node") {
target = modules.TwinGroup
} else {
target = modules.HubGroup
if message.Topic() == UploadTopic {
resource = UploadTopic
}
}
// routing key will be $hw.<project_id>.events.user.bus.response.cluster.<cluster_id>.node.<node_id>.<base64_topic>
msg := model.NewMessage("").BuildRouter(modules.BusGroup, "user",
resource, "response").FillBody(string(message.Payload()))
klog.Info(fmt.Sprintf("received msg from mqttserver, deliver to %s with resource %s", target, resource))
beehiveContext.SendToGroup(target, *msg)
}
eventbus内部mqtt
eventbus内部是有个mqtt server的,也就是说eventbus可以不依赖外部的mqttbroker来执行。
通过设置mqttMode为0来指定使用内部的mqttbroker,默认是2.
因此,eventbus中的edge\pkg\eventbus\mqtt\server.go,是不用看的,不要被相关代码干扰。
module name与group
由于整个beehive框架都是依赖module name 和group来分发消息的,而edgecore中涉及的module又特别多,所以有必要梳理一下:
模块 | module name | module group |
---|---|---|
devicetwin | DeviceTwinModuleName | TwinGroup |
edged | EdgedModuleName | EdgedGroup |
edgehub | ModuleNameEdgeHub | HubGroup |
edgestream | ModuleNameEdgeHub | GroupNameEdgeStream |
eventbus | "eventbus" | BusGroup |
metamanager | MetaManagerModuleName | MetaGroup |
servicebus | "servicebus" | BusGroup |
stub | "stubCloudHub" | MetaGroup |
这里看,其实还是挺乱的,有些直接用来字符安,有的定义了常量
订阅的topic梳理
SYS/dis/upload_records
从message处理方法看,对这个topic的消息,收到后的处理是把resource设置为“SYS/dis/upload_records”,targe设置为“HubGroup”,然后通过beehive送到对应的模块处理.
HubGroup的消息会被beehive发送给edgehub(edge\pkg\edgehub\edgehub.go)
go eh.routeToEdge()
go eh.routeToCloud()
go eh.keepalive()
edgehub主要就启了这3个goroutine,
- routeToEdge, 这个是从websocket中取消息,然后根据消息的group通过beehive,分发到对应的模块中取处理
- routeToCloud,这个是从beehive中取EdgeHub的消息(根据ModuleNameEdgeHub),去掉的消息,通过sendToCloud发送到云端(这里每次发送回起一个KeepChannel,等待云端的返回,超时后,会删掉这个通道)
- keepalive就是定时往云端发心跳了。
SYS/dis/upload_records小结
综上,可以看出,SYS/dis/upload_records收到的消息,会通过edgehub送到云上。
但是目前的代码中,是没有哪个组件会发送这个topic的,云端也没有对应的处理流程(待定)。
$hw/events/upload/#
从eventbus的OnSubMessageReceived中可以看出,除了“hw/events/node”的开头的topic,其他topic都是丢给edgehub处理的。
和之前的SYS/dis/upload_records相比,区别在于source的设置,两者的resource是不一样的。resource会影响最终的消息路由,但是在edge节点的处理上,是没啥区别的,这个resource影响最终云端收到消息后的处理(后续再仔细分析)
$hw/events/upload/#小结
综上,$hw/events/upload/#也是直接把消息送到云端。
$hw/events/device/+/state/update
从之前的分析中可知,这个topic就是交给device twin模块来处理的,
func initEventActionMap() {
EventActionMap = make(map[string]map[string]string)
EventActionMap[dtcommon.MemETPrefix] = make(map[string]string)
EventActionMap[dtcommon.DeviceETPrefix] = make(map[string]string)
EventActionMap[dtcommon.MemETPrefix][dtcommon.MemETDetailResultSuffix] = dtcommon.MemDetailResult
EventActionMap[dtcommon.MemETPrefix][dtcommon.MemETUpdateSuffix] = dtcommon.MemUpdated
EventActionMap[dtcommon.MemETPrefix][dtcommon.MemETGetSuffix] = dtcommon.MemGet
EventActionMap[dtcommon.DeviceETPrefix][dtcommon.DeviceETStateGetSuffix] = dtcommon.DeviceStateGet
EventActionMap[dtcommon.DeviceETPrefix][dtcommon.DeviceETUpdatedSuffix] = dtcommon.DeviceUpdated
EventActionMap[dtcommon.DeviceETPrefix][dtcommon.DeviceETStateUpdateSuffix] = dtcommon.DeviceStateUpdate
EventActionMap[dtcommon.DeviceETPrefix][dtcommon.TwinETUpdateSuffix] = dtcommon.TwinUpdate
EventActionMap[dtcommon.DeviceETPrefix][dtcommon.TwinETCloudSyncSuffix] = dtcommon.TwinCloudSync
EventActionMap[dtcommon.DeviceETPrefix][dtcommon.TwinETGetSuffix] = dtcommon.TwinGet
}
dt的actionmap中,定义了相关操作是DeviceStateUpdate,这个又对应了dealDeviceStateUpdate操作。
dealDeviceStateUpdate会同时更新cloud和edge,这里也先待定,暂时不展开分析
$hw/events/device/+/twin/+
这里有个坑,topic写的是twin/+,从edge\pkg\devicetwin\dtcommon\common.go可以看出,这个twin/+能匹配的消息包括:
- /twin/update
- /twin/get
和上面类似,这个topic分别对应的是EventActionMap中的DeviceETUpdatedSuffix和TwinETGetSuffix,也就是分别对应了dealTwinUpdate和dealTwinGet两个方法
$hw/events/device/+/twin/get
dealTwinGet --> DealGetTwin -->BuildDeviceTwinResult
//BuildDeviceTwinResult build device twin result, 0:get,1:update,2:sync
func BuildDeviceTwinResult(baseMessage BaseMessage, twins map[string]*MsgTwin, dealType int) ([]byte, error) {
result := make(map[string]*MsgTwin)
if dealType == 0 {
for k, v := range twins {
if v == nil {
result[k] = nil
continue
}
if v.Metadata != nil && strings.Compare(v.Metadata.Type, "deleted") == 0 {
continue
}
twin := *v
twin.ActualVersion = nil
twin.ExpectedVersion = nil
result[k] = &twin
}
} else {
result = twins
}
payload, err := json.Marshal(DeviceTwinResult{BaseMessage: baseMessage, Twin: result})
if err != nil {
return []byte(""), err
}
return payload, nil
}
这里这里的dealType传入的是0,所以这里是从数据库中查出数据里,放到twin中,放的时候,ActualVersion和ExpectedVersion都会被设置为nil.
不过我觉得mapper是没有必要去发送这个get请求的,这里应该是为其他第三方的应用准备的,当第三方应用需要数据时,就发一个twin get消息给edgecore。
$hw/events/device/+/twin/update
dealTwinUpdate-->Updated-->DealDeviceTwin-->DealMsgTwin-->dealUpdateResult
-->dealDocument
-->dealDelta
-->dealSyncResult
DealDeviceTwin是个很长的方法,之前《KubeEdge分析-mapper与deviceTwin交互流程》 中有详细分析过,这里就不详细分析了,简要概况就是要更新edge的数据库,然后发送到云端
$hw/events/node/+/membership/get
这个topic根据之前的分析,是交给twin模块来处理的,最终处理的方法是
dealMembershipGet --> dealMembershipGetInner --> dealMembershipGetInner-->BuildMembershipGetResult
func BuildMembershipGetResult(baseMessage BaseMessage, devices []*Device) ([]byte, error) {
result := make([]Device, 0, len(devices))
for _, v := range devices {
result = append(result, Device{
ID: v.ID,
Name: v.Name,
Description: v.Description,
State: v.State,
LastOnline: v.LastOnline,
Attributes: v.Attributes})
}
payload, err := json.Marshal(MembershipGetResult{BaseMessage: baseMessage, Devices: result})
if err != nil {
return []byte(""), err
}
return payload, nil
}
这里就是返回节点上的device列表
$hw/events/connected/%s
代码中只有Twin模块有对相关topic的处理,在event模块中,并没找到订阅相关的消息。
在twin中的处理流程如下:
runDeviceTwin-->distributeMsg-->classifyMsg-->dtcommon.LifeCycle-->dealLifeCycle
$hw/events/connected/%s小结
从名称上看,这个topic是用来mapper发现了新增设备,上报给edgecore的,但是从实现上看,并没有对这个topic进行处理。
$hw/events/disconnected/%s
和上面的connected topic类似,目前应该也没有监听
其他
// MemberGet to get membership device
MemberGet = "$hw/events/edgeGroup/%s/membership/get"
// MemberGetRes to get membership device
MemberGetRes = "$hw/events/edgeGroup/%s/membership/get/result"
// MemberDetail which edge-client should be pub when service start
MemberDetail = "$hw/events/edgeGroup/%s/membership/detail"
// MemberDetailRes MemberDetail topic resp
MemberDetailRes = "$hw/events/edgeGroup/%s/membership/detail/result"
// MemberUpdate updating of the twin
MemberUpdate = "$hw/events/edgeGroup/%s/membership/updated"
// GroupUpdate updates a edgegroup
GroupUpdate = "$hw/events/edgeGroup/%s/updated"
// GroupAuthGet get temperary aksk from cloudhub
GroupAuthGet = "$hw/events/edgeGroup/%s/authInfo/get"
// GroupAuthGetRes temperary aksk from cloudhub
GroupAuthGetRes = "$hw/events/edgeGroup/%s/authInfo/get/result"
这堆topic目前看也都是没有监听的,所以edgecore应该是不会进行处理的
发布的topic梳理
之前分析的都是edgecore监听的topic,现在看看edgecore会publish哪些topic
pubCloudMsgToEdge-->publish-->pubMQTT
func (eb *eventbus) pubCloudMsgToEdge() {
for {
select {
case <-beehiveContext.Done():
klog.Warning("EventBus PubCloudMsg To Edge stop")
return
default:
}
accessInfo, err := beehiveContext.Receive(eb.Name())
if err != nil {
klog.Errorf("Fail to get a message from channel: %v", err)
continue
}
operation := accessInfo.GetOperation()
resource := accessInfo.GetResource()
switch operation {
case "subscribe":
eb.subscribe(resource)
klog.Infof("Edge-hub-cli subscribe topic to %s", resource)
case "message":
body, ok := accessInfo.GetContent().(map[string]interface{})
if !ok {
klog.Errorf("Message is not map type")
return
}
message := body["message"].(map[string]interface{})
topic := message["topic"].(string)
payload, _ := json.Marshal(&message)
eb.publish(topic, payload)
case "publish":
topic := resource
var ok bool
// cloud and edge will send different type of content, need to check
payload, ok := accessInfo.GetContent().([]byte)
if !ok {
content := accessInfo.GetContent().(string)
payload = []byte(content)
}
eb.publish(topic, payload)
case "get_result":
if resource != "auth_info" {
klog.Info("Skip none auth_info get_result message")
return
}
topic := fmt.Sprintf("$hw/events/node/%s/authInfo/get/result", eventconfig.Config.NodeName)
payload, _ := json.Marshal(accessInfo.GetContent())
eb.publish(topic, payload)
default:
klog.Warningf("Action not found")
}
}
}
从beehive中取出消息,然后消息中的Operation来执行相关动作。Operation是定义在Router中的,设置operation有两个方法,一个是BuildMsg,另一个是BuildModelMessage。
BuildMsg调用的地方比较多,但都是metamanager调用的,metamanager主要是处理K8S的一些原生对象的,所以这里和eventbus无关。
./edge/pkg/eventbus
//constant defining node connection types
const (
ResourceTypeNodeConnection = "node/connection"
OperationNodeConnection = "publish"
SourceNodeConnection = "edgehub"
)
//BuildMsg returns message object with router and content details
func BuildMsg(group, parentID, sourceName, resource, operation string, content interface{}) *model.Message {
msg := model.NewMessage(parentID).BuildRouter(sourceName, group, resource, operation).FillBody(content)
return msg
}
//BuildModelMessage build mode messages
func (dtc *DTContext) BuildModelMessage(group string, parentID string, resource string, operation string, content interface{}) *model.Message {
msg := model.NewMessage(parentID)
msg.BuildRouter(modules.TwinGroup, group, resource, operation)
msg.Content = content
return msg
}
BuildModelMessage,有被发往云上的,也有发到busgroup的,这里就只看发到busgroup的。
目前看到的代码中的发往bus group的operation,都是"publish"类型的,所以目前会走到以下代码中
case "publish":
topic := resource
var ok bool
// cloud and edge will send different type of content, need to check
payload, ok := accessInfo.GetContent().([]byte)
if !ok {
content := accessInfo.GetContent().(string)
payload = []byte(content)
}
eb.publish(topic, payload)
这里梳理的所有通过BuildModelMessage
发布的topic列表
Membership update
topic := dtcommon.MemETPrefix + context.NodeName + dtcommon.MemETUpdateSuffix
$hw/events/node/+/membership/updated
当有设备添加或者删除的时候,会触发这个topic,mapper可以通过订阅这个topic来了解需要采集的设备的情况
Membership get result
topic := dtcommon.MemETPrefix + context.NodeName + dtcommon.MemETGetResultSuffix
$hw/events/node/+/membership/get/result
这个topic是响应membership get的请求,也就是mapper向edgecore查询有哪些设备
device updated
topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.DeviceETUpdatedSuffix
"$hw/events/device/+/updated"
这个topic是用来更新device model中的属性描述的(不是属性的值),也就是添加了一个新的属性或者更新一个属性的描述。
测试了更新设备model,发现并不能触发device updated的逻辑,从源码中看,目前只有设备的方法中有相关内容,但是设备首次添加,又不会触发这个update,所以不知道什么场景下才能触发了
twin update result
topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.TwinETUpdateResultSuffix
"$hw/events/device/+/twin/update/result"
这个topic是用来响应twin/update的,目前看,代码中用到的地方都是update失败的情况,也就是mapper向edgecore发送更新,twin/update/result返回更新结果
twin delta
topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.TwinETDeltaSuffix
"$hw/events/device/+/twin/update/delta"
这个topic是云端向mapper来发起更新property的值用的,比如云端想设置设备的某个属性值为1
twin Document
topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.TwinETDocumentSuffix
"$hw/events/device/+/twin/update/document"
这个topic是更新twin的时候附带的,document没有仔细研究,猜测这里放的是类似日志的东西,也就是上一个值是啥,这次更新为啥。
get twin result
topic := dtcommon.DeviceETPrefix + deviceID + dtcommon.TwinETGetResultSuffix
"$hw/events/device/+/twin/get/result"
这个topic是相应twin/get请求的,也就是mapper或者其他第三方应用可以通过twin/get topic来向edgecore查询device property的值,然后edgecore通过/twin/get/result把结果返回。
topic的内容
- $hw/events/device/sensor-tag-instance-12/twin/get
{
"event_id": "",
"timestamp": 1598527389
}
- $hw/events/node/edgenode1/membership/get/result
{
"event_id": "",
"timestamp": 1598527389979,
"devices": [{
"id": "sensor-tag-instance-10",
"name": "sensor-tag-instance-10",
"description": "TISimplelinkSensorTag"
}, {
"id": "sensor-tag-instance-11",
"name": "sensor-tag-instance-11",
"description": "TISimplelinkSensorTag"
}, {
"id": "camera",
"name": "camera",
"description": "camera"
} ]
}
- $hw/events/device/sensor-tag-instance-12/twin/update
{
"event_id": "",
"timestamp": 1598515893,
"twin": {
"temperature": {
"actual": {
"value": "111",
"metadata": {
"timestamp": 1598515893
}
},
"metadata": {
"type": "int"
}
}
}
}
- $hw/events/device/sensor-tag-instance-12/twin/update/delta
{
"event_id": "ccb9dbeb-f5c0-4f2b-9cb7-f023b688f586",
"timestamp": 1598515893657,
"twin": {
"temperature": {
"expected": {
"value": "2",
"metadata": {
"timestamp": 1598513321701
}
},
"actual": {
"value": "111",
"metadata": {
"timestamp": 1598515893648
}
},
"optional": false,
"metadata": {
"type": "int"
}
}
},
"delta": {
"temperature": "2"
}
}
- $hw/events/device/sensor-tag-instance-12/twin/update/document
{
"event_id": "",
"timestamp": 1598515893647,
"twin": {
"temperature": {
"last": {
"expected": {
"value": "2",
"metadata": {
"timestamp": 1598513321701
}
},
"optional": false,
"metadata": {
"type": "int"
}
},
"current": {
"expected": {
"value": "2",
"metadata": {
"timestamp": 1598513321701
}
},
"actual": {
"value": "111",
"metadata": {
"timestamp": 1598515893648
}
},
"optional": false,
"metadata": {
"type": "int"
}
}
}
}
}
- $hw/events/node/edgenode1/membership/updated
{
"event_id": "16ebbe71-5b04-4e17-ae37-80091bafaffc",
"timestamp": 1598513321706,
"added_devices": [{
"id": "sensor-tag-instance-12",
"name": "sensor-tag-instance-12",
"description": "TISimplelinkSensorTag",
"twin": {
"temperature": {
"expected": {
"value": "2",
"metadata": {
"timestamp": 1598513321688
}
},
"optional": false,
"metadata": {
"type": "int"
}
}
}
}],
"removed_devices": null
}
- $hw/events/node/edgenode1/membership/get
{
"event_id": "",
"timestamp": 1598527389
}
- $hw/events/device/sensor-tag-instance-12/twin/get/result
{
"event_id": "",
"timestamp": 1598527389979,
"twin": {
"temperature": {
"expected": {
"value": "2",
"metadata": {
"timestamp": 1598513321701
}
},
"actual": {
"value": "111",
"metadata": {
"timestamp": 1598515893648
}
},
"optional": false,
"metadata": {
"type": "int"
}
}
}
}
- $hw/events/device/sensor-tag-instance-12/state/update
{
"event_id": "",
"timestamp": 1598527716,
"state": "online"
}
- $hw/events/device/sensor-tag-instance-12/state/update/result
{
"event_id": "c85f7e9c-4251-4386-99ce-869af0051af7",
"timestamp": 1598527716708,
"device": {
"name": "sensor-tag-instance-12",
"state": "online",
"last_online": "2020-08-27 07:28:36"
}
}
- $ke/events/device/sensor-tag-instance-12/data/update
{
"event_id": "123e4567-e89b-12d3-a456-426655440000",
"timestamp": 1597213444,
"data": {
"propertyName1": {
"value": "123",
"metadata": {
"timestamp": 1597213444, //+optional
"type": "int"
}
},
"propertyName2": {
"value": "456",
"metadata": {
"timestamp": 1597213444,
"type": "int"
}
}
}
}
- $hw/events/upload/
目前看,这个topic看上去对内容没有限制,传什么都可以,随便测试了些数据,从edgecore能看到以下日志:
Aug 27 07:54:17 edgenode1 edgecore[23779]: I0827 07:54:17.441902 23779 client.go:94] OnSubMessageReceived receive msg from topic: $hw/events/upload/
Aug 27 07:54:17 edgenode1 edgecore[23779]: I0827 07:54:17.441950 23779 client.go:111] received msg from mqttserver, deliver to hub with resource JGh3L2V2ZW50cy91cGxvYWQv
- SYS/dis/upload_records
目前看,这个topic看上去对内容也没有限制,传什么都可以,随便测试了些数据,从edgecore能看到以下日志:
Aug 27 07:54:17 edgenode1 edgecore[23779]: I0827 07:54:17.441963 23779 client.go:94] OnSubMessageReceived receive msg from topic: SYS/dis/upload_records
Aug 27 07:54:17 edgenode1 edgecore[23779]: I0827 07:54:17.441972 23779 client.go:111] received msg from mqttserver, deliver to hub with resource SYS/dis/upload_records