2021/2/3 #
Crd #
device #
DeviceSpec #
// DeviceSpec defines the desired state of Device
// 定义了Device的希望状态
type DeviceSpec struct {
Description string `json:"description,omitempty"`
// Admin state (locked/unlocked)
// Admin状态(lock/unlocked)
AdminState AdminState `json:"adminState,omitempty"`
// Operating state (enabled/disabled)
// 运行状态(enabled/disabled)
OperatingState OperatingState `json:"operatingState,omitempty"`
// A map of supported protocols for the given device
// 给定设备支持的协议映射
Protocols map[string]ProtocolProperties `json:"protocols,omitempty"`
// Other labels applied to the device to help with searching
// 应用于设备以帮助搜索的其他标签
Labels []string `json:"labels,omitempty"`
// Device service specific location (interface{} is an empty interface so
// it can be anything)
// 设备服务特定位置(接口{}是一个空接口,因此它可以是任何东西)
Location string `json:"location,omitempty"`
// Associated Device Service - One per device
// 设备相关服务-每个设备一个
Service string `json:"service"`
// Associated Device Profile - Describes the device
// 关联的设备配置文件-描述这个设备
Profile string `json:"profile"`
// TODO support the following field
// A list of auto-generated events coming from the device
// AutoEvents []AutoEvent `json:"autoEvents"`
// 设备属性
DeviceProperties map[string]DesiredPropertyState `json:"deviceProperties,omitempty"`
}
DeviceStatus #
// DeviceStatus defines the observed state of Device
// DeviceStatus 定义设备的观察状态
type DeviceStatus struct {
// Time (milliseconds) that the device last provided any feedback or
// responded to any request
// 设备上次提供任何反馈或回应任何请求的时间(毫秒)
LastConnected int64 `json:"lastConnected,omitempty"`
// Time (milliseconds) that the device reported data to the core
// microservice
// 设备向核心微服务报告数据的时间(毫秒)
LastReported int64 `json:"lastReported,omitempty"`
// AddedToEdgeX indicates whether the object has been successfully
// created on EdgeX Foundry
// AddedToEdgeX 指示对象是否已成功在 EdgeX Foundry 上创建
AddedToEdgeX bool `json:"addedToEdgeX,omitempty"`
DeviceProperties map[string]ActualPropertyState `json:"deviceProperties,omitempty"`
Id string `json:"id,omitempty"`
}
DesiredPropertyState #
type DesiredPropertyState struct {
// Device属性的名称
Name string `json:"name"`
// 修改时候要用Put请求,这个是Put请求的URL地址
PutURL string `json:"putURL,omitempty"`
// 这个是属性对应的值
DesiredValue string `json:"desiredValue"`
}
ActualPropertyState #
type ActualPropertyState struct {
// Device属性的名称
Name string `json:"name"`
// 查询时候要用Get请求,这个是Get请求的URL地址
GetURL string `json:"getURL,omitempty"`
// 属性对应的值
ActualValue string `json:"actualValue"`
}
deviceprofile #
DeviceResource #
type DeviceResource struct {
// 资源描述
Description string `json:"description"`
// 资源名称
Name string `json:"name"`
// 资源标签
Tag string `json:"tag,omitempty"`
// 资源属性配置
Properties ProfileProperty `json:"properties"`
Attributes map[string]string `json:"attributes,omitempty"`
}
ProfileProperty #
type ProfileProperty struct {
// 属性值
Value PropertyValue `json:"value"`
//
Units Units `json:"units,omitempty"`
}
PropertyValue #
type PropertyValue struct {
// ValueDescriptor Type of property after transformations
// 值描述符 转换后属性的类型
Type string `json:"type,omitempty"`
// Read/Write Permissions set for this property
// 为此属性设置的读/写权限
ReadWrite string `json:"readWrite,omitempty"`
// Minimum value that can be get/set from this property
// 可以从此属性获取/设置的最小值
Minimum string `json:"minimum,omitempty"`
// Maximum value that can be get/set from this property
// 可以从此属性获取/设置的最大值
Maximum string `json:"maximum,omitempty"`
// Default value set to this property if no argument is passed
// 如果未传递参数,则默认值设置为此属性
DefaultValue string `json:"defaultValue,omitempty"`
// Size of this property in its type
// (i.e. bytes for numeric types, characters for string types)
// 此属性在其类型中的大小 (即数字类型的字节,字符串类型的字符)
Size string `json:"size,omitempty"`
// Mask to be applied prior to get/set of property
// 在获取/属性集之前应用的掩码
Mask string `json:"mask,omitempty"`
// Shift to be applied after masking, prior to get/set of property
// 在设置掩码后,在获取/设置属性之前应用的移位
Shift string `json:"shift,omitempty"`
// Multiplicative factor to be applied after shifting, prior to get/set of property
// 在移位后,在获取/属性集之前应用的乘法因子
Scale string `json:"scale,omitempty"`
// Additive factor to be applied after multiplying, prior to get/set of property
// 乘法后,在获得/属性集之前应用的加性因子
Offset string `json:"offset,omitempty"`
// Base for property to be applied to, leave 0 for no power operation
// (i.e. base ^ property: 2 ^ 10)
// 要应用的属性的基础,保留 0 表示无电源操作 (即基数^属性:2^10)
Base string `json:"base,omitempty"`
// Required value of the property, set for checking error state. Failing an
// assertion condition wil l mark the device with an error state
// 属性的必需值,为检查错误状态而设置。失败断言条件将设备标记为错误状态
Assertion string `json:"assertion,omitempty"`
Precision string `json:"precision,omitempty"`
// FloatEncoding indicates the representation of floating value of reading.
// It should be 'Base64' or 'eNotation'
// 浮点编码表示读取的浮点值的表示形式。它应该是“Base64”或“电子符号”
FloatEncoding string `json:"floatEncoding,omitempty"`
MediaType string `json:"mediaType,omitempty"`
}
Units #
type Units struct {
Type string `json:"type,omitempty"`
ReadWrite string `json:"readWrite,omitempty"`
DefaultValue string `json:"defaultValue,omitempty"`
}
Command #
type Command struct {
// EdgeXId is a unique identifier used by EdgeX Foundry, such as a UUID
// EdgeXId 是 EdgeX Foundry 使用的唯一标识符,例如 UUID
EdgeXId string `json:"id,omitempty"`
// Command name (unique on the profile)
// 命令名称(在配置文件中唯一)
Name string `json:"name,omitempty"`
// Get Command
// Get指令
Get Get `json:"get,omitempty"`
// Put Command
// Put指令
Put Put `json:"put,omitempty"`
}
Action #
type Action struct {
// Path used by service for action on a device or sensor
// 服务用于对设备或传感器执行操作的路径
Path string `json:"path,omitempty"`
// Responses from get or put requests to service
// 从Get或PUt请求到服务的响应
Responses []Response `json:"responses,omitempty"`
// Url for requests from command service
// 来自命令服务的请求的 URL
URL string `json:"url,omitempty"`
}
Put #
type Put struct {
// Action用来定义Put和Get请求
Action `json:",inline"`
// 请求的参数数组
ParameterNames []string `json:"parameterNames,omitempty"`
}
Get #
type Get struct {
// Action用来定义Put和Get请求
Action `json:",omitempty"`
}
Response #
// Response for a Get or Put request to a service
// Get或Put请求的响应
type Response struct {
// 响应代码
Code string `json:"code,omitempty"`
// 响应的描述
Description string `json:"description,omitempty"`
// 响应的返回列表
ExpectedValues []string `json:"expectedValues,omitempty"`
}
ResourceOperation #
type ResourceOperation struct {
Index string `json:"index,omitempty"`
Operation string `json:"operation,omitempty"`
// Deprecated
Object string `json:"object,omitempty"`
// The replacement of Object field
DeviceResource string `json:"deviceResource,omitempty"`
Parameter string `json:"parameter,omitempty"`
// Deprecated
Resource string `json:"resource,omitempty"`
// The replacement of Resource field
DeviceCommand string `json:"deviceCommand,omitempty"`
Secondary []string `json:"secondary,omitempty"`
Mappings map[string]string `json:"mappings,omitempty"`
}
ProfileResource #
type ProfileResource struct {
Name string `json:"name,omitempty"`
Get []ResourceOperation `json:"get,omitempty"`
Set []ResourceOperation `json:"set,omitempty"`
}
DeviceProfileSpec #
type DeviceProfileSpec struct {
// 这个是对设备配置的描述
Description string `json:"description,omitempty"`
// Manufacturer of the device
// 设备制造商
Manufacturer string `json:"manufacturer,omitempty"`
// Model of the device
// 设备的模型
Model string `json:"model,omitempty"`
// EdgeXLabels used to search for groups of profiles on EdgeX Foundry
// 设备资源列表
EdgeXLabels []string `json:"labels,omitempty"`
DeviceResources []DeviceResource `json:"deviceResources,omitempty"`
// TODO support the following field
// 配置资源
DeviceCommands []ProfileResource `json:"deviceCommands,omitempty"`
// 命令行
CoreCommands []Command `json:"coreCommands,omitempty"`
}
DeviceProfileStatus #
type DeviceProfileStatus struct {
// 对应EdgeXId
EdgeXId string `json:"id,omitempty"`
// 是否已经加入到EdgeX中
AddedToEdgeX bool `json:"addedToEdgeX,omitempty"`
}
deviceservice #
Addressable #
type Addressable struct {
// ID is a unique identifier for the Addressable, such as a UUID
// ID是Addressable的唯一标识,比如UUID
Id string `json:"id,omitempty"`
// Name is a unique name given to the Addressable
// Name是给予Addressable唯一名称
Name string `json:"name,omitempty"`
// Protocol for the address (HTTP/TCP)
// 地址的协议(HTTP/TCP)
Protocol string `json:"protocol,omitempty"`
// Method for connecting (i.e. POST)
// 连接的方式(比如 POST)
HTTPMethod string `json:"method,omitempty"`
// Address of the addressable
// addressable的地址
Address string `json:"address,omitempty"`
// Port for the address
// 地址的端口
Port int `json:"port,omitempty"`
// Path for callbacks
// 回调的路径
Path string `json:"path,omitempty"`
// For message bus protocols
// 提供给message bus协议
Publisher string `json:"publisher,omitempty"`
// User id for authentication
// 用于设备身份验证的用户 ID
User string `json:"user,omitempty"`
// Password of the user for authentication for the addressable
// 用于对可寻址对象进行身份验证的用户密码
Password string `json:"password,omitempty"`
// Topic for message bus addressables
// 消息总线可寻址对象主题
Topic string `json:"topic,omitempty"`
}
DeviceServiceSpec #
type DeviceServiceSpec struct {
// Spec的描述内容
Description string `json:"description,omitempty"`
// the Id assigned by the EdgeX foundry
// TODO store this field in the status
// 由 EdgeX 代工厂分配的 ID
Id string `json:"id,omitempty"`
// TODO store this field in the status
// 设备上次将数据连接到核心的时间(以毫秒为单位)
LastConnected int64 `json:"lastConnected,omitempty"`
// time in milliseconds that the device last reported data to the core
// 设备上次向核心报告数据的时间(以毫秒为单位)
// TODO store this field in the status
LastReported int64 `json:"lastReported,omitempty"`
// operational state - either enabled or disabled
// 操作状态 - 启用或禁用
OperatingState OperatingState `json:"operatingState,omitempty"`
// tags or other labels applied to the device service for search or other
// identification needs on the EdgeX Foundry
// 标记或其他标签应用于设备服务以进行搜索或其他EdgeX Foundry的识别需求
Labels []string `json:"labels,omitempty"`
// address (MQTT topic, HTTP address, serial bus, etc.) for reaching
// the service
// 地址(MQTT topic、HTTP 地址、串行总线等)用于到达服务内容
Addressable Addressable `json:"addressable,omitempty"`
// Device Service Admin State
// 设备服务管理员状态(locked、unlocked)
AdminState AdminState `json:"adminState,omitempty"`
}
DeviceServiceStatus #
type DeviceServiceStatus struct {
// 是否添加到EdgeX中
AddedToEdgeX bool `json:"addedToEdgeX,omitempty"`
}
valueDescriptor #
ValueDescriptorSpec #
type ValueDescriptorSpec struct {
Id string `json:"id,omitempty"`
Created int64 `json:"created,omitempty"`
Description string `json:"description,omitempty"`
Modified int64 `json:"modified,omitempty"`
Origin int64 `json:"origin,omitempty"`
Min string `json:"min,omitempty"`
Max string `json:"max,omitempty"`
DefaultValue string `json:"defaultValue,omitempty"`
Type string `json:"type,omitempty"`
UomLabel string `json:"uomLabel,omitempty"`
Formatting string `json:"formatting,omitempty"`
Labels []string `json:"labels,omitempty"`
MediaType string `json:"mediaType,omitempty"`
FloatEncoding string `json:"floatEncoding,omitempty"`
}
ValueDescriptorStatus #
type ValueDescriptorStatus struct {
// AddedToEdgeX indicates whether the object has been successfully
// created on EdgeX Foundry
AddedToEdgeX bool `json:"addedToEdgeX,omitempty"`
}
Controller #
device_controller #
func (r *DeviceReconciler) Reconcile(
ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("device", req.NamespacedName)
var d devicev1alpha1.Device
// 首先获取这个Device
if err := r.Get(ctx, req.NamespacedName, &d); err != nil {
return ctrl.Result{}, err
}
log.Info("Reconciling the Device object", "Device", d.GetName(), "AddedToEdgeX", d.Status.AddedToEdgeX)
// 如果设备状态中AddedToEdgeX为true时
if d.Status.AddedToEdgeX == true {
// the device has been added to the EdgeX foundry,
// check if each device property are in the desired state
// 这个设备已经被添加到了EdgeX foundry中,检查是否设备中属性是想要的状态
for _, dps := range d.Spec.DeviceProperties {
log.Info("getting the actual property state", "property", dps.Name)
// 根据dps(device的一个属性)来获取这个属性的真实值
aps, err := getActualPropertyState(dps.Name, &d, r.CoreCommandClient)
if err != nil {
return ctrl.Result{}, err
}
log.Info("got the actual property state",
"property name", aps.Name,
"property getURL", aps.GetURL,
"property actual value", aps.ActualValue)
if d.Status.DeviceProperties == nil {
d.Status.DeviceProperties = map[string]devicev1alpha1.ActualPropertyState{}
}
// 先将这个这个值更新到Status中
d.Status.DeviceProperties[aps.Name] = aps
// 看看真实值和需要的属性是否相同
if dps.DesiredValue != aps.ActualValue {
// 如果真实状态与需要的状态不同
log.Info("the desired value and the actual value are different",
"desired value", dps.DesiredValue,
"actual value", aps.ActualValue)
// 如果需要状态的PutURL为空的话,PutURL的作用是什么呢?
// 从CoreCommandClient中拿到PutURL地址
if dps.PutURL == "" {
putURL, err := getPutURL(d.GetName(), dps.Name, r.CoreCommandClient)
if err != nil {
return ctrl.Result{}, err
}
dps.PutURL = putURL
log.Info("get the desired property putURL",
"property", dps.Name, "putURL", putURL)
}
// set the device property to desired state
// 向这个URL地址发送Put请求,来将devcie的属性修改为想要的状态
log.Info("setting the property to desired value", "property", dps.Name)
// 这里应该是在发HTTP请求吧
rep, err := resty.New().R().
SetHeader("Content-Type", "application/json").
SetBody([]byte(fmt.Sprintf(`{"%s": "%s"}`, dps.Name, dps.DesiredValue))).
Put(dps.PutURL)
if err != nil {
return ctrl.Result{}, err
}
// 如果返回的状态是StatusOK的话
if rep.StatusCode() == http.StatusOK {
log.Info("successfully set the property to desired value", "property", dps.Name)
log.Info("setting the actual property value to desired value", "property", dps.Name)
// if the device property has been successfully set, we will
// update the Device.Status.DeviceProperties[name] as well
// 如果设备属性被成功设置,需要更新Status中的DeviceProperties[name]
if d.Status.DeviceProperties == nil {
d.Status.DeviceProperties = map[string]devicev1alpha1.ActualPropertyState{}
}
oldAps, exist := d.Status.DeviceProperties[dps.Name]
if !exist {
d.Status.DeviceProperties[dps.Name] = devicev1alpha1.ActualPropertyState{
Name: dps.Name,
ActualValue: dps.DesiredValue,
}
continue
}
oldAps.ActualValue = dps.DesiredValue
d.Status.DeviceProperties[dps.Name] = oldAps
log.Info("set the actual property value to desired value", "property", dps.Name)
}
}
}
return ctrl.Result{}, r.Status().Update(ctx, &d)
}
// 如果AddedToEdgeX的状态是false的话
// 先查看这个设备是否已经存在EdgeX上
log.Info("Checking if device already exist on the EdgeX", "device", d.GetName())
_, err := r.GetDeviceByName(d.GetName())
if err == nil {
// 这个有个问题,如果已经存在的话,难道不用修改AddedToEdgeX的值嘛?
log.Info("Device already exists on EdgeX")
return ctrl.Result{}, nil
}
if !clis.IsNotFoundErr(err) {
log.Info("fail to visit the EdgeX core-metadata-service")
return ctrl.Result{}, nil
}
// 如果在EdgeX上没有找到的话,就尝试加到EdgeX中去
log.Info("Adding device to the EdgeX", "device", d.GetName())
edgeXId, err := r.AddDevice(toEdgeXDevice(d))
if err != nil {
return ctrl.Result{}, fmt.Errorf("Fail to add Device to EdgeX: %v", err)
}
log.Info("Successfully add Device to EdgeX",
"Device", d.GetName(), "EdgeXId", edgeXId)
// 更新Id和状态
d.Status.Id = edgeXId
d.Status.AddedToEdgeX = true
return ctrl.Result{Requeue: true}, r.Status().Update(ctx, &d)
}
deviceprofile_controller #
func (r *DeviceProfileReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("deviceprofile", req.NamespacedName)
var dp devicev1alpha1.DeviceProfile
// 在k8s资源中找这个DeviceProfile是否存在
if err := r.Get(ctx, req.NamespacedName, &dp); err != nil {
return ctrl.Result{}, err
}
// 到EdgeX中找这个设备是否存在
_, err := r.GetDeviceProfileByName(dp.GetName())
// 已经存在了的话
if err == nil {
log.Info(
"DeviceProfile already exists on EdgeX")
return ctrl.Result{}, nil
}
if !clis.IsNotFoundErr(err) {
log.Info("Fail to visit the EdgeX core-metadata-service")
return ctrl.Result{}, nil
}
// 添加这个DeviceProfile到EdgeX中
edgeXId, err := r.AddDeviceProfile(toEdgeXDeviceProfile(dp))
if err != nil {
return ctrl.Result{}, fmt.Errorf("Fail to add DeviceProfile to Edgex: %v", err)
}
log.V(4).Info("Successfully add DeviceProfile to EdgeX",
"DeviceProfile", dp.GetName(), "EdgeXId", edgeXId)
// 修改相关状态
dp.Spec.Id = edgeXId
dp.Status.AddedToEdgeX = true
return ctrl.Result{}, r.Update(ctx, &dp)
}
deviceservice_controller #
func (r *DeviceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("deviceservice", req.NamespacedName)
var ds devicev1alpha1.DeviceService
// 首先在kubernetes环境中找deviceservice
if err := r.Get(ctx, req.NamespacedName, &ds); err != nil {
return ctrl.Result{}, err
}
// 再去EdgeX中找这个deviceservice
// 如果找到了
_, err := r.GetDeviceServiceByName(ds.GetName())
if err == nil {
log.Info(
"DeviceService already exists on EdgeX")
return ctrl.Result{}, nil
}
if !clis.IsNotFoundErr(err) {
log.Error(err, "fail to visit the EdgeX core-metatdata-service")
return ctrl.Result{}, nil
}
// 如果没找到,就创建一个addressable
// 1. create the addressable
add := toEdgeXAddressable(ds.Spec.Addressable)
_, err = r.GetAddressableByName(add.Name)
if err == nil {
log.Info(
"Addressable already exists on EdgeX")
return ctrl.Result{}, nil
}
// 添加这个addressable
addrEdgeXId, err := r.AddAddressable(add)
if err != nil {
return ctrl.Result{}, fmt.Errorf("Fail to add addressable to EdgeX: %v", err)
}
log.V(4).Info("Successfully add the Addressable to EdgeX",
"Addressable", add.Name, "EdgeXId", addrEdgeXId)
ds.Spec.Addressable.Id = addrEdgeXId
// 2. create the DeviceService
// 再添加这个DeviceService
dsEdgeXId, err := r.AddDeviceService(toEdgexDeviceService(ds))
if err != nil {
return ctrl.Result{}, fmt.Errorf("Fail to add DeviceService to EdgeX: %v", err)
}
log.V(4).Info("Successfully add DeviceService to EdgeX",
"DeviceService", ds.GetName(), "EdgeXId", dsEdgeXId)
ds.Spec.Id = dsEdgeXId
ds.Status.AddedToEdgeX = true
return ctrl.Result{}, r.Update(ctx, &ds)
}
valuedescriptor_controller #
func (r *ValueDescriptorReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("valuedescriptor", req.NamespacedName)
var vd devicev1alpha1.ValueDescriptor
// 先在k8s资源里查找这个valuedescriptor
if err := r.Get(ctx, req.NamespacedName, &vd); err != nil {
return ctrl.Result{}, err
}
// 1. check if the Edgex code-data has the corresponding ValueDescriptor
// NOTE this version does not support valuedescriptor update
// 再去EdgeX中查找这个valuedescriptor
_, err := r.GetValueDescriptorByName(vd.GetName())
if err == nil {
// 如果找到了
log.Info("ValueDescriptor already exists on EdgeX")
return ctrl.Result{}, nil
}
if !clis.IsNotFoundErr(err) {
log.Info("Fail to visit the Edgex core-data-service")
return ctrl.Result{}, nil
}
// 2. create one if the ValueDescriptor doesnot exist
// 如果没有找到,就创建一个后,添加到EdgeX中
edgexId, err := r.AddValueDescript(toEdgexValue(vd))
if err != nil {
return ctrl.Result{}, fmt.Errorf("Fail to add ValueDescriptor to Edgex: %v", err)
}
log.V(4).Info("Successfully add ValueDescriptor to Edgex",
"ValueDescriptor", vd.GetName(), "EdgexId", edgexId)
vd.Spec.Id = edgexId
vd.Status.AddedToEdgeX = true
return ctrl.Result{}, r.Update(ctx, &vd)
}
Syncer #
device_syncer #
func (ds *DeviceSyncer) Run(stop <-chan struct{}) {
ds.log.Info("starting the DeviceSyncer...")
go func() {
for {
<-time.After(ds.syncPeriod)
//每次到了同步时间
// list devices on edgex foundry
// 调用ListDevices(),其实是DeviceSyncer内的CoreMetaClient的功能来获取edgex上的devices
eDevs, err := ds.ListDevices()
if err != nil {
ds.log.Error(err, "fail to list the devices object on the EdgeX Foundry")
continue
}
// list devices on Kubernetes
// 来获取kubernetes上的device列表
var kDevs devv1.DeviceList
if err := ds.List(context.TODO(), &kDevs); err != nil {
ds.log.Error(err, "fail to list the devices object on the Kubernetes")
continue
}
// create the devices on Kubernetes but not on EdgeX
// 找到所有在kubernetes上,但不在EdgeX上的device的device
newKDevs := findNewDevices(eDevs, kDevs.Items)
if len(newKDevs) != 0 {
// 在EdgeX上创建新的device
if err := createDevices(ds.log, ds.Client, newKDevs); err != nil {
ds.log.Error(err, "fail to create devices")
continue
}
}
ds.log.V(5).Info("new devices not found")
}
}()
<-stop
ds.log.Info("stopping the device syncer")
}
deviceprofile_syncer #
func (ds *DeviceProfileSyncer) Run(stop <-chan struct{}) {
ds.log.Info("starting the DeviceProfileSyncer...")
go func() {
for {
<-time.After(ds.syncPeriod)
// list device profiles on edgex foundry
eDevs, err := ds.ListDeviceProfile()
if err != nil {
ds.log.Error(err, "fail to list the deviceprofile object on the EdgeX Foundry")
continue
}
// list device profiles on Kubernetes
var kDevs devv1.DeviceProfileList
if err := ds.List(context.TODO(), &kDevs); err != nil {
ds.log.Error(err, "fail to list the deviceprofile object on the Kubernetes")
continue
}
// create the devices on Kubernetes but not on EdgeX
newKDevs := findNewDeviceProfile(eDevs, kDevs.Items)
if len(newKDevs) != 0 {
if err := createDeviceProfile(ds.log, ds.Client, newKDevs); err != nil {
ds.log.Error(err, "fail to create devices profile")
continue
}
}
ds.log.V(5).Info("new deviceprofile not found")
}
}()
<-stop
ds.log.Info("stopping the deviceprofile syncer")
}
deviceservice_syncer #
func (ds *DeviceServiceSyncer) Run(stop <-chan struct{}) {
ds.log.Info("starting the DeviceServiceSyncer...")
go func() {
for {
<-time.After(ds.syncPeriod)
// list deviceservice on edgex foundry
eDevs, err := ds.ListDeviceServices()
if err != nil {
ds.log.Error(err, "fail to list the deviceservice object on the EdgeX Foundry")
continue
}
// list deviceservice on Kubernetes
var kDevs devv1.DeviceServiceList
if err := ds.List(context.TODO(), &kDevs); err != nil {
ds.log.Error(err, "fail to list the deviceservice object on the Kubernetes")
continue
}
// create the deviceservice on Kubernetes but not on EdgeX
newKDevs := findNewDeviceService(eDevs, kDevs.Items)
if len(newKDevs) != 0 {
if err := createDeviceService(ds.log, ds.Client, newKDevs); err != nil {
ds.log.Error(err, "fail to create deviceservice")
continue
}
}
ds.log.V(5).Info("new deviceservice not found")
}
}()
<-stop
ds.log.Info("stopping the device syncer")
}
2021/9/7 #
Crd修改 #
DeviceProfileSpec删除EdgeXLabels,添加NodePool、Labels
type DeviceProfileSpec struct {
// NodePool specifies which nodePool the deviceProfile belongs to
NodePool string `json:"nodePool,omitempty"`
Description string `json:"description,omitempty"`
// Manufacturer of the device
Manufacturer string `json:"manufacturer,omitempty"`
// Model of the device
Model string `json:"model,omitempty"`
// Labels used to search for groups of profiles on EdgeX Foundry
Labels []string `json:"labels,omitempty"`
DeviceResources []DeviceResource `json:"deviceResources,omitempty"`
// TODO support the following field
DeviceCommands []ProfileResource `json:"deviceCommands,omitempty"`
CoreCommands []Command `json:"coreCommands,omitempty"`
}
修改DeviceProfileStatus为EdgeId、Synced两个属性
type DeviceProfileStatus struct {
EdgeId string `json:"id,omitempty"`
Synced bool `json:"Synced,omitempty"`
}
deviceprofile_client #
// 这个定义了这个client的地址、端口
type EdgexDeviceProfile struct {
*resty.Client
Host string
Port int
logr.Logger
}
const (
// 定义地址和EdgeXObject的名称
DeviceProfilePath = "/api/v1/deviceprofile"
EdgeXObjectName = "device-controller/edgex-object.name"
)
// 新建一个EdgeexDeviceProfile
func NewEdgexDeviceProfile(host string, port int, log logr.Logger) *EdgexDeviceProfile {
return &EdgexDeviceProfile{
// Client就是一个http的服务器
Client: resty.New(),
Host: host,
Port: port,
Logger: log,
}
}
func getListDeviceProfileURL(host string, port int, opts devcli.ListOptions) (string, error) {
url := fmt.Sprintf("http://%s:%d%s", host, port, DeviceProfilePath)
// 填充url
if len(opts.LabelSelector) > 1 {
return url, fmt.Errorf("Multiple labels: list only support one label")
}
// 这里是不是写错了?
if len(opts.LabelSelector) > 0 && len(opts.LabelSelector) > 0 {
return url, fmt.Errorf("Multi list options: list action can't use 'label' with 'manufacturer' or 'model'")
}
for _, v := range opts.LabelSelector {
url = fmt.Sprintf("%s/label/%s", url, v)
}
// 判断parameters是否满足条件
listParameters := []string{"manufacturer", "model"}
for k, v := range opts.FieldSelector {
if !strutil.IsInStringLst(listParameters, k) {
return url, fmt.Errorf("Invaild list options: %s", k)
}
url = fmt.Sprintf("%s/%s/%s", url, k, v)
}
return url, nil
}
// 来获取EdgexDeviceProfile的列表
func (cdc *EdgexDeviceProfile) List(ctx context.Context, opts devcli.ListOptions) ([]v1alpha1.DeviceProfile, error) {
cdc.V(5).Info("will list DeviceProfiles")
// 这个是在获取DeviceProfile的整个列表的 访问地址,即,在EdgeX上面获取列表
lp, err := getListDeviceProfileURL(cdc.Host, cdc.Port, opts)
if err != nil {
return nil, err
}
// 访问这个ip地址
resp, err := cdc.R().EnableTrace().Get(lp)
if err != nil {
return nil, err
}
// 将结果放入dps中
dps := []models.DeviceProfile{}
if err := json.Unmarshal(resp.Body(), &dps); err != nil {
return nil, err
}
// 这里是将从EdgeX上获得的内容,转化为Kubernetes上的crd内容,再返回
deviceProfiles := make([]v1alpha1.DeviceProfile, len(dps))
for i, dp := range dps {
deviceProfiles[i] = toKubeDeviceProfile(&dp)
}
return deviceProfiles, nil
}
func (cdc *EdgexDeviceProfile) Get(ctx context.Context, name string, opts devcli.GetOptions) (*v1alpha1.DeviceProfile, error) {
cdc.V(5).Info("will get DeviceProfiles", "DeviceProfile", name)
var dp models.DeviceProfile
// 构建获取EdgexDeviceProfile的get请求
getURL := fmt.Sprintf("http://%s:%d%s/name/%s", cdc.Host, cdc.Port, DeviceProfilePath, name)
resp, err := cdc.R().Get(getURL)
if err != nil {
return nil, err
}
if string(resp.Body()) == "Item not found\n" {
return nil, errors.New("Item not found")
}
if err = json.Unmarshal(resp.Body(), &dp); err != nil {
return nil, err
}
// 将这个EdgeX对象转换为kubernetes上定义的deviceprofile对象
kubedp := toKubeDeviceProfile(&dp)
return &kubedp, nil
}
func (cdc *EdgexDeviceProfile) Create(ctx context.Context, deviceProfile *v1alpha1.DeviceProfile, opts devcli.CreateOptions) (*v1alpha1.DeviceProfile, error) {
// 首先将kubernetes上的deviceProfile对象转换为EdgeX上的EdgeXDeviceProfile对象
edgeDp := ToEdgeXDeviceProfile(deviceProfile)
// 变成json格式
dpJson, err := json.Marshal(edgeDp)
if err != nil {
return nil, err
}
postURL := fmt.Sprintf("http://%s:%d%s", cdc.Host, cdc.Port, DeviceProfilePath)
// 发起post请求
resp, err := cdc.R().SetBody(dpJson).Post(postURL)
if err != nil {
return nil, err
}
if resp.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("create edgex deviceProfile err: %s", string(resp.Body()))
// 假定 resp.Body() 存了 msg 信息
}
deviceProfile.Status.EdgeId = string(resp.Body())
deviceProfile.Status.Synced = true
// 设置对应的status
return deviceProfile, err
}
// TODO
func (cdc *EdgexDeviceProfile) Update(ctx context.Context, deviceProfile *v1alpha1.DeviceProfile, opts devcli.UpdateOptions) (*v1alpha1.DeviceProfile, error) {
return nil, nil
}
func (cdc *EdgexDeviceProfile) Delete(ctx context.Context, name string, opts devcli.DeleteOptions) error {
cdc.V(5).Info("will delete the DeviceProfile", "DeviceProfile", name)
delURL := fmt.Sprintf("http://%s:%d%s/name/%s", cdc.Host, cdc.Port, DeviceProfilePath, name)
resp, err := cdc.R().Delete(delURL)
if err != nil {
return err
}
if resp.StatusCode() != http.StatusOK {
return fmt.Errorf("delete edgex deviceProfile err: %s", string(resp.Body()))
// 假定 resp.Body() 存了 msg 信息
}
return nil
}
deviceprofile_controller #
// DeviceProfileReconciler reconciles a DeviceProfile object
type DeviceProfileReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
edgeClient devcli.DeviceProfileInterface
NodePool string
}
//+kubebuilder:rbac:groups=device.openyurt.io,resources=deviceprofiles,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=device.openyurt.io,resources=deviceprofiles/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=device.openyurt.io,resources=deviceprofiles/finalizers,verbs=update
// Reconcile make changes to a deviceprofile object in EdgeX based on it in Kubernetes
func (r *DeviceProfileReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("deviceprofile", req.NamespacedName)
var curdp devicev1alpha1.DeviceProfile
// 首先在Kubernetes中获取当前的deviceprofile
if err := r.Get(ctx, req.NamespacedName, &curdp); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// 如果获取的deviceprofile的NodePool与控制器自己的NodePool不同的话,就直接返回
if curdp.Spec.NodePool != r.NodePool {
return ctrl.Result{}, nil
}
dpName := util.GetEdgeNameTrimNodePool(curdp.GetName(), r.NodePool)
// 获取这个deviceprofile的名称
var prevdp devicev1alpha1.DeviceProfile
var exist bool
edps, err := r.edgeClient.List(context.Background(), devcli.ListOptions{})
if err != nil {
return ctrl.Result{}, err
}
// 到EdgeX找这个名称的deviceprofile
for _, edp := range edps {
if strings.ToLower(edp.Name) == dpName {
prevdp = edp
exist = true
break
}
}
if !curdp.ObjectMeta.DeletionTimestamp.IsZero() {
// 如果k8s上的这个deviceprofile到了删除时间
if exist {
// 如果在EdgeX上也存在,就把EdgeX上的这个删除
if err := r.edgeClient.Delete(context.Background(), prevdp.Name, devcli.DeleteOptions{}); err != nil {
return ctrl.Result{}, fmt.Errorf("Fail to delete DeviceProfile on Edgex: %v", err)
}
log.Info("Successfully delete DeviceProfile on EdgeX", "DeviceProfile", prevdp.Name)
}
// 删掉Finalizer,使得controller可以把他删掉
controllerutil.RemoveFinalizer(&curdp, "devicecontroller.openyurt.io")
err := r.Update(context.TODO(), &curdp)
return ctrl.Result{}, err
}
// 如果deviceprofile没有这个finalizer的话
if !controllerutil.ContainsFinalizer(&curdp, "devicecontroller.openyurt.io") {
// 添加这个finalizer
controllerutil.AddFinalizer(&curdp, "devicecontroller.openyurt.io")
// 更新这个deviceprofile
if err = r.Update(context.TODO(), &curdp); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
}
return ctrl.Result{}, err
}
}
// 如果在EdgeX上不存在的话
if !exist {
// 就创建一个
curdp, err := r.edgeClient.Create(context.Background(), &curdp, devcli.CreateOptions{})
if err != nil {
return ctrl.Result{}, fmt.Errorf("Fail to add DeviceProfile to Edgex: %v", err)
}
log.Info("Successfully add DeviceProfile to EdgeX",
"DeviceProfile", curdp.GetName(), "EdgeId", curdp.Status.EdgeId)
return ctrl.Result{}, r.Status().Update(ctx, curdp)
}
curdp.Spec.NodePool = ""
// 如果k8s上的deviceprofile与实际EdgeX上的deviceprofile不同的话
if !reflect.DeepEqual(curdp.Spec, prevdp.Spec) {
// TODO
log.Info("controller doesn't support update deviceprofile from Kubernetes to EdgeX")
return ctrl.Result{}, nil
}
return ctrl.Result{}, nil
}
deviceprofile_syncer #
func (ds *DeviceProfileSyncer) Run(stop <-chan struct{}) {
ds.log.Info("starting the DeviceProfileSyncer...")
go func() {
for {
<-time.After(ds.syncPeriod)
// 同步时间到了
// list devices on edgex foundry
// 查询在edgex上的deviceprofile列表
eDevs, err := ds.edgeClient.List(context.Background(), devcli.ListOptions{})
if err != nil {
ds.log.Error(err, "fail to list the deviceprofile object on the EdgeX Foundry")
continue
}
// 为他们添加NodePool属性
addNodePoolField(eDevs, ds.NodePool)
// list devices on Kubernetes
// 查询在kubernetes上的deviceprofile列表
var kDevs devicev1alpha1.DeviceProfileList
if err := ds.List(context.TODO(), &kDevs); err != nil {
ds.log.Error(err, "fail to list the deviceprofile object on the Kubernetes")
continue
}
// create the device profiles on Kubernetes but not on EdgeX
// 找到在kubernetes上但不在EdgeX上的列表
newKDevs, updateKDevs := findNewUpdateDeviceProfile(eDevs, kDevs.Items)
// 添加新的内容
if len(newKDevs) != 0 {
// 但是这里是不是有问题啊?为什么用k8s的client进行创建?
if err := createDeviceProfile(ds.log, ds.Client, newKDevs, ds.NodePool); err != nil {
ds.log.Error(err, "fail to create device profiles")
continue
}
}
// update the device profiles according EdgeX
// 更新的工作
if len(updateKDevs) != 0 {
// TODO
}
// delete the device profiles on Kubernetes but not on Egdex
// 删除这些在k8s上,但是不在EdgeX上的device profile
deleteKDevs := findDeleteDeviceProfile(eDevs, kDevs.Items)
if len(deleteKDevs) != 0 {
if err := deleteDeviceProfile(ds.log, ds.Client, deleteKDevs); err != nil {
ds.log.Error(err, "fail to delete device profiles")
}
}
}
}()
<-stop
ds.log.Info("stopping the deviceprofile syncer")
}
func addNodePoolField(edgeXDevs []devicev1alpha1.DeviceProfile, NodePoolName string) {
// 给每个EdgeX上的deviceprofile添加NodePool的名称
for i, _ := range edgeXDevs {
edgeXDevs[i].Spec.NodePool = NodePoolName
}
}
// findNewUpdateDeviceProfile finds deviceprofiles that have been created on the EdgeX but not the Kubernetes
func findNewUpdateDeviceProfile(edgeXDevs, kubeDevs []devicev1alpha1.DeviceProfile) ([]devicev1alpha1.DeviceProfile, []devicev1alpha1.DeviceProfile) {
var addDevs, updateDevs []devicev1alpha1.DeviceProfile
// 新建一个update切片,将所有EdgeX上的状态与Kubernetes上状态不同的deviceprofile放入这个切片中
for _, exd := range edgeXDevs {
var exist bool
for _, kd := range kubeDevs {
if strings.ToLower(exd.Name) == util.GetEdgeNameTrimNodePool(kd.Name, kd.Spec.NodePool) {
exist = true
if !reflect.DeepEqual(exd.Spec, kd.Spec) {
kd.Spec = exd.Spec
updateDevs = append(updateDevs, kd)
}
break
}
}
// 对于在EdgeX上有,但是Kubernetes上没有的,放入addDevs中
if !exist {
addDevs = append(addDevs, exd)
}
}
// 最后返回两个切片,一个是需要在Kubernetes中添加的切片,一个是需要更新的切片
return addDevs, updateDevs
}
// findDeleteDeviceProfile finds deviceprofiles that exist on the Kubernetes but not on the EdgeX
func findDeleteDeviceProfile(edgeXDevs, kubeDevs []devicev1alpha1.DeviceProfile) []devicev1alpha1.DeviceProfile {
var deleteDevs []devicev1alpha1.DeviceProfile
for _, kd := range kubeDevs {
var exist bool
for _, exd := range edgeXDevs {
if strings.ToLower(exd.Name) == util.GetEdgeNameTrimNodePool(kd.Name, kd.Spec.NodePool) {
exist = true
break
}
}
// 如果在Kubernetes上存在,但是在EdgeX上不存在,且k8s上对应的Status.Synced状态是true的话
// 将他们添加到切片
if !exist && kd.Status.Synced {
deleteDevs = append(deleteDevs, kd)
}
}
return deleteDevs
}
func getKubeNameWithPrefix(edgeName, NodePoolName string) string {
if NodePoolName == "" {
return edgeName
}
// 返回NodePoolName-edgeName这种格式
return fmt.Sprintf("%s-%s", NodePoolName, edgeName)
}
// createDeviceProfile creates the list of device profiles
func createDeviceProfile(log logr.Logger, cli client.Client, edgeXDevs []devicev1alpha1.DeviceProfile, NodePoolName string) error {
for _, ed := range edgeXDevs {
// 设置deviceprofile的名称
ed.SetName(getKubeNameWithPrefix(ed.GetName(), NodePoolName))
// 在k8s上添加这个deviceprofile
if err := cli.Create(context.TODO(), &ed); err != nil {
if apierrors.IsAlreadyExists(err) {
log.Info("DeviceProfile already exist on Kubernetes", "deviceprofile", strings.ToLower(ed.Name))
continue
}
return err
}
if err := cli.Status().Update(context.TODO(), &ed); err != nil {
return err
}
log.Info("Successfully create DeviceProfile to Kubernetes", "DeviceProfile", ed.GetName())
}
return nil
}
func deleteDeviceProfile(log logr.Logger, cli client.Client, kubeDevs []devicev1alpha1.DeviceProfile) error {
// 在kubernetes上删除这些deviceprofile
for _, kd := range kubeDevs {
if err := cli.Delete(context.TODO(), &kd); err != nil {
if apierrors.IsNotFound(err) {
log.Info("DeviceProfile doesn't exist on Kubernetes", "deviceprofile", kd.Name)
continue
}
return err
}
log.Info("Successfully delete DeviceProfile on Kubernetes", "DeviceProfile", kd.GetName())
}
return nil
}
2021/9/21 #
Crd修改 #
DeviceServiceSpec删除Id、LastConnected、LastReported属性,添加Managed、NodePool属性
type DeviceServiceSpec struct {
// Information describing the device
Description string `json:"description,omitempty"`
// operational state - either enabled or disabled
OperatingState OperatingState `json:"operatingState,omitempty"`
// tags or other labels applied to the device service for search or other
// identification needs on the EdgeX Foundry
Labels []string `json:"labels,omitempty"`
// address (MQTT topic, HTTP address, serial bus, etc.) for reaching
// the service
Addressable Addressable `json:"addressable,omitempty"`
// Device Service Admin State
AdminState AdminState `json:"adminState,omitempty"`
// True means deviceService is managed by cloud, cloud can update the related fields
// False means cloud can't update the fields
Managed bool `json:"managed,omitempty"`
// NodePool indicates which nodePool the deviceService comes from
NodePool string `json:"nodePool,omitempty"`
}
DeviceServiceStatus添加Synced、EdgeId、LastConnected、LastReported、AdminState、Conditions字段,并且添加get和set Conditions方法
type DeviceServiceStatus struct {
// Synced indicates whether the device already exists on both OpenYurt and edge platform
Synced bool `json:"synced,omitempty"`
// the Id assigned by the edge platform
EdgeId string `json:"edgeId,omitempty"`
// time in milliseconds that the device last reported data to the core
LastConnected int64 `json:"lastConnected,omitempty"`
// time in milliseconds that the device last reported data to the core
LastReported int64 `json:"lastReported,omitempty"`
// Device Service Admin State
AdminState AdminState `json:"adminState,omitempty"`
// current deviceService state
// +optional
Conditions clusterv1.Conditions `json:"conditions,omitempty"`
}
func (ds *DeviceService) SetConditions(conditions clusterv1.Conditions) {
ds.Status.Conditions = conditions
}
func (ds *DeviceService) GetConditions() clusterv1.Conditions {
return ds.Status.Conditions
}
deviceservice_client #
type EdgexDeviceServiceClient struct {
*resty.Client
CoreMetaClient ClientURL
logr.Logger
}
func NewEdgexDeviceServiceClient(coreMetaClient ClientURL, log logr.Logger) *EdgexDeviceServiceClient {
return &EdgexDeviceServiceClient{
Client: resty.New(),
CoreMetaClient: coreMetaClient,
Logger: log,
}
}
// Create function sends a POST request to EdgeX to add a new deviceService
func (eds *EdgexDeviceServiceClient) Create(ctx context.Context, deviceservice *v1alpha1.DeviceService, options edgeCli.CreateOptions) (*v1alpha1.DeviceService, error) {
// 将kubernetes上的deviceservice转化为EdgeX上的DevcieServcie
ds := toEdgexDeviceService(deviceservice)
eds.V(5).Info("will add the DeviceServices",
"DeviceService", ds.Name)
dpJson, err := json.Marshal(&ds)
if err != nil {
return nil, err
}
// 变成JSON格式,发起POST请求
postPath := fmt.Sprintf("http://%s:%d%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath)
// 接收返回信息
resp, err := eds.R().
SetBody(dpJson).Post(postPath)
if err != nil {
return nil, err
} else if resp.StatusCode() != http.StatusOK {
// 如果返回结果不是http.StatusOK
return nil, fmt.Errorf("create deviceService on edgex foundry failed, the response is : %s", resp.Body())
}
createdDs := deviceservice.DeepCopy()
// 修改Status状态
createdDs.Status.EdgeId = string(resp.Body())
return createdDs, err
}
// Delete function sends a request to EdgeX to delete a deviceService
func (eds *EdgexDeviceServiceClient) Delete(ctx context.Context, name string, option edgeCli.DeleteOptions) error {
eds.V(5).Info("will delete the DeviceService",
"DeviceService", name)
// 拼出Delete的URL
delURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, name)
// 发起删除的请求
resp, err := eds.R().Delete(delURL)
if err != nil {
return err
}
if string(resp.Body()) != "true" {
return errors.New(string(resp.Body()))
}
return nil
}
// Update is used to set the admin or operating state of the deviceService by unique name of the deviceService.
// TODO support to update other fields
func (eds *EdgexDeviceServiceClient) Update(ctx context.Context, ds *v1alpha1.DeviceService, options edgeCli.UpdateOptions) (*v1alpha1.DeviceService, error) {
// 看起来这里主要修改的就是AdminState和OperatingState两个字段的内容
// 根据deviceservice找到deviceservice的名称
actualDSName := getEdgeDeviceServiceName(ds)
// 生成需要put的url
putBaseURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, actualDSName)
if ds == nil {
return nil, nil
}
// 如果需要更新的devicesevice的spec下的adminstate不为空
if ds.Spec.AdminState != "" {
// 拼接出请求信息
amURL := fmt.Sprintf("%s/adminstate/%s", putBaseURL, ds.Spec.AdminState)
// 发出Put请求
if rep, err := resty.New().R().SetHeader("Content-Type", "application/json").Put(amURL); err != nil {
return nil, err
} else if rep.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("failed to update deviceService: %s, get response: %s", actualDSName, string(rep.Body()))
}
}
// 如果需要更新的deviceservcie的OperatingState不为空
if ds.Spec.OperatingState != "" {
// 拼接出请求url
opURL := fmt.Sprintf("%s/opstate/%s", putBaseURL, ds.Spec.OperatingState)
// 发起put请求
if rep, err := resty.New().R().
SetHeader("Content-Type", "application/json").Put(opURL); err != nil {
return nil, err
} else if rep.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("failed to update deviceService: %s, get response: %s", actualDSName, string(rep.Body()))
}
}
return ds, nil
}
// Get is used to query the deviceService information corresponding to the deviceService name
func (eds *EdgexDeviceServiceClient) Get(ctx context.Context, name string, options edgeCli.GetOptions) (*v1alpha1.DeviceService, error) {
eds.V(5).Info("will get DeviceServices",
"DeviceService", name)
var ds v1alpha1.DeviceService
// 拼接出查询DeviceService的URL
getURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath, name)
resp, err := eds.R().Get(getURL)
if err != nil {
return &ds, err
}
// 如果没有找到
if string(resp.Body()) == "Item not found\n" ||
strings.HasPrefix(string(resp.Body()), "no item found") {
return &ds, errors.New("Item not found")
}
// 找到以后反序列化整体内容
var dp models.DeviceService
err = json.Unmarshal(resp.Body(), &dp)
// 将找到的dp转换为kubernetes上面的deviceservice
ds = toKubeDeviceService(dp)
return &ds, err
}
// List is used to get all deviceService objects on edge platform
// The Hanoi version currently supports only a single label and does not support other filters
func (eds *EdgexDeviceServiceClient) List(ctx context.Context, options edgeCli.ListOptions) ([]v1alpha1.DeviceService, error) {
eds.V(5).Info("will list DeviceServices")
// 拼接出查询deviceservice列表的url
lp := fmt.Sprintf("http://%s:%d%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, DeviceServicePath)
if options.LabelSelector != nil {
// 如果有LabelSelector,那么在查询url中添加字段
if _, ok := options.LabelSelector["label"]; ok {
lp = strings.Join([]string{lp, strings.Join([]string{"label", options.LabelSelector["label"]}, "/")}, "/")
}
}
// 发起get请求,向对应的地址来获取内容
resp, err := eds.R().
EnableTrace().
Get(lp)
if err != nil {
return nil, err
}
dss := []models.DeviceService{}
// 将获取到的内容转化为edgex上的deviceservice内容
if err := json.Unmarshal(resp.Body(), &dss); err != nil {
return nil, err
}
var res []v1alpha1.DeviceService
// 将这些内容转化为kubernetes上的内容,最终返回
for _, ds := range dss {
res = append(res, toKubeDeviceService(ds))
}
return res, nil
}
// CreateAddressable function sends a POST request to EdgeX to add a new addressable
func (eds *EdgexDeviceServiceClient) CreateAddressable(ctx context.Context, addressable *v1alpha1.Addressable, options edgeCli.CreateOptions) (*v1alpha1.Addressable, error) {
// 将传入的addressable转化为EdgeX上的Addressable格式
as := toEdgeXAddressable(addressable)
eds.V(5).Info("will add the Addressables",
"Addressable", as.Name)
// 将EdgeX上的addressable转为JSON格式
dpJson, err := json.Marshal(&as)
if err != nil {
return nil, err
}
// 构建出POST请求的url链接
postPath := fmt.Sprintf("http://%s:%d%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath)
// 发起POST请求
resp, err := eds.R().
SetBody(dpJson).Post(postPath)
if err != nil {
return nil, err
}
// 复制addressable,并且将结果写入复制后得到的createdAddr的Id中
createdAddr := addressable.DeepCopy()
createdAddr.Id = string(resp.Body())
return createdAddr, err
}
// DeleteAddressable function sends a request to EdgeX to delete a addressable
func (eds *EdgexDeviceServiceClient) DeleteAddressable(ctx context.Context, name string, options edgeCli.DeleteOptions) error {
eds.V(5).Info("will delete the Addressable",
"Addressable", name)
// 拼接出删除Addressable的url地址
delURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath, name)
// 对删除地址发起删除请求
resp, err := eds.R().Delete(delURL)
if err != nil {
return err
}
if string(resp.Body()) != "true" {
return errors.New(string(resp.Body()))
}
return nil
}
// UpdateAddressable is used to update the addressable on edgex foundry
func (eds *EdgexDeviceServiceClient) UpdateAddressable(ctx context.Context, device *v1alpha1.Addressable, options edgeCli.UpdateOptions) (*v1alpha1.Addressable, error) {
// 看来这个版本里还没完成
return nil, nil
}
// GetAddressable is used to query the addressable information corresponding to the addressable name
func (eds *EdgexDeviceServiceClient) GetAddressable(ctx context.Context, name string, options edgeCli.GetOptions) (*v1alpha1.Addressable, error) {
eds.V(5).Info("will get Addressables",
"Addressable", name)
var addressable v1alpha1.Addressable
// 拼接出获取这个Addressable的Get请求
getURL := fmt.Sprintf("http://%s:%d%s/name/%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath, name)
resp, err := eds.R().Get(getURL)
if err != nil {
return &addressable, err
}
// 看返回结果是否没有找到
if string(resp.Body()) == "Item not found\n" {
return &addressable, errors.New("Item not found")
}
var maddr models.Addressable
// 将返回结果转换为EdgeX上的Addressable类型
err = json.Unmarshal(resp.Body(), &maddr)
// 再将EdgeX上的Addressable类型转换为Kubernetes上的EdgeX格式内容
addressable = toKubeAddressable(maddr)
return &addressable, err
}
// ListAddressables is used to get all addressable objects on edge platform
func (eds *EdgexDeviceServiceClient) ListAddressables(ctx context.Context, options edgeCli.ListOptions) ([]v1alpha1.Addressable, error) {
eds.V(5).Info("will list Addressables")
// 拼接获取查询Addressable列表的地址
lp := fmt.Sprintf("http://%s:%d%s",
eds.CoreMetaClient.Host, eds.CoreMetaClient.Port, AddressablePath)
// 对这个地址发起请求
resp, err := eds.R().
EnableTrace().
Get(lp)
if err != nil {
return nil, err
}
// 将结果写入EdgeX上的Addressable结构体中
ass := []models.Addressable{}
if err := json.Unmarshal(resp.Body(), &ass); err != nil {
return nil, err
}
// 将EdgeX上的Addressable转换为Kubernetes上的Addressable
var res []v1alpha1.Addressable
for i := range ass {
res = append(res, toKubeAddressable(ass[i]))
}
return res, nil
}
deviceservice_controller #
func (r *DeviceServiceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
log := r.Log.WithValues("deviceService", req.NamespacedName)
var ds devicev1alpha1.DeviceService
// 首先,在Kubernetes中找到这个DeviceService
if err := r.Get(ctx, req.NamespacedName, &ds); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// If objects doesn't belong to the edge platform to which the controller is connected, the controller does not handle events for that object
// 如果对象不属于控制器所连接的边缘平台,则控制器不处理该对象的事件
if ds.Spec.NodePool != r.NodePool {
return ctrl.Result{}, nil
}
log.V(4).Info("Reconciling the DeviceService object", "DeviceService", ds.GetName())
// Update deviceService conditions
// 更新deviceService的状态
defer func() {
conditions.SetSummary(&ds,
conditions.WithConditions(
devicev1alpha1.DeviceServiceSyncedCondition, devicev1alpha1.DeviceServiceManagingCondition),
)
err := r.Status().Update(ctx, &ds)
if client.IgnoreNotFound(err) != nil {
log.Error(err, "update deviceService conditions failed", "deviceService")
}
}()
// 1. Handle the deviceService deletion event
// 处理deviceService的删除事件
if err := r.reconcileDeleteDeviceService(ctx, &ds); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
} else if !ds.ObjectMeta.DeletionTimestamp.IsZero() {
return ctrl.Result{}, nil
}
if ds.Status.Synced == false {
// 如果deviceservice的synced状态是false的话,就要创建这个deviceservice
// 2. Synchronize OpenYurt deviceService to edge platform
if err := r.reconcileCreateDeviceService(ctx, &ds, log); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
return ctrl.Result{}, err
}
}
} else if ds.Spec.Managed == true {
// 3. If the deviceService has been synchronized and is managed by the cloud, reconcile the deviceService fields
// 如果这个deviceservice已经被同步了,且被云端管理,就要更新这个deviceservice
if err := r.reconcileUpdateDeviceService(ctx, &ds, log); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
return ctrl.Result{}, err
}
}
}
return ctrl.Result{}, nil
}
func (r *DeviceServiceReconciler) SetupWithManager(mgr ctrl.Manager) error {
coreMetaCliInfo := edgexCli.ClientURL{Host: "edgex-core-metadata", Port: 48081}
r.deviceServiceCli = edgexCli.NewEdgexDeviceServiceClient(coreMetaCliInfo, r.Log)
nodePool, err := util.GetNodePool(mgr.GetConfig())
if err != nil {
return err
}
r.NodePool = nodePool
// register the filter field for deviceService
// 这里的filter field还不懂是什么意思
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &devicev1alpha1.DeviceService{}, "spec.nodePool", func(rawObj client.Object) []string {
deviceService := rawObj.(*devicev1alpha1.DeviceService)
return []string{deviceService.Spec.NodePool}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&devicev1alpha1.DeviceService{}).
Complete(r)
}
func (r *DeviceServiceReconciler) reconcileDeleteDeviceService(ctx context.Context, ds *devicev1alpha1.DeviceService) error {
// gets the actual name of deviceService on the edge platform from the Label of the device
// 从设备的Label中获取边缘平台上deviceService的实际名称
edgeDeviceServiceName := ds.ObjectMeta.Labels[EdgeXObjectName]
if ds.ObjectMeta.DeletionTimestamp.IsZero() {
// 这个应用还没被删除
// 这里要注意,如果DeletionTimestam不为0的话就是说明正在删除
if len(ds.GetFinalizers()) == 0 {
patchString := map[string]interface{}{
"metadata": map[string]interface{}{
"finalizers": []string{devicev1alpha1.DeviceServiceFinalizer},
},
}
// 如果没有写好finalizers,那么就新建一个finalizers,用patch的方式发布到k8s上
if patchData, err := json.Marshal(patchString); err != nil {
return err
} else {
if err = r.Patch(ctx, ds, client.RawPatch(types.MergePatchType, patchData)); err != nil {
return err
}
}
}
} else {
// 如果是!ds.ObjectMeta.DeletionTimestamp.IsZero()的意思就是要删除这个对象
// 这一步是为了去掉finalizers
patchString := map[string]interface{}{
"metadata": map[string]interface{}{
"finalizers": []string{},
},
}
// delete the deviceService in OpenYurt
// 先去掉这个deviceservice的finalizers,保证他可以被删除了,就会被系统回收了
if patchData, err := json.Marshal(patchString); err != nil {
return err
} else {
if err = r.Patch(ctx, ds, client.RawPatch(types.MergePatchType, patchData)); err != nil {
return err
}
}
// delete the deviceService object on edge platform
// 调用写好的Delete函数,发请求将EdgeX上的这个deviceservice进行删除
err := r.deviceServiceCli.Delete(nil, edgeDeviceServiceName, edgeInterface.DeleteOptions{})
if err != nil && !clis.IsNotFoundErr(err) {
return err
}
}
return nil
}
func (r *DeviceServiceReconciler) reconcileCreateDeviceService(ctx context.Context, ds *devicev1alpha1.DeviceService, log logr.Logger) error {
// get the actual name of deviceService on the Edge platform from the Label of the device
edgeDeviceName := ds.ObjectMeta.Labels[EdgeXObjectName]
log.V(4).Info("Checking if deviceService already exist on the edge platform", "deviceService", ds.GetName())
// Checking if deviceService already exist on the edge platform
// 判断这个deviceService是不是已经在EdgeX平台上了
if edgeDs, err := r.deviceServiceCli.Get(nil, edgeDeviceName, edgeInterface.GetOptions{}); err != nil {
if !clis.IsNotFoundErr(err) {
// 如果这个不是IsNotFoundErr的话就要立马退出
log.V(4).Error(err, "fail to visit the edge platform")
return nil
}
} else {
// a. If object exists, the status of the device on OpenYurt is updated
// 到这里说明这个对象是存在的,那么我们要对这个对象进行更新
log.V(4).Info("DeviceService already exists on edge platform")
// 更新他的同步的状态
ds.Status.Synced = true
ds.Status.EdgeId = edgeDs.Status.EdgeId
return r.Status().Update(ctx, ds)
}
// b. If object does not exist, a request is sent to the edge platform to create a new deviceService and related addressable
// 如果这个对象不存在,那就需要向EdgeX平台发起请求来创建一个deviceService和相关的addressable
addressable := ds.Spec.Addressable
// 先看这个addressable是否存在
as, err := r.deviceServiceCli.GetAddressable(nil, addressable.Name, edgeInterface.GetOptions{})
if err == nil {
// 存在了
log.V(4).Info("Addressable already exists on edge platform")
ds.Spec.Addressable = *as
} else if clis.IsNotFoundErr(err) {
// 出错了,但是是不存在的错误,就要创建一个addressable
// 在EdgeX上创建一个与addressable相同的Addressable
createdAddr, err := r.deviceServiceCli.CreateAddressable(nil, &addressable, edgeInterface.CreateOptions{})
if err != nil {
conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceSyncedCondition, "failed to add addressable to EdgeX", clusterv1.ConditionSeverityWarning, err.Error())
return fmt.Errorf("failed to add addressable to edge platform: %v", err)
}
log.V(4).Info("Successfully add the Addressable to edge platform",
"Addressable", addressable.Name, "EdgeId", createdAddr.Id)
// 返回的这个createdAddr其实就是addressable加了edgexid和更新了同步状态而已
ds.Spec.Addressable.Id = createdAddr.Id
} else {
// 否则就是其他错误了,就要停止了
log.V(4).Error(err, "fail to visit the edge platform core-metatdata-service")
conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceSyncedCondition, "failed to visit the EdgeX core-metadata-service", clusterv1.ConditionSeverityWarning, err.Error())
return err
}
// 将创建/更新好的addressable进行更新同步
if err = r.Update(ctx, ds); err != nil {
return err
}
// 在EdgeX平台创建deviceservice
createdDs, err := r.deviceServiceCli.Create(nil, ds, edgeInterface.CreateOptions{})
if err != nil {
log.V(4).Error(err, "failed to create deviceService on edge platform")
conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceSyncedCondition, "failed to add DeviceService to EdgeX", clusterv1.ConditionSeverityWarning, err.Error())
return fmt.Errorf("fail to add DeviceService to edge platform: %v", err)
}
log.V(4).Info("Successfully add DeviceService to Edge Platform",
"DeviceService", ds.GetName(), "EdgeId", createdDs.Status.EdgeId)
ds.Status.EdgeId = createdDs.Status.EdgeId
ds.Status.Synced = true
conditions.MarkTrue(ds, devicev1alpha1.DeviceServiceSyncedCondition)
return r.Status().Update(ctx, ds)
}
func (r *DeviceServiceReconciler) reconcileUpdateDeviceService(ctx context.Context, ds *devicev1alpha1.DeviceService, log logr.Logger) error {
// 1. reconciling the AdminState field of deviceService
// 获取deviceservice的状态
newDeviceServiceStatus := ds.Status.DeepCopy()
// 获取要更新的deviceservice内容
updateDeviceService := ds.DeepCopy()
// do not update deviceService's OperatingState
// 不要更新deviceservice的OperatingState
updateDeviceService.Spec.OperatingState = ""
// 如果spec的adminstate不为空,且与status状态不同的话
// adminstate新的状态要写成旧的预期状态
if ds.Spec.AdminState != "" && ds.Spec.AdminState != ds.Status.AdminState {
newDeviceServiceStatus.AdminState = ds.Spec.AdminState
} else {
updateDeviceService.Spec.AdminState = ""
}
// 更新EdgeX上的adminstate状态
_, err := r.deviceServiceCli.Update(nil, updateDeviceService, edgeInterface.UpdateOptions{})
if err != nil {
conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceManagingCondition, "failed to update AdminState of deviceService on edge platform", clusterv1.ConditionSeverityWarning, err.Error())
return err
}
// 2. update the device status on OpenYurt
// 更新kubernetes上的device的状态
ds.Status = *newDeviceServiceStatus
if err = r.Status().Update(ctx, ds); err != nil {
conditions.MarkFalse(ds, devicev1alpha1.DeviceServiceManagingCondition, "failed to update status of deviceService on openyurt", clusterv1.ConditionSeverityWarning, err.Error())
return err
}
conditions.MarkTrue(ds, devicev1alpha1.DeviceServiceManagingCondition)
return nil
}
deviceservice_syncer #
func (ds *DeviceServiceSyncer) Run(stop <-chan struct{}) {
ds.log.V(1).Info("starting the DeviceServiceSyncer...")
go func() {
for {
<-time.After(ds.syncPeriod)
// 时间到了,需要进行同步
// 1. get deviceServices on edge platform and OpenYurt
// 首先获取所有在EdgeX平台上的deviceservice列表和在kubernetes上的deviceservice列表
edgeDeviceServices, kubeDeviceServices, err := ds.getAllDeviceServices()
if err != nil {
ds.log.V(3).Error(err, "fail to list the deviceServices")
continue
}
// 2. find the deviceServices that need to be synchronized
// 接着获取需要做更改的map,需要做同步的map
redundantEdgeDeviceServices, redundantKubeDeviceServices, syncedDeviceServices :=
ds.findDiffDeviceServices(edgeDeviceServices, kubeDeviceServices)
ds.log.V(1).Info("The number of deviceServices waiting for synchronization",
"Edge deviceServices should be added to OpenYurt", len(redundantEdgeDeviceServices),
"OpenYurt deviceServices that should be deleted", len(redundantKubeDeviceServices),
"DeviceServices that should be synchronized", len(syncedDeviceServices))
// 3. create deviceServices on OpenYurt which are exists in edge platform but not in OpenYurt
// 在kubernetes上创建在EdgeX平台上但是不在kubernetes上的deviceservice
if err := ds.syncEdgeToKube(redundantEdgeDeviceServices); err != nil {
ds.log.V(3).Error(err, "fail to create deviceServices on OpenYurt")
continue
}
// 4. delete redundant deviceServices on OpenYurt
// 删除在kubernetes上存在,但是在EdgeX上不存在的deviceservice
if err := ds.deleteDeviceServices(redundantKubeDeviceServices); err != nil {
ds.log.V(3).Error(err, "fail to delete redundant deviceServices on OpenYurt")
continue
}
// 5. update deviceService status on OpenYurt
// 更新在kubernetes上的状态
if err := ds.updateDeviceServices(syncedDeviceServices); err != nil {
ds.log.Error(err, "fail to update deviceServices")
continue
}
ds.log.V(1).Info("One round of DeviceService synchronization is complete")
}
}()
<-stop
ds.log.V(1).Info("stopping the deviceService syncer")
}
// Get the existing DeviceService on the Edge platform, as well as OpenYurt existing DeviceService
// edgeDeviceServices:map[actualName]DeviceService
// kubeDeviceServices:map[actualName]DeviceService
func (ds *DeviceServiceSyncer) getAllDeviceServices() (
map[string]devicev1alpha1.DeviceService, map[string]devicev1alpha1.DeviceService, error) {
edgeDeviceServices := map[string]devicev1alpha1.DeviceService{}
kubeDeviceServices := map[string]devicev1alpha1.DeviceService{}
// 1. list deviceServices on edge platform
// 获取所有在EdgeX平台上的deviceServices
eDevSs, err := ds.deviceServiceCli.List(nil, iotcli.ListOptions{})
if err != nil {
ds.log.V(4).Error(err, "fail to list the deviceServices object on the edge platform")
return edgeDeviceServices, kubeDeviceServices, err
}
// 2. list deviceServices on OpenYurt (filter objects belonging to edgeServer)
// 获取所有在kubernetes平台上的deviceServices
var kDevSs devicev1alpha1.DeviceServiceList
listOptions := client.MatchingFields{"spec.nodePool": ds.NodePool}
// 找到所有nodePool名称是这个的内容
if err = ds.List(context.TODO(), &kDevSs, listOptions); err != nil {
ds.log.V(4).Error(err, "fail to list the deviceServices object on the Kubernetes")
return edgeDeviceServices, kubeDeviceServices, err
}
// 下面两个就是将对应的值放入map对应名称里
for i := range eDevSs {
deviceServicesName := eDevSs[i].Labels[EdgeXObjectName]
edgeDeviceServices[deviceServicesName] = eDevSs[i]
}
for i := range kDevSs.Items {
deviceServicesName := kDevSs.Items[i].Labels[EdgeXObjectName]
kubeDeviceServices[deviceServicesName] = kDevSs.Items[i]
}
return edgeDeviceServices, kubeDeviceServices, nil
}
// Get the list of deviceServices that need to be added, deleted and updated
func (ds *DeviceServiceSyncer) findDiffDeviceServices(
edgeDeviceService map[string]devicev1alpha1.DeviceService, kubeDeviceService map[string]devicev1alpha1.DeviceService) (
redundantEdgeDeviceServices map[string]*devicev1alpha1.DeviceService, redundantKubeDeviceServices map[string]*devicev1alpha1.DeviceService, syncedDeviceServices map[string]*devicev1alpha1.DeviceService) {
// 用来记录所有的增加、删除、更新
redundantEdgeDeviceServices = map[string]*devicev1alpha1.DeviceService{}
redundantKubeDeviceServices = map[string]*devicev1alpha1.DeviceService{}
syncedDeviceServices = map[string]*devicev1alpha1.DeviceService{}
// 先遍历EdgeX上的deviceservice
for n, v := range edgeDeviceService {
// 获取名字
edName := v.Labels[EdgeXObjectName]
if _, exists := kubeDeviceService[edName]; !exists {
// 如果这个内容在kubernetes上不存在的话 (新增)
ed := edgeDeviceService[n]
// 加入到map中
redundantEdgeDeviceServices[edName] = ds.completeCreateContent(&ed)
} else {
// 如果在kubernetes中存在,那么加入synced的map中(更新)
kd := kubeDeviceService[edName]
ed := edgeDeviceService[n]
syncedDeviceServices[edName] = ds.completeUpdateContent(&kd, &ed)
}
}
for k, v := range kubeDeviceService {
// 遍历kubernetes中的deviceservice
if !v.Status.Synced {
continue
}
// 找到Synced状态是true的,然后根据名字,将在k8s但是不在EdgeX上的放入map中
kdName := v.Labels[EdgeXObjectName]
if _, exists := edgeDeviceService[kdName]; !exists {
kd := kubeDeviceService[k]
redundantKubeDeviceServices[kdName] = &kd
}
}
return
}
// syncEdgeToKube creates deviceServices on OpenYurt which are exists in edge platform but not in OpenYurt
func (ds *DeviceServiceSyncer) syncEdgeToKube(edgeDevs map[string]*devicev1alpha1.DeviceService) error {
for _, ed := range edgeDevs {
// 在kubernetes依次创建这些EdgeX列表
if err := ds.Client.Create(context.TODO(), ed); err != nil {
if apierrors.IsAlreadyExists(err) {
ds.log.V(5).Info("DeviceService already exist on Kubernetes",
"DeviceService", strings.ToLower(ed.Name))
continue
}
ds.log.Info("created deviceService failed:",
"DeviceService", strings.ToLower(ed.Name))
return err
}
}
return nil
}
// deleteDeviceServices deletes redundant deviceServices on OpenYurt
func (ds *DeviceServiceSyncer) deleteDeviceServices(redundantKubeDeviceServices map[string]*devicev1alpha1.DeviceService) error {
// 在kubernetes上删除这些deviceservice列表
for i := range redundantKubeDeviceServices {
if err := ds.Client.Delete(context.TODO(), redundantKubeDeviceServices[i]); err != nil {
ds.log.V(5).Error(err, "fail to delete the DeviceService on Kubernetes",
"DeviceService", redundantKubeDeviceServices[i].Name)
return err
}
}
return nil
}
// updateDeviceServicesStatus updates deviceServices status on OpenYurt
func (ds *DeviceServiceSyncer) updateDeviceServices(syncedDeviceServices map[string]*devicev1alpha1.DeviceService) error {
for _, sd := range syncedDeviceServices {
if sd.ObjectMeta.ResourceVersion == "" {
continue
}
if err := ds.Client.Status().Update(context.TODO(), sd); err != nil {
if apierrors.IsConflict(err) {
ds.log.V(5).Info("update Conflicts",
"DeviceService", sd.Name)
continue
}
ds.log.V(5).Error(err, "fail to update the DeviceService on Kubernetes",
"DeviceService", sd.Name)
return err
}
}
return nil
}
// 下面两个函数的作用是,完善kubernetes与EdgeX平台上deviceservice的转换
// completeCreateContent completes the content of the deviceService which will be created on OpenYurt
func (ds *DeviceServiceSyncer) completeCreateContent(edgeDS *devicev1alpha1.DeviceService) *devicev1alpha1.DeviceService {
createDevice := edgeDS.DeepCopy()
createDevice.Spec.NodePool = ds.NodePool
createDevice.Name = strings.Join([]string{ds.NodePool, createDevice.Name}, "-")
createDevice.Spec.Managed = false
return createDevice
}
// completeUpdateContent completes the content of the deviceService which will be updated on OpenYurt
func (ds *DeviceServiceSyncer) completeUpdateContent(kubeDS *devicev1alpha1.DeviceService, edgeDS *devicev1alpha1.DeviceService) *devicev1alpha1.DeviceService {
updatedDS := kubeDS.DeepCopy()
// update device status
updatedDS.Status.LastConnected = edgeDS.Status.LastConnected
updatedDS.Status.LastReported = edgeDS.Status.LastReported
updatedDS.Status.AdminState = edgeDS.Status.AdminState
return updatedDS
}
2021/9/22 #
Crd修改 #
DeviceSpec添加了Managed和NodePool两个字段
// DeviceSpec defines the desired state of Device
type DeviceSpec struct {
// Information describing the device
Description string `json:"description,omitempty"`
// Admin state (locked/unlocked)
AdminState AdminState `json:"adminState,omitempty"`
// Operating state (enabled/disabled)
OperatingState OperatingState `json:"operatingState,omitempty"`
// A map of supported protocols for the given device
Protocols map[string]ProtocolProperties `json:"protocols,omitempty"`
// Other labels applied to the device to help with searching
Labels []string `json:"labels,omitempty"`
// Device service specific location (interface{} is an empty interface so
// it can be anything)
Location string `json:"location,omitempty"`
// Associated Device Service - One per device
Service string `json:"service"`
// Associated Device Profile - Describes the device
Profile string `json:"profile"`
// True means device is managed by cloud, cloud can update the related fields
// False means cloud can't update the fields
Managed bool `json:"managed,omitempty"`
// NodePool indicates which nodePool the device comes from
NodePool string `json:"nodePool,omitempty"`
// TODO support the following field
// A list of auto-generated events coming from the device
// AutoEvents []AutoEvent `json:"autoEvents"`
// DeviceProperties represents the expected state of the device's properties
DeviceProperties map[string]DesiredPropertyState `json:"deviceProperties,omitempty"`
}
DeviceStatus中添加Synced、EdgeId、AdminState、OperatingState、Conditions字段
// DeviceStatus defines the observed state of Device
type DeviceStatus struct {
// Time (milliseconds) that the device last provided any feedback or
// responded to any request
LastConnected int64 `json:"lastConnected,omitempty"`
// Time (milliseconds) that the device reported data to the core
// microservice
LastReported int64 `json:"lastReported,omitempty"`
// Synced indicates whether the device already exists on both OpenYurt and edge platform
Synced bool `json:"synced,omitempty"`
// it represents the actual state of the device's properties
DeviceProperties map[string]ActualPropertyState `json:"deviceProperties,omitempty"`
EdgeId string `json:"edgeId,omitempty"`
// Admin state (locked/unlocked)
AdminState AdminState `json:"adminState,omitempty"`
// Operating state (enabled/disabled)
OperatingState OperatingState `json:"operatingState,omitempty"`
// current device state
// +optional
Conditions clusterv1.Conditions `json:"conditions,omitempty"`
}
device_client #
// Create function sends a POST request to EdgeX to add a new device
func (efc *EdgexDeviceClient) Create(ctx context.Context, device *devicev1alpha1.Device, options edgeCli.CreateOptions) (*devicev1alpha1.Device, error) {
dp := toEdgeXDevice(device)
// 将kubernetes上的device转换为EdgeX上的device
efc.V(5).Info("will add the Devices",
"Device", dp.Name)
dpJson, err := json.Marshal(&dp)
if err != nil {
return nil, err
}
// 构造post请求
postPath := fmt.Sprintf("http://%s:%d%s",
efc.CoreMetaClient.Host, efc.CoreMetaClient.Port, DevicePath)
resp, err := efc.R().
SetBody(dpJson).Post(postPath)
if err != nil {
return nil, err
} else if resp.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("create device on edgex foundry failed, the response is : %s", resp.Body())
}
// 返回成功的时候,复制一份k8s上的device
createdDevice := device.DeepCopy()
// 为他添加EdgeId字段
createdDevice.Status.EdgeId = string(resp.Body())
return createdDevice, err
}
// Delete function sends a request to EdgeX to delete a device
func (efc *EdgexDeviceClient) Delete(ctx context.Context, name string, options edgeCli.DeleteOptions) error {
efc.V(5).Info("will delete the Device",
"Device", name)
// 构造请求的url
delURL := fmt.Sprintf("http://%s:%d%s/name/%s",
efc.CoreMetaClient.Host, efc.CoreMetaClient.Port, DevicePath, name)
// 发起请求
resp, err := efc.R().Delete(delURL)
if err != nil {
return err
}
if resp.StatusCode() != http.StatusOK {
return errors.New(string(resp.Body()))
}
return nil
}
// Update is used to set the admin or operating state of the device by unique name of the device.
// TODO support to update other fields
func (efc *EdgexDeviceClient) Update(ctx context.Context, device *devicev1alpha1.Device, options edgeCli.UpdateOptions) (*devicev1alpha1.Device, error) {
actualDeviceName := getEdgeDeviceName(device)
// 构造put的url
putURL := fmt.Sprintf("http://%s:%d%s/name/%s",
efc.CoreMetaClient.Host, efc.CoreMetaClient.Port, DevicePath, actualDeviceName)
if device == nil {
return nil, nil
}
// 构造需要修改内容
updateData := map[string]string{}
if device.Spec.AdminState != "" {
updateData["adminState"] = string(device.Spec.AdminState)
}
if device.Spec.OperatingState != "" {
updateData["operatingState"] = string(device.Spec.OperatingState)
}
if len(updateData) == 0 {
return nil, nil
}
// 变成json格式
data, _ := json.Marshal(updateData)
// 发起更新请求
rep, err := resty.New().R().
SetHeader("Content-Type", "application/json").
SetBody(data).
Put(putURL)
if err != nil {
return nil, err
} else if rep.StatusCode() != http.StatusOK {
return nil, fmt.Errorf("failed to update device: %s, get response: %s", actualDeviceName, string(rep.Body()))
}
return device, nil
}
// Get is used to query the device information corresponding to the device name
func (efc *EdgexDeviceClient) Get(ctx context.Context, deviceName string, options edgeCli.GetOptions) (*devicev1alpha1.Device, error) {
efc.V(5).Info("will get Devices",
"Device", deviceName)
var device devicev1alpha1.Device
// 构造get请求地址
getURL := fmt.Sprintf("http://%s:%d%s/name/%s",
efc.CoreMetaClient.Host, efc.CoreMetaClient.Port, DevicePath, deviceName)
resp, err := efc.R().Get(getURL)
if err != nil {
return &device, err
}
// 如果返回结果是没有找到
if string(resp.Body()) == "Item not found\n" {
return &device, errors.New("Item not found")
}
var dp models.Device
// 将结果放入device中
err = json.Unmarshal(resp.Body(), &dp)
// 转换为kubernetes上的device
device = toKubeDevice(dp)
return &device, err
}
// List is used to get all device objects on edge platform
// The Hanoi version currently supports only a single label and does not support other filters
func (efc *EdgexDeviceClient) List(ctx context.Context, options edgeCli.ListOptions) ([]devicev1alpha1.Device, error) {
// 拼接获取list列表的url
lp := fmt.Sprintf("http://%s:%d%s",
efc.CoreMetaClient.Host, efc.CoreMetaClient.Port, DevicePath)
// 如果有labelselector的话,就要拼接到url中
if options.LabelSelector != nil {
if _, ok := options.LabelSelector["label"]; ok {
lp = strings.Join([]string{lp, strings.Join([]string{"label", options.LabelSelector["label"]}, "/")}, "/")
}
}
// 发起请求
resp, err := efc.R().EnableTrace().Get(lp)
if err != nil {
return nil, err
}
// 获得在EdgeX上的device列表
dps := []models.Device{}
if err := json.Unmarshal(resp.Body(), &dps); err != nil {
return nil, err
}
// 将在EdgeX上的device转换为在kubernetes上的device
var res []devicev1alpha1.Device
for _, dp := range dps {
res = append(res, toKubeDevice(dp))
}
return res, nil
}
func (efc *EdgexDeviceClient) GetPropertyState(ctx context.Context, propertyName string, d *devicev1alpha1.Device, options edgeCli.GetOptions) (*devicev1alpha1.ActualPropertyState, error) {
// 获取device的实际名称
actualDeviceName := getEdgeDeviceName(d)
// get the old property from status
// 从状态中获取旧属性(属性名称给了)
oldAps, exist := d.Status.DeviceProperties[propertyName]
propertyGetURL := ""
// 1. query the Get URL of an property
// 获取查询的get url
if !exist || (exist && oldAps.GetURL == "") {
// 如果不存在或者这个属性的geturl为空
// 根据实际设备名称获取commandRep
commandRep, err := efc.GetCommandResponseByName(actualDeviceName)
if err != nil {
return &devicev1alpha1.ActualPropertyState{}, err
}
// 找到属性名称,获取url
for _, c := range commandRep.Commands {
if c.Name == propertyName {
propertyGetURL = c.Get.URL
break
}
}
if propertyGetURL == "" {
return nil, fmt.Errorf("this property %s is not exist", propertyName)
}
} else {
propertyGetURL = oldAps.GetURL
}
// 2. get the actual property value by the getURL
// 通过get url来获取实际的属性
actualPropertyState := devicev1alpha1.ActualPropertyState{
Name: propertyName,
GetURL: propertyGetURL,
}
// 这其实就是get请求,加上了一些错误判断
if resp, err := getPropertyState(propertyGetURL); err != nil {
return nil, err
} else {
var event models.Event
// 请求结果放入event
if err := json.Unmarshal(resp.Body(), &event); err != nil {
return &devicev1alpha1.ActualPropertyState{}, err
}
// 从event中取出实际值
actualPropertyState.ActualValue = getPropertyValueFromEvent(propertyName, event)
}
return &actualPropertyState, nil
}
// getPropertyState returns different error messages according to the status code
func getPropertyState(getURL string) (*resty.Response, error) {
// 这里其实就是对错误进行了一些处理
resp, err := resty.New().R().Get(getURL)
if err != nil {
return resp, err
}
if resp.StatusCode() == 400 {
err = errors.New("request is in an invalid state")
} else if resp.StatusCode() == 404 {
err = errors.New("the requested resource does not exist")
} else if resp.StatusCode() == 423 {
err = errors.New("the device is locked (AdminState) or down (OperatingState)")
} else if resp.StatusCode() == 500 {
err = errors.New("an unexpected error occurred on the server")
}
return resp, err
}
func (efc *EdgexDeviceClient) UpdatePropertyState(ctx context.Context, propertyName string, d *devicev1alpha1.Device, options edgeCli.UpdateOptions) error {
// Get the actual device name
// 获取设备的实际名称
acturalDeviceName := getEdgeDeviceName(d)
// 来获取设备的对应属性
dps := d.Spec.DeviceProperties[propertyName]
if dps.PutURL == "" {
// 获取他对应的url
putURL, err := efc.getPropertyPutURL(acturalDeviceName, dps.Name)
if err != nil {
return err
}
dps.PutURL = putURL
}
// set the device property to desired state
efc.V(5).Info("setting the property to desired value", "property", dps.Name)
// 将内容写入后,发起put请求
rep, err := resty.New().R().
SetHeader("Content-Type", "application/json").
SetBody([]byte(fmt.Sprintf(`{"%s": "%s"}`, dps.Name, dps.DesiredValue))).
Put(dps.PutURL)
if err != nil {
return err
} else if rep.StatusCode() != http.StatusOK {
return fmt.Errorf("failed to set property: %s, get response: %s", dps.Name, string(rep.Body()))
} else if rep.Body() != nil {
// If the parameters are illegal, such as out of range, the 200 status code is also returned, but the description appears in the body
a := string(rep.Body())
if strings.Contains(a, "execWriteCmd") {
return fmt.Errorf("failed to set property: %s, get response: %s", dps.Name, string(rep.Body()))
}
}
return nil
}
// Gets the putURL from edgex foundry which is used to set the device property's value
func (efc *EdgexDeviceClient) getPropertyPutURL(deviceName, cmdName string) (string, error) {
// 获取所有设备支持的命令
cr, err := efc.GetCommandResponseByName(deviceName)
if err != nil {
return "", err
}
// 找到对应的命令就返回这个命令的put url
for _, c := range cr.Commands {
if cmdName == c.Name {
return c.Put.URL, nil
}
}
return "", errors.New("corresponding command is not found")
}
// ListPropertiesState gets all the actual property information about a device
func (efc *EdgexDeviceClient) ListPropertiesState(ctx context.Context, device *devicev1alpha1.Device, options edgeCli.ListOptions) (map[string]devicev1alpha1.DesiredPropertyState, map[string]devicev1alpha1.ActualPropertyState, error) {
// 获取设备名称
actualDeviceName := getEdgeDeviceName(device)
dps := map[string]devicev1alpha1.DesiredPropertyState{}
aps := map[string]devicev1alpha1.ActualPropertyState{}
cr, err := efc.GetCommandResponseByName(actualDeviceName)
if err != nil {
return dps, aps, err
}
for _, c := range cr.Commands {
// DesiredPropertyState only store the basic information and does not set DesiredValue
resp, err := getPropertyState(c.Get.URL)
dps[c.Name] = devicev1alpha1.DesiredPropertyState{Name: c.Name, PutURL: c.Put.URL}
if err != nil {
// 请求成功
aps[c.Name] = devicev1alpha1.ActualPropertyState{Name: c.Name, GetURL: c.Get.URL}
} else {
// 如果请求失败
var event models.Event
if err := json.Unmarshal(resp.Body(), &event); err != nil {
return dps, aps, err
}
// 从返回的event中获取属性实际值
actualValue := getPropertyValueFromEvent(c.Name, event)
aps[c.Name] = devicev1alpha1.ActualPropertyState{Name: c.Name, GetURL: c.Get.URL, ActualValue: actualValue}
}
}
return dps, aps, nil
}
// The actual property value is resolved from the returned event
// 实际的属性值,在返回的event中
func getPropertyValueFromEvent(propertyName string, modelEvent models.Event) string {
actualValue := ""
// 从event的readings中读取值
for _, k := range modelEvent.Readings {
if propertyName == k.Name {
actualValue = k.Value
break
}
}
return actualValue
}
// GetCommandResponseByName gets all commands supported by the device
// 获取所有设备支持的命令
func (efc *EdgexDeviceClient) GetCommandResponseByName(deviceName string) (
models.CommandResponse, error) {
efc.V(5).Info("will get CommandResponses",
"CommandResponse", deviceName)
var vd models.CommandResponse
getURL := fmt.Sprintf("http://%s:%d%s/name/%s",
efc.CoreCommandClient.Host, efc.CoreCommandClient.Port, CommandResponsePath, deviceName)
resp, err := efc.R().Get(getURL)
if err != nil {
return vd, err
}
if strings.Contains(string(resp.Body()), "Item not found") {
return vd, errors.New("Item not found")
}
err = json.Unmarshal(resp.Body(), &vd)
return vd, err
}
device_controller #
//+kubebuilder:rbac:groups=device.openyurt.io,resources=devices,verbs=get;list;watch;create;update;patch;delete
//+kubebuilder:rbac:groups=device.openyurt.io,resources=devices/status,verbs=get;update;patch
//+kubebuilder:rbac:groups=device.openyurt.io,resources=devices/finalizers,verbs=update
func (r *DeviceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) {
// 根据名称获取这个值(应该是kubernetes上的吧?)
log := r.Log.WithValues("device", req.NamespacedName)
var d devicev1alpha1.Device
if err := r.Get(ctx, req.NamespacedName, &d); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
}
// If objects doesn't belong to the Edge platform to which the controller is connected, the controller does not handle events for that object
// 如果对象不属于控制器所连接的Edge平台,则控制器不处理该对象的事件
if d.Spec.NodePool != r.NodePool {
return ctrl.Result{}, nil
}
log.V(4).Info("Reconciling the Device object", "Device", d.GetName())
// Update the conditions for device
defer func() {
// 更新device的conditions
conditions.SetSummary(&d,
conditions.WithConditions(devicev1alpha1.DeviceSyncedCondition, devicev1alpha1.DeviceManagingCondition),
)
err := r.Status().Update(ctx, &d)
if client.IgnoreNotFound(err) != nil {
if !apierrors.IsConflict(err) {
log.Info("err", "Conditions", d.Status.Conditions)
log.Error(err, "update device conditions failed")
}
}
}()
// 1. Handle the device deletion event
// 处理设备的删除事件
// 要删除的就删除,不要删除的就增加finalizer
if err := r.reconcileDeleteDevice(ctx, &d, log); err != nil {
return ctrl.Result{}, client.IgnoreNotFound(err)
} else if !d.ObjectMeta.DeletionTimestamp.IsZero() {
// 删除了,但是删除时间不为空(正在删除中?)就直接返回
return ctrl.Result{}, nil
}
if d.Status.Synced == false {
// 如果没有同步
// 2. Synchronize OpenYurt device objects to edge platform
// 就要在EdgeX平台创建这个设备对象
if err := r.reconcileCreateDevice(ctx, &d, log); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{Requeue: true}, nil
} else {
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
} else if d.Spec.Managed == true {
// 同步了,且被云端管理,就要对这个设备进行更新
// 3. If the device has been synchronized and is managed by the cloud, reconcile the device properties
if err := r.reconcileUpdateDevice(ctx, &d, log); err != nil {
if apierrors.IsConflict(err) {
return ctrl.Result{RequeueAfter: time.Second * 2}, nil
}
return ctrl.Result{}, err
}
}
return ctrl.Result{}, nil
}
// SetupWithManager sets up the controller with the Manager.
func (r *DeviceReconciler) SetupWithManager(mgr ctrl.Manager) error {
// 这里其实要访问两个微服务,都包装到了deviceCli中了
coreMetaCliInfo := edgexCli.ClientURL{Host: "edgex-core-metadata", Port: 48081}
coreCmdCliInfo := edgexCli.ClientURL{Host: "edgex-core-command", Port: 48082}
r.deviceCli = edgexCli.NewEdgexDeviceClient(coreMetaCliInfo, coreCmdCliInfo, r.Log)
// Gets the nodePool to which deviceController is deployed
// 获取deviceController部署到的nodePool
nodePool, err := util.GetNodePool(mgr.GetConfig())
if err != nil {
return err
}
r.NodePool = nodePool
// register the filter field for device
// 这里的作用一直没有搞懂,filter field到底是干嘛用的?
if err := mgr.GetFieldIndexer().IndexField(context.TODO(), &devicev1alpha1.Device{}, "spec.nodePool", func(rawObj client.Object) []string {
device := rawObj.(*devicev1alpha1.Device)
return []string{device.Spec.NodePool}
}); err != nil {
return err
}
return ctrl.NewControllerManagedBy(mgr).
For(&devicev1alpha1.Device{}).
WithEventFilter(genFirstUpdateFilter("device", r.Log)).
Complete(r)
}
func (r *DeviceReconciler) reconcileDeleteDevice(ctx context.Context, d *devicev1alpha1.Device, log logr.Logger) error {
// gets the actual name of the device on the Edge platform from the Label of the device
// 从设备的label中来获取这个设备在edgex平台上的真实名称
edgeDeviceName := d.ObjectMeta.Labels[EdgeXObjectName]
if d.ObjectMeta.DeletionTimestamp.IsZero() {
// 如果不用删除
if len(d.GetFinalizers()) == 0 {
// 如果他的finalizers为空
patchData, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"finalizers": []string{devicev1alpha1.DeviceFinalizer},
},
})
// 我们就要增加他的finalizer
// 以patch的方式增加
if err := r.Patch(ctx, d, client.RawPatch(types.MergePatchType, patchData)); err != nil {
return err
}
}
} else {
// delete the device object on the edge platform
// 如果要删除,就发起删除请求,将内容从EdgeX平台上删除
err := r.deviceCli.Delete(nil, edgeDeviceName, iotcli.DeleteOptions{})
if err != nil && !clis.IsNotFoundErr(err) {
return err
}
// delete the device in OpenYurt
// 然后去掉finalizer,这样以后就可以将kubernetes上的这个内容删除了
patchData, _ := json.Marshal(map[string]interface{}{
"metadata": map[string]interface{}{
"finalizers": []string{},
},
})
// 发请求,做patch修改,其实就是删掉了finalizers
if err = r.Patch(ctx, d, client.RawPatch(types.MergePatchType, patchData)); err != nil {
return err
}
}
return nil
}
func (r *DeviceReconciler) reconcileCreateDevice(ctx context.Context, d *devicev1alpha1.Device, log logr.Logger) error {
// get the actual name of the device on the Edge platform from the Label of the device
// 从设备的label中获取部署在EdgeX上的设备的真实名称
edgeDeviceName := d.ObjectMeta.Labels[EdgeXObjectName]
// 拷贝一份设备的状态
newDeviceStatus := d.Status.DeepCopy()
log.V(4).Info("Checking if device already exist on the edge platform", "device", d.GetName())
// Checking if device already exist on the edge platform
// 看是否这个设备已经存在在EdgeX平台上了
edgeDevice, err := r.deviceCli.Get(nil, edgeDeviceName, iotcli.GetOptions{})
if err == nil {
// 如果这个对象已经存在,那么就更新这个设备在kubernetes上的状态
// a. If object exists, the status of the device on OpenYurt is updated
log.V(4).Info("Device already exists on edge platform")
newDeviceStatus.EdgeId = edgeDevice.Status.EdgeId
newDeviceStatus.Synced = true
} else if clis.IsNotFoundErr(err) {
// b. If the object does not exist, a request is sent to the edge platform to create a new device
// 如果这个对象不存在,依靠IsNotFoundErr找到
log.V(4).Info("Adding device to the edge platform", "device", d.GetName())
// 就将这个对象创建
createdEdgeObj, err := r.deviceCli.Create(nil, d, iotcli.CreateOptions{})
if err != nil {
conditions.MarkFalse(d, devicev1alpha1.DeviceSyncedCondition, "failed to create device on edge platform", clusterv1.ConditionSeverityWarning, err.Error())
return fmt.Errorf("fail to add Device to edge platform: %v", err)
} else {
// 更新最新的status
log.V(4).Info("Successfully add Device to edge platform",
"EdgeDeviceName", edgeDeviceName, "EdgeId", createdEdgeObj.Status.EdgeId)
newDeviceStatus.EdgeId = createdEdgeObj.Status.EdgeId
newDeviceStatus.Synced = true
}
} else {
// 如果是其他错误,就直接报错了
log.V(4).Info("failed to visit the edge platform")
conditions.MarkFalse(d, devicev1alpha1.DeviceSyncedCondition, "failed to visit the EdgeX core-metadata-service", clusterv1.ConditionSeverityWarning, "")
return nil
}
d.Status = *newDeviceStatus
conditions.MarkTrue(d, devicev1alpha1.DeviceSyncedCondition)
// 最后更新设备的状态
return r.Status().Update(ctx, d)
}
func (r *DeviceReconciler) reconcileUpdateDevice(ctx context.Context, d *devicev1alpha1.Device, log logr.Logger) error {
// the device has been added to the edge platform, check if each device property are in the desired state
// 看是否设备的每个状态都在想要的值
newDeviceStatus := d.Status.DeepCopy()
// This list is used to hold the names of properties that failed to reconcile
// 这个列表就是存所有没有调整好的属性的名称
var failedPropertyNames []string
// 1. reconciling the AdminState and OperatingState field of device
log.V(3).Info("reconciling the AdminState and OperatingState field of device")
// 首先调整adminstate和operatingstate状态
updateDevice := d.DeepCopy()
if d.Spec.AdminState != "" && d.Spec.AdminState != d.Status.AdminState {
newDeviceStatus.AdminState = d.Spec.AdminState
} else {
updateDevice.Spec.AdminState = ""
}
if d.Spec.OperatingState != "" && d.Spec.OperatingState != d.Status.OperatingState {
newDeviceStatus.OperatingState = d.Spec.OperatingState
} else {
updateDevice.Spec.OperatingState = ""
}
_, err := r.deviceCli.Update(nil, updateDevice, iotcli.UpdateOptions{})
if err != nil {
conditions.MarkFalse(d, devicev1alpha1.DeviceManagingCondition, "failed to update AdminState or OperatingState of device on edge platform", clusterv1.ConditionSeverityWarning, err.Error())
return err
}
// 2. reconciling the device properties' value
// 调整设备属性的值
log.V(3).Info("reconciling the device properties")
// property updates are made only when the device is operational and unlocked
if newDeviceStatus.OperatingState == devicev1alpha1.Enabled && newDeviceStatus.AdminState == devicev1alpha1.UnLocked {
// operatingstate为enable且adminstate为unlock的时候
newDeviceStatus, failedPropertyNames = r.reconcileDeviceProperties(d, newDeviceStatus, log)
}
// 更新状态
d.Status = *newDeviceStatus
// 3. update the device status on OpenYurt
// 更新在kubernetes上的device状态
log.V(3).Info("update the device status")
if err := r.Status().Update(ctx, d); err != nil {
conditions.MarkFalse(d, devicev1alpha1.DeviceManagingCondition, "failed to update status of device on openyurt", clusterv1.ConditionSeverityWarning, err.Error())
return err
} else if len(failedPropertyNames) != 0 {
// 没有成功修改的属性的名称
err = fmt.Errorf("the following device properties failed to reconcile: %v", failedPropertyNames)
conditions.MarkFalse(d, devicev1alpha1.DeviceManagingCondition, err.Error(), clusterv1.ConditionSeverityInfo, "")
return err
}
conditions.MarkTrue(d, devicev1alpha1.DeviceManagingCondition)
return nil
}
// Update the actual property value of the device on edge platform,
// return the latest status and the names of the property that failed to update
func (r *DeviceReconciler) reconcileDeviceProperties(d *devicev1alpha1.Device, deviceStatus *devicev1alpha1.DeviceStatus, log logr.Logger) (*devicev1alpha1.DeviceStatus, []string) {
newDeviceStatus := deviceStatus.DeepCopy()
// This list is used to hold the names of properties that failed to reconcile
// 这个列表用来记录没有成功调整的属性的名称
var failedPropertyNames []string
// 2. reconciling the device properties' value
log.V(3).Info("reconciling the device properties' value")
for _, desiredProperty := range d.Spec.DeviceProperties {
if desiredProperty.DesiredValue == "" {
continue
}
propertyName := desiredProperty.Name
// 获取属性对应的真实值
// 1.1. gets the actual property value of the current device from edge platform
log.V(4).Info("getting the actual property state", "property", propertyName)
actualProperty, err := r.deviceCli.GetPropertyState(nil, propertyName, d, iotcli.GetOptions{})
if err != nil {
// 如果出错了,就将这个属性值放入列表中
failedPropertyNames = append(failedPropertyNames, propertyName)
continue
}
log.V(4).Info("got the actual property state",
"property name", propertyName,
"property getURL", actualProperty.GetURL,
"property actual value", actualProperty.ActualValue)
if newDeviceStatus.DeviceProperties == nil {
// 如果是第一个属性,就要新建一个DeviceProperties
newDeviceStatus.DeviceProperties = map[string]devicev1alpha1.ActualPropertyState{}
}
// 将相应的属性名和真实属性填入
newDeviceStatus.DeviceProperties[propertyName] = *actualProperty
// 1.2. set the device attribute in the edge platform to the expected value
if desiredProperty.DesiredValue != actualProperty.ActualValue {
// 如果想要的属性值与实际的不相同时
log.V(4).Info("the desired value and the actual value are different",
"desired value", desiredProperty.DesiredValue,
"actual value", actualProperty.ActualValue)
// 就将这个属性更新一下
if err := r.deviceCli.UpdatePropertyState(nil, propertyName, d, iotcli.UpdateOptions{}); err != nil {
log.V(4).Error(err, "failed to update property", "propertyName", propertyName)
// 如果出错了,就将结果写入列表
failedPropertyNames = append(failedPropertyNames, propertyName)
continue
}
log.V(4).Info("successfully set the property to desired value", "property", propertyName)
// 更新后的属性
newActualProperty := devicev1alpha1.ActualPropertyState{
Name: propertyName,
GetURL: actualProperty.GetURL,
ActualValue: desiredProperty.DesiredValue,
}
newDeviceStatus.DeviceProperties[propertyName] = newActualProperty
}
}
return newDeviceStatus, failedPropertyNames
}
device_syncer #
func (ds *DeviceSyncer) Run(stop <-chan struct{}) {
ds.log.V(1).Info("starting the DeviceSyncer...")
go func() {
for {
// 到了同步时间
<-time.After(ds.syncPeriod)
// 1. get device on edge platform and OpenYurt
// 获取kubernetes上和edgex上的device列表
edgeDevices, kubeDevices, err := ds.getAllDevices()
if err != nil {
ds.log.V(3).Error(err, "fail to list the devices")
continue
}
// 2. find the device that need to be synchronized
// 找到需要被同步的设备
redundantEdgeDevices, redundantKubeDevices, syncedDevices := ds.findDiffDevice(edgeDevices, kubeDevices)
ds.log.V(1).Info("The number of devices waiting for synchronization",
"Edge device should be added to OpenYurt", len(redundantEdgeDevices),
"OpenYurt device that should be deleted", len(redundantKubeDevices),
"Devices that should be synchronized", len(syncedDevices))
// 3. create device on OpenYurt which are exists in edge platform but not in OpenYurt
// 在kubernetes平台上创建在edgex平台上但是不在kubernetes平台上的设备
if err := ds.syncEdgeToKube(redundantEdgeDevices); err != nil {
ds.log.V(3).Error(err, "fail to create devices on OpenYurt")
continue
}
// 4. delete redundant device on OpenYurt
// 删除kubernetes上多余的设备
if err := ds.deleteDevices(redundantKubeDevices); err != nil {
ds.log.V(3).Error(err, "fail to delete redundant devices on OpenYurt")
continue
}
// 5. update device status on OpenYurt
// 更新在kubernetes上的设备状态
if err := ds.updateDevices(syncedDevices); err != nil {
ds.log.Error(err, "fail to update devices status")
continue
}
ds.log.V(1).Info("One round of Device synchronization is complete")
}
}()
<-stop
ds.log.V(1).Info("stopping the device syncer")
}
// Get the existing Device on the Edge platform, as well as OpenYurt existing Device
// edgeDevice:map[actualName]device
// kubeDevice:map[actualName]device
func (ds *DeviceSyncer) getAllDevices() (map[string]devicev1alpha1.Device, map[string]devicev1alpha1.Device, error) {
edgeDevice := map[string]devicev1alpha1.Device{}
kubeDevice := map[string]devicev1alpha1.Device{}
// 1. list devices on edge platform
// 获取在edgex平台上的设备列表
eDevs, err := ds.deviceCli.List(nil, edgeCli.ListOptions{})
if err != nil {
ds.log.V(4).Error(err, "fail to list the devices object on the Edge Platform")
return edgeDevice, kubeDevice, err
}
// 2. list devices on OpenYurt (filter objects belonging to edgeServer)
// 获取在kubernetes平台上的设备列表
var kDevs devicev1alpha1.DeviceList
listOptions := client.MatchingFields{"spec.nodePool": ds.NodePool}
if err = ds.List(context.TODO(), &kDevs, listOptions); err != nil {
ds.log.V(4).Error(err, "fail to list the devices object on the OpenYurt")
return edgeDevice, kubeDevice, err
}
// 将他们变成 map[actualName]device 这种样式
for i := range eDevs {
deviceName := getActualName(&eDevs[i])
edgeDevice[deviceName] = eDevs[i]
}
for i := range kDevs.Items {
deviceName := getActualName(&kDevs.Items[i])
kubeDevice[deviceName] = kDevs.Items[i]
}
return edgeDevice, kubeDevice, nil
}
// Get the list of devices that need to be added, deleted and updated
func (ds *DeviceSyncer) findDiffDevice(
edgeDevice map[string]devicev1alpha1.Device, kubeDevice map[string]devicev1alpha1.Device) (
redundantEdgeDevices map[string]*devicev1alpha1.Device, redundantKubeDevices map[string]*devicev1alpha1.Device, syncedDevices map[string]*devicev1alpha1.Device) {
// 获取需要被增加、删除、修改的设备列表
redundantEdgeDevices = map[string]*devicev1alpha1.Device{}
redundantKubeDevices = map[string]*devicev1alpha1.Device{}
syncedDevices = map[string]*devicev1alpha1.Device{}
for n := range edgeDevice {
// 遍历edgex上设备的列表
tmp := edgeDevice[n]
edName := getActualName(&tmp)
if _, exists := kubeDevice[edName]; !exists {
// 如果在kubernetes上不存在这个内容
ed := edgeDevice[n]
// 那就放在添加的list里面
redundantEdgeDevices[edName] = ds.completeCreateContent(&ed)
} else {
// 否则放在更新的list里面
kd := kubeDevice[edName]
ed := edgeDevice[edName]
syncedDevices[edName] = ds.completeUpdateContent(&kd, &ed)
}
}
for n, v := range kubeDevice {
// 找到kubernetes上有,但是edgex上没有的设备
if !v.Status.Synced {
continue
}
tmp := kubeDevice[n]
kdName := getActualName(&tmp)
if _, exists := edgeDevice[kdName]; !exists {
kd := kubeDevice[n]
redundantKubeDevices[kdName] = &kd
}
}
return
}
// syncEdgeToKube creates device on OpenYurt which are exists in edge platform but not in OpenYurt
func (ds *DeviceSyncer) syncEdgeToKube(edgeDevs map[string]*devicev1alpha1.Device) error {
for _, ed := range edgeDevs {
// 遍历这些设备,并在kubernetes上创建这些设备。
// (这些设备是在edgex上有,但是在kubernetes行没有)
if err := ds.Client.Create(context.TODO(), ed); err != nil {
if apierrors.IsAlreadyExists(err) {
continue
}
ds.log.V(5).Info("created device failed:",
"device", strings.ToLower(ed.Name))
return err
}
}
return nil
}
// deleteDevices deletes redundant device on OpenYurt
func (ds *DeviceSyncer) deleteDevices(redundantKubeDevices map[string]*devicev1alpha1.Device) error {
// 删除kubernetes上多余的设备
for i := range redundantKubeDevices {
if err := ds.Client.Delete(context.TODO(), redundantKubeDevices[i]); err != nil {
ds.log.V(5).Error(err, "fail to delete the Device on OpenYurt",
"device", redundantKubeDevices[i].Name)
return err
}
}
return nil
}
// updateDevicesStatus updates device status on OpenYurt
func (ds *DeviceSyncer) updateDevices(syncedDevices map[string]*devicev1alpha1.Device) error {
// 对于两边都有的设备,进行更新
for n := range syncedDevices {
if err := ds.Client.Status().Update(context.TODO(), syncedDevices[n]); err != nil {
if apierrors.IsConflict(err) {
ds.log.Info("----Conflict")
continue
}
return err
}
}
return nil
}
// 下面两个函数是用在kubernetes和edgex两个平台的设备的互相转换的
// completeCreateContent completes the content of the device which will be created on OpenYurt
func (ds *DeviceSyncer) completeCreateContent(edgeDevice *devicev1alpha1.Device) *devicev1alpha1.Device {
createDevice := edgeDevice.DeepCopy()
createDevice.Spec.NodePool = ds.NodePool
createDevice.Name = strings.Join([]string{ds.NodePool, createDevice.Name}, "-")
createDevice.Spec.Managed = false
return createDevice
}
// completeUpdateContent completes the content of the device which will be updated on OpenYurt
func (ds *DeviceSyncer) completeUpdateContent(kubeDevice *devicev1alpha1.Device, edgeDevice *devicev1alpha1.Device) *devicev1alpha1.Device {
updatedDevice := kubeDevice.DeepCopy()
_, aps, _ := ds.deviceCli.ListPropertiesState(nil, updatedDevice, edgeCli.ListOptions{})
// update device status
updatedDevice.Status.LastConnected = edgeDevice.Status.LastConnected
updatedDevice.Status.LastReported = edgeDevice.Status.LastReported
updatedDevice.Status.AdminState = edgeDevice.Status.AdminState
updatedDevice.Status.OperatingState = edgeDevice.Status.OperatingState
updatedDevice.Status.DeviceProperties = aps
return updatedDevice
}
func getActualName(d *devicev1alpha1.Device) string {
return d.Labels[EdgeXObjectName]
}