Communication layer
On top of the Redis encapsulation and table abstraction is the SONiC communication layer, which provides four different PubSub encapsulations for inter-service communication, depending on the requirements.
SubscribeStateTable
最直接的就是SubscriberStateTable了。
它的原理是利用Redis数据库中自带的keyspace消息通知机制 [4] —— 当数据库中的任何一个key对应的值发生了变化,就会触发Redis发送两个keyspace的事件通知,一个是__keyspace@<db-id>__:<key>
下的<op>
事件,一个是__keyspace@<db-id>__:<op>
下的<key>>
事件,比如,在数据库0中删除了一个key,那么就会触发两个事件通知:
PUBLISH __keyspace@0__:foo del
PUBLISH __keyevent@0__:del foo
The SubscriberStateTable listens for the first event notification and then calls the corresponding callback function. The class diagram of the main class directly related to it is as follows, where you can see that it inherits from ConsumerTableBase, since it is the Consumer of Redis messages:
At initialization time, we can see how it subscribes to Redis event notifications: the
// File: sonic-swss-common - common/subscriberstatetable.cpp
SubscriberStateTable::SubscriberStateTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri), m_table(db, tableName)
{
m_keyspace = "__keyspace@";
m_keyspace += to_string(db->getDbId()) + "__:" + tableName + m_table.getTableNameSeparator() + "*";
psubscribe(m_db, m_keyspace);
// ...
Its event reception and distribution are mainly handled by two functions:
readData()
负责将redis中待读取的事件读取出来,并放入ConsumerTableBase中的队列中pops()
:负责将队列中的原始事件取出来,并且进行解析,然后通过函数参数传递给调用方
// File: sonic-swss-common - common/subscriberstatetable.cpp
uint64_t SubscriberStateTable::readData()
{
// ...
reply = nullptr;
int status;
do {
status = redisGetReplyFromReader(m_subscribe->getContext(), reinterpret_cast<void**>(&reply));
if(reply != nullptr && status == REDIS_OK) {
m_keyspace_event_buffer.emplace_back(make_shared<RedisReply>(reply));
}
} while(reply != nullptr && status == REDIS_OK);
// ...
return 0;
}
void SubscriberStateTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string& /*prefix*/)
{
vkco.clear();
// ...
// Pop from m_keyspace_event_buffer, which is filled by readData()
while (auto event = popEventBuffer()) {
KeyOpFieldsValuesTuple kco;
// Parsing here ...
vkco.push_back(kco);
}
m_keyspace_event_buffer.clear();
}
NotificationProducer / NotificationConsumer
When it comes to message communication, it's easy to associate it with message queues, which is our second form of communication -- [NotificationProducer](https://github.com/sonic-net/sonic-swss-common /blob/master/common/notificationproducer.h) and [NotificationConsumer](https://github.com/sonic-net/sonic-swss-common/blob/master/ common/notificationconsumer.h).
This communication method is implemented through Redis' own PubSub, which is mainly a wrapper around the PUBLISH
and SUBSCRIBE
commands, and is very limited in the simplest notification scenarios, such as timeout check, restart check, etc. in orchagent, not for passing user configuration and data. Scenarios:
In this communication mode, the Producer, the sender of the message, does two main things: first, it packages the message into JSON format, and second, it calls Redis' PUBLISH
command to send the message out. And since the PUBLISH
command can only carry one message, the op
and data
fields in the request are placed at the top of the values
, and then the buildJson
function is called to package it into a JSON array format:
int64_t swss::NotificationProducer::send(const std::string &op, const std::string &data, std::vector<FieldValueTuple> &values)
{
// Pack the op and data into values array, then pack everything into a JSON string as the message
FieldValueTuple opdata(op, data);
values.insert(values.begin(), opdata);
std::string msg = JSon::buildJson(values);
values.erase(values.begin());
// Publish message to Redis channel
RedisCommand command;
command.format("PUBLISH %s %s", m_channel.c_str(), msg.c_str());
// ...
RedisReply reply = m_pipe->push(command);
reply.checkReplyType(REDIS_REPLY_INTEGER);
return reply.getReply<long long int>();
}
The receiver receives all notifications using the SUBSCRIBE
command:
void swss::NotificationConsumer::subscribe()
{
// ...
m_subscribe = new DBConnector(m_db->getDbId(),
m_db->getContext()->unix_sock.path,
NOTIFICATION_SUBSCRIBE_TIMEOUT);
// ...
// Subscribe to Redis channel
std::string s = "SUBSCRIBE " + m_channel;
RedisReply r(m_subscribe, s, REDIS_REPLY_ARRAY);
}
ProducerTable / ConsumerTable
我们可以看到NotificationProducer/Consumer实现简单粗暴,但是由于API的限制 [8],它并不适合用来传递数据,所以,SONiC中提供了一种和它非常接近的另外一种基于消息队列的通信机制 —— ProducerTable和ConsumerTable。
The difference between this communication method and Notification is that the message published to the Channel is very simple (single character "G") and all the data is stored in the List, thus solving the problem of message size limitation in Notification. In SONiC, it is mainly used in the FlexCounter, syncd
service and ASIC_DB
:
-
消息格式:每条消息都是一个(Key, FieldValuePairs, Op)的三元组,如果用JSON来表达这个消息,那么它的格式如下:(这里的Key是Table中数据的Key,被操作的数据是Hash,所以Field就是Hash中的Field,Value就是Hash中的Value了,也就是说一个消息可以对很多个Field进行操作)
[ "Key", "[\"Field1\", \"Value1\", \"Field2", \"Value2\", ...]", "Op" ]
-
Enqueue:ProducerTable通过Lua脚本将消息三元组原子的写入消息队列中(Key =
<table-name>_KEY_VALUE_OP_QUEUE
,并且发布更新通知到特定的Channel(Key =<table-name>_CHANNEL
)中。 -
Pop:ConsumerTable也通过Lua脚本从消息队列中原子的读取消息三元组,并在读取过程中将其中请求的改动真正的写入到数据库中。
注意:Redis中Lua脚本和MULTI/EXEC的原子性和通常说的数据库ACID中的原子性(Atomicity)不同,Redis中的原子性其实更接近于ACID中的隔离性(Isolation),他保证Lua脚本中所有的命令在执行的时候不会有其他的命令执行,但是并不保证Lua脚本中的所有命令都会执行成功,比如,如果Lua脚本中的第二个命令执行失败了,那么第一个命令依然会被提交,只是后面的命令就不会继续执行了。更多的细节可以参考Redis官方文档 [[5]][RedisTx] [[6]][RedisLuaAtomicity]。
The main class diagram is as follows. Here we can see m_shaEnqueue
in the ProducerTable and m_shaPop
in the ConsumerTable, which are the SHAs obtained by the two Lua scripts we mentioned above at load time, and we can then use Redis's EVALSHA
command to make atomic calls to them: the
The core logic of the ProducerTable is as follows, we can see the JSON packaging of Values and the use of EVALSHA
to make Lua script calls:
// File: sonic-swss-common - common/producertable.cpp
ProducerTable::ProducerTable(RedisPipeline *pipeline, const string &tableName, bool buffered)
// ...
{
string luaEnque =
"redis.call('LPUSH', KEYS[1], ARGV[1], ARGV[2], ARGV[3]);"
"redis.call('PUBLISH', KEYS[2], ARGV[4]);";
m_shaEnque = m_pipe->loadRedisScript(luaEnque);
}
void ProducerTable::set(const string &key, const vector<FieldValueTuple> &values, const string &op, const string &prefix)
{
enqueueDbChange(key, JSon::buildJson(values), "S" + op, prefix);
}
void ProducerTable::del(const string &key, const string &op, const string &prefix)
{
enqueueDbChange(key, "{}", "D" + op, prefix);
}
void ProducerTable::enqueueDbChange(const string &key, const string &value, const string &op, const string& /* prefix */)
{
RedisCommand command;
command.format(
"EVALSHA %s 2 %s %s %s %s %s %s",
m_shaEnque.c_str(),
getKeyValueOpQueueTableName().c_str(),
getChannelName(m_pipe->getDbId()).c_str(),
key.c_str(),
value.c_str(),
op.c_str(),
"G");
m_pipe->push(command, REDIS_REPLY_NIL);
}
The other side of the ConsumerTable is a little more complicated, because it supports many op types, so the logic is written in a separate file (common/consumer_table_pops.lua
), we won't post the code here, interested students can see for themselves.
// File: sonic-swss-common - common/consumertable.cpp
ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBatchSize, int pri)
: ConsumerTableBase(db, tableName, popBatchSize, pri)
, TableName_KeyValueOpQueues(tableName)
, m_modifyRedis(true)
{
std::string luaScript = loadLuaScript("consumer_table_pops.lua");
m_shaPop = loadRedisScript(db, luaScript);
// ...
}
void ConsumerTable::pops(deque<KeyOpFieldsValuesTuple> &vkco, const string &prefix)
{
// Note that here we are processing the messages in bulk with POP_BATCH_SIZE!
RedisCommand command;
command.format(
"EVALSHA %s 2 %s %s %d %d",
m_shaPop.c_str(),
getKeyValueOpQueueTableName().c_str(),
(prefix+getTableName()).c_str(),
POP_BATCH_SIZE,
RedisReply r(m_db, command, REDIS_REPLY_ARRAY);
vkco.clear();
// Parse and pack the messages in bulk
// ...
}
ProducerStateTable / ConsumerStateTable
Although Producer/ConsumerTable is intuitive and order-preserving, it can only handle one Key for one message and also requires JSON serialization, however, many times we do not use the order-preserving function, but rather need more throughput, so in order to optimize performance, SONiC introduces the fourth communication method, which is also the most commonly used communication method: the ProducerStateTable and ConsumerStateTable.
Unlike ProducerTable, ProducerStateTable uses Hash to store messages instead of List, which doesn't guarantee the order of messages, but is a great performance boost! First, we save the overhead of JSON serialization, and second, for the same Field under the same Key if it is changed several times, then only the last change needs to be kept, so that all the change messages about the Key are combined into one, reducing a lot of unnecessary message processing.
The underlying implementation of Producer/ConsumerStateTable is also a bit more complex than Producer/ConsumerTable. The main class diagram of its associated classes is as follows. Here we can still see that it is implemented by calling Lua scripts through EVALSHA
, m_shaSet
and m_shaDel
are used to store modified and sent messages, while m_shaPop
is used to get messages on the other side:
When delivering messages:
-
首先,每个消息会被存放成两个部分:一个是KEY_SET,用来保存当前有哪些Key发生了修改,它以Set的形式存放在
<table-name_KEY_SET>
的key下,另一个是所有被修改的Key的内容,它以Hash的形式存放在_<redis-key-name>
的key下。 -
然后,消息存放之后Producer如果发现是新的Key,那么就是调用
PUBLISH
命令,来通知<table-name>_CHANNEL@<db-id>
Channel,有新的Key出现了。// File: sonic-swss-common - common/producerstatetable.cpp ProducerStateTable::ProducerStateTable(RedisPipeline *pipeline, const string &tableName, bool buffered) : TableBase(tableName, SonicDBConfig::getSeparator(pipeline->getDBConnector())) , TableName_KeySet(tableName) // ... { string luaSet = "local added = redis.call('SADD', KEYS[2], ARGV[2])\n" "for i = 0, #KEYS - 3 do\n" " redis.call('HSET', KEYS[3 + i], ARGV[3 + i * 2], ARGV[4 + i * 2])\n" "end\n" " if added > 0 then \n" " redis.call('PUBLISH', KEYS[1], ARGV[1])\n" "end\n"; m_shaSet = m_pipe->loadRedisScript(luaSet);
-
最后,Consumer会通过
SUBSCRIBE
命令来订阅<table-name>_CHANNEL@<db-id>
Channel,一旦有新的消息到来,就会使用Lua脚本调用HGETALL
命令来获取所有的Key,并将其中的值读取出来并真正的写入到数据库中去。ConsumerStateTable::ConsumerStateTable(DBConnector *db, const std::string &tableName, int popBatchSize, int pri) : ConsumerTableBase(db, tableName, popBatchSize, pri) , TableName_KeySet(tableName) { std::string luaScript = loadLuaScript("consumer_state_table_pops.lua"); m_shaPop = loadRedisScript(db, luaScript); // ... subscribe(m_db, getChannelName(m_db->getDbId())); // ...
For ease of understanding, let's take an example here: Enabling Port Ethernet0:
-
首先,我们在命令行下调用
config interface startup Ethernet0
来启用Ethernet0,这会导致portmgrd
通过ProducerStateTable向APP_DB发送状态更新消息,如下:EVALSHA "<hash-of-set-lua>" "6" "PORT_TABLE_CHANNEL@0" "PORT_TABLE_KEY_SET" "_PORT_TABLE:Ethernet0" "_PORT_TABLE:Ethernet0" "_PORT_TABLE:Ethernet0" "_PORT_TABLE:Ethernet0" "G" "Ethernet0" "alias" "Ethernet5/1" "index" "5" "lanes" "9,10,11,12" "speed" "40000"
这个命令会在其中调用如下的命令来创建和发布消息:
SADD "PORT_TABLE_KEY_SET" "_PORT_TABLE:Ethernet0" HSET "_PORT_TABLE:Ethernet0" "alias" "Ethernet5/1" HSET "_PORT_TABLE:Ethernet0" "index" "5" HSET "_PORT_TABLE:Ethernet0" "lanes" "9,10,11,12" HSET "_PORT_TABLE:Ethernet0" "speed" "40000" PUBLISH "PORT_TABLE_CHANNEL@0" "_PORT_TABLE:Ethernet0"
所以最终这个消息会在APPL_DB中被存放成如下的形式:
PORT_TABLE_KEY_SET: _PORT_TABLE:Ethernet0 _PORT_TABLE:Ethernet0: alias: Ethernet5/1 index: 5 lanes: 9,10,11,12 speed: 40000
-
当ConsumerStateTable收到消息后,也会调用
EVALSHA
命令来执行Lua脚本,如下:EVALSHA "<hash-of-pop-lua>" "3" "PORT_TABLE_KEY_SET" "PORT_TABLE:" "PORT_TABLE_DEL_SET" "8192" "_"
和Producer类似,这个脚本会执行如下命令,将
PORT_TABLE_KEY_SET
中的key,也就是_PORT_TABLE:Ethernet0
读取出来,然后再将其对应的Hash读取出来,并更新到PORT_TABLE:Ethernet0
去,同时将_PORT_TABLE:Ethernet0
从数据库和PORT_TABLE_KEY_SET
中删除。SPOP "PORT_TABLE_KEY_SET" "_PORT_TABLE:Ethernet0" HGETALL "_PORT_TABLE:Ethernet0" HSET "PORT_TABLE:Ethernet0" "alias" "Ethernet5/1" HSET "PORT_TABLE:Ethernet0" "index" "5" HSET "PORT_TABLE:Ethernet0" "lanes" "9,10,11,12" HSET "PORT_TABLE:Ethernet0" "speed" "40000" DEL "_PORT_TABLE:Ethernet0"
到这里,数据的更新才算是完成了。
参考资料
- SONiC Architecture
- Github repo: sonic-swss
- Github repo: sonic-swss-common
- Redis keyspace notifications
- Redis Transactions
- Redis Atomicity with Lua
- Redis hashes
- Redis client handling