BlackFeather'S Blog 我的技术小博 -- C/C++,Python,Golang

mongoose魔改历程(C++封装http和ws服务、多线程、优化)-2021.07.23更新


2024.5.22更新

由于性能问题,ws里面还有一个小BUG未修改,此库放弃使用

推荐几个已经使用的http库:

A C++11 single-file header-only cross platform HTTP/HTTPS library.

https://github.com/yhirose/cpp-httplib


基于C++17/20的Http应用框架,使用Drogon可以方便的使用C++构建各种类型的Web应用服务端程序

https://github.com/drogonframework/drogon


Header only c++ network library, based on asio,support tcp,udp,http,websocket,rpc,ssl,icmp,serial_port.

https://github.com/zhllxt/asio2



背景需求

每个工作背景都不同,需要总结一下:

1.C++编写,跨平台(windows上用的vs2013,所以是C++11标准)

2.支持http和ws的server,支持多线程,支持uri映射(类似addhandler("/hello",onrequestcallback);),支持请求中相关字段的获取(get/post参数读取、header、cookie等读取)

3.不用性能超强,并发数也不大(几十并发)

4.依赖库少一点,不想为了一个小功能引入boost之类的大家伙

之前一直用的是libevent库提供的http服务作为一个接口服务器,后来有了新的需求要增加ws支持,官方的不支持ws,自己魔改libevent增加ws支持实力不够改的兼容性不好,框架网上有大把的框架,用现成的吧。


 


筛选

查找了很多库 https://github.com/fffaraz/awesome-cpp#networking

前后测试了两遍没有完全满足的,都要二次封装,那么就选一个库作为基石来鼓捣。

最后选择的是mongoose(具体过程不说了,完全是个人情况而定的)

mongoose一个很单纯的C编写的库,包含了client和server的功能,单线程业务处理,不过提供的方法倒是挺多的,非常方便使用。不过这个库不是性能优先,如果非常注重性能(几万并发起步)那还是不要用了,不过却满足我的当前需求。


 


建立http服务

mongoose简单示例(官方)

#include "mongoose.h"

//连接上所有事件的回调函数
static void ev_handler(struct mg_connection *nc, int ev, void *p) 
{
  
  //http请求
  if (ev == MG_EV_HTTP_REQUEST) 
  {
    //发送返回值,有多个函数可以用不同姿势发送返回值
    mg_printf(nc, "%s", "HTTP/1.1 200 OK\r\nContent-Length: 2\r\n\r\nOK");
  }

}

int main(void) 
{
  struct mg_mgr mgr;
  struct mg_connection *nc;

  mg_mgr_init(&mgr, NULL); //初始化连接管理器

  //绑定端口,设置回调函数
  nc = mg_bind(&mgr, "8000", ev_handler); 
  if (nc == NULL) {
    printf("Failed to create listener\n");
    return 1;
  }

  //允许http和websocket
  mg_set_protocol_http_websocket(nc);  

  //事件循环
  for (;;) {
    mg_mgr_poll(&mgr, 1000);
  }

  //释放连接管理器
  mg_mgr_free(&mgr);

  return 0;
}


先建立连接管理器(mg_mgr),然后所有的连接都在mg_mgr中以链表的形式管理。

然后在mg_mgr中bind一个本地端口,并设置该连接允许http和websocket(mg_set_protocol_http_websocket),然后处理回调消息就完事了,非常简单。

那么也简单了,按照这几个方法,很容易封装出来一个C++的版本,这里不多贴代码了。


 



多线程的使用

多线程的官方示例:

#include "mongoose.h"

static sig_atomic_t s_received_signal = 0;
static const char *s_http_port = "8000";
static const int s_num_worker_threads = 5;
static unsigned long s_next_id = 0;

static void signal_handler(int sig_num) {
  signal(sig_num, signal_handler);
  s_received_signal = sig_num;
}
static struct mg_serve_http_opts s_http_server_opts;
static sock_t sock[2];

// This info is passed to the worker thread
struct work_request {
  unsigned long conn_id;  // needed to identify the connection where to send the reply
  // optionally, more data that could be required by worker 
};

// This info is passed by the worker thread to mg_broadcast
struct work_result {
  unsigned long conn_id;
  int sleep_time;
};

static void on_work_complete(struct mg_connection *nc, int ev, void *ev_data) {
  (void) ev;
  char s[32];
  struct mg_connection *c;
  for (c = mg_next(nc->mgr, NULL); c != NULL; c = mg_next(nc->mgr, c)) {
    if (c->user_data != NULL) {
      struct work_result *res = (struct work_result *)ev_data;
      if ((unsigned long)c->user_data == res->conn_id) {
        sprintf(s, "conn_id:%lu sleep:%d", res->conn_id, res->sleep_time);
        mg_send_head(c, 200, strlen(s), "Content-Type: text/plain");
        mg_printf(c, "%s", s);
      }
    }
  }
}

void *worker_thread_proc(void *param) {
  struct mg_mgr *mgr = (struct mg_mgr *) param;
  struct work_request req = {0};
  
  while (s_received_signal == 0) {
    if (read(sock[1], &req, sizeof(req)) < 0)
      perror("Reading worker sock");
    int r = rand() % 10;
    sleep(r);
    struct work_result res = {req.conn_id, r};
    mg_broadcast(mgr, on_work_complete, (void *)&res, sizeof(res));
  }
  return NULL;
}

static void ev_handler(struct mg_connection *nc, int ev, void *ev_data) {
  (void) nc;
  (void) ev_data;
  
  switch (ev) {
    case MG_EV_ACCEPT:
      nc->user_data = (void *)++s_next_id;
      break;
    case MG_EV_HTTP_REQUEST: {
      struct work_request req = {(unsigned long)nc->user_data};

      if (write(sock[0], &req, sizeof(req)) < 0)
        perror("Writing worker sock");
      break;
    }
    case MG_EV_CLOSE: {
      if (nc->user_data) nc->user_data = NULL;
    }
  }
}

int main(void) {
  struct mg_mgr mgr;
  struct mg_connection *nc;
  int i;

  if (mg_socketpair(sock, SOCK_STREAM) == 0) {
    perror("Opening socket pair");
    exit(1);
  }

  signal(SIGTERM, signal_handler);
  signal(SIGINT, signal_handler);

  mg_mgr_init(&mgr, NULL);

  nc = mg_bind(&mgr, s_http_port, ev_handler);
  if (nc == NULL) {
    printf("Failed to create listener\n");
    return 1;
  }

  mg_set_protocol_http_websocket(nc);
  s_http_server_opts.document_root = ".";  // Serve current directory
  s_http_server_opts.enable_directory_listing = "no";

  for (i = 0; i < s_num_worker_threads; i++) {
    mg_start_thread(worker_thread_proc, &mgr);
  }

  printf("Started on port %s\n", s_http_port);
  while (s_received_signal == 0) {
    mg_mgr_poll(&mgr, 200);
  }

  mg_mgr_free(&mgr);

  closesocket(sock[0]);
  closesocket(sock[1]);

  return 0;
}

开始看的我有点懵逼,建立了一个mg_socketpair来完成业务线程与工作线程的通信,在多线程的工作线程中要用mg_broadcast回调在回调里面真正发送数据,开始没有很好地理解写错了,崩溃了整整一宿(20:00到3:00)。后来仔细阅读了代码结合一些其他人的描述,才弄明白原理。

首先mongoose是业务线程是单线程,就是mg_mgr_poll那个循环就是业务线程,也就是说ev_handler这个回调和发送返回值操作都要在业务线程中做,不在业务线程中调用发送函数(例如mg_send_head)是错误的直接蹦。工作线程是可以多个线程来处理业务,处理完毕后要通知回业务线程。这个通知的方法就是mg_broadcast。

至于这个mg_socketpair可以认为是一个队列,用于业务线程接收到事件后,封装数据的发送到工作线程。补个图。

微信图片_20201030171508.jpg



 


多线程之间通信


ev_handler回调的数据发送到工作线程

官方示例中使用mg_socketpair通信,其实就是建立了一对socket一个作为发送者一个作为接收者,就是socket通信而已。实际上可以用队列比较方便。建议使用无锁队列效率比较高(写的第一版用的是加锁队列,效率差了十多倍),无锁队列库:https://github.com/cameron314/concurrentqueue  (个人比较喜欢的header only方式)

数据传输这里,开始我直接传递的event_data指针,发现完全不行,因为ev_handler返回后该数据会被销毁,所以要在回调中把event_data中所需要的数据都缓存下来。比如http的回调事件MG_EV_HTTP_REQUEST,event_data是http_message*数据,里面包含uri、mehtod、body、message、header等等信息,都要自己另存下来,否则回调一返回数据就会被销毁了。保存好这份数据后,将其插入队列等待工作线程取数据处理。这时候回调返回,业务线程可以继续loop去干其他事情。

			http_message *msg = (http_message *)event_data;

			//保存回调里面的关键数据
			MG_EVENT_DATA_PTR cbdata = new MG_EVENT_DATA;
			cbdata->event_type = event_type;
			cbdata->body.assign(msg->body.p, msg->body.len);
			cbdata->method.assign(msg->method.p, msg->method.len);
			cbdata->uri.assign(msg->uri.p, msg->uri.len);
			cbdata->query.assign(msg->query_string.p, msg->query_string.len);
			cbdata->proto.assign(msg->proto.p, msg->proto.len);
			cbdata->contentlength = msg->content_length;
			for (int i = 0; i < MG_MAX_HTTP_HEADERS; i++)
			{
				if (msg->header_names[i].len == 0)
					break;
				cbdata->headers.emplace_back(std::string(msg->header_names[i].p, msg->header_names[i].len), std::string(msg->header_values[i].p, msg->header_values[i].len));
			}
			//将cbdata插入队列


工作线程处理与返回


工作线程是多线程,例如4线程就用std::thread开启4个work_handler()。里面的逻辑就是先从队列里面取出来数据,然后取出来数据后回调给用户接口,用户接口干活,干完之后封装返回值,这些都不用多说就是个简单的callback而已。这里再次强调不能直接在工作线程中调用发送方法(例如mg_send_head),要用回调将数据返回给业务线程去处理。重点就是mg_broadcast这个坑,写的真尼玛晦涩难懂,还都是坑,怪不得网上很多人都说官方的多线程例子是坨屎。

首先说说mg_broadcast这个方法

//工作线程中调用
void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, size_t len) {
  struct ctl_msg ctl_msg;

  /*
   * Mongoose manager has a socketpair, `struct mg_mgr::ctl`,
   * where `mg_broadcast()` pushes the message.
   * `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls
   * specified callback for each connection. Thus the callback function executes
   * in event manager thread.
   */
  if (mgr->ctl[0] != INVALID_SOCKET && data != NULL &&
      len < sizeof(ctl_msg.message)) {
    size_t dummy;

    ctl_msg.callback = cb;
    memcpy(ctl_msg.message, data, len);
    dummy = MG_SEND_FUNC(mgr->ctl[0], (char *) &ctl_msg,
                         offsetof(struct ctl_msg, message) + len, 0);
    dummy = MG_RECV_FUNC(mgr->ctl[0], (char *) &len, 1, 0);
    (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
  }
}


//业务线程中接收
static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) {
  struct ctl_msg ctl_msg;
  int len = (int) MG_RECV_FUNC(mgr->ctl[1], (char *) &ctl_msg, sizeof(ctl_msg), 0);
  size_t dummy = MG_SEND_FUNC(mgr->ctl[1], ctl_msg.message, 1, 0);
  DBG(("read %d from ctl socket", len));
  (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
  if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) {
    struct mg_connection *nc;
    for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) {
      //回调方法
      ctl_msg.callback(nc, MG_EV_POLL,
                       ctl_msg.message MG_UD_ARG(nc->user_data));
    }
  }
}


void mg_broadcast(struct mg_mgr *mgr, mg_event_handler_t cb, void *data, size_t len);

第二个参数是广播的回调方法,data和len是数据。工作线程中调用mg_broadcast来广播,触发的回调方法里面就是业务线程了(开始没理解,后来通过打印调用和回调的threadid才明白)。

内部很简单其实就是一个在内部建立了一个mg_socketpair(上面描述过),将要传递的数据封装到结构体里面,工作线程中调用send把结构体发送出去,业务线程里面recv到这个结构体,然后触发回调,很简单的逻辑。这里官方代码很坑,connection在连接后 ( MG_EV_ACCEPT 事件),初始化了一个唯一connid( s_next_id )给每个connection(存储到了user_data里面),广播会将每个connection都回调一遍(上面代码里面nc那个链表的遍历),然后在回调里面判断当前的connection是不是我要发送数据的connid,不是的话返回,是的话继续发送数据。官方示例里面在on_work_complete又遍历了一遍所有connection这里其实是错误的,一个大坑,因为回调已经是遍历所有connection回调了。这个也符合mg_broadcast这个名字,广播给所有connection回调。


这里还有一个大坑,因为mg_broadcast其实也是异步的,利用socket发送用户数据,那个结构体的定义为:

struct ctl_msg { 

    mg_event_handler_t callback; 

    char message[MG_CTL_MSG_MESSAGE_SIZE];  //默认是8192 

}; 

所以这里传递的数据不能是原始返回内容的buffer,太小了不合适呀,所以要new一个buffer传递进去,然后在回调里面去delete掉。但这里会引入另外一个问题,broadcast里面是遍历所有正常的connection,如果某个用户接口处理时间比较长比如要2秒,返回的时候connection已经断开了(多线程下是可能存在的),那么这个广播后就不能触发回调,导致这份buffer无法被删除造成内存泄露。


 


问题总结:

1.广播时connection断开了无法删除buffer

2.异步的,不知道当前connection的状态

3.工作线程与业务线程传递数据使用广播,意味着每次都要通知一遍所有的connection,这完全没有必要呀,我有connid那只回调一个connection就可以了哇。当然原本的broadcast到所有nc还是有必要的。



开始魔改mg_broadcast吧。。。改后的代码:

struct ctl_msg {
  mg_event_handler_t callback;
  uint32_t connid;  //connID,可以遍历指定的connection
  int syncflag;     //异步和同步执行标记
  void *data;       //传递的数据
};

//增加connid参数,可以指定connid来触发回调,如果connid为0那就是广播(给connection初始化connid的时候不能设置为0)
//data: 用户数据指针
//ret:返回值指针,NULL为异步,如果设置了就意味着同步发送 发送完毕后才返回。同步方式可以节省内存,异步发送需要新开辟内存
void mg_broadcast(struct mg_mgr *mgr, uint32_t connid, mg_event_handler_t cb, void *data, int *ret) {
  struct ctl_msg ctl_msg;
  /*
   * Mongoose manager has a socketpair, `struct mg_mgr::ctl`,
   * where `mg_broadcast()` pushes the message.
   * `mg_mgr_poll()` wakes up, reads a message from the socket pair, and calls
   * specified callback for each connection. Thus the callback function executes
   * in event manager thread.
   */
  if (mgr->ctl[0] != INVALID_SOCKET && data != NULL) {
    size_t dummy;

    ctl_msg.callback = cb;
	ctl_msg.connid = connid;
	if (ret)
		ctl_msg.syncflag = 1;
	ctl_msg.data = data;
	dummy = MG_SEND_FUNC(mgr->ctl[0], (char *)&ctl_msg, sizeof(ctl_msg), 0);
	dummy = MG_RECV_FUNC(mgr->ctl[0], (char *)&ctl_msg.syncflag, sizeof(int), 0);
    (void) dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
	if (ret)
		*ret = ctl_msg.syncflag;
  }
}
static void mg_mgr_handle_ctl_sock(struct mg_mgr *mgr) {
	struct ctl_msg ctl_msg;
	size_t dummy;
	int ret = 1;
	int len = (int)MG_RECV_FUNC(mgr->ctl[1], (char *)&ctl_msg, sizeof(ctl_msg), 0);

	//不是同步执行 就立刻返回
	if (ctl_msg.syncflag == 0)
		dummy = MG_SEND_FUNC(mgr->ctl[1], (char *)&ret, sizeof(int), 0);
	
	DBG(("read %d from ctl socket", len));
	(void)dummy; /* https://gcc.gnu.org/bugzilla/show_bug.cgi?id=25509 */
	if (len >= (int) sizeof(ctl_msg.callback) && ctl_msg.callback != NULL) {
		struct mg_connection *nc;
		if (ctl_msg.connid == 0)
		{
			//先广播所有正常的连接
			for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) {
				ctl_msg.callback(nc, MG_EV_POLL,
					ctl_msg.data MG_UD_ARG(nc->user_data));
			}
			//最后发送一个nc为NULL的回调 表示广播结束了 用于清理内存等等
			ctl_msg.callback(NULL, MG_EV_POLL,
				ctl_msg.data MG_UD_ARG(nc->user_data));
		}
		else
		{
			for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc)) {
				if (nc->connid == ctl_msg.connid)
					break;
			}
			if (nc == NULL)
				ret = 0;
			ctl_msg.callback(nc, MG_EV_POLL,
				ctl_msg.data MG_UD_ARG(nc->user_data));
		}
	}

	//同步执行的情况下 执行完毕了 返回ret
	if (ctl_msg.syncflag)
		dummy = MG_SEND_FUNC(mgr->ctl[1], (char *)&ret, sizeof(int), 0);
}

增加connid参数,可以指定connid来触发回调,如果connid为0那就是广播到所有connection(给connection初始化connid的时候不能设置为0)。

增加异步、同步控制变量(int *ret),传入NULL还为异步执行,如果设置了就意味着同步发送,发送完毕后才返回。同步方式可以节省内存,异步发送需要新开辟内存。

回调变动:

    指定connid:如果connection还活着,那么正常回调(一次),如果connection关闭了没了,那么回调的mg_connection*是个NULL,用于清理缓冲区

    未指定connid:广播到所有connection上,并且在所有connection回调完毕后,附加一次mg_connection*为NULL的广播,表示广播结束,用于清理缓冲区


 


小魔改:

上述保存connection的connid用的是mg_connection的user_data变量,但是我们实现业务肯定也有一堆数据要存储,而且不同类型的请求例如http和ws要保存的数据类型也不一样,区分处理很麻烦,随手魔改了mg_connection结构体,直接给他加上了connid这个变量。开始做第一版的时候,用了mg_connection结构体里面的sock这个变量作为connid,也就是socket的句柄,不过后来想想大并发下socket关闭后再建立新连接可能句柄是会重复的,那就失去唯一的作用了,所以新增了connid变量。



好啦,完事!


用压力测试跑了下几十个并发,对比发现低并发下跟其他库比如libevent差不多,满足要求。



=========================================================

2021.06.25更新

虽然上述的修改满足了要求,但是却修改了mongoose官方的代码,而且改动很细节,所以后来直接优化了一版,将上述的方法封装到了自己的C++代码中,mongoose直接引用官方6.18版本的代码即可,无需任何改动。

=========================================================

 

=========================================================

2021.07.23更新

有朋友指出官方的6.18版本无法编译通过,发现还真的是有个结构体声明有问题,当时开发我拿的是假的6.18么?

根据官方最后的6.18版本,重新调整。。。

增加静态文件服务器模式StartFileServer方法,该方法不能与StartServer同时使用,二选一。静态文件本地路径要求是"/"分割,不能用windows的"\"。

例如StartFileServer("127.0.0.1:8686", "D:/");

=========================================================


完整代码(2021.07.23更新):

需要依赖无锁队列库:https://github.com/cameron314/concurrentqueue

如果不开启ssl则无其他第三方依赖库。

C++11标准,编译器为vs2013。



FHttpServer.h

#pragma once

#include <string>
#include <chrono>
#include <vector>
#include <map>
#include <thread>
#include <mutex>
#include <functional>
#include "queue/blockingconcurrentqueue.h"


//#define MG_ENABLE_SSL 1
#include "mongoose.h"


typedef	std::vector <std::pair<std::string, std::string>>  StringPairArray;


//每个连接的userdata
typedef struct
{
	uint32_t connID;
	std::string uri; //ws依赖 用于回调的map key
	std::string wsname;//ws依赖 用于广播 类似组名 可重复
}MG_CONNECT_USERDATA;


//多线程任务队列
typedef struct
{
	union socket_address saddr;
	//uint32_t connFlags;
	uint32_t connID;
	int event_type;

	std::string body; //如果是websocket 这里是data
	std::string method;
	std::string uri;
	std::string query;
	std::string proto;
	StringPairArray headers;
	int wsflags;  //ws frame事件中的flags
}MG_EVENT_DATA, *MG_EVENT_DATA_PTR;


class FHttpServer;


//http请求信息
class FHttpRequest
{
	//不可复制
	FHttpRequest(const FHttpRequest &) = delete;
	FHttpRequest &operator=(const FHttpRequest &) = delete;

	MG_EVENT_DATA_PTR m_pEventData;

public:
	FHttpRequest(MG_EVENT_DATA_PTR eventdata) : m_pEventData(eventdata){};
	~FHttpRequest(){};

	uint32_t GetConnID();
	std::string GetURI();
	std::string GetMethod();
	std::string GetRemoteIP();
	//从url上面得到参数
	std::string GetQueryParam(const char *name);
	//从post的数据中得到参数
	std::string GetPostParam(const char *name);
	std::string GetHeader(const char *name);
	std::string GetCookies();
	size_t GetDataLen();
	std::string GetData();

private:
	std::string GetParam(mg_str *query_string, const char *name);
};

//http返回值操作
class FHttpResponse
{
	//不可复制
	FHttpResponse(const FHttpResponse &) = delete;
	FHttpResponse &operator=(const FHttpResponse &) = delete;

	FHttpServer *m_pServer;
	uint32_t m_connID;
	int m_nStatusCode;
	std::string m_strStatus;
	StringPairArray *m_pDefaultHeaders;	//默认的返回headers
	StringPairArray m_arrHeaders;		//如果修改了,那么使用自己的副本
	
public:
	FHttpResponse(uint32_t connID, FHttpServer *pServer, StringPairArray *defaultHeaders);
	~FHttpResponse();
	//是否已经发送过返回值了
	bool m_bsenddata;

	//设置http返回code和reason,要在Send之前调用
	void SetHttpStatusCode(int nStatusCode, const char *reason);
	//设置header 唯一header名 要在Send之前调用
	void SetHttpHeader(const std::string &key, const std::string &value);
	//追加header,类似多个SetCookie这种头部 要在Send之前调用
	void AppendHttpHeader(const std::string &key, const std::string &value);

	//发送320转向
	void SetHttpRedirect(const std::string &rurl);

	//发送数据 直接发送
	void SendHttpContent(const std::string &data, const char *type = NULL);

	//以chunk方式发送数据,调用SendChunkContent发送数据,不要直接用SendContent
	void SendHttpChunkBegin();
	//发送chunk数据,结束chunk发送一个空buf
	void SendHttpChunkContent(const std::string &data);

	
private:
	void _SendData(const std::string *content);
	void _MakeResponseHeaders(std::string *data);
};

//websocket的消息信息
class fsWSMessage
{
	MG_EVENT_DATA_PTR m_pEventData;

public:
	fsWSMessage(MG_EVENT_DATA_PTR eventdata) : m_pEventData(eventdata){};
	~fsWSMessage(){};

	uint32_t GetConnID();
	std::string GetMsg();
	int GetFlag();
};

//websocket回话操作
class fsWSSession
{
	FHttpServer *m_pServer;
	uint32_t m_connID;
public:
	fsWSSession(uint32_t connID, FHttpServer *pServer) :
		m_connID(connID),
		m_pServer(pServer){};
	~fsWSSession(){};

	//设置该ws连接的name
	void SetName(const std::string &name);

	//发送websocket数据
	void SendFrameMsg(const std::string &data, int type = WEBSOCKET_OP_TEXT);

	//关闭连接
	void CloseConnect();
};

class FHttpServer
{
	//静态文件服务器
	struct mg_serve_http_opts m_mHttpServerOpts;
	//mg_serve_http_opts傻逼结构体用的是指针,还要保存下来字符串变量
	std::string m_strlocalpath;
	bool m_bisfileserver;

	struct mg_mgr m_mMgr;
	bool m_exitflag;
	
	//线程之间广播使用
	typedef void(*BROADCAST_CALLBACK)(struct mg_connection *nc, void *ev_data);
	typedef struct
	{
		BROADCAST_CALLBACK callback;
		uint32_t connid;  //connID,可以遍历指定的connection
		void *data;       //传递的数据
	}MG_BROADCAST_MSG;
	sock_t m_broadsender;

	uint32_t m_nSeqConnid;

	//默认的返回headers
	StringPairArray m_defaultHeaders;

	//工作线程和队列
	tsBlockQuque<MG_EVENT_DATA_PTR> m_TaskQueue;
	std::vector <std::thread> m_threadPool;
	
	//http回调方法和映射表
	typedef std::function<void(FHttpRequest &, FHttpResponse &)> funcHttpCallback;
	std::map <std::string, funcHttpCallback> m_mapHttpRouter;
	//websocket回调方法和映射表
	typedef std::function<void(FHttpRequest &, fsWSSession &)> funcWSOnReadyCallback;
	typedef std::function<void(fsWSMessage &, fsWSSession &)> funcWSOnMsgCallback;
	typedef std::function<void(uint32_t)> funcWSOnCloseCallback;
	struct stWebSocketCallback
	{
		funcWSOnReadyCallback onready;
		funcWSOnMsgCallback onmessage;
		funcWSOnCloseCallback onclose;
	};
	std::map <std::string, stWebSocketCallback> m_mapWSRouter;

public:
	FHttpServer();
	~FHttpServer();

	bool StartFileServer(const std::string &addr, const std::string &localpath);
	//启动服务 addr可以是"8888", ":8888", "127.0.0.1:8888"; nThread为工作线程数
	bool StartServer(const std::string &addr, int nThread);
	//停止服务
	void StopServer();
	//添加默认返回的http头,唯一值
	void AddDefaultHeader(const std::string &key, const std::string &value);
	//添加http回调接口
	void AddHttpHandler(const std::string &uri, funcHttpCallback callback);
	//添加websocket回调接口
	void AddWSHandler(const std::string &uri, 
		funcWSOnReadyCallback onready,
		funcWSOnMsgCallback onmessage,
		funcWSOnCloseCallback onclose);

	//////////////////////////////////////////////////////////////////////////
	//websocket使用的接口
	//////////////////////////////////////////////////////////////////////////
	//发送数据到connID这个ws连接上
	void SendWSFrame(uint32_t connID, const std::string &data, int type = WEBSOCKET_OP_TEXT);
	//关闭ws连接
	void CloseWSConn(uint32_t connID);
	//广播数据到name这个组
	void BroadcastWS(const std::string &name, const std::string &data, int type = WEBSOCKET_OP_TEXT);
	//////////////////////////////////////////////////////////////////////////


	//内部使用的接口,外部不要调用
	void _settaskquque(MG_EVENT_DATA_PTR eventdata);
	bool _ishttphandler(const std::string &uri);
	bool _iswshandler(const std::string &uri);
	bool _isstop();
	void _broadcast(uint32_t connID, void *data, BROADCAST_CALLBACK cb);

private:
	static void onEventCallback(mg_connection *nc, int event_type, void *event_data);
	void TaskQuqueRunner();
	void WorkEventHandler(MG_EVENT_DATA_PTR eventdata);
};


FHttpServer.cpp

#include "stdafx.h"
#include "FHttpServer.h"


#define		is_websocket(nc)		(nc->flags & MG_F_IS_WEBSOCKET)
#define		MG_REQUEST_MAXLENGTH	10 * 1024 * 1024   //10M

FHttpServer::FHttpServer()
{
	mg_mgr_init(&m_mMgr, this);
	
	m_bisfileserver = false;
	m_nSeqConnid = 1;
	m_exitflag = false;

	//默认返回的http头
	AddDefaultHeader("Content-Type", "text/plain; charset=GBK");
	AddDefaultHeader("Connection", "Keep-Alive");
}


FHttpServer::~FHttpServer()
{
	StopServer();
}

bool FHttpServer::StartFileServer(const std::string &addr, const std::string &localpath)
{
	m_strlocalpath = localpath;
	memset(&m_mHttpServerOpts, 0, sizeof(struct mg_serve_http_opts));
	m_mHttpServerOpts.document_root = m_strlocalpath.c_str();
	m_mHttpServerOpts.enable_directory_listing = "yes";

	m_bisfileserver = true;
	return StartServer(addr, 1);
}

bool FHttpServer::StartServer(const std::string &addr, int nThread)
{
	//mg_connection *connection = mg_bind(&m_mMgr, addr.c_str(), FHttpServer::OnEventHandler, this);
	mg_connection *connection = mg_bind(&m_mMgr, addr.c_str(), onEventCallback);
	if (connection == NULL)
	{
		printf("listen server %s err.\n", addr.c_str());
		return false;
	}

// 	mg_register_http_endpoint(connection, "/upload", [](struct mg_connection *nc, int ev, void *p){
// 		struct mg_http_multipart_part *mp = (struct mg_http_multipart_part *) p;
// 		printf("endpoint eventtype:%d \n", ev, mp->status);
// 	});

	//用于异步通知主线程干活 不修改mongoose源码模拟和扩展mg_broadcast方法
	mg_connection *udprecv = mg_bind(&m_mMgr, "udp://127.0.0.1:0", [](mg_connection *nc, int event_type, void *event_data)
	{
		if (event_type == MG_EV_RECV)
		{    
			struct mbuf *io = &nc->recv_mbuf;
			if (io->len == sizeof(MG_BROADCAST_MSG))
			{
				MG_BROADCAST_MSG *pctlmsg = (MG_BROADCAST_MSG *)io->buf;
				if (pctlmsg->callback != NULL)
				{

					struct mg_mgr *mgr = nc->mgr;
					struct mg_connection *nc;
					if (pctlmsg->connid == 0)
					{
						//先广播所有正常的连接
						for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc))
						{
							if (nc->user_data)
								pctlmsg->callback(nc, pctlmsg->data);
						}
						//最后发送一个nc为NULL的回调 表示广播结束了 用于清理内存等等
						pctlmsg->callback(NULL, pctlmsg->data);
					}
					else
					{
						for (nc = mg_next(mgr, NULL); nc != NULL; nc = mg_next(mgr, nc))
						{
							if (nc->user_data && ((MG_CONNECT_USERDATA *)nc->user_data)->connID == pctlmsg->connid)
								break;
						}
						pctlmsg->callback(nc, pctlmsg->data);
					}
				}
			}

			// In case of UDP, Mongoose creates new virtual connection for
			// incoming messages
			// We can keep it (and it will be reused for another messages from
			// the same address) or we can close it (this saves some memory, but
			// decreases perfomance, because it forces creation of connection
			// for every incoming dgram)
			nc->flags |= MG_F_SEND_AND_CLOSE;
			// Discard message from recv buffer
			mbuf_remove(io, io->len);

		}//eventhandler
	});
	if (udprecv == NULL)
	{
		printf("listen broadcase channel err.\n");
		return false;
	}
	//建立发送者
	union socket_address broadrecvsa;
	socklen_t len = sizeof(broadrecvsa.sin);
	getsockname(udprecv->sock, &broadrecvsa.sa, &len);
	//建立发送者的udp socket
	union socket_address udpsasender;
	memset(&udpsasender, 0, sizeof(udpsasender));
	len = sizeof(udpsasender.sin);
	udpsasender.sin.sin_family = AF_INET;
	udpsasender.sin.sin_addr.s_addr = htonl(0x7f000001); /* 127.0.0.1 */
	m_broadsender = socket(AF_INET, SOCK_DGRAM, 0);
	if (m_broadsender == INVALID_SOCKET)
	{
		printf("make broadcase channel err.\n");
		return false;
	}
	if (bind(m_broadsender, &udpsasender.sa, len) != 0)
	{
		printf("bind broadcase channel err.\n");
		return false;
	}
	if (connect(m_broadsender, &broadrecvsa.sa, len) != 0)
	{
		printf("connect broadcase channel err.\n");
		return false;
	}

	//both http and websocket
	mg_set_protocol_http_websocket(connection);

	//初始化工作队列
	for (int i = 0; i < nThread; i++)
		m_threadPool.emplace_back(&FHttpServer::TaskQuqueRunner, this);
	
	std::thread([this]{
		// loop
		while (!m_exitflag)
			mg_mgr_poll(&m_mMgr, 200); // ms
	}).detach();

	printf("started server on: %s(tid:%d)\n", addr.c_str(), GetCurrentThreadId());
	
	return true;
}

void FHttpServer::StopServer()
{
	if (m_exitflag)
		return;

	printf("set stop flag.\n");
	m_exitflag = true;

	printf("free mgr.\n");
	mg_mgr_free(&m_mMgr);

	printf("wait for work thread finish...\n");
	for (std::thread &th : m_threadPool)
	{
		if (th.joinable())
			th.join();
	}

	printf("server stopped.");
}

void FHttpServer::AddDefaultHeader(const std::string &key, const std::string &value)
{
	for (auto &header : m_defaultHeaders)
	{
		if (mg_ncasecmp(key.c_str(), header.first.c_str(), key.length()) == 0)
		{
			header.second = value;
			return;
		}
	}

	m_defaultHeaders.emplace_back(key, value);
}

void FHttpServer::AddHttpHandler(const std::string &uri, funcHttpCallback callback)
{
	m_mapHttpRouter.emplace(uri, callback);
}

void FHttpServer::AddWSHandler(const std::string &uri,
	funcWSOnReadyCallback onready,
	funcWSOnMsgCallback onmessage,
	funcWSOnCloseCallback onclose)
{
	stWebSocketCallback cb = {onready, onmessage, onclose};
	m_mapWSRouter.emplace(uri, cb);
}

void FHttpServer::SendWSFrame(uint32_t connID, const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/)
{
	return fsWSSession(connID, this).SendFrameMsg(data, type);
}

void FHttpServer::CloseWSConn(uint32_t connID)
{
	fsWSSession(connID, this).CloseConnect();
}

void FHttpServer::BroadcastWS(const std::string &name, const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/)
{
	struct wsData
	{
		int type;
		std::string name;
		std::string buf;
	};
	wsData *param = new wsData{ type, name, data };

	//传入connid为0,广播到所有nc上
	_broadcast(0, param, [](struct mg_connection *nc, void *data){
		if (nc)
		{
			if (is_websocket(nc))
			{
				wsData *p = (wsData *)data;
				MG_CONNECT_USERDATA *pUserdata = (MG_CONNECT_USERDATA *)nc->user_data;
				if (pUserdata->wsname == p->name)
					mg_send_websocket_frame(nc, p->type, p->buf.c_str(), p->buf.length());
			}
		}
		else
		{
			//nc为NULL就是广播完毕了 这类清理内存
			delete ((wsData *)data);
		}
	});
}

void FHttpServer::_settaskquque(MG_EVENT_DATA_PTR eventdata)
{
	m_TaskQueue.enqueue(eventdata);
}

bool FHttpServer::_ishttphandler(const std::string &uri)
{
	if (m_mapHttpRouter.find(uri) == m_mapHttpRouter.end())
		return false;
	return true;
}

bool FHttpServer::_iswshandler(const std::string &uri)
{
	if (m_mapWSRouter.find(uri) == m_mapWSRouter.end())
		return false;
	return true;
}

void FHttpServer::_broadcast(uint32_t connID, void *data, BROADCAST_CALLBACK cb)
{
	MG_BROADCAST_MSG ctlmsg;
	ctlmsg.connid = connID;
	ctlmsg.callback = cb;
	ctlmsg.data = data;

	send(m_broadsender, (const char *)&ctlmsg, sizeof(ctlmsg), 0);
};

bool FHttpServer::_isstop()
{
	return m_exitflag;
}

//单线程回调函数
void FHttpServer::onEventCallback(mg_connection *nc, int event_type, void *event_data)
{
	FHttpServer *pServer = (FHttpServer *)nc->mgr->user_data;
	MG_CONNECT_USERDATA *pUserData = (MG_CONNECT_USERDATA *)nc->user_data;

	//debug
 	//if (event_type != MG_EV_POLL)
	//	printf("EventCallback(%d): type:%d, sockid:%d\n", GetCurrentThreadId(), event_type, (pUserData ? pUserData->connID : 0));
// 	if (event_type == MG_EV_RECV)
// 	{
// 		struct mbuf *io = &nc->recv_mbuf;
// 		WriteBufferToFile("request.txt", io->buf, io->len, TRUE);
// 	}

	//连接上后 创建出来connid等信息
	if (event_type == MG_EV_ACCEPT)
	{
		pUserData = new MG_CONNECT_USERDATA;
		pUserData->connID = pServer->m_nSeqConnid;

		nc->user_data = pUserData;
		//数据包最大限制
		//nc->recv_mbuf_limit = pServer->m_HttpMaxLength;

		pServer->m_nSeqConnid++;
		//connid使用0来标记 所以要+1
		if (pServer->m_nSeqConnid == 0)
			pServer->m_nSeqConnid++;
		
		return;
	}

	//关闭事件 清理user_data
	if (event_type == MG_EV_CLOSE)
	{
		if (is_websocket(nc))
		{
			//websocket的话要回调
			MG_EVENT_DATA_PTR cbdata = new MG_EVENT_DATA;
			cbdata->event_type = event_type;
			cbdata->connID = pUserData->connID;
			cbdata->uri = pUserData->uri;

			pServer->_settaskquque(cbdata);
		}
		
		//删掉user_data
		delete pUserData;
		nc->user_data = NULL;

		return;
	}

	
	//http请求和websocket握手完毕 要读取http request信息
	if (event_type == MG_EV_HTTP_REQUEST || 
		event_type == MG_EV_WEBSOCKET_HANDSHAKE_DONE ||
		event_type == MG_EV_WEBSOCKET_FRAME)
	{
		if (event_type == MG_EV_HTTP_REQUEST)
		{
			if (pServer->m_bisfileserver)
			{
				mg_serve_http(nc, (http_message *)event_data, pServer->m_mHttpServerOpts);
				return;
			}

			//处理映射表 不在映射表的就不加入队列了
			std::string uri;
			uri.assign(((http_message *)event_data)->uri.p, ((http_message *)event_data)->uri.len);
			auto iter = pServer->m_mapHttpRouter.find(uri);
			if (iter == pServer->m_mapHttpRouter.end())
			{
				//看看websocket
				auto wsiter = pServer->m_mapWSRouter.find(uri);
				if (wsiter == pServer->m_mapWSRouter.end())
				{
					//不支持的uri映射 返回403吧
					mg_http_send_error(nc, 403, "Not Support.");
					return;
				}
			}
		}

		MG_EVENT_DATA_PTR cbdata = new MG_EVENT_DATA;
		cbdata->event_type = event_type;
		cbdata->connID = pUserData->connID;
		
		if (event_type == MG_EV_WEBSOCKET_FRAME)
		{
			websocket_message *wsmsg = (websocket_message *)event_data;
			cbdata->body.assign((char *)wsmsg->data, wsmsg->size);
			cbdata->wsflags = wsmsg->flags;
			//从user_data里面读取uri
			cbdata->uri = pUserData->uri;
		}
		else
		{
			http_message *httpmsg = (http_message *)event_data;

			//用于读取远程IP
			cbdata->saddr = nc->sa;

			//http request信息
			cbdata->body.assign(httpmsg->body.p, httpmsg->body.len);
			cbdata->method.assign(httpmsg->method.p, httpmsg->method.len);
			cbdata->uri.assign(httpmsg->uri.p, httpmsg->uri.len);
			cbdata->query.assign(httpmsg->query_string.p, httpmsg->query_string.len);
			cbdata->proto.assign(httpmsg->proto.p, httpmsg->proto.len);
			for (int i = 0; i < MG_MAX_HTTP_HEADERS; i++)
			{
				if (httpmsg->header_names[i].len == 0)
					break;
				cbdata->headers.emplace_back(std::string(httpmsg->header_names[i].p, httpmsg->header_names[i].len), std::string(httpmsg->header_values[i].p, httpmsg->header_values[i].len));
			}

			//ws默认name就是uri 用于广播
			if (event_type == MG_EV_WEBSOCKET_HANDSHAKE_DONE)
				pUserData->wsname = pUserData->uri = cbdata->uri;
		}
		
		//printf("AddEvent Data(%d): %d\n", GetCurrentThreadId(), cbdata->event_type);
		pServer->_settaskquque(cbdata);
	}
}

void FHttpServer::TaskQuqueRunner()
{
	while (true)
	{
		MG_EVENT_DATA_PTR eventdata;
		if (m_TaskQueue.wait_dequeue_timed(eventdata, std::chrono::seconds(1)))
		{
			//printf("TaskQuque Get Data(%d): %d\n", GetCurrentThreadId(), eventdata->event_type);
			WorkEventHandler(eventdata);

			//用完了要删掉
			delete eventdata;
		}
		else
		{
			//printf("Empty quque.\n");

			//结束标记并且队列没有数据了 返回
			if (m_exitflag)
				break;
		}
	}

	return;
}
 
void FHttpServer::WorkEventHandler(MG_EVENT_DATA_PTR eventdata)
 {
	//printf("WorkHandler Data(%d): %d\n", GetCurrentThreadId(), eventdata->event_type);

 	if (eventdata->event_type == MG_EV_HTTP_REQUEST)
 	{
 		//普通http请求
		//OnHttpHandler(eventdata);

		//准备变量
		FHttpRequest request(eventdata);
		FHttpResponse response(eventdata->connID, this, &m_defaultHeaders);

		//回调
		m_mapHttpRouter[eventdata->uri](request, response);
		return;
 	}

	//剩下的都是ws的事件了
	stWebSocketCallback wsCB = m_mapWSRouter[eventdata->uri];
	if (eventdata->event_type == MG_EV_WEBSOCKET_HANDSHAKE_DONE && wsCB.onready)
		wsCB.onready(FHttpRequest(eventdata), fsWSSession(eventdata->connID, this));
	else if (eventdata->event_type == MG_EV_WEBSOCKET_FRAME && wsCB.onmessage)
		wsCB.onmessage(fsWSMessage(eventdata), fsWSSession(eventdata->connID, this));
	else if (eventdata->event_type == MG_EV_CLOSE && wsCB.onclose)
		wsCB.onclose(eventdata->connID);
 }

uint32_t FHttpRequest::GetConnID()
{
	return m_pEventData->connID;
}

std::string FHttpRequest::GetURI()
{
	return m_pEventData->uri;
}

std::string FHttpRequest::GetMethod()
{
	return m_pEventData->method;
}

std::string FHttpRequest::GetRemoteIP()
{
	char addr[32];
	mg_sock_addr_to_str(&m_pEventData->saddr, addr, sizeof(addr), MG_SOCK_STRINGIFY_IP);
	return std::string(addr);
}

std::string FHttpRequest::GetParam(mg_str *query_string, const char *name)
{
	char value[128];
	int ret = mg_get_http_var(query_string, name, value, sizeof(value) - 1);
	if (ret <= 0)
		return std::string();
	if (ret < sizeof(value))
		return std::string(value, ret);

	//数据过长了
	std::unique_ptr<char> buf(new char[ret + 1]);
	ret = mg_get_http_var(query_string, name, buf.get(), ret + 1);
	return std::string(buf.get(), ret);
}

std::string FHttpRequest::GetQueryParam(const char *name)
{
	mg_str query_string;
	query_string.p = m_pEventData->query.c_str();
	query_string.len = m_pEventData->query.length();
	return GetParam(&query_string, name);
}

std::string FHttpRequest::GetPostParam(const char *name)
{
	mg_str query_string;
	query_string.p = m_pEventData->body.c_str();
	query_string.len = m_pEventData->body.length();
	return GetParam(&query_string, name);
}

std::string FHttpRequest::GetHeader(const char *name)
{
	for (auto &header : m_pEventData->headers)
	{
		if (mg_ncasecmp(name, header.first.c_str(), header.first.length()) == 0)
			return header.second;
	}

	return std::string();
}

std::string FHttpRequest::GetCookies()
{
	return GetHeader("Cookie");
}

size_t FHttpRequest::GetDataLen()
{
	return m_pEventData->body.length();
}

std::string FHttpRequest::GetData()
{
	return m_pEventData->body;
}

FHttpResponse::FHttpResponse(uint32_t connID, FHttpServer *pServer, StringPairArray *defaultHeaders) :
	m_connID(connID),
	m_pServer(pServer),
	m_nStatusCode(200),
	m_strStatus("OK"),
	m_pDefaultHeaders(defaultHeaders),
	m_bsenddata(false)
{

}

FHttpResponse::~FHttpResponse()
{
	if (!m_bsenddata)
		SendHttpContent(std::string());
}

void FHttpResponse::SetHttpStatusCode(int nStatusCode, const char *reason)
{
	m_nStatusCode = nStatusCode;
	m_strStatus = reason;
}

void FHttpResponse::SetHttpHeader(const std::string &key, const std::string &value)
{
	//修改了header 就建立自己的副本
	if (m_pDefaultHeaders)
	{
		m_arrHeaders = *m_pDefaultHeaders;
		m_pDefaultHeaders = NULL;
	}

	for (auto &header : m_arrHeaders)
	{
		if (mg_ncasecmp(key.c_str(), header.first.c_str(), key.length()) == 0)
		{
			header.second = value;
			return;
		}
	}

	m_arrHeaders.emplace_back(key, value);
}

void FHttpResponse::AppendHttpHeader(const std::string &key, const std::string &value)
{
	//修改了header 就建立自己的副本
	if (m_pDefaultHeaders)
	{
		m_arrHeaders = *m_pDefaultHeaders;
		m_pDefaultHeaders = NULL;
	}

	m_arrHeaders.emplace_back(key, value);
}

void FHttpResponse::SetHttpRedirect(const std::string &rurl)
{
	m_bsenddata = true;

	m_pServer->_broadcast(m_connID, new std::string(rurl), [](struct mg_connection *nc, void *data){
		std::string *rurl = (std::string *)data;
		if (nc)
			mg_http_send_redirect(nc, 302, mg_mk_str(rurl->c_str()), mg_mk_str(NULL));

		delete rurl;
	});
}

void FHttpResponse::SendHttpContent(const std::string &data, const char *type /*= NULL*/)
{
	if (type)
		SetHttpHeader("Content-Type", type);

	_SendData(&data);
}

void FHttpResponse::SendHttpChunkBegin()
{
	_SendData(NULL);
}

void FHttpResponse::SendHttpChunkContent(const std::string &data)
{
	m_pServer->_broadcast(m_connID, new std::string(data), [](struct mg_connection *nc, void *data){
		std::string *buf = (std::string *)data;
		if (nc)
			mg_send_http_chunk(nc, buf->c_str(), buf->length());

		delete buf;
	});
}

void FHttpResponse::_MakeResponseHeaders(std::string *data)
{
	//拼接header
	StringPairArray *headers = m_pDefaultHeaders;
	if (headers == NULL)
		headers = &m_arrHeaders;
	size_t nHeanderSize = headers->size();
	for (size_t i = 0; i < nHeanderSize; i++)
	{
		std::string str = headers->at(i).first + ": " + headers->at(i).second + "\r\n";
		data->append(str);
	}
}

void FHttpResponse::_SendData(const std::string *content)
{
	//不要重复发送
	if (m_bsenddata)
		return;

	std::string *pSendData = new std::string("HTTP/1.1 " + std::to_string(m_nStatusCode) + " " + m_strStatus + "\r\n");
	_MakeResponseHeaders(pSendData);

	if (content == NULL)
	{
		//开启了chunk模式
		pSendData->append("Transfer-Encoding: chunked\r\n\r\n");
	}
	else
	{
		//有Content
		pSendData->append("Content-Length: " + std::to_string(content->length()) + "\r\n\r\n");

		//拼接数据
		pSendData->append(*content);
	}

	//发送 要用mg_broadcast来完成回调 异步发送
	m_pServer->_broadcast(m_connID, pSendData, [](struct mg_connection *nc, void *data){
		std::string *buf = (std::string *)data;
		if (nc)
			mg_send(nc, buf->c_str(), buf->length());

		delete buf;
	});

	m_bsenddata = true;
}

uint32_t fsWSMessage::GetConnID()
{
	return m_pEventData->connID;
}

std::string fsWSMessage::GetMsg()
{
	return m_pEventData->body;
}

int fsWSMessage::GetFlag()
{
	return m_pEventData->wsflags;
}

void fsWSSession::SetName(const std::string &name)
{
	m_pServer->_broadcast(m_connID, new std::string(name), [](struct mg_connection *nc, void *data){
		std::string *name = (std::string *)data;
		if (nc)
			((MG_CONNECT_USERDATA *)nc->user_data)->wsname = *name;
		
		delete name;
	});
}

void fsWSSession::SendFrameMsg(const std::string &data, int type /*= WEBSOCKET_OP_TEXT*/)
{
	int ret = 0;
	struct wsData
	{
		int type;
		std::string buf;
	};
	wsData *param = new wsData{ type, data };

	//需要返回值 所以使用同步的方式发送
	m_pServer->_broadcast(m_connID, param, [](struct mg_connection *nc, void *data){
		wsData *p = (wsData *)data;
		if (nc)
			mg_send_websocket_frame(nc, p->type, p->buf.c_str(), p->buf.length());
		
		delete p;
	});
}

void fsWSSession::CloseConnect()
{
	m_pServer->_broadcast(m_connID, NULL, [](struct mg_connection *nc, void *data){
		if (nc)
			mg_send_websocket_frame(nc, WEBSOCKET_OP_CLOSE, "bye", 3);
	});
}


 


最后使用方法:

	FHttpServer server;
	server.AddHttpHandler("/hello", [](fsRequest &req, fsResponse &response){
 		printf("uri:%s\n", req.GetURI().c_str());
 		printf("remote:%s\n", req.GetRemoteIP().c_str());
 		printf("method:%s\n", req.GetMethod().c_str());
 		printf("p:%s\n", req.GetQueryParam("p").c_str());
 		printf("body(%d):%s\n", req.GetDataLen(), req.GetData().c_str());

		response.SendHttpContent("hello word!");
	});

	uint32_t connID = -1;
	server.AddWSHandler("/ws", 
		[&](fsRequest &req, fsWSSession &session)
		{
			printf("ws on ready %s.\n", req.GetQueryParam("id").c_str());
			session.SendFrame("ready.");
			session.SetName(req.GetQueryParam("id"));
			connID = req.GetConnID();
		},  //onready
		[](fsWSMessage &req, fsWSSession &session)
		{
			printf("body:%s\n", req.GetMsg().c_str());
			session.SendFrame(req.GetMsg());
		},
		[](uint32_t connID)
		{
			printf("ws on close.\n");
		} //onclose
	);

	server.StartServer("8881", 32);

	while (true)
	{
		int x = getchar();
		if (x == 'q')
			break;

		server.SendWSFrame(connID, "hello");
		server.BroadcastWS("123", "broadcase msg.");
	}

	server.StopServer();





2020年10月30日 | 发布:blackfeather | 分类:C/C++代码 | 评论:8

留言列表:

  • 梦醉天宇 发布于 2023/4/16 21:45:28  回复
  • 使用这个URL【http://192.168.0.139:9607/ws】在线程中调用 WorkEventHandler(eventdata);的地方会崩溃是怎么回事
    想知道ws这个怎么使用
    • blackfeather 发布于 2023/12/27 15:24:08  回复
    • 貌似这里确实有问题,楼下有提到问题根源。这个自己改改吧,由于性能问题不再使用mongoose了。。。
      • Gavin 发布于 2024/2/2 23:57:29  回复
      • 楼主,现在不用mongoose了么?还有其他什么推荐吗?
        • 博主 发布于 2024/5/22 15:37:39  回复
        • https://github.com/yhirose/cpp-httplib
          https://github.com/drogonframework/drogon
          https://github.com/zhllxt/asio2
  • 老百姓 发布于 2022/2/16 1:03:37  回复
  • 博主,请问tsBlockQuque这个报错。报错提示:tsBlockQuque不是模板,这怎么弄呀
    • 博主 发布于 2022/2/28 12:53:04  回复
    • tsBlockQuque就是上面说的concurrentqueue那个库。。。重新定义了个别名写起来比较短而已。。。
  • Loo 发布于 2021/8/31 10:07:17  回复
  • //处理映射表 不在映射表的就不加入队列了
    std::string uri;
    uri.assign(((http_message *)event_data)-&gt;uri.p, ((http_message *)event_data)-&gt;uri.len);
    auto iter = pServer-&gt;m_mapHttpRouter.find(uri);
    if (iter == pServer-&gt;m_mapHttpRouter.end())
    {
    //看看websocket
    auto wsiter = pServer-&gt;m_mapWSRouter.find(uri);
    if (wsiter == pServer-&gt;m_mapWSRouter.end())
    {
    //不支持的uri映射 返回403吧
    mg_http_send_error(nc, 403, &quot;Not Support.&quot;);
    return;
    }
    }

    此处http的请求不建议再去ws里面查找映射,否则在WorkEventHandler中使用m_mapHttpRouter[eventdata-&gt;uri](request, response);会直接崩溃掉
    • blackfeather 发布于 2023/12/27 15:22:48  回复
    • 收到
  • 二十四 发布于 2021/7/13 16:27:51  回复
  • 博主可以分享一下么
  • 柴鸡菜鸡 发布于 2021/6/24 14:57:17  回复
  • 新版本没有多线程了吗……函数名都找不到
    • 博主 发布于 2021/6/25 13:10:04  回复
    • 这个修改基于6.18的,7.x的函数和流程变动较大,没有再去做兼容
  • 菜鸡柴鸡 发布于 2021/6/24 11:50:40  回复
  • 好厉害,我看了几天了
  • alont 发布于 2021/3/11 10:57:46  回复
  • 你是用的mongoose的哪个版本呀?
    • 博主 发布于 2021/3/11 13:19:55  回复
    • 6.18。后来又有一些调整,有兴趣使用可以加QQ发给你
      • 二十四 发布于 2021/7/13 16:27:20  回复
      • 太棒了,我也是,最近项目需要 支持 ws 但是现阶段使用的 mongoose 所以需要修改,大佬求分享
  • alont 发布于 2021/3/11 10:57:07  回复
  • 博主牛逼啊。

发表留言: