Edgex Messagebus

EdgeX中messagebus功能整理 #

CoreCommand #

SubscribeCommandRequests #

(interal/core/command/controller/messaging/internal.go)

订阅来自EdgeX service的command请求,并将请求转发给DeviceService。

// SubscribeCommandRequests subscribes command requests from EdgeX service (e.g., Application Service)
// and forwards them to the appropriate Device Service via internal MessageBus
func SubscribeCommandRequests(ctx context.Context, requestTimeout time.Duration, dic *di.Container) errors.EdgeX {
	.....

	go func() {
		for {
			select {
			case <-ctx.Done():
				lc.Infof("Exiting waiting for MessageBus '%s' topic messages", requestCommandTopic)
				return
			case err = <-messageErrors:
				lc.Error(err.Error())
			case requestEnvelope := <-messages:
				processDeviceCommandRequest(messageBus, requestEnvelope, baseTopic, requestTimeout, lc, dic)
			}
		}
	}()


	return nil
}

processDeviceCommandRequest首先将得到的requestEnvelope拆解ReceivedTopic,分别获得deviceName、commandName、method,接着拼接出请求这个device的deviceRequestTopic,向这个device发起请求,将device响应结果返还给一开始请求的服务。

func processDeviceCommandRequest(...) {
	...

	// internal response topic scheme: <ResponseTopicPrefix>/<service-name>/<request-id>
    // 这个应该是还给发给他消息的topic
	internalResponseTopic := common.BuildTopic(baseTopic, common.ResponseTopic, common.CoreCommandServiceKey, requestEnvelope.RequestID)
	topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/")
	length := len(topicLevels)
	...
    
	// expected internal command request/response topic scheme: #/<device>/<command-name>/<method>
	deviceName := topicLevels[length-3]
	commandName, err := url.QueryUnescape(topicLevels[length-2])
	...
	method := topicLevels[length-1]
	...

	topicPrefix := common.BuildTopic(baseTopic, common.CoreCommandDeviceRequestPublishTopic)
	// internal command request topic scheme: <DeviceRequestTopicPrefix>/<device-service>/<device>/<command-name>/<method>
    // 这里做了验证,并且得到请求的devicetopic
	deviceServiceName, deviceRequestTopic, err := validateRequestTopic(topicPrefix, deviceName, commandName, method, dic)
	...
    // 构建需要请求device的topic
	deviceResponseTopicPrefix := common.BuildTopic(baseTopic, common.ResponseTopic, deviceServiceName)

	...
    // 获取device的响应
	response, err := messageBus.Request(requestEnvelope, deviceRequestTopic, deviceResponseTopicPrefix, requestTimeout)
	...

	// original request is from internal MessageBus
    // 返回给最开始请求过来的地方
	err = messageBus.Publish(*response, internalResponseTopic)
	...
}
sequenceDiagram
    Application->>Core-Command: core/command/request/#
    Core-Command->>Application: <ResponseTopicPrefix>/<service-name>/<request-id>
    Core-Command->>Device:<DeviceRequestTopicPrefix>/<device-service>/<device>/<command-name>/<method>
    Device->>Core-Command:<ResopnseTopic>/<deviceServiceName>/<request-id> 

SubscribeCommandQueryRequests #

(interal/core/command/controller/messaging/internal.go)

EdgeX service发起command query请求,来获取命令

// SubscribeCommandQueryRequests subscribes command query requests from EdgeX service (e.g., Application Service)
// via internal MessageBus
func SubscribeCommandQueryRequests(ctx context.Context, dic *di.Container) errors.EdgeX {
	....
	go func() {
		for {
			select {
			case <-ctx.Done():
				lc.Infof("Exiting waiting for MessageBus '%s' topic messages", queryRequestTopic)
				return
			case err = <-messageErrors:
				lc.Error(err.Error())
			case requestEnvelope := <-messages:
				processCommandQueryRequest(messageBus, requestEnvelope, baseTopic, lc, dic)
			}
		}
	}()

	return nil
}
func processCommandQueryRequest(...) {

    ...
	// example topic scheme: /commandquery/request/<device>
	// deviceName is expected to be at last topic level.
    // 拆分topic,得到deviceName
	topicLevels := strings.Split(requestEnvelope.ReceivedTopic, "/")
	deviceName := topicLevels[len(topicLevels)-1]
	if strings.EqualFold(deviceName, common.All) {
		deviceName = common.All
	}
	
    // 根据设备名称,获取command列表
	responseEnvelope, err := getCommandQueryResponseEnvelope(requestEnvelope, deviceName, dic)
	...

	// internal response topic scheme: <ResponseTopicPrefix>/<service-name>/<request-id>
    // 构建响应topic <ResponseTopicPrefix>/<service-name>/<request-id>
	internalQueryResponseTopic := common.BuildTopic(baseTopic, common.ResponseTopic, common.CoreCommandServiceKey, requestEnvelope.RequestID)
	...
	
    // 发回响应
	err = messageBus.Publish(responseEnvelope, internalQueryResponseTopic)
	...
}
// getCommandQueryResponseEnvelope returns the MessageEnvelope containing the DeviceCoreCommand payload bytes
func getCommandQueryResponseEnvelope(requestEnvelope types.MessageEnvelope, deviceName string, dic *di.Container) (types.MessageEnvelope, error) {
	var commandsResponse any
	var err error

	switch deviceName {
	case common.All:
		offset, limit := common.DefaultOffset, common.DefaultLimit
        // 根据QueryParams设置limit和offset
		if requestEnvelope.QueryParams != nil {
			if offsetRaw, ok := requestEnvelope.QueryParams[common.Offset]; ok {
				offset, err = strconv.Atoi(offsetRaw)
				...
			}
			if limitRaw, ok := requestEnvelope.QueryParams[common.Limit]; ok {
				limit, err = strconv.Atoi(limitRaw)
				...
			}
		}
		// 根据limit和offset获取所有command
		commands, totalCounts, edgexError := application.AllCommands(offset, limit, dic)
		...
		
        // 构造返回结果响应
		commandsResponse = responses.NewMultiDeviceCoreCommandsResponse(requestEnvelope.RequestID, "", http.StatusOK, totalCounts, commands)
	default:
        // 根据devicename获取所有command
		commands, edgexError := application.CommandsByDeviceName(deviceName, dic)
		...
		// 构造返回结果响应
		commandsResponse = responses.NewDeviceCoreCommandResponse(requestEnvelope.RequestID, "", http.StatusOK, commands)
	}
	
	responseBytes, err := json.Marshal(commandsResponse)
	...
	
	responseEnvelope, err := types.NewMessageEnvelopeForResponse(responseBytes, requestEnvelope.RequestID, requestEnvelope.CorrelationID, common.ContentTypeJSON)
	...
	
	return responseEnvelope, nil
}
sequenceDiagram
	Application->>Core-Command: core/commandquery/request/#
	Core-Command->>Application: <ResponseTopicPrefix>/<service-name>/<request-id>

CoreData #

SubscribeEvents #

(interal/core/data/controller/messaging/subscriber.go)

当外界增加了event内容以后,会发布一个event相关的topic,CoreData负责订阅这个topic,并调用自身的AddEvent

// SubscribeEvents subscribes to events from message bus
func SubscribeEvents(ctx context.Context, dic *di.Container) errors.EdgeX {
	...

	go func() {
		for {
			select {
			...
			case msgEnvelope := <-messages:
				...
				event := &requests.AddEventRequest{}
				// decoding the large payload may cause memory issues so checking before decoding
				maxEventSize := dataContainer.ConfigurationFrom(dic.Get).MaxEventSize
				edgeXerr := utils.CheckPayloadSize(msgEnvelope.Payload, maxEventSize*1024)
				...
				err = unmarshalPayload(msgEnvelope, event)
				...
                // 验证Event
				err = validateEvent(msgEnvelope.ReceivedTopic, event.Event)
				...
                // 调用app.AddEvent
				err = app.AddEvent(requests.AddEventReqToEventModel(*event), ctx, dic)
				...
			}
		}
	}()

	return nil
}
sequenceDiagram
	Others->>Core-CoreData: events/device/#
	Core-CoreData-->>Core-CoreData: AddEvent

PublishEvent #

(interal/core/data/application/event.go)

当别的应用调用CoreData的httpcontroller使用这个AddEvent的时候,会开一个协程,将这个Add事件放到messageBus上

func (ec *EventController) AddEvent(w http.ResponseWriter, r *http.Request) {
	....
    // 开一个协程,发布event
		go ec.app.PublishEvent(dataBytes, serviceName, profileName, deviceName, sourceName, ctx, ec.dic)
	....
}
// PublishEvent publishes incoming AddEventRequest in the format of []byte through MessageClient
func (a *CoreDataApp) PublishEvent(data []byte, serviceName string, profileName string, deviceName string, sourceName string, ctx context.Context, dic *di.Container) {
	...
    // 构造topic
	publishTopic := common.BuildTopic(basePrefix, common.EventsPublishTopic, CoreDataEventTopicPrefix, serviceName, profileName, deviceName, url.QueryEscape(sourceName))
	...
    // 转换内容
	msgEnvelope := msgTypes.NewMessageEnvelope(data, ctx)
    // 进行发布
	err := msgClient.Publish(msgEnvelope, publishTopic)
	...
}
sequenceDiagram
	Http-->>Core-CoreData: 访问AddEvent的url
	Core-CoreData-->>Core-CoreData: AddEvent
	Core-CoreData->>messageBus: events/core/<service>/<profile>/<device>/<source>

CoreMetaData #

device #

AddDevice #
func AddDevice(d models.Device, ctx context.Context, dic *di.Container) (id string, edgeXerr errors.EdgeX) {
	...
	// 添加device
	addedDevice, err := dbClient.AddDevice(d)
	...

	deviceDTO := dtos.FromDeviceModelToDTO(addedDevice)
    // 开一个协程,发布这个add的系统事件
	go publishSystemEvent(common.DeviceSystemEventType, common.SystemEventActionAdd, d.ServiceName, deviceDTO, ctx, dic)

	return addedDevice.Id, nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问AddDevice的url
	Core-MetaData-->>Core-MetaData: AddDevice
	Core-MetaData->>messageBus: system-events/core-metadata/device/add/<deviceServiceName>
	Core-MetaData-->>Http: 返回响应
DeleteDeviceByName #
// DeleteDeviceByName deletes the device by name
func DeleteDeviceByName(name string, ctx context.Context, dic *di.Container) errors.EdgeX {
	...
    // 删除这个设备
	err = dbClient.DeleteDeviceByName(name)
	...
	deviceDTO := dtos.FromDeviceModelToDTO(device)
    // 开一个协程发送发布这个消息
	go publishSystemEvent(common.DeviceSystemEventType, common.SystemEventActionDelete, device.ServiceName, deviceDTO, ctx, dic)

	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问DeleteDeviceByName的url
	Core-MetaData-->>Core-MetaData: DeleteDeviceByName
	Core-MetaData->>messageBus: system-events/core-metadata/device/delete/<deviceServiceName>
	Core-MetaData-->>Http: 返回响应
PatchDevice #
// PatchDevice executes the PATCH operation with the device DTO to replace the old data
func PatchDevice(dto dtos.UpdateDevice, ctx context.Context, dic *di.Container) errors.EdgeX {
	// Old service name is used for invoking callback
	var oldServiceName string
	if dto.ServiceName != nil && *dto.ServiceName != device.ServiceName {
		oldServiceName = device.ServiceName
	}
    ...
	// 更新device
	err = dbClient.UpdateDevice(device)
    ...
	
    // 如果修改了deviceService
	if oldServiceName != "" {
		go publishSystemEvent(common.DeviceSystemEventType, common.SystemEventActionUpdate, oldServiceName, deviceDTO, ctx, dic)
	}

	go publishSystemEvent(common.DeviceSystemEventType, common.SystemEventActionUpdate, device.ServiceName, deviceDTO, ctx, dic)

	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问PatchDevice的url
	Core-MetaData-->>Core-MetaData: PatchDevice
	opt 如果修改了deviceService
		Core-MetaData->>messageBus: system-events/core-metadata/device/update/<oldDeviceServiceName>
	end
	Core-MetaData->>messageBus: system-events/core-metadata/device/update/<deviceServiceName>
	Core-MetaData-->>Http: 返回响应

devicecommand #

AddDeviceProfileDeviceCommand #
func AddDeviceProfileDeviceCommand(profileName string, deviceCommand models.DeviceCommand, ctx context.Context, dic *di.Container) errors.EdgeX {
    ...
    // 获取deviceProfile
	profile, err := dbClient.DeviceProfileByName(profileName)
	...
    // 更新profile的deviceCommand
	profile.DeviceCommands = append(profile.DeviceCommands, deviceCommand)
	...
    // 更新deviceProfile
	err = dbClient.UpdateDeviceProfile(profile)
    ...
    // 异步发布更新DeviceProfile系统事件
	go publishUpdateDeviceProfileSystemEvent(profileDTO, ctx, dic)

	return nil
}
func publishUpdateDeviceProfileSystemEvent(profileDTO dtos.DeviceProfile, ctx context.Context, dic *di.Container) {
	...
	//Publish general system event regardless of associated devices
	publishSystemEvent(common.DeviceProfileSystemEventType, common.SystemEventActionUpdate, common.CoreMetaDataServiceKey, profileDTO, ctx, dic)
	// Publish system event for each device service
	dsMap := make(map[string]bool)
	for _, d := range devices {
		if _, ok := dsMap[d.ServiceName]; ok {
			// skip the invoked device service
			continue
		}
		dsMap[d.ServiceName] = true

		publishSystemEvent(common.DeviceProfileSystemEventType, common.SystemEventActionUpdate, d.ServiceName, profileDTO, ctx, dic)
	}
}
sequenceDiagram
	Http-->>Core-MetaData: 访问AddDeviceProfileDeviceCommand的url
	Core-MetaData-->>Core-MetaData: AddDeviceProfileDeviceCommand
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/core-metadata
	loop  DevicesByProfileName,更新与profile相关的设备
		Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/<deviceServiceName>
	end
	Core-MetaData-->>Http: 返回响应
PatchDeviceProfileDeviceCommand #
func PatchDeviceProfileDeviceCommand(profileName string, dto dtos.UpdateDeviceCommand, ctx context.Context, dic *di.Container) errors.EdgeX {
	...
    // 根据名字获取profile
	profile, err := dbClient.DeviceProfileByName(profileName)
	...

	// Find matched deviceCommand
    // 找到匹配的deviceCommand
	index := -1
	for i := range profile.DeviceCommands {
		if profile.DeviceCommands[i].Name == *dto.Name {
			index = i
			break
		}
	}
	...
    // 更新profile.DeviceCommands
	requests.ReplaceDeviceCommandModelFieldsWithDTO(&profile.DeviceCommands[index], dto)
	// 更新deviceProfile
	err = dbClient.UpdateDeviceProfile(profile)
	// 开一个协程发布Profile更新,逻辑与上面的相同
	go publishUpdateDeviceProfileSystemEvent(profileDTO, ctx, dic)

	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问PatchDeviceProfileDeviceCommand的url
	Core-MetaData-->>Core-MetaData: PatchDeviceProfileDeviceCommand
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/core-metadata
	loop  DevicesByProfileName,更新与profile相关的设备
		Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/<deviceServiceName>
	end
	Core-MetaData-->>Http: 返回响应
DeleteDeviceCommandByName #
func DeleteDeviceCommandByName(profileName string, commandName string, ctx context.Context, dic *di.Container) errors.EdgeX {
	...
    // 确保这个profile没有绑定的device
	devices, err := dbClient.DevicesByProfileName(0, 1, profileName)
    if err != nil {
		return errors.NewCommonEdgeXWrapper(err)
	} else if len(devices) > 0 {
		return errors.NewCommonEdgeX(errors.KindStatusConflict, "fail to update the device profile when associated device exists", nil)
	}
	...
    // 根据profileName来获取DeviceProfile
	profile, err := dbClient.DeviceProfileByName(profileName)
	...
	// 找到命令后,删除命令
	profile.DeviceCommands = append(profile.DeviceCommands[:index], profile.DeviceCommands[index+1:]...)
    ...
	// 更新profile
	err = dbClient.UpdateDeviceProfile(profile)
	...
    // 发布一个异步的profile更新事件
	go publishUpdateDeviceProfileSystemEvent(profileDTO, ctx, dic)
	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问DeleteDeviceCommandByName的url
	Core-MetaData-->>Core-MetaData: DeleteDeviceCommandByName
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/core-metadata
	loop  DevicesByProfileName,更新与profile相关的设备
		Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/<deviceServiceName>
	end
	Core-MetaData-->>Http: 返回响应

deviceprofile #

AddDeviceProfile #
// The AddDeviceProfile function accepts the new device profile model from the controller functions
// and invokes addDeviceProfile function in the infrastructure layer
func AddDeviceProfile(d models.DeviceProfile, ctx context.Context, dic *di.Container) (id string, err errors.EdgeX) {
	...
    // 添加一个deviceprofile
	addedDeviceProfile, err := dbClient.AddDeviceProfile(d)
	...
    // 发布一个异步的添加deviceprofile的系统事件
	go publishSystemEvent(common.DeviceProfileSystemEventType, common.SystemEventActionAdd, common.CoreMetaDataServiceKey, profileDTO, ctx, dic)

	return addedDeviceProfile.Id, nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问AddDeviceProfile的url
	Core-MetaData-->>Core-MetaData: AddDeviceProfile
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/add/core-metadata
	Core-MetaData-->>Http: 返回响应
UpdateDeviceProfile #
// The UpdateDeviceProfile function accepts the device profile model from the controller functions
// and invokes updateDeviceProfile function in the infrastructure layer
func UpdateDeviceProfile(d models.DeviceProfile, ctx context.Context, dic *di.Container) (err errors.EdgeX) {
	...
    // 更新profile
	err = dbClient.UpdateDeviceProfile(d)
	...
    // 发布一个异步的deviceprofile更新
	go publishUpdateDeviceProfileSystemEvent(profileDTO, ctx, dic)

	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问UpdateDeviceProfile的url
	Core-MetaData-->>Core-MetaData: UpdateDeviceProfile
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/core-metadata
	loop  DevicesByProfileName,更新与profile相关的设备
		Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/<deviceServiceName>
	end
	Core-MetaData-->>Http: 返回响应
DeleteDeviceProfileByName #
// DeleteDeviceProfileByName delete the device profile by name
func DeleteDeviceProfileByName(name string, ctx context.Context, dic *di.Container) errors.EdgeX {
	....
    // 如果设置了deviceprofile禁止删除的化就不能删除
	if strictProfileDeletes {
		return errors.NewCommonEdgeX(errors.KindServiceLocked, "profile deletion is not allowed when StrictDeviceProfileDeletes config is enabled", nil)
	}
	....
	// 根据名字删除profile
	err = dbClient.DeleteDeviceProfileByName(name)
    ....
    // 发布一个异步的deviceprofile删除系统事件
	go publishSystemEvent(common.DeviceProfileSystemEventType, common.SystemEventActionDelete, common.CoreMetaDataServiceKey, profileDTO, ctx, dic)

	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问DeleteDeviceProfileByName的url
	Core-MetaData-->>Core-MetaData: DeleteDeviceProfileByName
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/delete/core-metadata
	Core-MetaData-->>Http: 返回响应
PatchDeviceProfileBasicInfo #
func PatchDeviceProfileBasicInfo(ctx context.Context, dto dtos.UpdateDeviceProfileBasicInfo, dic *di.Container) errors.EdgeX {
	....
    // 更新deviceprofile的BasicInfo
	requests.ReplaceDeviceProfileModelBasicInfoFieldsWithDTO(&deviceProfile, dto)
	err = dbClient.UpdateDeviceProfile(deviceProfile)
	// 发布一个异步的更新profile系统事件
	go publishUpdateDeviceProfileSystemEvent(profileDTO, ctx, dic)

	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问PatchDeviceProfileBasicInfo的url
	Core-MetaData-->>Core-MetaData: PatchDeviceProfileBasicInfo
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/core-metadata
	loop  DevicesByProfileName,更新与profile相关的设备
		Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/<deviceServiceName>
	end
	Core-MetaData-->>Http: 返回响应

deviceresource #

AddDeviceProfileResource #
func AddDeviceProfileResource(profileName string, resource models.DeviceResource, ctx context.Context, dic *di.Container) errors.EdgeX {
    ....
	// 修改profile.DeviceResource
	profile.DeviceResources = append(profile.DeviceResources, resource)
	....
	// 更新profile
	err = dbClient.UpdateDeviceProfile(profile)
    ....
	// 发布一个异步的deviceprofile更新事件
	go publishUpdateDeviceProfileSystemEvent(profileDTO, ctx, dic)

	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问AddDeviceProfileResource的url
	Core-MetaData-->>Core-MetaData: AddDeviceProfileResource
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/core-metadata
	loop  DevicesByProfileName,更新与profile相关的设备
		Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/<deviceServiceName>
	end
	Core-MetaData-->>Http: 返回响应
PatchDeviceProfileResource #
func PatchDeviceProfileResource(profileName string, dto dtos.UpdateDeviceResource, ctx context.Context, dic *di.Container) errors.EdgeX {
	....
	// 找到匹配的resource的index,并对它进行更新
	requests.ReplaceDeviceResourceModelFieldsWithDTO(&profile.DeviceResources[index], dto)
    // 更新profile
	....
    // 发布一个异步的deviceprofile更新事件
	go publishUpdateDeviceProfileSystemEvent(profileDTO, ctx, dic)

	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问PatchDeviceProfileResource的url
	Core-MetaData-->>Core-MetaData: PatchDeviceProfileResource
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/core-metadata
	loop  DevicesByProfileName,更新与profile相关的设备
		Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/<deviceServiceName>
	end
	Core-MetaData-->>Http: 返回响应
DeleteDeviceResourceByName #
func DeleteDeviceResourceByName(profileName string, resourceName string, ctx context.Context, dic *di.Container) errors.EdgeX {
	....
    // 如果不允许更改的话,就停止
	if strictProfileChanges {
		return errors.NewCommonEdgeX(errors.KindServiceLocked, "profile change is not allowed when StrictDeviceProfileChanges config is enabled", nil)
	}
	....
	// Check the associated Device existence
    // 检查相关的Device是否存在
	devices, err := dbClient.DevicesByProfileName(0, 1, profileName)
	if err != nil {
		return errors.NewCommonEdgeXWrapper(err)
	} else if len(devices) > 0 {
		return errors.NewCommonEdgeX(errors.KindStatusConflict, "fail to update the device profile when associated device exists", nil)
	}
	// 删除name对应的index的资源
	profile.DeviceResources = append(profile.DeviceResources[:index], profile.DeviceResources[index+1:]...)
	// 更新deviceprofile
	err = dbClient.UpdateDeviceProfile(profile)
	// 发布一个异步的deviceprofile更新事件
	go publishUpdateDeviceProfileSystemEvent(profileDTO, ctx, dic)
	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问DeleteDeviceResourceByName的url
	Core-MetaData-->>Core-MetaData: DeleteDeviceResourceByName
	Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/core-metadata
	loop  DevicesByProfileName,更新与profile相关的设备
		Core-MetaData->>messageBus: system-events/core-metadata/deviceprofile/update/<deviceServiceName>
	end
	Core-MetaData-->>Http: 返回响应

deviceservice #

AddDeviceService #
// The AddDeviceService function accepts the new device service model from the controller function
// and then invokes AddDeviceService function of infrastructure layer to add new device service
func AddDeviceService(d models.DeviceService, ctx context.Context, dic *di.Container) (id string, err errors.EdgeX) {
	....
    // 添加DeviceService
	addedDeviceService, err := dbClient.AddDeviceService(d)
	....
    // 发布一个添加deviceService系统事件
	go publishSystemEvent(common.DeviceServiceSystemEventType, common.SystemEventActionAdd, d.Name, DeviceServiceDTO, ctx, dic)
	return addedDeviceService.Id, nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问AddDeviceService的url
	Core-MetaData-->>Core-MetaData: AddDeviceService
	Core-MetaData->>messageBus: system-events/core-metadata/deviceservice/add/<deviceServicename>
	Core-MetaData-->>Http: 返回响应
PatchDeviceService #
// PatchDeviceService executes the PATCH operation with the device service DTO to replace the old data
func PatchDeviceService(dto dtos.UpdateDeviceService, ctx context.Context, dic *di.Container) errors.EdgeX {
	....
    // 更新deviceservice
	requests.ReplaceDeviceServiceModelFieldsWithDTO(&deviceService, dto)

	err = dbClient.UpdateDeviceService(deviceService)
	// 发布一个更新deviceService系统事件
	go publishSystemEvent(common.DeviceServiceSystemEventType, common.SystemEventActionUpdate, deviceService.Name, DeviceServiceDTO, ctx, dic)
	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问PatchDeviceService的url
	Core-MetaData-->>Core-MetaData: PatchDeviceService
	Core-MetaData->>messageBus: system-events/core-metadata/deviceservice/update/<deviceServicename>
	Core-MetaData-->>Http: 返回响应
DeleteDeviceServiceByName #
func DeleteDeviceServiceByName(name string, ctx context.Context, dic *di.Container) errors.EdgeX {
	....
    // 删除deviceService
	err = dbClient.DeleteDeviceServiceByName(name)
	....
    // 发布一个删除deviceService系统事件
	go publishSystemEvent(common.DeviceServiceSystemEventType, common.SystemEventActionDelete, deviceService.Name, DeviceServiceDTO, ctx, dic)
	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问DeleteDeviceServiceByName的url
	Core-MetaData-->>Core-MetaData: DeleteDeviceServiceByName
	Core-MetaData->>messageBus: system-events/core-metadata/deviceservice/delete/<deviceServicename>
	Core-MetaData-->>Http: 返回响应

provisionwatcher #

AddProvisionWatcher #
// AddProvisionWatcher function accepts the new provision watcher model from the controller function
// and then invokes AddProvisionWatcher function of infrastructure layer to add new device service
func AddProvisionWatcher(pw models.ProvisionWatcher, ctx context.Context, dic *di.Container) (id string, err errors.EdgeX) {
	....
    // 添加ProvisionWatcher
	addProvisionWatcher, err := dbClient.AddProvisionWatcher(pw)
	....
    // 发布一个添加provisionWatcher系统事件
	go publishSystemEvent(common.ProvisionWatcherSystemEventType, common.SystemEventActionAdd, pw.DiscoveredDevice.ServiceName, dtos.FromProvisionWatcherModelToDTO(pw), ctx, dic)
	return addProvisionWatcher.Id, nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问AddProvisionWatcher的url
	Core-MetaData-->>Core-MetaData: AddProvisionWatcher
	Core-MetaData->>messageBus: system-events/core-metadata/provisionwatcher/add/<pw.DiscoveredDevice.ServiceName>
	Core-MetaData-->>Http: 返回响应
DeleteProvisionWatcherByName #
// DeleteProvisionWatcherByName deletes the provision watcher by name
func DeleteProvisionWatcherByName(ctx context.Context, name string, dic *di.Container) errors.EdgeX {
	....
    // 删除ProvisionWatcher
	err = dbClient.DeleteProvisionWatcherByName(pw.Name)
	....
    // 发布一个删除provisionWatcher系统事件
	go publishSystemEvent(common.ProvisionWatcherSystemEventType, common.SystemEventActionDelete, pw.DiscoveredDevice.ServiceName, dtos.FromProvisionWatcherModelToDTO(pw), ctx, dic)
	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问AddProvisionWatcher的url
	Core-MetaData-->>Core-MetaData: AddProvisionWatcher
	Core-MetaData->>messageBus: system-events/core-metadata/provisionwatcher/delete/<pw.DiscoveredDevice.ServiceName>
	Core-MetaData-->>Http: 返回响应
PatchProvisionWatcher #
// PatchProvisionWatcher executes the PATCH operation with the provisionWatcher DTO to replace the old data
func PatchProvisionWatcher(ctx context.Context, dto dtos.UpdateProvisionWatcher, dic *di.Container) errors.EdgeX {
	....
    // 获取oldServiceName
	if dto.DiscoveredDevice.ServiceName != nil && *dto.DiscoveredDevice.ServiceName != pw.DiscoveredDevice.ServiceName {
		oldServiceName = pw.DiscoveredDevice.ServiceName
	}
	// 更新ProvisionWatcher
	err = dbClient.UpdateProvisionWatcher(pw)
	// 如果有旧的ServiceName
    // 向oldServiceName也发布一个事件
	if oldServiceName != "" {
		go publishSystemEvent(common.ProvisionWatcherSystemEventType, common.SystemEventActionUpdate, oldServiceName, dtos.FromProvisionWatcherModelToDTO(pw), ctx, dic)
	}
    // 发布一个更新事件
	go publishSystemEvent(common.ProvisionWatcherSystemEventType, common.SystemEventActionUpdate, pw.DiscoveredDevice.ServiceName, dtos.FromProvisionWatcherModelToDTO(pw), ctx, dic)
	return nil
}
sequenceDiagram
	Http-->>Core-MetaData: 访问PatchProvisionWatcher的url
	Core-MetaData-->>Core-MetaData: PatchProvisionWatcher
	opt 如果修改了deviceService
		Core-MetaData->>messageBus: system-events/core-metadata/provisionwatcher/update/<oldDeviceServiceName>
	end
	Core-MetaData->>messageBus: system-events/core-metadata/provisionwatcher/update/<deviceServiceName>
	Core-MetaData-->>Http: 返回响应