2024.5.22更新
由于性能问题,ws里面还有一个小BUG未修改,此库放弃使用。
推荐几个已经使用的http库:
A C++11 single-file header-only cross platform HTTP/HTTPS library.
https://github.com/yhirose/cpp-httplib
基于C++17/20的Http应用框架,使用Drogon可以方便的使用C++构建各种类型的Web应用服务端程序
https://github.com/drogonframework/drogon
Header only c++ network library, based on asio,support tcp,udp,http,websocket,rpc,ssl,icmp,serial_port.
https://github.com/zhllxt/asio2
背景需求
每个工作背景都不同,需要总结一下:
1.C++编写,跨平台(windows上用的vs2013,所以是C++11标准)
2.支持http和ws的server,支持多线程,支持uri映射(类似addhandler("/hello",onrequestcallback);),支持请求中相关字段的获取(get/post参数读取、header、cookie等读取)
3.不用性能超强,并发数也不大(几十并发)
4.依赖库少一点,不想为了一个小功能引入boost之类的大家伙
之前一直用的是libevent库提供的http服务作为一个接口服务器,后来有了新的需求要增加ws支持,官方的不支持ws,自己魔改libevent增加ws支持实力不够改的兼容性不好,框架网上有大把的框架,用现成的吧。
筛选
查找了很多库 https://github.com/fffaraz/awesome-cpp#networking
前后测试了两遍没有完全满足的,都要二次封装,那么就选一个库作为基石来鼓捣。
最后选择的是mongoose(具体过程不说了,完全是个人情况而定的)
mongoose一个很单纯的C编写的库,包含了client和server的功能,单线程业务处理,不过提供的方法倒是挺多的,非常方便使用。不过这个库不是性能优先,如果非常注重性能(几万并发起步)那还是不要用了,不过却满足我的当前需求。
建立http服务
mongoose简单示例(官方)
#include "mongoose.h" //连接上所有事件的回调函数 static void ev_handler(struct mg_connection *nc, int ev, void *p) { //http请求 if (ev == MG_EV_HTTP_REQUEST) { //发送返回值,有多个函数可以用不同姿势发送返回值 mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK"); } } int main(void) { struct mg_mgr mgr; struct mg_connection *nc; mg_mgr_init(&mgr, NULL); //初始化连接管理器 //绑定端口,设置回调函数 nc = mg_bind(&mgr, "8000", ev_handler); if (nc == NULL) { printf("Failed to create listener\n"); return 1; } //允许http和websocket mg_set_protocol_http_websocket(nc); //事件循环 for (;;) { mg_mgr_poll(&mgr, 1000); } //释放连接管理器 mg_mgr_free(&mgr); return 0; }
先建立连接管理器(mg_mgr),然后所有的连接都在mg_mgr中以链表的形式管理。
然后在mg_mgr中bind一个本地端口,并设置该连接允许http和websocket(mg_set_protocol_http_websocket),然后处理回调消息就完事了,非常简单。
那么也简单了,按照这几个方法,很容易封装出来一个C++的版本,这里不多贴代码了。
多线程的使用
多线程的官方示例:
#include "mongoose.h" static sig_atomic_t s_received_signal = 0; static const char *s_http_port = "8000"; static const int s_num_worker_threads = 5; static unsigned long s_next_id = 0; static void signal_handler(int sig_num) { signal(sig_num, signal_handler); s_received_signal = sig_num; } static struct mg_serve_http_opts s_http_server_opts; static sock_t sock[2]; // This info is passed to the worker thread struct work_request { unsigned long conn_id; // needed to identify the connection where to send the reply // optionally, more data that could be required by worker }; // This info is passed by the worker thread to mg_broadcast struct work_result { unsigned long conn_id; int sleep_time; }; static void on_work_complete(struct mg_connection *nc, int ev, void *ev_data) { (void) ev; char s[32]; struct mg_connection *c; for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c)) { if (c->user_data != NULL) { struct work_result *res = (struct work_result *)ev_data; if ((unsigned long)c->user_data == res->conn_id) { sprintf(s, "conn_id:%lu sleep:%d", res->conn_id, res->sleep_time); mg_send_head(c, 200, strlen(s), "Content-Type: text/plain"); mg_printf(c, "%s", s); } } } } void *worker_thread_proc(void *param) { struct mg_mgr *mgr = (struct mg_mgr *) param; struct work_request req = {0}; while (s_received_signal == 0) { if (read(sock[1], &req, sizeof(req)) < 0) perror("Reading worker sock"); int r = rand() % 10; sleep(r); struct work_result res = {req.conn_id, r}; mg_broadcast(mgr, on_work_complete, (void *)&res, sizeof(res)); } return NULL; } static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) { (void) nc; (void) ev_data; switch (ev) { case MG_EV_ACCEPT: nc->user_data = (void *)++s_next_id; break; case MG_EV_HTTP_REQUEST: { struct work_request req = {(unsigned long)nc->user_data}; if (write(sock[0], &req, sizeof(req)) < 0) perror("Writing worker sock"); break; } case MG_EV_CLOSE: { if (nc->user_data) nc->user_data = NULL; } } } int main(void) { struct mg_mgr mgr; struct mg_connection *nc; int i; if (mg_socketpair(sock, SOCK_STREAM) == 0) { perror("Opening socket pair"); exit(1); } signal(SIGTERM, signal_handler); signal(SIGINT, signal_handler); mg_mgr_init(&mgr, NULL); nc = mg_bind(&mgr, s_http_port, ev_handler); if (nc == NULL) { printf("Failed to create listener\n"); return 1; } mg_set_protocol_http_websocket(nc); s_http_server_opts.document_root = "."; // Serve current directory s_http_server_opts.enable_directory_listing = "no"; for (i = 0; i < s_num_worker_threads; i++) { mg_start_thread(worker_thread_proc, &mgr); } printf("Started on port %s\n", s_http_port); while (s_received_signal == 0) { mg_mgr_poll(&mgr, 200); } mg_mgr_free(&mgr); closesocket(sock[0]); closesocket(sock[1]); return 0; }
开始看的我有点懵逼,建立了一个mg_socketpair来完成业务线程与工作线程的通信,在多线程的工作线程中要用mg_broadcast回调在回调里面真正发送数据,开始没有很好地理解写错了,崩溃了整整一宿(20:00到3:00)。后来仔细阅读了代码结合一些其他人的描述,才弄明白原理。
首先mongoose是业务线程是单线程,就是mg_mgr_poll那个循环就是业务线程,也就是说ev_handler这个回调和发送返回值操作都要在业务线程中做,不在业务线程中调用发送函数(例如mg_send_head)是错误的直接蹦。工作线程是可以多个线程来处理业务,处理完毕后要通知回业务线程。这个通知的方法就是mg_broadcast。
至于这个mg_socketpair可以认为是一个队列,用于业务线程接收到事件后,封装数据的发送到工作线程。补个图。
多线程之间通信
ev_handler回调的数据发送到工作线程
官方示例中使用mg_socketpair通信,其实就是建立了一对socket一个作为发送者一个作为接收者,就是socket通信而已。实际上可以用队列比较方便。建议使用无锁队列效率比较高(写的第一版用的是加锁队列,效率差了十多倍),无锁队列库:https://github.com/cameron314/concurrentqueue (个人比较喜欢的header only方式)
数据传输这里,开始我直接传递的event_data指针,发现完全不行,因为ev_handler返回后该数据会被销毁,所以要在回调中把event_data中所需要的数据都缓存下来。比如http的回调事件MG_EV_HTTP_REQUEST,event_data是http_message*数据,里面包含uri、mehtod、body、message、header等等信息,都要自己另存下来,否则回调一返回数据就会被销毁了。保存好这份数据后,将其插入队列等待工作线程取数据处理。这时候回调返回,业务线程可以继续loop去干其他事情。
http_message *msg = (http_message *)event_data; //保存回调里面的关键数据 MG_EVENT_DATA_PTR cbdata = new MG_EVENT_DATA; cbdata->event_type = event_type; cbdata->body.assign(msg->body.p, msg->body.len); cbdata->method.assign(msg->method.p, msg->method.len); cbdata->uri.assign(msg->uri.p, msg->uri.len); cbdata->query.assign(msg->query_string.p, msg->query_string.len); cbdata->proto.assign(msg->proto.p, msg->proto.len); cbdata->contentlength = msg->content_length; for (int i = 0; i < MG_MAX_HTTP_HEADERS; i++) { if (msg->header_names[i].len == 0) break; cbdata->headers.emplace_back(std::string(msg->header_names[i].p, msg->header_names[i].len), std::string(msg->header_values[i].p, msg->header_values[i].len)); } //将cbdata插入队列
工作线程处理与返回
工作线程是多线程,例如4线程就用std::thread开启4个work_handler()。里面的逻辑就是先从队列里面取出来数据,然后取出来数据后回调给用户接口,用户接口干活,干完之后封装返回值,这些都不用多说就是个简单的callback而已。这里再次强调不能直接在工作线程中调用发送方法(例如mg_send_head),要用回调将数据返回给业务线程去处理。重点就是mg_broadcast这个坑,写的真尼玛晦涩难懂,还都是坑,怪不得网上很多人都说官方的多线程例子是坨屎。
首先说说mg_broadcast这个方法
//工作线程中调用 void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, size_t len) { struct ctl_msg ctl_msg; /* * Mongoose manager has a socketpair, `struct mg_mgr::ctl`, * where `mg_broadcast()` pushes the message. * `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls * specified callback for each connection. Thus the callback function executes * in event manager thread. */ if (mgr->ctl[0] != INVALID_SOCKET && data != NULL && len < sizeof(ctl_msg.message)) { size_t dummy; ctl_msg.callback = cb; memcpy(ctl_msg.message, data, len); dummy = MG_SEND_FUNC(mgr->ctl[0], (char *) &ctl_msg, offsetof(struct ctl_msg, message) + len, 0); dummy = MG_RECV_FUNC(mgr->ctl[0], (char *) &len, 1, 0); (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ } } //业务线程中接收 static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) { struct ctl_msg ctl_msg; int len = (int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0); size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0); DBG(("read %d from ctl socket", len)); (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) { struct mg_connection *nc; for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) { //回调方法 ctl_msg.callback(nc, MG_EV_POLL, ctl_msg.message MG_UD_ARG(nc->user_data)); } } }
void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, size_t len);
第二个参数是广播的回调方法,data和len是数据。工作线程中调用mg_broadcast来广播,触发的回调方法里面就是业务线程了(开始没理解,后来通过打印调用和回调的threadid才明白)。
内部很简单其实就是一个在内部建立了一个mg_socketpair(上面描述过),将要传递的数据封装到结构体里面,工作线程中调用send把结构体发送出去,业务线程里面recv到这个结构体,然后触发回调,很简单的逻辑。这里官方代码很坑,connection在连接后 ( MG_EV_ACCEPT 事件),初始化了一个唯一connid( s_next_id )给每个connection(存储到了user_data里面),广播会将每个connection都回调一遍(上面代码里面nc那个链表的遍历),然后在回调里面判断当前的connection是不是我要发送数据的connid,不是的话返回,是的话继续发送数据。官方示例里面在on_work_complete又遍历了一遍所有connection这里其实是错误的,一个大坑,因为回调已经是遍历所有connection回调了。这个也符合mg_broadcast这个名字,广播给所有connection回调。
这里还有一个大坑,因为mg_broadcast其实也是异步的,利用socket发送用户数据,那个结构体的定义为:
struct ctl_msg {
mg_event_handler_t callback;
char message[MG_CTL_MSG_MESSAGE_SIZE]; //默认是8192
};
所以这里传递的数据不能是原始返回内容的buffer,太小了不合适呀,所以要new一个buffer传递进去,然后在回调里面去delete掉。但这里会引入另外一个问题,broadcast里面是遍历所有正常的connection,如果某个用户接口处理时间比较长比如要2秒,返回的时候connection已经断开了(多线程下是可能存在的),那么这个广播后就不能触发回调,导致这份buffer无法被删除造成内存泄露。
问题总结:
1.广播时connection断开了无法删除buffer
2.异步的,不知道当前connection的状态
3.工作线程与业务线程传递数据使用广播,意味着每次都要通知一遍所有的connection,这完全没有必要呀,我有connid那只回调一个connection就可以了哇。当然原本的broadcast到所有nc还是有必要的。
开始魔改mg_broadcast吧。。。改后的代码:
struct ctl_msg { mg_event_handler_t callback; uint32_t connid; //connID,可以遍历指定的connection int syncflag; //异步和同步执行标记 void *data; //传递的数据 }; //增加connid参数,可以指定connid来触发回调,如果connid为0那就是广播(给connection初始化connid的时候不能设置为0) //data: 用户数据指针 //ret:返回值指针,NULL为异步,如果设置了就意味着同步发送 发送完毕后才返回。同步方式可以节省内存,异步发送需要新开辟内存 void mg_broadcast(struct mg_mgr *mgr, uint32_t connid, mg_event_handler_t cb, void *data, int *ret) { struct ctl_msg ctl_msg; /* * Mongoose manager has a socketpair, `struct mg_mgr::ctl`, * where `mg_broadcast()` pushes the message. * `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls * specified callback for each connection. Thus the callback function executes * in event manager thread. */ if (mgr->ctl[0] != INVALID_SOCKET && data != NULL) { size_t dummy; ctl_msg.callback = cb; ctl_msg.connid = connid; if (ret) ctl_msg.syncflag = 1; ctl_msg.data = data; dummy = MG_SEND_FUNC(mgr->ctl[0], (char *)&ctl_msg, sizeof(ctl_msg), 0); dummy = MG_RECV_FUNC(mgr->ctl[0], (char *)&ctl_msg.syncflag, sizeof(int), 0); (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ if (ret) *ret = ctl_msg.syncflag; } } static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) { struct ctl_msg ctl_msg; size_t dummy; int ret = 1; int len = (int)MG_RECV_FUNC(mgr->ctl[1], (char *)&ctl_msg, sizeof(ctl_msg), 0); //不是同步执行 就立刻返回 if (ctl_msg.syncflag == 0) dummy = MG_SEND_FUNC(mgr->ctl[1], (char *)&ret, sizeof(int), 0); DBG(("read %d from ctl socket", len)); (void)dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */ if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) { struct mg_connection *nc; if (ctl_msg.connid == 0) { //先广播所有正常的连接 for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) { ctl_msg.callback(nc, MG_EV_POLL, ctl_msg.data MG_UD_ARG(nc->user_data)); } //最后发送一个nc为NULL的回调 表示广播结束了 用于清理内存等等 ctl_msg.callback(NULL, MG_EV_POLL, ctl_msg.data MG_UD_ARG(nc->user_data)); } else { for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) { if (nc->connid == ctl_msg.connid) break; } if (nc == NULL) ret = 0; ctl_msg.callback(nc, MG_EV_POLL, ctl_msg.data MG_UD_ARG(nc->user_data)); } } //同步执行的情况下 执行完毕了 返回ret if (ctl_msg.syncflag) dummy = MG_SEND_FUNC(mgr->ctl[1], (char *)&ret, sizeof(int), 0); }
增加connid参数,可以指定connid来触发回调,如果connid为0那就是广播到所有connection(给connection初始化connid的时候不能设置为0)。
增加异步、同步控制变量(int *ret),传入NULL还为异步执行,如果设置了就意味着同步发送,发送完毕后才返回。同步方式可以节省内存,异步发送需要新开辟内存。
回调变动:
指定connid:如果connection还活着,那么正常回调(一次),如果connection关闭了没了,那么回调的mg_connection*是个NULL,用于清理缓冲区
未指定connid:广播到所有connection上,并且在所有connection回调完毕后,附加一次mg_connection*为NULL的广播,表示广播结束,用于清理缓冲区
小魔改:
上述保存connection的connid用的是mg_connection的user_data变量,但是我们实现业务肯定也有一堆数据要存储,而且不同类型的请求例如http和ws要保存的数据类型也不一样,区分处理很麻烦,随手魔改了mg_connection结构体,直接给他加上了connid这个变量。开始做第一版的时候,用了mg_connection结构体里面的sock这个变量作为connid,也就是socket的句柄,不过后来想想大并发下socket关闭后再建立新连接可能句柄是会重复的,那就失去唯一的作用了,所以新增了connid变量。
好啦,完事!
用压力测试跑了下几十个并发,对比发现低并发下跟其他库比如libevent差不多,满足要求。
=========================================================
2021.06.25更新
虽然上述的修改满足了要求,但是却修改了mongoose官方的代码,而且改动很细节,所以后来直接优化了一版,将上述的方法封装到了自己的C++代码中,mongoose直接引用官方6.18版本的代码即可,无需任何改动。
=========================================================
=========================================================
2021.07.23更新
有朋友指出官方的6.18版本无法编译通过,发现还真的是有个结构体声明有问题,当时开发我拿的是假的6.18么?
根据官方最后的6.18版本,重新调整。。。
增加静态文件服务器模式StartFileServer方法,该方法不能与StartServer同时使用,二选一。静态文件本地路径要求是"/"分割,不能用windows的"\"。
例如StartFileServer("127.0.0.1:8686", "D:/");
=========================================================
完整代码(2021.07.23更新):
需要依赖无锁队列库:https://github.com/cameron314/concurrentqueue
如果不开启ssl则无其他第三方依赖库。
C++11标准,编译器为vs2013。
FHttpServer.h
#pragma once #include <string> #include <chrono> #include <vector> #include <map> #include <thread> #include <mutex> #include <functional> #include "queue/blockingconcurrentqueue.h" //#define MG_ENABLE_SSL 1 #include "mongoose.h" typedef std::vector <std::pair<std::string, std::string>> StringPairArray; //每个连接的userdata typedef struct { uint32_t connID; std::string uri; //ws依赖 用于回调的map key std::string wsname;//ws依赖 用于广播 类似组名 可重复 }MG_CONNECT_USERDATA; //多线程任务队列 typedef struct { union socket_address saddr; //uint32_t connFlags; uint32_t connID; int event_type; std::string body; //如果是websocket 这里是data std::string method; std::string uri; std::string query; std::string proto; StringPairArray headers; int wsflags; //ws frame事件中的flags }MG_EVENT_DATA, *MG_EVENT_DATA_PTR; class FHttpServer; //http请求信息 class FHttpRequest { //不可复制 FHttpRequest(const FHttpRequest &) = delete; FHttpRequest &operator=(const FHttpRequest &) = delete; MG_EVENT_DATA_PTR m_pEventData; public: FHttpRequest(MG_EVENT_DATA_PTR eventdata) : m_pEventData(eventdata){}; ~FHttpRequest(){}; uint32_t GetConnID(); std::string GetURI(); std::string GetMethod(); std::string GetRemoteIP(); //从url上面得到参数 std::string GetQueryParam(const char *name); //从post的数据中得到参数 std::string GetPostParam(const char *name); std::string GetHeader(const char *name); std::string GetCookies(); size_t GetDataLen(); std::string GetData(); private: std::string GetParam(mg_str *query_string, const char *name); }; //http返回值操作 class FHttpResponse { //不可复制 FHttpResponse(const FHttpResponse &) = delete; FHttpResponse &operator=(const FHttpResponse &) = delete; FHttpServer *m_pServer; uint32_t m_connID; int m_nStatusCode; std::string m_strStatus; StringPairArray *m_pDefaultHeaders; //默认的返回headers StringPairArray m_arrHeaders; //如果修改了,那么使用自己的副本 public: FHttpResponse(uint32_t connID, FHttpServer *pServer, StringPairArray *defaultHeaders); ~FHttpResponse(); //是否已经发送过返回值了 bool m_bsenddata; //设置http返回code和reason,要在Send之前调用 void SetHttpStatusCode(int nStatusCode, const char *reason); //设置header 唯一header名 要在Send之前调用 void SetHttpHeader(const std::string &key, const std::string &value); //追加header,类似多个SetCookie这种头部 要在Send之前调用 void AppendHttpHeader(const std::string &key, const std::string &value); //发送320转向 void SetHttpRedirect(const std::string &rurl); //发送数据 直接发送 void SendHttpContent(const std::string &data, const char *type = NULL); //以chunk方式发送数据,调用SendChunkContent发送数据,不要直接用SendContent void SendHttpChunkBegin(); //发送chunk数据,结束chunk发送一个空buf void SendHttpChunkContent(const std::string &data); private: void _SendData(const std::string *content); void _MakeResponseHeaders(std::string *data); }; //websocket的消息信息 class fsWSMessage { MG_EVENT_DATA_PTR m_pEventData; public: fsWSMessage(MG_EVENT_DATA_PTR eventdata) : m_pEventData(eventdata){}; ~fsWSMessage(){}; uint32_t GetConnID(); std::string GetMsg(); int GetFlag(); }; //websocket回话操作 class fsWSSession { FHttpServer *m_pServer; uint32_t m_connID; public: fsWSSession(uint32_t connID, FHttpServer *pServer) : m_connID(connID), m_pServer(pServer){}; ~fsWSSession(){}; //设置该ws连接的name void SetName(const std::string &name); //发送websocket数据 void SendFrameMsg(const std::string &data, int type = WEBSOCKET_OP_TEXT); //关闭连接 void CloseConnect(); }; class FHttpServer { //静态文件服务器 struct mg_serve_http_opts m_mHttpServerOpts; //mg_serve_http_opts傻逼结构体用的是指针,还要保存下来字符串变量 std::string m_strlocalpath; bool m_bisfileserver; struct mg_mgr m_mMgr; bool m_exitflag; //线程之间广播使用 typedef void(*BROADCAST_CALLBACK)(struct mg_connection *nc, void *ev_data); typedef struct { BROADCAST_CALLBACK callback; uint32_t connid; //connID,可以遍历指定的connection void *data; //传递的数据 }MG_BROADCAST_MSG; sock_t m_broadsender; uint32_t m_nSeqConnid; //默认的返回headers StringPairArray m_defaultHeaders; //工作线程和队列 tsBlockQuque<MG_EVENT_DATA_PTR> m_TaskQueue; std::vector <std::thread> m_threadPool; //http回调方法和映射表 typedef std::function<void(FHttpRequest &, FHttpResponse &)> funcHttpCallback; std::map <std::string, funcHttpCallback> m_mapHttpRouter; //websocket回调方法和映射表 typedef std::function<void(FHttpRequest &, fsWSSession &)> funcWSOnReadyCallback; typedef std::function<void(fsWSMessage &, fsWSSession &)> funcWSOnMsgCallback; typedef std::function<void(uint32_t)> funcWSOnCloseCallback; struct stWebSocketCallback { funcWSOnReadyCallback onready; funcWSOnMsgCallback onmessage; funcWSOnCloseCallback onclose; }; std::map <std::string, stWebSocketCallback> m_mapWSRouter; public: FHttpServer(); ~FHttpServer(); bool StartFileServer(const std::string &addr, const std::string &localpath); //启动服务 addr可以是"8888", ":8888", "127.0.0.1:8888"; nThread为工作线程数 bool StartServer(const std::string &addr, int nThread); //停止服务 void StopServer(); //添加默认返回的http头,唯一值 void AddDefaultHeader(const std::string &key, const std::string &value); //添加http回调接口 void AddHttpHandler(const std::string &uri, funcHttpCallback callback); //添加websocket回调接口 void AddWSHandler(const std::string &uri, funcWSOnReadyCallback onready, funcWSOnMsgCallback onmessage, funcWSOnCloseCallback onclose); ////////////////////////////////////////////////////////////////////////// //websocket使用的接口 ////////////////////////////////////////////////////////////////////////// //发送数据到connID这个ws连接上 void SendWSFrame(uint32_t connID, const std::string &data, int type = WEBSOCKET_OP_TEXT); //关闭ws连接 void CloseWSConn(uint32_t connID); //广播数据到name这个组 void BroadcastWS(const std::string &name, const std::string &data, int type = WEBSOCKET_OP_TEXT); ////////////////////////////////////////////////////////////////////////// //内部使用的接口,外部不要调用 void _settaskquque(MG_EVENT_DATA_PTR eventdata); bool _ishttphandler(const std::string &uri); bool _iswshandler(const std::string &uri); bool _isstop(); void _broadcast(uint32_t connID, void *data, BROADCAST_CALLBACK cb); private: static void onEventCallback(mg_connection *nc, int event_type, void *event_data); void TaskQuqueRunner(); void WorkEventHandler(MG_EVENT_DATA_PTR eventdata); };
FHttpServer.cpp
#include "stdafx.h" #include "FHttpServer.h" #define is_websocket(nc) (nc->flags & MG_F_IS_WEBSOCKET) #define MG_REQUEST_MAXLENGTH 10 * 1024 * 1024 //10M FHttpServer::FHttpServer() { mg_mgr_init(&m_mMgr, this); m_bisfileserver = false; m_nSeqConnid = 1; m_exitflag = false; //默认返回的http头 AddDefaultHeader("Content-Type", "text/plain; charset=GBK"); AddDefaultHeader("Connection", "Keep-Alive"); } FHttpServer::~FHttpServer() { StopServer(); } bool FHttpServer::StartFileServer(const std::string &addr, const std::string &localpath) { m_strlocalpath = localpath; memset(&m_mHttpServerOpts, 0, sizeof(struct mg_serve_http_opts)); m_mHttpServerOpts.document_root = m_strlocalpath.c_str(); m_mHttpServerOpts.enable_directory_listing = "yes"; m_bisfileserver = true; return StartServer(addr, 1); } bool FHttpServer::StartServer(const std::string &addr, int nThread) { //mg_connection *connection = mg_bind(&m_mMgr, addr.c_str(), FHttpServer::OnEventHandler, this); mg_connection *connection = mg_bind(&m_mMgr, addr.c_str(), onEventCallback); if (connection == NULL) { printf("listen server %s err.\n", addr.c_str()); return false; } // mg_register_http_endpoint(connection, "/upload", [](struct mg_connection *nc, int ev, void *p){ // struct mg_http_multipart_part *mp = (struct mg_http_multipart_part *) p; // printf("endpoint eventtype:%d \n", ev, mp->status); // }); //用于异步通知主线程干活 不修改mongoose源码模拟和扩展mg_broadcast方法 mg_connection *udprecv = mg_bind(&m_mMgr, "udp://127.0.0.1:0", [](mg_connection *nc, int event_type, void *event_data) { if (event_type == MG_EV_RECV) { struct mbuf *io = &nc->recv_mbuf; if (io->len == sizeof(MG_BROADCAST_MSG)) { MG_BROADCAST_MSG *pctlmsg = (MG_BROADCAST_MSG *)io->buf; if (pctlmsg->callback != NULL) { struct mg_mgr *mgr = nc->mgr; struct mg_connection *nc; if (pctlmsg->connid == 0) { //先广播所有正常的连接 for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) { if (nc->user_data) pctlmsg->callback(nc, pctlmsg->data); } //最后发送一个nc为NULL的回调 表示广播结束了 用于清理内存等等 pctlmsg->callback(NULL, pctlmsg->data); } else { for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) { if (nc->user_data && ((MG_CONNECT_USERDATA *)nc->user_data)->connID == pctlmsg->connid) break; } pctlmsg->callback(nc, pctlmsg->data); } } } // In case of UDP, Mongoose creates new virtual connection for // incoming messages // We can keep it (and it will be reused for another messages from // the same address) or we can close it (this saves some memory, but // decreases perfomance, because it forces creation of connection // for every incoming dgram) nc->flags |= MG_F_SEND_AND_CLOSE; // Discard message from recv buffer mbuf_remove(io, io->len); }//eventhandler }); if (udprecv == NULL) { printf("listen broadcase channel err.\n"); return false; } //建立发送者 union socket_address broadrecvsa; socklen_t len = sizeof(broadrecvsa.sin); getsockname(udprecv->sock, &broadrecvsa.sa, &len); //建立发送者的udp socket union socket_address udpsasender; memset(&udpsasender, 0, sizeof(udpsasender)); len = sizeof(udpsasender.sin); udpsasender.sin.sin_family = AF_INET; udpsasender.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */ m_broadsender = socket(AF_INET, SOCK_DGRAM, 0); if (m_broadsender == INVALID_SOCKET) { printf("make broadcase channel err.\n"); return false; } if (bind(m_broadsender, &udpsasender.sa, len) != 0) { printf("bind broadcase channel err.\n"); return false; } if (connect(m_broadsender, &broadrecvsa.sa, len) != 0) { printf("connect broadcase channel err.\n"); return false; } //both http and websocket mg_set_protocol_http_websocket(connection); //初始化工作队列 for (int i = 0; i < nThread; i++) m_threadPool.emplace_back(&FHttpServer::TaskQuqueRunner, this); std::thread([this]{ // loop while (!m_exitflag) mg_mgr_poll(&m_mMgr, 200); // ms }).detach(); printf("started server on: %s(tid:%d)\n", addr.c_str(), GetCurrentThreadId()); return true; } void FHttpServer::StopServer() { if (m_exitflag) return; printf("set stop flag.\n"); m_exitflag = true; printf("free mgr.\n"); mg_mgr_free(&m_mMgr); printf("wait for work thread finish...\n"); for (std::thread &th : m_threadPool) { if (th.joinable()) th.join(); } printf("server stopped."); } void FHttpServer::AddDefaultHeader(const std::string &key, const std::string &value) { for (auto &header : m_defaultHeaders) { if (mg_ncasecmp(key.c_str(), header.first.c_str(), key.length()) == 0) { header.second = value; return; } } m_defaultHeaders.emplace_back(key, value); } void FHttpServer::AddHttpHandler(const std::string &uri, funcHttpCallback callback) { m_mapHttpRouter.emplace(uri, callback); } void FHttpServer::AddWSHandler(const std::string &uri, funcWSOnReadyCallback onready, funcWSOnMsgCallback onmessage, funcWSOnCloseCallback onclose) { stWebSocketCallback cb = {onready, onmessage, onclose}; m_mapWSRouter.emplace(uri, cb); } void FHttpServer::SendWSFrame(uint32_t connID, const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/) { return fsWSSession(connID, this).SendFrameMsg(data, type); } void FHttpServer::CloseWSConn(uint32_t connID) { fsWSSession(connID, this).CloseConnect(); } void FHttpServer::BroadcastWS(const std::string &name, const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/) { struct wsData { int type; std::string name; std::string buf; }; wsData *param = new wsData{ type, name, data }; //传入connid为0,广播到所有nc上 _broadcast(0, param, [](struct mg_connection *nc, void *data){ if (nc) { if (is_websocket(nc)) { wsData *p = (wsData *)data; MG_CONNECT_USERDATA *pUserdata = (MG_CONNECT_USERDATA *)nc->user_data; if (pUserdata->wsname == p->name) mg_send_websocket_frame(nc, p->type, p->buf.c_str(), p->buf.length()); } } else { //nc为NULL就是广播完毕了 这类清理内存 delete ((wsData *)data); } }); } void FHttpServer::_settaskquque(MG_EVENT_DATA_PTR eventdata) { m_TaskQueue.enqueue(eventdata); } bool FHttpServer::_ishttphandler(const std::string &uri) { if (m_mapHttpRouter.find(uri) == m_mapHttpRouter.end()) return false; return true; } bool FHttpServer::_iswshandler(const std::string &uri) { if (m_mapWSRouter.find(uri) == m_mapWSRouter.end()) return false; return true; } void FHttpServer::_broadcast(uint32_t connID, void *data, BROADCAST_CALLBACK cb) { MG_BROADCAST_MSG ctlmsg; ctlmsg.connid = connID; ctlmsg.callback = cb; ctlmsg.data = data; send(m_broadsender, (const char *)&ctlmsg, sizeof(ctlmsg), 0); }; bool FHttpServer::_isstop() { return m_exitflag; } //单线程回调函数 void FHttpServer::onEventCallback(mg_connection *nc, int event_type, void *event_data) { FHttpServer *pServer = (FHttpServer *)nc->mgr->user_data; MG_CONNECT_USERDATA *pUserData = (MG_CONNECT_USERDATA *)nc->user_data; //debug //if (event_type != MG_EV_POLL) // printf("EventCallback(%d): type:%d, sockid:%d\n", GetCurrentThreadId(), event_type, (pUserData ? pUserData->connID : 0)); // if (event_type == MG_EV_RECV) // { // struct mbuf *io = &nc->recv_mbuf; // WriteBufferToFile("request.txt", io->buf, io->len, TRUE); // } //连接上后 创建出来connid等信息 if (event_type == MG_EV_ACCEPT) { pUserData = new MG_CONNECT_USERDATA; pUserData->connID = pServer->m_nSeqConnid; nc->user_data = pUserData; //数据包最大限制 //nc->recv_mbuf_limit = pServer->m_HttpMaxLength; pServer->m_nSeqConnid++; //connid使用0来标记 所以要+1 if (pServer->m_nSeqConnid == 0) pServer->m_nSeqConnid++; return; } //关闭事件 清理user_data if (event_type == MG_EV_CLOSE) { if (is_websocket(nc)) { //websocket的话要回调 MG_EVENT_DATA_PTR cbdata = new MG_EVENT_DATA; cbdata->event_type = event_type; cbdata->connID = pUserData->connID; cbdata->uri = pUserData->uri; pServer->_settaskquque(cbdata); } //删掉user_data delete pUserData; nc->user_data = NULL; return; } //http请求和websocket握手完毕 要读取http request信息 if (event_type == MG_EV_HTTP_REQUEST || event_type == MG_EV_WEBSOCKET_HANDSHAKE_DONE || event_type == MG_EV_WEBSOCKET_FRAME) { if (event_type == MG_EV_HTTP_REQUEST) { if (pServer->m_bisfileserver) { mg_serve_http(nc, (http_message *)event_data, pServer->m_mHttpServerOpts); return; } //处理映射表 不在映射表的就不加入队列了 std::string uri; uri.assign(((http_message *)event_data)->uri.p, ((http_message *)event_data)->uri.len); auto iter = pServer->m_mapHttpRouter.find(uri); if (iter == pServer->m_mapHttpRouter.end()) { //看看websocket auto wsiter = pServer->m_mapWSRouter.find(uri); if (wsiter == pServer->m_mapWSRouter.end()) { //不支持的uri映射 返回403吧 mg_http_send_error(nc, 403, "Not Support."); return; } } } MG_EVENT_DATA_PTR cbdata = new MG_EVENT_DATA; cbdata->event_type = event_type; cbdata->connID = pUserData->connID; if (event_type == MG_EV_WEBSOCKET_FRAME) { websocket_message *wsmsg = (websocket_message *)event_data; cbdata->body.assign((char *)wsmsg->data, wsmsg->size); cbdata->wsflags = wsmsg->flags; //从user_data里面读取uri cbdata->uri = pUserData->uri; } else { http_message *httpmsg = (http_message *)event_data; //用于读取远程IP cbdata->saddr = nc->sa; //http request信息 cbdata->body.assign(httpmsg->body.p, httpmsg->body.len); cbdata->method.assign(httpmsg->method.p, httpmsg->method.len); cbdata->uri.assign(httpmsg->uri.p, httpmsg->uri.len); cbdata->query.assign(httpmsg->query_string.p, httpmsg->query_string.len); cbdata->proto.assign(httpmsg->proto.p, httpmsg->proto.len); for (int i = 0; i < MG_MAX_HTTP_HEADERS; i++) { if (httpmsg->header_names[i].len == 0) break; cbdata->headers.emplace_back(std::string(httpmsg->header_names[i].p, httpmsg->header_names[i].len), std::string(httpmsg->header_values[i].p, httpmsg->header_values[i].len)); } //ws默认name就是uri 用于广播 if (event_type == MG_EV_WEBSOCKET_HANDSHAKE_DONE) pUserData->wsname = pUserData->uri = cbdata->uri; } //printf("AddEvent Data(%d): %d\n", GetCurrentThreadId(), cbdata->event_type); pServer->_settaskquque(cbdata); } } void FHttpServer::TaskQuqueRunner() { while (true) { MG_EVENT_DATA_PTR eventdata; if (m_TaskQueue.wait_dequeue_timed(eventdata, std::chrono::seconds(1))) { //printf("TaskQuque Get Data(%d): %d\n", GetCurrentThreadId(), eventdata->event_type); WorkEventHandler(eventdata); //用完了要删掉 delete eventdata; } else { //printf("Empty quque.\n"); //结束标记并且队列没有数据了 返回 if (m_exitflag) break; } } return; } void FHttpServer::WorkEventHandler(MG_EVENT_DATA_PTR eventdata) { //printf("WorkHandler Data(%d): %d\n", GetCurrentThreadId(), eventdata->event_type); if (eventdata->event_type == MG_EV_HTTP_REQUEST) { //普通http请求 //OnHttpHandler(eventdata); //准备变量 FHttpRequest request(eventdata); FHttpResponse response(eventdata->connID, this, &m_defaultHeaders); //回调 m_mapHttpRouter[eventdata->uri](request, response); return; } //剩下的都是ws的事件了 stWebSocketCallback wsCB = m_mapWSRouter[eventdata->uri]; if (eventdata->event_type == MG_EV_WEBSOCKET_HANDSHAKE_DONE && wsCB.onready) wsCB.onready(FHttpRequest(eventdata), fsWSSession(eventdata->connID, this)); else if (eventdata->event_type == MG_EV_WEBSOCKET_FRAME && wsCB.onmessage) wsCB.onmessage(fsWSMessage(eventdata), fsWSSession(eventdata->connID, this)); else if (eventdata->event_type == MG_EV_CLOSE && wsCB.onclose) wsCB.onclose(eventdata->connID); } uint32_t FHttpRequest::GetConnID() { return m_pEventData->connID; } std::string FHttpRequest::GetURI() { return m_pEventData->uri; } std::string FHttpRequest::GetMethod() { return m_pEventData->method; } std::string FHttpRequest::GetRemoteIP() { char addr[32]; mg_sock_addr_to_str(&m_pEventData->saddr, addr, sizeof(addr), MG_SOCK_STRINGIFY_IP); return std::string(addr); } std::string FHttpRequest::GetParam(mg_str *query_string, const char *name) { char value[128]; int ret = mg_get_http_var(query_string, name, value, sizeof(value) - 1); if (ret <= 0) return std::string(); if (ret < sizeof(value)) return std::string(value, ret); //数据过长了 std::unique_ptr<char> buf(new char[ret + 1]); ret = mg_get_http_var(query_string, name, buf.get(), ret + 1); return std::string(buf.get(), ret); } std::string FHttpRequest::GetQueryParam(const char *name) { mg_str query_string; query_string.p = m_pEventData->query.c_str(); query_string.len = m_pEventData->query.length(); return GetParam(&query_string, name); } std::string FHttpRequest::GetPostParam(const char *name) { mg_str query_string; query_string.p = m_pEventData->body.c_str(); query_string.len = m_pEventData->body.length(); return GetParam(&query_string, name); } std::string FHttpRequest::GetHeader(const char *name) { for (auto &header : m_pEventData->headers) { if (mg_ncasecmp(name, header.first.c_str(), header.first.length()) == 0) return header.second; } return std::string(); } std::string FHttpRequest::GetCookies() { return GetHeader("Cookie"); } size_t FHttpRequest::GetDataLen() { return m_pEventData->body.length(); } std::string FHttpRequest::GetData() { return m_pEventData->body; } FHttpResponse::FHttpResponse(uint32_t connID, FHttpServer *pServer, StringPairArray *defaultHeaders) : m_connID(connID), m_pServer(pServer), m_nStatusCode(200), m_strStatus("OK"), m_pDefaultHeaders(defaultHeaders), m_bsenddata(false) { } FHttpResponse::~FHttpResponse() { if (!m_bsenddata) SendHttpContent(std::string()); } void FHttpResponse::SetHttpStatusCode(int nStatusCode, const char *reason) { m_nStatusCode = nStatusCode; m_strStatus = reason; } void FHttpResponse::SetHttpHeader(const std::string &key, const std::string &value) { //修改了header 就建立自己的副本 if (m_pDefaultHeaders) { m_arrHeaders = *m_pDefaultHeaders; m_pDefaultHeaders = NULL; } for (auto &header : m_arrHeaders) { if (mg_ncasecmp(key.c_str(), header.first.c_str(), key.length()) == 0) { header.second = value; return; } } m_arrHeaders.emplace_back(key, value); } void FHttpResponse::AppendHttpHeader(const std::string &key, const std::string &value) { //修改了header 就建立自己的副本 if (m_pDefaultHeaders) { m_arrHeaders = *m_pDefaultHeaders; m_pDefaultHeaders = NULL; } m_arrHeaders.emplace_back(key, value); } void FHttpResponse::SetHttpRedirect(const std::string &rurl) { m_bsenddata = true; m_pServer->_broadcast(m_connID, new std::string(rurl), [](struct mg_connection *nc, void *data){ std::string *rurl = (std::string *)data; if (nc) mg_http_send_redirect(nc, 302, mg_mk_str(rurl->c_str()), mg_mk_str(NULL)); delete rurl; }); } void FHttpResponse::SendHttpContent(const std::string &data, const char *type /*= NULL*/) { if (type) SetHttpHeader("Content-Type", type); _SendData(&data); } void FHttpResponse::SendHttpChunkBegin() { _SendData(NULL); } void FHttpResponse::SendHttpChunkContent(const std::string &data) { m_pServer->_broadcast(m_connID, new std::string(data), [](struct mg_connection *nc, void *data){ std::string *buf = (std::string *)data; if (nc) mg_send_http_chunk(nc, buf->c_str(), buf->length()); delete buf; }); } void FHttpResponse::_MakeResponseHeaders(std::string *data) { //拼接header StringPairArray *headers = m_pDefaultHeaders; if (headers == NULL) headers = &m_arrHeaders; size_t nHeanderSize = headers->size(); for (size_t i = 0; i < nHeanderSize; i++) { std::string str = headers->at(i).first + ": " + headers->at(i).second + "\r\n"; data->append(str); } } void FHttpResponse::_SendData(const std::string *content) { //不要重复发送 if (m_bsenddata) return; std::string *pSendData = new std::string("HTTP/1.1 " + std::to_string(m_nStatusCode) + " " + m_strStatus + "\r\n"); _MakeResponseHeaders(pSendData); if (content == NULL) { //开启了chunk模式 pSendData->append("Transfer-Encoding: chunked\r\n\r\n"); } else { //有Content pSendData->append("Content-Length: " + std::to_string(content->length()) + "\r\n\r\n"); //拼接数据 pSendData->append(*content); } //发送 要用mg_broadcast来完成回调 异步发送 m_pServer->_broadcast(m_connID, pSendData, [](struct mg_connection *nc, void *data){ std::string *buf = (std::string *)data; if (nc) mg_send(nc, buf->c_str(), buf->length()); delete buf; }); m_bsenddata = true; } uint32_t fsWSMessage::GetConnID() { return m_pEventData->connID; } std::string fsWSMessage::GetMsg() { return m_pEventData->body; } int fsWSMessage::GetFlag() { return m_pEventData->wsflags; } void fsWSSession::SetName(const std::string &name) { m_pServer->_broadcast(m_connID, new std::string(name), [](struct mg_connection *nc, void *data){ std::string *name = (std::string *)data; if (nc) ((MG_CONNECT_USERDATA *)nc->user_data)->wsname = *name; delete name; }); } void fsWSSession::SendFrameMsg(const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/) { int ret = 0; struct wsData { int type; std::string buf; }; wsData *param = new wsData{ type, data }; //需要返回值 所以使用同步的方式发送 m_pServer->_broadcast(m_connID, param, [](struct mg_connection *nc, void *data){ wsData *p = (wsData *)data; if (nc) mg_send_websocket_frame(nc, p->type, p->buf.c_str(), p->buf.length()); delete p; }); } void fsWSSession::CloseConnect() { m_pServer->_broadcast(m_connID, NULL, [](struct mg_connection *nc, void *data){ if (nc) mg_send_websocket_frame(nc, WEBSOCKET_OP_CLOSE, "bye", 3); }); }
最后使用方法:
FHttpServer server; server.AddHttpHandler("/hello", [](fsRequest &req, fsResponse &response){ printf("uri:%s\n", req.GetURI().c_str()); printf("remote:%s\n", req.GetRemoteIP().c_str()); printf("method:%s\n", req.GetMethod().c_str()); printf("p:%s\n", req.GetQueryParam("p").c_str()); printf("body(%d):%s\n", req.GetDataLen(), req.GetData().c_str()); response.SendHttpContent("hello word!"); }); uint32_t connID = -1; server.AddWSHandler("/ws", [&](fsRequest &req, fsWSSession &session) { printf("ws on ready %s.\n", req.GetQueryParam("id").c_str()); session.SendFrame("ready."); session.SetName(req.GetQueryParam("id")); connID = req.GetConnID(); }, //onready [](fsWSMessage &req, fsWSSession &session) { printf("body:%s\n", req.GetMsg().c_str()); session.SendFrame(req.GetMsg()); }, [](uint32_t connID) { printf("ws on close.\n"); } //onclose ); server.StartServer("8881", 32); while (true) { int x = getchar(); if (x == 'q') break; server.SendWSFrame(connID, "hello"); server.BroadcastWS("123", "broadcase msg."); } server.StopServer();
想知道ws这个怎么使用
https://github.com/drogonframework/drogon
https://github.com/zhllxt/asio2
std::string uri;
uri.assign(((http_message *)event_data)->uri.p, ((http_message *)event_data)->uri.len);
auto iter = pServer->m_mapHttpRouter.find(uri);
if (iter == pServer->m_mapHttpRouter.end())
{
//看看websocket
auto wsiter = pServer->m_mapWSRouter.find(uri);
if (wsiter == pServer->m_mapWSRouter.end())
{
//不支持的uri映射 返回403吧
mg_http_send_error(nc, 403, "Not Support.");
return;
}
}
此处http的请求不建议再去ws里面查找映射,否则在WorkEventHandler中使用m_mapHttpRouter[eventdata->uri](request, response);会直接崩溃掉