Redis发布与订阅功能publish、subscribe、psubscribe等命令组成
通过指向subscribe命令,客户端可以订阅一个或多个频道,每当有其他客户端向被订阅的频道发送消息,频道的所有订阅者都会收到消息
比如客户端A,B,C都执行了命令:
subscribe "new.it"
此时某个客户端执行命令:
publish "new.it" "hello"
那么3个客户端都会受到消息
频道的订阅与退订
struct redisServer {
//保存频道的订阅关系 key为频道名称,value为链表
dict *pubsub_channels;
}
{"subscribe",subscribeCommand,-2,"rpslt",0,NULL,0,0,0,0,0}
//建立绑定关系
int pubsubSubscribeChannel(redisClient *c, robj *channel) {
dictEntry *de;
list *clients = NULL;
int retval = 0;
//添加数据到server中
if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) {
retval = 1;
incrRefCount(channel);
/* Add the client to the channel -> list of clients hash table */
de = dictFind(server.pubsub_channels,channel);
if (de == NULL) {
clients = listCreate();
dictAdd(server.pubsub_channels,channel,clients);
incrRefCount(channel);
} else {
clients = dictGetVal(de);
}
listAddNodeTail(clients,c);
}
/* Notify the client */
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.subscribebulk);
addReplyBulk(c,channel);
addReplyLongLong(c,clientSubscriptionsCount(c));
return retval;
}
//发送消息
PUBLISH <channel> <message>
{"publish",publishCommand,3,"pltrF",0,NULL,0,0,0,0,0}
void publishCommand(redisClient *c) {
//查找订阅者
int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
if (server.cluster_enabled)
clusterPropagatePublish(c->argv[1],c->argv[2]);
else
forceCommandPropagation(c,REDIS_PROPAGATE_REPL);
addReplyLongLong(c,receivers);
}
//发布一条消息
int pubsubPublishMessage(robj *channel, robj *message) {
int receivers = 0;
dictEntry *de;
listNode *ln;
listIter li;
//查找当前频道下的所有客户端
de = dictFind(server.pubsub_channels,channel);
if (de) {
list *list = dictGetVal(de);
listNode *ln;
listIter li;
listRewind(list,&li);
while ((ln = listNext(&li)) != NULL) {
redisClient *c = ln->value;
addReply(c,shared.mbulkhdr[3]);
addReply(c,shared.messagebulk);
addReplyBulk(c,channel);
addReplyBulk(c,message);
receivers++;
}
}
//模式匹配发送数据
if (listLength(server.pubsub_patterns)) {
listRewind(server.pubsub_patterns,&li);
channel = getDecodedObject(channel);
while ((ln = listNext(&li)) != NULL) {
pubsubPattern *pat = ln->value;
if (stringmatchlen((char*)pat->pattern->ptr,
(int)sdslen(pat->pattern->ptr), WIN_PORT_FIX /* cast (int) */
(char*)channel->ptr,
(int)sdslen(channel->ptr),0)) { WIN_PORT_FIX /* cast (int) */
addReply(pat->client,shared.mbulkhdr[4]);
addReply(pat->client,shared.pmessagebulk);
addReplyBulk(pat->client,pat->pattern);
addReplyBulk(pat->client,channel);
addReplyBulk(pat->client,message);
receivers++;
}
}
decrRefCount(channel);
}
return receivers;
}
//如果是cluster模式,发送cluster消息
void clusterSendPublish(clusterLink *link, robj *channel, robj *message) {
unsigned char buf[sizeof(clusterMsg)], *payload;
clusterMsg *hdr = (clusterMsg*) buf;
uint32_t totlen;
uint32_t channel_len, message_len;
channel = getDecodedObject(channel);
message = getDecodedObject(message);
channel_len = (uint32_t)sdslen(channel->ptr); WIN_PORT_FIX /* cast (uint32_t) */
message_len = (uint32_t)sdslen(message->ptr); WIN_PORT_FIX /* cast (uint32_t) */
clusterBuildMessageHdr(hdr,CLUSTERMSG_TYPE_PUBLISH);
totlen = sizeof(clusterMsg)-sizeof(union clusterMsgData);
totlen += sizeof(clusterMsgDataPublish) - 8 + channel_len + message_len;
hdr->data.publish.msg.channel_len = htonl(channel_len);
hdr->data.publish.msg.message_len = htonl(message_len);
hdr->totlen = htonl(totlen);
/* Try to use the local buffer if possible */
if (totlen < sizeof(buf)) {
payload = buf;
} else {
payload = zmalloc(totlen);
memcpy(payload,hdr,sizeof(*hdr));
hdr = (clusterMsg*) payload;
}
memcpy(hdr->data.publish.msg.bulk_data,channel->ptr,sdslen(channel->ptr));
memcpy(hdr->data.publish.msg.bulk_data+sdslen(channel->ptr),
message->ptr,sdslen(message->ptr));
//一般广播还是集群广播
if (link)
clusterSendMessage(link,payload,totlen);
else
clusterBroadcastMessage(payload,totlen);
decrRefCount(channel);
decrRefCount(message);
if (payload != buf) zfree(payload);
}
查看订阅命令
//列出所有的频道信息
PUBSUB channels
{"pubsub",pubsubCommand,-2,"pltrR",0,NULL,0,0,0,0,0},
void pubsubCommand(redisClient *c) {
if (!strcasecmp(c->argv[1]->ptr,"channels") &&
(c->argc == 2 || c->argc ==3))
{
/* PUBSUB CHANNELS [<pattern>] */
sds pat = (c->argc == 2) ? NULL : c->argv[2]->ptr;
dictIterator *di = dictGetIterator(server.pubsub_channels);
dictEntry *de;
PORT_LONG mblen = 0;
void *replylen;
replylen = addDeferredMultiBulkLength(c);
while((de = dictNext(di)) != NULL) {
robj *cobj = dictGetKey(de);
sds channel = cobj->ptr;
if (!pat || stringmatchlen(pat, (int)sdslen(pat), WIN_PORT_FIX /* cast (int) */
channel, (int)sdslen(channel),0)) WIN_PORT_FIX /* cast (int) */
{
addReplyBulk(c,cobj);
mblen++;
}
}
dictReleaseIterator(di);
setDeferredMultiBulkLength(c,replylen,mblen);
} else if (!strcasecmp(c->argv[1]->ptr,"numsub") && c->argc >= 2) {
/* PUBSUB NUMSUB [Channel_1 ... Channel_N] */
int j;
addReplyMultiBulkLen(c,(c->argc-2)*2);
for (j = 2; j < c->argc; j++) {
list *l = dictFetchValue(server.pubsub_channels,c->argv[j]);
addReplyBulk(c,c->argv[j]);
addReplyLongLong(c,l ? listLength(l) : 0);
}
} else if (!strcasecmp(c->argv[1]->ptr,"numpat") && c->argc == 2) {
/* PUBSUB NUMPAT */
addReplyLongLong(c,listLength(server.pubsub_patterns));
} else {
addReplyErrorFormat(c,
"Unknown PUBSUB subcommand or wrong number of arguments for '%s'",
(char*)c->argv[1]->ptr);
}
}