(以下内容已经整合进《SONiC入门指南》的 SAI介绍 和 Syncd-SAI工作流 两节中。)
Syncd容器 是SONiC中专门负责管理ASIC的容器,其中核心进程syncd
负责与Redis数据库沟通,加载SAI并与其交互,以完成ASIC的初始化,配置和状态上报的处理等等。
由于SONiC中大量的工作流最后都需要通过Syncd和SAI来和ASIC进行交互,所以这一部分也就成为了这些工作流的公共部分,所以,在展开其他工作流之前,我们先来看一下Syncd和SAI是如何工作的。
1. Syncd启动流程
syncd
进程的入口在syncd_main.cpp
中的syncd_main
函数,其启动的整体流程大致分为两部分。
第一部分是创建各个对象,并进行初始化:
sequenceDiagram
autonumber
participant SDM as syncd_main
participant SD as Syncd
participant SAI as VendorSai
SDM->>+SD: 调用构造函数
SD->>SD: 加载和解析命令行参数和配置文件
SD->>SD: 创建数据库相关对象,如:<br/>ASIC_DB Connector和FlexCounterManager
SD->>SD: 创建MDIO IPC服务器
SD->>SD: 创建SAI上报处理逻辑
SD->>SD: 创建RedisSelectableChannel用于接收Redis通知
SD->>-SAI: 初始化SAI
第二个部分是启动主循环,并且处理初始化事件:
sequenceDiagram
autonumber
participant SDM as syncd_main
participant SD as Syncd
participant SAI as VendorSai
participant NP as NotificationProcessor
participant MIS as MdioIpcServer
SDM->>+SD: 启动主线程循环
SD->>NP: 启动SAI上报处理线程
NP->>NP: 开始通知处理循环
SD->>MIS: 启动MDIO IPC服务器线程
MIS->>MIS: 开始MDIO IPC服务器事件循环
SD->>SD: 初始化并启动事件分发机制,开始主循环
loop 处理事件
alt 如果是创建Switch的事件或者是WarmBoot
SD->>SAI: 创建Switch对象,设置通知回调
else 如果是其他事件
SD->>SD: 处理事件
end
end
SD->>-SDM: 退出主循环返回
然后我们再从代码的角度来更加仔细的看一下这个流程。
1.1. syncd_main函数
syncd_main
函数本身非常简单,主要逻辑就是创建Syncd对象,然后调用其run
方法:
1 2 3 4 5 6 7 8 int syncd_main (int argc, char **argv) { auto vendorSai = std::make_shared <VendorSai>(); auto syncd = std::make_shared <Syncd>(vendorSai, commandLineOptions, isWarmStart); syncd->run (); return EXIT_SUCCESS; }
其中,Syncd
对象的构造函数负责初始化Syncd
中的各个功能,而run
方法则负责启动Syncd的主循环。
1.2. Syncd构造函数
Syncd
对象的构造函数负责创建或初始化Syncd
中的各个功能,比如用于连接数据库的对象,统计管理,和ASIC通知的处理逻辑等等,其主要代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 Syncd::Syncd ( _In_ std::shared_ptr<sairedis::SaiInterface> vendorSai, _In_ std::shared_ptr<CommandLineOptions> cmd, _In_ bool isWarmStart): m_vendorSai (vendorSai), ... { ... auto ccc = sairedis::ContextConfigContainer::loadFromFile (m_commandLineOptions->m_contextConfig.c_str ()); m_contextConfig = ccc->get (m_commandLineOptions->m_globalContext); ... m_manager = std::make_shared <FlexCounterManager>(m_vendorSai, m_contextConfig->m_dbCounters); m_dbAsic = std::make_shared <swss::DBConnector>(m_contextConfig->m_dbAsic, 0 ); m_mdioIpcServer = std::make_shared <MdioIpcServer>(m_vendorSai, m_commandLineOptions->m_globalContext); m_selectableChannel = std::make_shared <sairedis::RedisSelectableChannel>(m_dbAsic, ASIC_STATE_TABLE, REDIS_TABLE_GETRESPONSE, TEMP_PREFIX, modifyRedis); m_notifications = std::make_shared <RedisNotificationProducer>(m_contextConfig->m_dbAsic); m_client = std::make_shared <RedisClient>(m_dbAsic); m_processor = std::make_shared <NotificationProcessor>(m_notifications, m_client, std::bind (&Syncd::syncProcessNotification, this , _1)); m_handler = std::make_shared <NotificationHandler>(m_processor); m_sn.onFdbEvent = std::bind (&NotificationHandler::onFdbEvent, m_handler.get (), _1, _2); m_sn.onNatEvent = std::bind (&NotificationHandler::onNatEvent, m_handler.get (), _1, _2); m_handler->setSwitchNotifications (m_sn.getSwitchNotifications ()); ... sai_status_t status = vendorSai->initialize (0 , &m_test_services); ... }
1.3. SAI的初始化与VendorSai
为了有一个更加直观的理解,我们拿一小部分代码来展示一下SAI的接口定义和初始化的方法,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 typedef struct _sai_apis_t { sai_switch_api_t * switch_api; sai_port_api_t * port_api; ... } sai_apis_t ; typedef struct _sai_switch_api_t { sai_create_switch_fn create_switch; sai_remove_switch_fn remove_switch; sai_set_switch_attribute_fn set_switch_attribute; sai_get_switch_attribute_fn get_switch_attribute; ... } sai_switch_api_t ; typedef struct _sai_port_api_t { sai_create_port_fn create_port; sai_remove_port_fn remove_port; sai_set_port_attribute_fn set_port_attribute; sai_get_port_attribute_fn get_port_attribute; ... } sai_port_api_t ;
其中,sai_apis_t
结构体是SAI所有模块的接口的集合,其中每个成员都是一个特定模块的接口列表的指针。我们用sai_switch_api_t
来举例,它定义了SAI Switch模块的所有接口,我们在inc/saiswitch.h
中可以看到它的定义。同样的,我们在inc/saiport.h
中可以看到SAI Port模块的接口定义。
SAI的初始化其实就是想办法获取上面这些函数指针,这样我们就可以通过SAI的接口来操作ASIC了。
参与SAI初始化的主要函数有两个,他们都定义在inc/sai.h
中:
sai_api_initialize
:初始化SAI
sai_api_query
:传入SAI的API的类型,获取对应的接口列表
虽然大部分厂商的SAI实现是闭源的,但是mellanox却开源了自己的SAI实现,所以这里我们可以借助其更加深入的理解SAI是如何工作的。
比如,sai_api_initialize
函数其实就是简单的设置设置两个全局变量,然后返回SAI_STATUS_SUCCESS
:
1 2 3 4 5 6 7 8 9 10 11 12 sai_status_t sai_api_initialize (_In_ uint64_t flags, _In_ const sai_service_method_table_t * services) { if (g_initialized) { return SAI_STATUS_FAILURE; } memcpy (&g_mlnx_services, services, sizeof (g_mlnx_services)); g_initialized = true ; return SAI_STATUS_SUCCESS; }
初始化完成后,我们就可以使用sai_api_query
函数,通过传入API的类型来查询对应的接口列表,而每一个接口列表其实都是一个全局变量:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 sai_status_t sai_api_query (_In_ sai_api_t sai_api_id, _Out_ void ** api_method_table) { if (!g_initialized) { return SAI_STATUS_UNINITIALIZED; } ... return sai_api_query_eth (sai_api_id, api_method_table); } sai_status_t sai_api_query_eth (_In_ sai_api_t sai_api_id, _Out_ void ** api_method_table) { switch (sai_api_id) { case SAI_API_BRIDGE: *(const sai_bridge_api_t **)api_method_table = &mlnx_bridge_api; return SAI_STATUS_SUCCESS; case SAI_API_SWITCH: *(const sai_switch_api_t **)api_method_table = &mlnx_switch_api; return SAI_STATUS_SUCCESS; ... default : if (sai_api_id >= (sai_api_t )SAI_API_EXTENSIONS_RANGE_END) { return SAI_STATUS_INVALID_PARAMETER; } else { return SAI_STATUS_NOT_IMPLEMENTED; } } } const sai_bridge_api_t mlnx_bridge_api = { mlnx_create_bridge, mlnx_remove_bridge, mlnx_set_bridge_attribute, mlnx_get_bridge_attribute, ... }; const sai_switch_api_t mlnx_switch_api = { mlnx_create_switch, mlnx_remove_switch, mlnx_set_switch_attribute, mlnx_get_switch_attribute, ... };
Syncd
使用VendorSai
来对SAI的所有API进行封装,方便上层调用。其初始化过程也非常直接,基本就是对上面两个函数的直接调用和错误处理,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 sai_status_t VendorSai::initialize ( _In_ uint64_t flags, _In_ const sai_service_method_table_t *service_method_table) { ... memcpy (&m_service_method_table, service_method_table, sizeof (m_service_method_table)); auto status = sai_api_initialize (flags, service_method_table); if (status == SAI_STATUS_SUCCESS) { memset (&m_apis, 0 , sizeof (m_apis)); int failed = sai_metadata_apis_query (sai_api_query, &m_apis); ... } ... return status; }
当获取好所有的SAI API之后,我们就可以通过VendorSai
对象来调用SAI的API了。当前调用SAI的API方式主要有两种。
第一种是通过sai_object_type_into_t
来调用,它类似于为所有的SAI Object实现了一个虚表,如下:
1 2 3 4 5 6 7 8 9 10 11 12 sai_status_t VendorSai::set ( _In_ sai_object_type_t objectType, _In_ sai_object_id_t objectId, _In_ const sai_attribute_t *attr) { ... auto info = sai_metadata_get_object_type_info (objectType); sai_object_meta_key_t mk = { .objecttype = objectType, .objectkey = { .key = { .object_id = objectId } } }; return info->set (&mk, attr); }
另外一种是通过保存在VendorSai
对象中的m_apis
来调用,这种方式更加直接,但是调用前需要先根据SAI Object的类型来调用不同的API。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 sai_status_t VendorSai::getStatsExt ( _In_ sai_object_type_t object_type, _In_ sai_object_id_t object_id, _In_ uint32_t number_of_counters, _In_ const sai_stat_id_t *counter_ids, _In_ sai_stats_mode_t mode, _Out_ uint64_t *counters) { sai_status_t (*ptr)( _In_ sai_object_id_t port_id, _In_ uint32_t number_of_counters, _In_ const sai_stat_id_t *counter_ids, _In_ sai_stats_mode_t mode, _Out_ uint64_t *counters); switch ((int )object_type) { case SAI_OBJECT_TYPE_PORT: ptr = m_apis.port_api->get_port_stats_ext; break ; case SAI_OBJECT_TYPE_ROUTER_INTERFACE: ptr = m_apis.router_interface_api->get_router_interface_stats_ext; break ; case SAI_OBJECT_TYPE_POLICER: ptr = m_apis.policer_api->get_policer_stats_ext; break ; ... default : SWSS_LOG_ERROR ("not implemented, FIXME" ); return SAI_STATUS_FAILURE; } return ptr (object_id, number_of_counters, counter_ids, mode, counters); }
可以明显看出,第一种调用方式代码要精炼和直观许多。
1.4. Syncd主循环
Syncd
的主循环也是使用的SONiC中标准的事件分发 机制:在启动时,Syncd
会将所有用于事件处理的Selectable
对象注册到用于获取事件的Select
对象中,然后在主循环中调用Select
的select
方法,等待事件的发生。核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 void Syncd::run () { volatile bool runMainLoop = true ; std ::shared_ptr <swss::Select> s = std ::make_shared<swss::Select>(); onSyncdStart(m_commandLineOptions->m_startType == SAI_START_TYPE_WARM_BOOT); m_processor->startNotificationsProcessingThread(); for (auto & sw: m_switches) { m_mdioIpcServer->setSwitchId(sw.second->getRid()); } m_mdioIpcServer->startMdioThread(); s->addSelectable(m_selectableChannel.get()); s->addSelectable(m_restartQuery.get()); s->addSelectable(m_flexCounter.get()); s->addSelectable(m_flexCounterGroup.get()); while (runMainLoop) { swss::Selectable *sel = NULL ; int result = s->select(&sel); ... if (sel == m_restartQuery.get()) { } else if (sel == m_flexCounter.get()) { processFlexCounterEvent(*(swss::ConsumerTable*)sel); } else if (sel == m_flexCounterGroup.get()) { processFlexCounterGroupEvent(*(swss::ConsumerTable*)sel); } else if (sel == m_selectableChannel.get()) { processEvent(*m_selectableChannel.get()); } else { SWSS_LOG_ERROR("select failed: %d" , result); } ... } ... }
其中,m_selectableChannel
就是主要负责处理Redis数据库中的事件的对象。它使用ProducerTable / ConsumerTable 的方式与Redis数据库进行交互,所以,所有orchagent
发送过来的操作都会以三元组的形式保存在Redis中的list中,等待Syncd
的处理。其核心定义如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 class RedisSelectableChannel : public SelectableChannel{ public : RedisSelectableChannel ( _In_ std::shared_ptr<swss::DBConnector> dbAsic, _In_ const std::string& asicStateTable, _In_ const std::string& getResponseTable, _In_ const std::string& tempPrefix, _In_ bool modifyRedis); public : virtual bool empty () override ; ... public : virtual int getFd () override ; virtual uint64_t readData () override ; ... private : std::shared_ptr<swss::DBConnector> m_dbAsic; std::shared_ptr<swss::ConsumerTable> m_asicState; std::shared_ptr<swss::ProducerTable> m_getResponse; ... };
另外,在主循环启动时,Syncd
还会额外启动两个线程:
用于接收ASIC上报通知的通知处理线程:m_processor->startNotificationsProcessingThread();
用于处理MDIO通信的MDIO IPC处理线程:m_mdioIpcServer->startMdioThread();
它们的细节我们在初始化的部分不做过多展开,等后面介绍相关工作流时再来详细介绍。
1.5. 创建Switch对象,初始化通知机制
在主循环启动后,Syncd
就会开始调用SAI的API来创建Switch对象,这里的入口有两个,一个是ASIC_DB收到创建Switch的通知,另外一个是Warm Boot时,Syncd
来主动调用,但是创建Switch这一步的内部流程都类似。
在这一步中间,有一个很重要的步骤,就是初始化SAI内部实现中的通知回调,将我们之前已经创建好的通知处理逻辑传递给SAI的实现,比如FDB的事件等等。这些回调函数会被当做Switch的属性(Attributes)通过参数的形式传给SAI的create_switch
方法,SAI的实现会将其保存起来,这样就可以在事件发生时调用回调函数,来通知Syncd
了。这里的核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 sai_status_t Syncd::processQuadEvent ( _In_ sai_common_api_t api, _In_ const swss::KeyOpFieldsValuesTuple &kco) { sai_object_meta_key_t metaKey; ... SaiAttributeList list (metaKey.objecttype, values, false ) ; sai_attribute_t *attr_list = list.get_attr_list (); uint32_t attr_count = list.get_attr_count (); if (metaKey.objecttype == SAI_OBJECT_TYPE_SWITCH && (api == SAI_COMMON_API_CREATE || api == SAI_COMMON_API_SET)) { m_handler->updateNotificationsPointers (attr_count, attr_list); } if (isInitViewMode ()) { sai_status_t status = processQuadEventInInitViewMode (metaKey.objecttype, strObjectId, api, attr_count, attr_list); syncUpdateRedisQuadEvent (status, api, kco); return status; } ... } void NotificationHandler::updateNotificationsPointers (_In_ uint32_t attr_count, _In_ sai_attribute_t *attr_list) const { for (uint32_t index = 0 ; index < attr_count; ++index) { ... sai_attribute_t &attr = attr_list[index]; switch (attr.id) { ... case SAI_SWITCH_ATTR_SHUTDOWN_REQUEST_NOTIFY: attr.value.ptr = (void *)m_switchNotifications.on_switch_shutdown_request; break ; case SAI_SWITCH_ATTR_FDB_EVENT_NOTIFY: attr.value.ptr = (void *)m_switchNotifications.on_fdb_event; break ; ... } ... } } void Syncd::onSwitchCreateInInitViewMode (_In_ sai_object_id_t switchVid, _In_ uint32_t attr_count, _In_ const sai_attribute_t *attr_list) { if (m_switches.find (switchVid) == m_switches.end ()) { sai_object_id_t switchRid; sai_status_t status; status = m_vendorSai->create (SAI_OBJECT_TYPE_SWITCH, &switchRid, 0 , attr_count, attr_list); ... m_switches[switchVid] = std::make_shared <SaiSwitch>(switchVid, switchRid, m_client, m_translator, m_vendorSai); m_mdioIpcServer->setSwitchId (switchRid); ... } ... }
从Mellanox的SAI实现,我们可以看到其具体的保存的方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 static sai_status_t mlnx_create_switch (_Out_ sai_object_id_t * switch_id, _In_ uint32_t attr_count, _In_ const sai_attribute_t *attr_list) { ... status = find_attrib_in_list (attr_count, attr_list, SAI_SWITCH_ATTR_SWITCH_STATE_CHANGE_NOTIFY, &attr_val, &attr_idx); if (!SAI_ERR (status)) { g_notification_callbacks.on_switch_state_change = (sai_switch_state_change_notification_fn)attr_val->ptr; } status = find_attrib_in_list (attr_count, attr_list, SAI_SWITCH_ATTR_SHUTDOWN_REQUEST_NOTIFY, &attr_val, &attr_idx); if (!SAI_ERR (status)) { g_notification_callbacks.on_switch_shutdown_request = (sai_switch_shutdown_request_notification_fn)attr_val->ptr; } status = find_attrib_in_list (attr_count, attr_list, SAI_SWITCH_ATTR_FDB_EVENT_NOTIFY, &attr_val, &attr_idx); if (!SAI_ERR (status)) { g_notification_callbacks.on_fdb_event = (sai_fdb_event_notification_fn)attr_val->ptr; } status = find_attrib_in_list (attr_count, attr_list, SAI_SWITCH_ATTR_PORT_STATE_CHANGE_NOTIFY, &attr_val, &attr_idx); if (!SAI_ERR (status)) { g_notification_callbacks.on_port_state_change = (sai_port_state_change_notification_fn)attr_val->ptr; } status = find_attrib_in_list (attr_count, attr_list, SAI_SWITCH_ATTR_PACKET_EVENT_NOTIFY, &attr_val, &attr_idx); if (!SAI_ERR (status)) { g_notification_callbacks.on_packet_event = (sai_packet_event_notification_fn)attr_val->ptr; } ... }
2. ASIC状态更新
ASIC状态更新是Syncd
中最重要的工作流之一,当orchagent
发现任何变化并开始修改ASIC_DB时,就会触发该工作流,通过SAI来对ASIC进行更新。在了解了Syncd
的主循环之后,理解ASIC状态更新的工作流就很简单了。
所有的步骤都发生在主线程一个线程中,顺序执行,总结成时序图如下:
sequenceDiagram
autonumber
participant SD as Syncd
participant RSC as RedisSelectableChannel
participant SAI as VendorSai
participant R as Redis
loop 主线程循环
SD->>RSC: 收到epoll通知,通知获取所有到来的消息
RSC->>R: 通过ConsumerTable获取所有到来的消息
critical 给Syncd加锁
loop 所有收到的消息
SD->>RSC: 获取一个消息
SD->>SD: 解析消息,获取操作类型和操作对象
SD->>SAI: 调用对应的SAI API,更新ASIC
SD->>RSC: 发送调用结果给Redis
RSC->>R: 将调用结果写入Redis
end
end
end
首先,orchagent
通过Redis发送过来的操作会被RedisSelectableChannel
对象接收,然后在主循环中被处理。当Syncd
处理到m_selectableChannel
时,就会调用processEvent
方法来处理该操作。这几步的核心代码我们上面介绍主循环时已经介绍过了,这里就不再赘述。
然后,processEvent
会根据其中的操作类型,调用对应的SAI的API来对ASIC进行更新。其逻辑是一个巨大的switch-case语句,如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 void Syncd::processEvent (_In_ sairedis::SelectableChannel& consumer) { std::lock_guard<std::mutex> lock (m_mutex) ; do { swss::KeyOpFieldsValuesTuple kco; consumer.pop (kco, isInitViewMode ()); processSingleEvent (kco); } while (!consumer.empty ()); } sai_status_t Syncd::processSingleEvent (_In_ const swss::KeyOpFieldsValuesTuple &kco) { auto & op = kfvOp (kco); ... if (op == REDIS_ASIC_STATE_COMMAND_CREATE) return processQuadEvent (SAI_COMMON_API_CREATE, kco); if (op == REDIS_ASIC_STATE_COMMAND_REMOVE) return processQuadEvent (SAI_COMMON_API_REMOVE, kco); ... } sai_status_t Syncd::processQuadEvent ( _In_ sai_common_api_t api, _In_ const swss::KeyOpFieldsValuesTuple &kco) { const std::string& key = kfvKey (kco); const std::string& strObjectId = key.substr (key.find (":" ) + 1 ); sai_object_meta_key_t metaKey; sai_deserialize_object_meta_key (key, metaKey); auto & values = kfvFieldsValues (kco); SaiAttributeList list (metaKey.objecttype, values, false ) ; sai_attribute_t *attr_list = list.get_attr_list (); uint32_t attr_count = list.get_attr_count (); ... auto info = sai_metadata_get_object_type_info (metaKey.objecttype); sai_status_t status; if (info->isnonobjectid) { status = processEntry (metaKey, api, attr_count, attr_list); } else { status = processOid (metaKey.objecttype, strObjectId, api, attr_count, attr_list); } if (api == SAI_COMMON_API_GET) { sai_object_id_t switchVid = VidManager::switchIdQuery(metaKey.objectkey.key.object_id); sendGetResponse (metaKey.objecttype, strObjectId, switchVid, status, attr_count, attr_list); ... } else { sendApiResponse (api, status); } syncUpdateRedisQuadEvent (status, api, kco); return status; } sai_status_t Syncd::processEntry (_In_ sai_object_meta_key_t metaKey, _In_ sai_common_api_t api, _In_ uint32_t attr_count, _In_ sai_attribute_t *attr_list) { ... switch (api) { case SAI_COMMON_API_CREATE: return m_vendorSai->create (metaKey, SAI_NULL_OBJECT_ID, attr_count, attr_list); case SAI_COMMON_API_REMOVE: return m_vendorSai->remove (metaKey); ... default : SWSS_LOG_THROW ("api %s not supported" , sai_serialize_common_api (api).c_str ()); } }
3. ASIC状态变更上报
反过来,当ASIC状态发生任何变化,或者需要上报数据,它也会通过SAI来通知我们,此时Syncd会监听这些通知,然后通过ASIC_DB上报给orchagent。其主要工作流如下:
sequenceDiagram
participant SAI as SAI Impl
participant NP as NotificationProcessor
participant SD as Syncd
participant RNP as RedisNotificationProducer
participant R as Redis
loop SAI实现事件处理线程
SAI->>SAI: 通过ASIC SDK获取事件
SAI->>SAI: 解析事件,并转换成SAI通知对象
SAI->>NP: 将通知对象序列化,<br/>并发送给通知处理线程的队列中
end
loop 通知处理线程消息循环
NP->>NP: 从队列中获取通知
NP->>SD: 获取Syncd锁
critical 给Syncd加锁
NP->>NP: 反序列化通知对象,并做一些处理
NP->>RNP: 重新序列化通知对象,并请求发送
RNP->>R: 将通知以NotificationProducer<br/>的形式写入ASIC_DB
end
end
这里我们也来看一下具体的实现。为了更加深入的理解,我们还是借助开源的Mellanox的SAI实现来进行分析。
最开始,SAI的实现需要接受到ASIC的通知,这一步是通过ASIC的SDK来实现的,Mellanox的SAI会创建一个事件处理线程(event_thread),然后使用select
函数来获取并处理ASIC发送过来的通知,核心代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 static void event_thread_func (void *context) {#define MAX_PACKET_SIZE MAX(g_resource_limits.port_mtu_max, SX_HOST_EVENT_BUFFER_SIZE_MAX) sx_status_t status; sx_api_handle_t api_handle; sx_user_channel_t port_channel, callback_channel; fd_set descr_set; int ret_val; sai_object_id_t switch_id = (sai_object_id_t )context; sai_port_oper_status_notification_t port_data; sai_fdb_event_notification_data_t *fdb_events = NULL ; sai_attribute_t *attr_list = NULL ; ... if (SX_STATUS_SUCCESS != (status = sx_api_open (sai_log_cb, &api_handle))) { if (g_notification_callbacks.on_switch_shutdown_request) { g_notification_callbacks.on_switch_shutdown_request (switch_id); } return ; } if (SX_STATUS_SUCCESS != (status = sx_api_host_ifc_open (api_handle, &port_channel.channel.fd))) { goto out; } ... port_channel.type = SX_USER_CHANNEL_TYPE_FD; if (SX_STATUS_SUCCESS != (status = sx_api_host_ifc_trap_id_register_set (api_handle, SX_ACCESS_CMD_REGISTER, DEFAULT_ETH_SWID, SX_TRAP_ID_PUDE, &port_channel))) { goto out; } ... for (uint32_t ii = 0 ; ii < (sizeof (mlnx_trap_ids) / sizeof (*mlnx_trap_ids)); ii++) { status = sx_api_host_ifc_trap_id_register_set (api_handle, SX_ACCESS_CMD_REGISTER, DEFAULT_ETH_SWID, mlnx_trap_ids[ii], &callback_channel); } while (!event_thread_asked_to_stop) { FD_ZERO (&descr_set); FD_SET (port_channel.channel.fd.fd, &descr_set); FD_SET (callback_channel.channel.fd.fd, &descr_set); ... ret_val = select (FD_SETSIZE, &descr_set, NULL , NULL , &timeout); if (ret_val > 0 ) { if (FD_ISSET (port_channel.channel.fd.fd, &descr_set)) { if (g_notification_callbacks.on_port_state_change) { g_notification_callbacks.on_port_state_change (1 , &port_data); } } if (FD_ISSET (callback_channel.channel.fd.fd, &descr_set)) { packet_size = MAX_PACKET_SIZE; if (SX_STATUS_SUCCESS != (status = sx_lib_host_ifc_recv (&callback_channel.channel.fd, p_packet, &packet_size, receive_info))) { goto out; } if (SX_TRAP_ID_BFD_PACKET_EVENT == receive_info->trap_id) { const struct bfd_packet_event *event = (const struct bfd_packet_event*)p_packet; status = mlnx_switch_bfd_packet_handle (event); continue ; } if (receive_info->trap_id == SX_TRAP_ID_FDB_EVENT) { trap_name = "FDB event" ; } else if (SAI_STATUS_SUCCESS != (status = mlnx_translate_sdk_trap_to_sai (receive_info->trap_id, &trap_name, &trap_oid))) { continue ; } if (SX_TRAP_ID_FDB_EVENT == receive_info->trap_id) { if (g_notification_callbacks.on_fdb_event) { g_notification_callbacks.on_fdb_event (event_count, fdb_events); } continue ; } status = mlnx_get_hostif_packet_data (receive_info, &attrs_num, callback_data); if (g_notification_callbacks.on_packet_event) { g_notification_callbacks.on_packet_event (switch_id, packet_size, p_packet, attrs_num, callback_data); } } } } out: ... }
接下来,我们用FDB事件来举例,当ASIC收到FDB事件,就会被上面的事件处理循环获取到,并调用g_notification_callbacks.on_fdb_event
函数来处理。这个函数接下来就会调用到Syncd
初始化时设置好的NotificationHandler::onFdbEvent
函数,这个函数会将该事件序列化后,通过消息队列转发给通知处理线程来进行处理:
1 2 3 4 5 6 void NotificationHandler::onFdbEvent (_In_ uint32_t count, _In_ const sai_fdb_event_notification_data_t *data) { std::string s = sai_serialize_fdb_event_ntf (count, data); enqueueNotification (SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT, s); }
而此时通知处理线程会被唤醒,从消息队列中取出该事件,然后通过Syncd
获取到Syncd
的锁,再开始处理该通知:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 void NotificationProcessor::ntf_process_function () { std::mutex ntf_mutex; std::unique_lock<std::mutex> ulock (ntf_mutex) ; while (m_runThread) { m_cv.wait (ulock); swss::KeyOpFieldsValuesTuple item; while (m_notificationQueue->tryDequeue (item)) { processNotification (item); } } } void Syncd::syncProcessNotification (_In_ const swss::KeyOpFieldsValuesTuple& item) { std::lock_guard<std::mutex> lock (m_mutex) ; m_processor->syncProcessNotification (item); }
接下来就是事件的分发和处理了,syncProcessNotification
函数是一系列的if-else
语句,根据事件的类型,调用不同的处理函数来处理该事件:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 void NotificationProcessor::syncProcessNotification ( _In_ const swss::KeyOpFieldsValuesTuple& item) { std::string notification = kfvKey (item); std::string data = kfvOp (item); if (notification == SAI_SWITCH_NOTIFICATION_NAME_SWITCH_STATE_CHANGE) { handle_switch_state_change (data); } else if (notification == SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT) { handle_fdb_event (data); } else if ... } else { SWSS_LOG_ERROR ("unknown notification: %s" , notification.c_str ()); } }
而每个事件处理函数都类似,他们会对发送过来的事件进行反序列化,然后调用真正的处理逻辑发送通知,比如,fdb事件对应的handle_fdb_event
函数和process_on_fdb_event
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 void NotificationProcessor::handle_fdb_event (_In_ const std::string &data) { uint32_t count; sai_fdb_event_notification_data_t *fdbevent = NULL ; sai_deserialize_fdb_event_ntf (data, count, &fdbevent); process_on_fdb_event (count, fdbevent); sai_deserialize_free_fdb_event_ntf (count, fdbevent); } void NotificationProcessor::process_on_fdb_event ( _In_ uint32_t count, _In_ sai_fdb_event_notification_data_t *data) { for (uint32_t i = 0 ; i < count; i++) { sai_fdb_event_notification_data_t *fdb = &data[i]; fdb->fdb_entry.switch_id = m_translator->translateRidToVid (fdb->fdb_entry.switch_id, SAI_NULL_OBJECT_ID); fdb->fdb_entry.bv_id = m_translator->translateRidToVid (fdb->fdb_entry.bv_id, fdb->fdb_entry.switch_id, true ); m_translator->translateRidToVid (SAI_OBJECT_TYPE_FDB_ENTRY, fdb->fdb_entry.switch_id, fdb->attr_count, fdb->attr, true ); ... } std::string s = sai_serialize_fdb_event_ntf (count, data); sendNotification (SAI_SWITCH_NOTIFICATION_NAME_FDB_EVENT, s); }
具体发送事件的逻辑就非常直接了,最终就是通过NotificationProducer 来发送通知到ASIC_DB中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 void NotificationProcessor::sendNotification (_In_ const std::string& op, _In_ const std::string& data) { std::vector<swss::FieldValueTuple> entry; sendNotification (op, data, entry); } void NotificationProcessor::sendNotification (_In_ const std::string& op, _In_ const std::string& data, _In_ std::vector<swss::FieldValueTuple> entry) { m_notifications->send (op, data, entry); } void RedisNotificationProducer::send (_In_ const std::string& op, _In_ const std::string& data, _In_ const std::vector<swss::FieldValueTuple>& values) { std::vector<swss::FieldValueTuple> vals = values; m_notificationProducer->send (op, data, vals); }
到此,Syncd
中的通知上报的流程就结束了。
4. 参考资料
SONiC Architecture
Github repo: sonic-sairedis
Github repo: Nvidia (Mellanox) SAI implementation
原创文章,转载请标明出处: Soul Orbit 本文链接地址: SONiC学习笔记(五):Syncd-SAI工作流