这里实现一个简单的多媒体数据流的处理库,它是以Filter的思想来实现的,通过Filter可以实现多路数据采集,处理和输出;
一. 如何实现一个filter
1.定义一个Filter descripter 结构,它包含了Filter的主要属性和行为;
typedef struct McFilterDesc{ McFilterId id; const char* name; const char* text; McFilterCategory category; const char* encFormat; int ninputs; int noutputs; McFilterFunc init; McFilterFunc preprocess; McFilterFunc process; McFilterFunc postProcess; McFilterFunc uninit; McFilterMethod* methods; }UC_McFilterDesc;
2. 定义Filter结构体
struct McFilter{ UC_McFilterDesc* desc; pthread_mutex_t lock; struct McQueue **inputs; struct McQueue **outputs; McFilterNotifyFunc notify; void* notifyUd; void* data; struct McTicker* ticker; uint32_t lastTick; bool seen; };
3. 根据Filter descripter 创建Filter 实例
UC_McFilter* mc_filter_new_from_desc(UC_McFilterDesc* desc){ UC_McFilter* filter; filter = (UC_McFilter*)malloc(sizeof(UC_McFilter)); pthread_mutex_init(&filter->lock, NULL); filter->desc = desc; if(desc->ninputs > 0) filter->inputs = (UC_McQueue**)malloc(sizeof(UC_McQueue*) * desc->ninputs); if(desc->noutputs > 0) filter->outputs = (UC_McQueue**)malloc(sizeof(UC_McQueue*) * desc->noutputs); if(desc->init != NULL) desc->init(filter); return filter; }
二. 将创建的Filter 链接起来
UC_McQueue* mc_queue_new(UC_McFilter* filter1, int pin1, UC_McFilter* filter2, int pin2){ UC_McQueue* queue = (UC_McQueue*)malloc(sizeof(UC_McQueue)); uc_queue_init(&queue->q); queue->prev.filter = filter1; queue->prev.pin = pin1; queue->next.filter = filter2; queue->next.pin = pin2; return queue; } int mc_filter_link(UC_McFilter* filter1, int pin1, UC_McFilter* filter2, int pin2){ UC_McQueue* queue; queue = mc_queue_new(filter1, pin1, filter2, pin2); filter1->outputs[pin1] = queue; filter2->inputs[pin2] = queue; return 0; }
三. 将Filter链表加入运行表中
1. 创建Ticker结构,并启动运行线程
struct McTicker{ pthread_mutex_t lock; pthread_cond_t cond; pthread_t thread; UC_McList* execList; int interval; int execId; uint32_t ticks; uint64_t time; bool run; }; typedef struct McTicker UC_McTicker; static void ticker_start(UC_McTicker* ticker){ ticker->run = TRUE; pthread_create(&ticker->thread, NULL, ticker_run, ticker); } static void ticker_init(UC_McTicker* ticker){ pthread_mutex_init(&ticker->lock, NULL); pthread_cond_init(&ticker->cond, NULL); ticker->execList = NULL; ticker->ticks = 1; ticker->time = 0; ticker->interval = 10; ticker->run = FALSE; ticker->execId = 0; ticker_start(ticker); }
2. 将source filter 加入到Ticker的execList中
int mc_ticker_attach(UC_McTicker* ticker, UC_McFilter* filter){ UC_McList* sources = NULL; UC_McList* filters = NULL; UC_McList* iter; if(filter->ticker != NULL){ printf("Filter %s had beeb scheded\n", filter->desc->name); return 0; } printf("%s is invoked\n", __FUNCTION__); ticker_find_filters(&filters, filter); sources = ticker_get_sources(filters); //printf("%s is invoked 2\n", __FUNCTION__); if(sources == NULL){ mc_list_free(filters); return -1; } //printf("%s is invoked 3\n", __FUNCTION__); for(iter = filters; iter != NULL; iter = iter->next){ mc_filter_preprocess((UC_McFilter*)iter->data, ticker); } //printf("%s is invoked 4\n", __FUNCTION__); pthread_mutex_lock(&ticker->lock); ticker->execList = mc_list_concat(ticker->execList, sources); pthread_mutex_unlock(&ticker->lock); mc_list_free(filters); printf("%s is invoked exit\n", __FUNCTION__); return 0; }
3. 运行流程图
原文:http://www.cnblogs.com/xw022/p/3791063.html