这里先看一下异步发布和订阅的代码部分,后面再详细讲解原理
redis_publisher.h文件
// // Created by gary on 2021/4/28. // #ifndef REDIS_REDIS_PUBLISHER_H #define REDIS_REDIS_PUBLISHER_H #include <stdlib.h> #include <hiredis/async.h> #include <hiredis/adapters/libevent.h> #include <string> #include <vector> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #include <boost/tr1/functional.hpp> #include <event.h>
/*
解释:对于头文件semaphore.h信号量机制,后续再进一步理解;
boost/tr1/functional.hpp是1.6.4之前的版本有的部分
对于头文件event.h也是后面再去详细讲解
*/ using namespace std; class CRedisPublisher { public: CRedisPublisher(); ~CRedisPublisher(); bool init(); bool uninit(); bool connect(); bool disconnect(); bool publish(const string &channel_name, const string &message); private: //连接回调 static void connect_callback(const redisAsyncContext *redis_context, int status); //断开连接的回调 static void disconnect_callback(const redisAsyncContext *redis_context, int status); //执行命令回调 static void command_callback(redisAsyncContext* redis_context, void *reply, void *privdata); //事件分发线程函数 static void *event_thread(void *data); void *event_proc(); private: //libevent事件对象 event_base *_event_base; //事件线程ID pthread_t _event_thread; //事件线程的信号量 sem_t _event_sem; //hiredis异步对象 redisAsyncContext *_redis_context; }; #endif //REDIS_REDIS_PUBLISHER_H
针对对象redisAsyncContext的解释:
/* Context for an async connection to Redis */ typedef struct redisAsyncContext { /* Hold the regular context, so it can be realloc‘ed. */ redisContext c; /* Setup error flags so they can be used directly. */ int err; char *errstr; /* Not used by hiredis */ void *data; /* Event library data and hooks */ struct { void *data; /* Hooks that are called when the library expects to start * reading/writing. These functions should be idempotent. */
//幂等(idempotent、idempotence)是一个数学与计算机学概念,常见于抽象代数中。在编程中一个幂等操作的特点是其任意多次执行
//所产生的影响均与一次执行的影响相同。幂等函数,或幂等方法,是指可以使用相同参数重复执行,并能获得相同结果的函数。 void (*addRead)(void *privdata); void (*delRead)(void *privdata); void (*addWrite)(void *privdata); void (*delWrite)(void *privdata); void (*cleanup)(void *privdata); } ev; /* Called when either the connection is terminated due to an error or per * user request. The status is set accordingly (REDIS_OK, REDIS_ERR). */ redisDisconnectCallback *onDisconnect; /* Called when the first write event was received. */ redisConnectCallback *onConnect; /* Regular command callbacks */ redisCallbackList replies; /* Subscription callbacks */ struct { redisCallbackList invalid; struct dict *channels; struct dict *patterns; } sub; } redisAsyncContext;
对于对象redisContext
/* Context for a connection to Redis */ typedef struct redisContext { int err; /* Error flags, 0 when there is no error */ char errstr[128]; /* String representation of error when applicable */ int fd; int flags; char *obuf; /* Write buffer */ redisReader *reader; /* Protocol reader */ enum redisConnectionType connection_type; struct timeval *timeout; struct { char *host; char *source_addr; int port; } tcp; struct { char *path; } unix_sock; } redisContext;
针对redisConnectionType的解释
enum redisConnectionType { REDIS_CONN_TCP, REDIS_CONN_UNIX, };
对于事件event_base *_event_base;的解释:
使用libevent函数之前需要分配一个或者多个event_base结构体。每个event_base结构体持有一个事件集合,可以检测以确定哪个事件是激活的。
? 如果设置event_base使用锁,则可以安全地在多个线程中访问它。然而,其事件循环只能运行在一个线程中。如果需要用多个线程检测IO,则需要为每个线程使用一个event_base。
每个event_base都有一种用于检测哪种事件已经就绪的“方法”,或者说后端。可以识别的方法有
redis_publisher.cpp
// // Created by gary on 2021/4/28. // #include <stddef.h> #include <assert.h> #include <string.h> #include <iostream> #include "redis_publisher.h" using namespace std; CRedisPublisher::CRedisPublisher() :_event_base(0), _event_thread(0), _redis_context(0) {} CRedisPublisher::~CRedisPublisher() {} bool CRedisPublisher::init() { //initialize the event
// Create and return a new event_base to use with the rest of Libevent.
//@return a new event_base on success, or NULL on failure. _event_base = event_base_new();//创建libevent对象 if(NULL == _event_base) { cout << "Create redis event failed." << endl; return false; } memset(&_event_sem, 0, sizeof(_event_sem)); /*
sem_init() 初始化一个定位在 sem 的匿名信号量。value 参数指定信号量的初始值。 pshared 参数指明信号量是由进程内线程共享,
还是由进程之间共享。如果 pshared 的值为 0,那么信号量将被进程内的线程共享,并且应该放置在这个进程的所有线程都可见的
地址上(如全局变量,或者堆上动态分配的变量)。
*/ int ret = sem_init(&_event_sem, 0, 0); if(ret != 0) { cout << "Init sem failed\n"; return false; } return true; } bool CRedisPublisher::uninit() { _event_base = NULL;
//sem_destroy() 销毁由sem指向的匿名信号量 sem_destroy(&_event_sem); return true; } bool CRedisPublisher::connect() { //connect redis _redis_context = redisAsyncConnect("127.0.0.1", 6379);//异步连接到redis服务器上,使用默认端口 if(NULL == _redis_context) { cout << "Connect redis failed\n"; return false; } if(_redis_context->err) { cout << "Connect redis error: " << _redis_context->err << ", " <<_redis_context->errstr << "\n"; return false; } //attach the event
//异步连接功能,用redisLibeventAttach
与libevent绑定:对hiredis进行libevent事件注册,这里的base就是上一步创建的事件处理实例 redisLibeventAttach(_redis_context, _event_base); //创建事件处理线程 int ret = pthread_create(&_event_thread, 0, &CRedisPublisher::event_thread, this); if(ret != 0) { cout << "Create event thread failed.\n"; disconnect(); return false; } //设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态 redisAsyncSetConnectCallback(_redis_context, &CRedisPublisher::connect_callback); //设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连 redisAsyncSetDisconnectCallback(_redis_context, &CRedisPublisher::disconnect_callback); //启动事件线程
//sem_post函数的作用是给信号量的值加上一个“1”,它是一个“原子操作”---即同时对同一个信号量做加“1”操作的两个线程是不会冲突的;
//当有线程阻塞在这个信号量上时,调用这个函数会使其中一个线程不在阻塞,选择机制是由线程的调度策略决定的。
//sem_post() 成功时返回 0;错误时,信号量的值没有更改,-1 被返回,并设置 errno 来指明错误。 sem_post(&_event_sem); return true; } bool CRedisPublisher::disconnect() { if(_redis_context) { redisAsyncDisconnect(_redis_context); redisAsyncFree(_redis_context); _redis_context = NULL; } return true; } bool CRedisPublisher::publish(const string &channel_name, const string &message) {
//异步命令的发送方式和同步很像,区别在于异步发送函数执行后只能得到该命令是否成功过加入发送队列的返回,而无法确定这个命令是否发送成功以及命令的返回。 int ret = redisAsyncCommand(_redis_context, &CRedisPublisher::command_callback, this, "PUBLISH %s %s", channel_name.c_str(), message.c_str()); if(REDIS_ERR == ret) { cout << "Publish command failed: " << ret << "\n"; return false; } return true; } void CRedisPublisher::connect_callback(const redisAsyncContext *redis_context, int status) { if(status != REDIS_OK) { cout << "Error: " << redis_context->errstr << "\n"; } else { cout << "Redis connnected!\n"; } } void CRedisPublisher::disconnect_callback(const redisAsyncContext *redis_context, int status) { if(status != REDIS_OK) { cout << "Error: " << redis_context->errstr << "\n"; } } //消息接收回调函数 void CRedisPublisher::command_callback(redisAsyncContext *redis_context, void *reply, void *privdata) { cout << "Command callback.\n"; } void *CRedisPublisher::event_thread(void *data) { if(NULL == data) { cout <<"error!\n"; assert(false); return NULL; } CRedisPublisher *self_this = reinterpret_cast<CRedisPublisher *>(data); return self_this->event_proc(); } void *CRedisPublisher::event_proc() {
/*sem_wait() 减小(锁定)由sem指定的信号量的值.如果信号量的值比0大,
那么进行减一的操作,函数立即返回.
如果信号量当前为0值,那么调用就会一直阻塞直到或者是信号量变得可以进行减一的操作
(例如,信号量的值比0大),或者是信号处理程序中断调用*/ sem_wait(&_event_sem); //开启事件分发,event_base_dispatch会阻塞
//Event dispatching loop
//
// This loop will run the event base until either there are no more pending or
// active, or until something calls event_base_loopbreak() or
// event_base_loopexit().
event_base_dispatch(_event_base); return NULL; }
pub_main.cpp
// // Created by gary on 2021/4/28. // #include "redis_publisher.h" int main(int argc, char *argv[]) { CRedisPublisher publisher; bool ret = publisher.init(); if (!ret) { printf("Init failed.\n"); return 0; } ret = publisher.connect(); if (!ret) { printf("connect failed."); return 0; } while (true) { publisher.publish("test-channel", "Test message by gary"); sleep(1); } publisher.disconnect(); publisher.uninit(); return 0; }
redis_subscriber.h
// // Created by gary on 2021/4/28. // #ifndef REDIS_REDIS_SUBSCRIBER_H #define REDIS_REDIS_SUBSCRIBER_H #include <iostream> #include <stdlib.h> #include <hiredis/async.h> #include <hiredis/adapters/libevent.h> #include <string> #include <vector> #include <unistd.h> #include <pthread.h> #include <semaphore.h> #include <boost/tr1/functional.hpp> class CRedisSubscriber { public: // 回调函数对象类型,当接收到消息后调用回调把消息发送出去 //使用std::tr1::function对象代替虚函数 //在 C++的TR1中(Technology Report)中包含一个function模板类和bind模板函数,使用它们可以实现类似函数指针的 // 功能,但却却比函数指针更加灵活,特别是函数指向类的非静态成员函数时。可以参考Scott Meyers. <<Effective C++ (3rd Edition)>>. Item 35. typedef std::tr1::function<void(const char *, const char *, int )> NotifyMessageFn; CRedisSubscriber(); ~CRedisSubscriber(); bool init(const NotifyMessageFn &fn);//传入回调对象 bool uninit(); bool connect(); bool disconnect(); //可以多次调用,订阅多个频道 bool subscribe(const std::string &channel_name); private: //连接回调 static void connect_callback(const redisAsyncContext *redis_context, int status); //断开连接的回调 static void disconnect_callback(const redisAsyncContext *redis_context, int status); //执行命令回调 static void command_callback(redisAsyncContext *redis_context, void *reply, void *privdata); //事件分发线程函数 static void *event_thread(void *data); void *event_proc(); private: //libevent事件对象 event_base *_event_base; //事件线程ID pthread_t _event_thread; //事件线程的信号量 sem_t _event_sem; //hiredis异步对象 redisAsyncContext *_redis_context; //通知外层的回调函数对象 NotifyMessageFn _notify_message_fn; }; #endif //REDIS_REDIS_SUBSCRIBER_H
redis_subscriber.cpp
// // Created by gary on 2021/4/28. // #include <stddef.h> #include <assert.h> #include <string.h> #include <iostream> #include "redis_subscriber.h" using namespace std; CRedisSubscriber::CRedisSubscriber() :_event_base(0), _event_thread(0), _redis_context(0) {} CRedisSubscriber::~CRedisSubscriber() {} bool CRedisSubscriber::init(const CRedisSubscriber::NotifyMessageFn &fn) { //initialize the event _notify_message_fn = fn; _event_base = event_base_new(); if(NULL == _event_base) { cout << "Create redis event failed\n"; return false; } memset(&_event_sem, 0, sizeof(_event_sem)); int ret = sem_init(&_event_sem, 0, 0); if (ret != 0) { printf(": Init sem failed.\n"); return false; } return true; } bool CRedisSubscriber::uninit() { _event_base = NULL; sem_destroy(&_event_sem); return true; } bool CRedisSubscriber::connect() { // connect redis _redis_context = redisAsyncConnect("127.0.0.1", 6379); // 异步连接到redis服务器上,使用默认端口 if (NULL == _redis_context) { printf(": Connect redis failed.\n"); return false; } if (_redis_context->err) { printf(": Connect redis error: %d, %s\n", _redis_context->err, _redis_context->errstr); // 输出错误信息 return false; } // attach the event redisLibeventAttach(_redis_context, _event_base); // 将事件绑定到redis context上,使设置给redis的回调跟事件关联 // 创建事件处理线程 int ret = pthread_create(&_event_thread, 0, &CRedisSubscriber::event_thread, this); if (ret != 0) { printf(": create event thread failed.\n"); disconnect(); return false; } // 设置连接回调,当异步调用连接后,服务器处理连接请求结束后调用,通知调用者连接的状态 redisAsyncSetConnectCallback(_redis_context, &CRedisSubscriber::connect_callback); // 设置断开连接回调,当服务器断开连接后,通知调用者连接断开,调用者可以利用这个函数实现重连 redisAsyncSetDisconnectCallback(_redis_context, &CRedisSubscriber::disconnect_callback); // 启动事件线程 sem_post(&_event_sem); return true; } bool CRedisSubscriber::disconnect() { if (_redis_context) { redisAsyncDisconnect(_redis_context); redisAsyncFree(_redis_context); _redis_context = NULL; } return true; } bool CRedisSubscriber::subscribe(const std::string &channel_name) { int ret = redisAsyncCommand(_redis_context, &CRedisSubscriber::command_callback, this, "SUBSCRIBE %s", channel_name.c_str()); if(REDIS_ERR == ret) { cout << "Subscribe command failed: " <<ret << "\n"; return false; } cout << "Subscribe success: " << channel_name.c_str() << "\n"; return true; } void CRedisSubscriber::connect_callback(const redisAsyncContext *redis_context, int status) { if (status != REDIS_OK) { printf(": Error: %s\n", redis_context->errstr); } else { printf(": Redis connected!"); } } void CRedisSubscriber::disconnect_callback(const redisAsyncContext *redis_context, int status) { if (status != REDIS_OK) { // 这里异常退出,可以尝试重连 printf(": Error: %s\n", redis_context->errstr); } } // 消息接收回调函数 void CRedisSubscriber::command_callback(redisAsyncContext *redis_context, void *reply, void *privdata) { if (NULL == reply || NULL == privdata) { return ; } // 静态函数中,要使用类的成员变量,把当前的this指针传进来,用this指针间接访问 CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(privdata); ///* This is the reply object returned by redisCommand() */ redisReply *redis_reply = reinterpret_cast<redisReply *>(reply); // 订阅接收到的消息是一个带三元素的数组 if (redis_reply->type == REDIS_REPLY_ARRAY && redis_reply->elements == 3)//这里的3是固定不变的 { printf(": Recieve message:%s:%d:%s:%d:%s:%d\n", redis_reply->element[0]->str, redis_reply->element[0]->len, redis_reply->element[1]->str, redis_reply->element[1]->len, redis_reply->element[2]->str, redis_reply->element[2]->len); // 调用函数对象把消息通知给外层 self_this->_notify_message_fn(redis_reply->element[1]->str, redis_reply->element[2]->str, redis_reply->element[2]->len); } } void *CRedisSubscriber::event_thread(void *data) { if (NULL == data) { printf(": Error!\n"); assert(false); return NULL; } CRedisSubscriber *self_this = reinterpret_cast<CRedisSubscriber *>(data); return self_this->event_proc(); } void *CRedisSubscriber::event_proc() { sem_wait(&_event_sem); // 开启事件分发,event_base_dispatch会阻塞 event_base_dispatch(_event_base); return NULL; }
sub_main.cpp
// // Created by gary on 2021/4/28. // #include "redis_subscriber.h" void recieve_message(const char *channel_name, const char *message, int len) { printf("Recieve message:\n channel name: %s\n message: %s\n", channel_name, message); } int main(int argc, char *argv[]) { CRedisSubscriber subscriber; CRedisSubscriber::NotifyMessageFn fn = bind(recieve_message, std::tr1::placeholders::_1, std::tr1::placeholders::_2, std::tr1::placeholders::_3); bool ret = subscriber.init(fn); if (!ret) { printf("Init failed.\n"); return 0; } ret = subscriber.connect(); if (!ret) { printf("Connect failed.\n"); return 0; } subscriber.subscribe("test-channel"); while (true) { sleep(1); } subscriber.disconnect(); subscriber.uninit(); return 0; }
cmakelists.txt
cmake_minimum_required(VERSION 3.10.2) project(redis) set(CMAKE_CXX_STANDARD 14) set(CMAKE_CXX_FLAGS "${CAMKE_CXX_FLAGS} -std=c++11 -pthread") #find_package(hiredis REQUIRED) find_package(Boost REQUIRED) include_directories(include /usr/include/hiredis) include_directories(${Boost_INCLUDE_DIRS}) include_directories(/usr/include/event) link_directories(/usr/lib/event) add_library(redis_lib src/redis_config.cpp src/redis_tools.cpp) add_executable(redis_demo src/main.cpp ) target_link_libraries(redis_demo redis_lib /usr/lib/x86_64-linux-gnu/libhiredis.a) add_library(redis_pub_lib src/redis_publisher.cpp /usr/lib/x86_64-linux-gnu/libevent.a /usr/lib/x86_64-linux-gnu/libevent_core.a /usr/lib/x86_64-linux-gnu/libevent_extra.a) add_executable(redis_pub_main src/pub_main.cpp) target_link_libraries(redis_pub_main redis_pub_lib /usr/lib/x86_64-linux-gnu/libhiredis.a /usr/lib/x86_64-linux-gnu/libevent.a /usr/lib/x86_64-linux-gnu/libevent_core.a /usr/lib/x86_64-linux-gnu/libevent_extra.a) add_library(redis_sub_lib src/redis_subscriber.cpp /usr/lib/x86_64-linux-gnu/libevent.a /usr/lib/x86_64-linux-gnu/libevent_core.a /usr/lib/x86_64-linux-gnu/libevent_extra.a) add_executable(redis_sub_main src/sub_main.cpp) target_link_libraries(redis_sub_main redis_sub_lib /usr/lib/x86_64-linux-gnu/libhiredis.a /usr/lib/x86_64-linux-gnu/libevent.a /usr/lib/x86_64-linux-gnu/libevent_core.a /usr/lib/x86_64-linux-gnu/libevent_extra.a)
原文:https://www.cnblogs.com/gary-guo/p/14718446.html