但是,我们也得考虑一个问题,当前的设计是时间到了,则主动去执行定时任务,释放连接,那能不能在时间到了后,自动执行定时任务呢,这时候我们就想到一个操作–类的析构函数。
一个类的析构函数,在对象被释放时会自动被执行,那么我们如果将一个定时任务作为一个类的析构函数内的操作,则这个定时任务在对象被释放的时候就会执行。
但是仅仅为了这个目的,而设计一个额外的任务类,好像有些不划算,但是,这里我们又要考虑另一个问题,那就是假如有一个连接建立成功了,我们给这个连接设置了一个30s后的定时销毁任务,但是在第10s的时候,这个连接进行了一次通信,那么我们应该是在第30s的时候关闭,还是第40s的时候关闭呢?无疑应该是第40s的时候。也就是说,这时候,我们需要让这个第30s的任务失效,但是我们该如何实现这个操作呢?
这里,我们就用到了智能指针shared_ptr,shared_ptr有个计数器,当计数为0的时候,才会真正释放一个对象,那么如果连接在第10s进行了一次通信,则我们继续向定时任务中,添加一个30s后(也就是第40s)的任务类对象的shared_ptr,则这时候两个任务shared_ptr计数为2,则第30s的定时任务被释放的时候,计数-1,变为1,并不为0,则并不会执行实际的析构函数,那么就相当于这个第30s的任务失效了,只有在第40s的时候,这个任务才会被真正释放。
上述过程就是时间轮定时任务的思想了,当然这里为了更加简便的实现,进行了一些小小的调整实现。
总结:
1.同一时刻的定时任务只能添加一个,需要考虑如何在同一时刻支持添加多个定时任务
解决方案: 将时间轮的一维数组设计为二维数组(时间轮一位数组的每一个节点也是一个数组)
2.假设当前的定时任务是一个连接的非活跃销毁任务,这个任务什么时候添加到时间轮中比较合适
一个连接30s内都没有通信,则是一个非活跃连接,这时候就销毁。但是一个连接如果在建立的时候添加了一个30s后销毁的任务,但是这个连接30s内人家有数据通信,在第30s的时候不是一个非活跃连接。
思想:需要在一个连接有IO事件产生的时候,延迟定时任务的执行
作为一个时间轮定时器,本身并不关注任务类型,只要是时间到了就需要被执行 。
解决方案:类的析构函数+智能指针shared_ptr ,通过这两个技术可以实现定时任务的延时
1.使用一个类,对定时任务进行封装,类实例化的每一个对象,就是一个定时任务对象,当对象被销毁的时候,再去执行定时任务(将定时任务的执行,放到析构函数中)
2.shared ptr用于对new的对象进行空间管理,当shared_ptr对一个对象进行管理的时候,内部有一个计数器,计数器为0的时候,则释放所管理的对象
int * a = new int ;
std: shared ptr< int > pi ( a) ; -- - a对象只有在pi计数为0 的时候,才会被释放
std: shared ptr< int > pi1 ( pi) -- 当针对pi又构建了一个shared_ptr对象,则pi和pi1计数器为2
当pi和pi1中任意一个被释放的时候,只是计数器-1,因此他们管理的a对象并没有被释放,只有当pi和pi1都被释放了,计数器为0了,这时候才会释放管理的a对象
基于这个思想,我们可以使用shared_ptr来管理定时器任务对象
但是std::shared_ptr pi2(a);但是如果pi2是针对的原始对象构造的。并不会跟pi和pi1,共享计数
使用案例:
# include
# include
# include
# include
# include
# include
using TaskFunc = std:: function< void ( ) > ;
using ReleaseFunc = std:: function< void ( ) > ;
class TimerTask
{
private :
uint64_t _id;
uint32_t _timeout;
bool _canceled;
TaskFunc _task_cb;
ReleaseFunc _release;
public :
TimerTask ( const uint64_t & id, const uint32_t & delay, const TaskFunc & cb)
: _id ( id) , _timeout ( delay) , _task_cb ( cb) , _canceled ( false ) { }
~ TimerTask ( )
{
if ( _canceled == false )
_task_cb ( ) ;
_release ( ) ;
}
void Canceled ( ) { _canceled = true ; }
uint32_t DelayTime ( ) { return _timeout; }
void SetRelease ( const ReleaseFunc & cb) { _release = cb; }
} ;
using PtrTask = std:: shared_ptr< TimerTask> ;
using WeakTask = std:: weak_ptr< TimerTask> ;
class TimerWheel
{
private :
std:: vector< std:: vector< PtrTask>> _wheel;
int _tick;
int _capacity;
std:: unordered_map< uint64_t , WeakTask> _timers;
private :
void RemoveTimer ( const uint64_t & id)
{
auto it = _timers. find ( id) ;
if ( it != _timers. end ( ) )
{
_timers. erase ( it) ;
}
}
public :
TimerWheel ( ) : _tick ( 0 ) , _capacity ( 60 ) , _wheel ( _capacity) { }
void TimerAdd ( const uint64_t & id, const uint32_t & delay, const TaskFunc & cb)
{
PtrTask ptr ( new TimerTask ( id, delay, cb) ) ;
ptr-> SetRelease ( std:: bind ( & TimerWheel:: RemoveTimer, this , id) ) ;
int pos = ( _tick + delay) % _capacity;
_wheel[ pos] . push_back ( ptr) ;
_timers[ id] = WeakTask ( ptr) ;
}
void TimerRefresh ( const uint64_t & id)
{
auto it = _timers. find ( id) ;
if ( it == _timers. end ( ) )
{
return ;
}
PtrTask ptr = it-> second. lock ( ) ;
int delay = ptr-> DelayTime ( ) ;
int pos = ( _tick + delay) % _capacity;
_wheel[ pos] . push_back ( ptr) ;
}
void TimerCancel ( const uint64_t & id)
{
auto it = _timers. find ( id) ;
if ( it == _timers. end ( ) )
return ;
PtrTask ptr = it-> second. lock ( ) ;
if ( ptr)
ptr-> Canceled ( ) ;
}
void RunTimerTask ( )
{
_tick = ( _tick + 1 ) % _capacity;
_wheel[ _tick] . clear ( ) ;
}
} ;
class Test
{
public :
Test ( ) { std:: cout << "Test 构造" << std:: endl; }
~ Test ( ) { std:: cout << "Test 析构" << std:: endl; }
} ;
void Release ( const Test * t)
{
delete t;
}
int main ( )
{
TimerWheel tw;
Test * t = new Test ( ) ;
tw. TimerAdd ( 100 , 5 , std:: bind ( Release, t) ) ;
for ( int i = 0 ; i < 5 ; i++ )
{
sleep ( 1 ) ;
tw. TimerRefresh ( 100 ) ;
tw. RunTimerTask ( ) ;
std:: cout << "刷新了定时任务,需要在5秒钟之后进行销毁" << std:: endl;
}
for ( ; ; )
{
sleep ( 1 ) ;
std:: cout << "------------------" << std:: endl;
tw. RunTimerTask ( ) ;
}
return 0 ;
}
3.正则库的简单使用
正则表达式(regular expression)描述了一种字符串匹配的模式(pattern),可以用来检查一个串是否含有某种子串、将匹配的子串替换或者从某个串中取出符合某个条件的子串等。
正则表达式的使用,可以使得HTTP请求的解析更加简单(这里指的时程序员的工作变得的简单,这并不代表处理效率会变高,实际上效率上是低于直接的字符串处理的),使我们实现的HTTP组件库使用起来更加灵活。
我们可以在这里学习相关的语法:正则表达式教程–菜鸟教程
bool std: regex_match ( const stdrstring & src, std: smatch & matches, std: regex & e)
src: 原始字符串
matches: 正则表达式可以从原始字符串中匹配并提取符合某种规则的数据,提取的数据就放在matches中,是一个类似于数组的容器
e: 正则表达式的匹配规则
返回值: 用于确定匹配是否成功
正则表达式的简单案例:
void regex_test ( )
{
std:: string str = "/numbers/1234" ;
std:: regex e ( "/numbers/(d+)" ) ;
std:: smatch matches;
bool ret = std:: regex_match ( str, matches, e) ;
if ( ret == false )
{
std:: cout << "regex_match failed" << std:: endl;
}
for ( auto & str : matches)
{
std:: cout << str << std:: endl;
}
}
/ numbers/ 1234
1234
HTTP请求行的匹配
int main ( )
{
std:: string request = "GET /www.baidu.com/login?user=xiaoming&pass=123123 HTTP/1.1
" ;
std:: smatch matches;
std:: regex e ( "(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:?(.*))? (HTTP/1.[01])(?:
|
)?" ) ;
bool ret = std:: regex_match ( request, matches, e) ;
if ( ret == false )
{
std:: cout << "regex_match failed" << std:: endl;
}
for ( auto & str : matches)
{
std:: cout << str << std:: endl;
}
return 0 ;
}
4.通用类型any类型的实现
在本项目中,我们要实现一个高并发的服务器组件,能够的接收并处理客户端发送过来的请求,就必然涉及到与客户端的通信,而通信就必然涉及到对套接字的操作;同时,由于 TCP 是面向字节流的,因此服务器在接收客户端数据的时候就可能出现 socket 中的数据不足一条完整请求的情况,此时我们请求处理到一半时就需要停下来等待 socket 中下一次的数据到来。
因此我们需要为客户端连接设置一个请求处理的上下文,用来保存请求接收、解析以及处理的状态,它决定着对于下一次从缓冲区中取出的数据如何进行处理、从哪里开始处理等。同时,对于一条完整的请求,我们还需要对其进行解析,得到各种关键的要素,比如 HTTP 请求中的请求方法、请求URL、HTTP版本等,这些信息都会被保存在请求处理上下文中。
那么我们应该如何保存请求接收、解析以及处理的各种状态信息呢,定义一个 HTTP 请求信息的结构用于填充吗?如果我们的服务器组件仅支持 HTTP 协议这样做是可以的,但我们设计的服务器的目标是要能够支持各种不同的应用层协议,便于我们组件的使用者能够根据自己不同的业务场景定制对应的应用层协议进行使用,因此我们就需要让这个结构能够保存不同类型的数据,此时就需要 any 出场了。
每一个Connection对连接进行管理,最终都不可避免需要涉及到应用层协议的处理,因此在Connection中需要设置协议处理的上下文来控制处理节奏。但是应用层协议千千万,为了降低耦合度,这个协议接收解析上下文就不能有明显的协议倾向,它可以是任意协议的上下文信息,因此就需要一个通用的类型来保存各种不同的数据结构。
在C语言中,通用类型可以使用void*来管理,但是在C++中,boost库和C++17给我们提供了一个通用类型any来灵活使用,如果考虑增加代码的移植性,尽量减少第三方库的依赖,则可以使用C++17特性中的any,或者自己来实现。而这个any通用类型类的实现其实并不复杂,以下是简单的部分实现。
# include
# include
# include
# include
# include
class holder
{
public :
virtual ~ holder ( ) { }
virtual const std:: type_info & type ( ) = 0 ;
virtual holder * clone ( ) = 0 ;
} ;
template < class T >
class placeholder : public holder
{
public :
placeholder ( const T & val) : _val ( val) { }
virtual const std:: type_info & type ( ) { return typeid ( T) ; }
virtual holder * clone ( ) { return new placeholder< T> ( _val) ; }
virtual ~ placeholder ( ) { }
public :
T _val;
} ;
class Any
{
private :
holder * _content;
public :
Any ( ) : _content ( nullptr ) { }
template < class T >
Any ( const T & val) : _content ( new placeholder< T> ( val) ) { }
Any ( const Any & other) : _content ( other. _content ? other. _content-> clone ( ) : nullptr ) { }
~ Any ( )
{
if ( _content)
delete _content;
}
Any & swap ( Any & other)
{
std:: swap ( _content, other. _content) ;
return * this ;
}
template < class T >
T * get ( )
{
assert ( typeid ( T) == _content-> type ( ) ) ;
if ( _content == nullptr )
return nullptr ;
return & ( ( placeholder< T> * ) _content) -> _val;
}
template < class T >
Any & operator = ( const T & val)
{
Any ( val) . swap ( * this ) ;
return * this ;
}
Any & operator = ( const Any & other)
{
Any ( other) . swap ( * this ) ;
return * this ;
}
} ;
class Test
{
public :
Test ( ) { std:: cout << "Test() 构造" << std:: endl; }
Test ( const Test & t) { std:: cout << "Test(const Test& t) 拷贝构造" << std:: endl; }
~ Test ( ) { std:: cout << "~Test() 析构" << std:: endl; }
} ;
int main ( )
{
std:: any a;
a = 10 ;
int * pi = std:: any_cast< int > ( & a) ;
std:: cout << * pi << std:: endl;
a = std:: string ( "hello" ) ;
std:: string * ps = std:: any_cast< std:: string> ( & a) ;
std:: cout << * ps << std:: endl;
return 0 ;
}
下面是C++17中any的使用用例:
int main ( )
{
std:: any a;
a = 10 ;
int * pi = std:: any_cast< int > ( & a) ;
std:: cout << * pi << std:: endl;
a = std:: string ( "hello" ) ;
std:: string * ps = std:: any_cast< std:: string> ( & a) ;
std:: cout << * ps << std:: endl;
}
需要注意的是,C++17的特性需要高版本的g++编译器支持,建议g++ 7.3及以上版本。
sudo yum install centos-release-scl-rh centos-release-scl
sudo yum install devtoolset-7-gcc devtoolset-7-gcc-c++
source /opt/rh/devtoolset-7/enable
echo "source /opt/rh/devtoolset-7/enable" >> ~/.bashrc
// 查看g++版本
g++ -v
四、功能模块划分
基于以上的理解,我们要实现的是一个带有协议支持的Reactor模型高性能服务器,因此将整个项目的实现划分为两个大的模块:
SERVER模块:实现Reactor模型的TCP服务器
协议模块:对当前的Reactor模型服务器提供应用层协议支持
1.SERVER模块
SERVER模块就是对所有的连接以及线程进行管理,让它们各司其职,在合适的时候做合适的事,最终完成高性能服务器组件的实现。而具体的管理也分为三个方面:
监听连接管理:对监听连接进行管理。
通信连接管理:对通信连接进行管理。
超时连接管理:对超时连接进行管理。
基于以上的管理思想,将这个模块进行细致的划分又可以划分为以下多个子模块:
Buffer 模块:实现通信套接字的用户态缓冲区,防止接收到的数据不是一条完整的数据,同时确保客户端响应的数据在套接字可写的情况下进行发送。
Socket 模块:对 socket 套接字的操作进行封装,使得程序中对于套接字的各项操作更加简便。
Channel 模块:对于一个描述符进行监控事件管理,便于在用户态对描述符的监控事件进行维护。
Connection 模块:对通信连接进行整体管理,一个连接的所有操作都通过此模块来完成,增加连接操作的灵活以及便捷性。
Acceptor 模块:对监听套接字进行管理,为客户端的新建连接创建 Connection 对象,并设置各种回调。
TimerQueue 模块:定时任务模块,让一个任务可以在指定的时间之后被执行。
Poller模块:对任意的描述符进行IO事件监控,本质上就是对 epoll 的各种操作进行封装,从而让对描述符进行事件监控的操作更加简单,此模块是 Channel 模块的一个子模块。
EventLoop 模块:对事件监控进行管理,为了确保线程安全,此模块一个模块对应一个线程,服务器中的所有的事件都是由此模块来完成。
LoopThread 模块:将 EventLoop 与 thread 整合到一起,向外部返回所实例化的 EventLoop 对象,即将 EventLoop 对象与线程一一绑定。
LoopThreadPool 模块:LoopThread 线程池,用于对所有的 LoopThread 进行管理及分配。
TcpServer 模块:对前边所有子模块进行整合,从而提供给组件使用者的可以便捷的完成一个高性能服务器搭建的模块。
1.1Buffer模块
Buffer模块是一个缓冲区模块,用于实现通信中用户态的接收缓冲区和发送缓冲区功能
功能:用于实现通信套接字的用户态缓冲区
意义:
1.防止接收到的数据不是一条完整的数据,因此对接收的数据进行缓存
2.对于客户端响应的数据,应该是在套接字可写的情况下进行发送
功能设计:
1.向缓冲区添加数据
2.从缓冲区中取出数据
1.2.Socket模块
Socket模块是对套接字操作封装的一个模块,主要实现的socket的各项操作。
功能:对socket套接字的操作进行封装
意义:程序中对于套接字的各项操作更加简便
功能设计:
1.创建套接字
2.绑定地址信息
3.开始监听
4.向服务器发起连接
5.获取新连接
6.接收数据
7.发送数据
8.创建一个监听连接
9.创建一个客户端连接
10.设置套接字选项–开启地址端口复用
11.设置套接字阻塞属性–设置为非阻塞
1.3Channel模块
Channel模块是对一个描述符需要进行的IO事件管理的模块,实现对描述符可读,可写,错误事件的管理操作,以及Poller模块对描述符进行IO事件监控就绪后,根据不同的事件,回调不同的处理函数功能。
功能:对于一个描述符进行监控事件的管理
意义:对于描述符的监控事件在用户态更容易维护,以及触发事件后的操作流程更加的清晰
功能设计:
1.对监控事件的管理:
1.1描述符是否可读
1.2描述符是否可写
1.3对描述符监控可读
1.4对描述符监控可写
1.5解除对可读事件监控
1.6解除对可写事件监控
1.7解除所有事件监控
2.对监控事件触发后处理:设置对于不同事件的回调函数,明确触发了某个事件之后应该怎么处理
1.4Connection模块
功能:
1.这是一个对于通信连接进行整体管理的一个模块。对一个连接的操作都是通过这个模块进行的
2.Connection模块,一个连接有任何的事件该怎么处理都是由这个模块来进行处理的,因为组件的设计也不知道使用者要如何处理事件。因此只能是提供一些事件回调函数由使用者设置
意义:这个模块本身来说不是一个单独的功能模块,是一个对连接做管理的模块。增加连接操作的灵活以及使捷性
功能设计:
1.关闭连接
2.发送数据
3.协议切换
4.启动非活跃连接超时释放
5.取消非活跃连接超时释放
6.回调函数设置:
1.连接建立完成的回调
2.连接有新数据接收成功后的回调
3.连接关闭时的回调
4.产生任何事件进行的回调
Connection模块是对Buffer模块,Socket模块,Channel模块的一个整体封装,实现了对一个通信套接字的整体的管理,每一个进行数据通信的套接字(也就是accept获取到的新连接)都会使用Connection进行管理。
Connection模块内部包含有四个由组件使用者传入的回调函数:连接建立完成回调,事件回调,新数据回调,关闭回调。
Connection模块内部包含有两个组件使用者提供的接口:数据发送接口,连接关闭接口
Connection模块内部包含有两个用户态缓冲区:用户态接收缓冲区,用户态发送缓冲区
Connection模块内部包含有一个Socket对象:完成描述符面向系统的IO操作
Connection模块内部包含有一个Channel对象:完成描述符IO事件就绪的处理
具体处理流程如下:
1.实现向Channel提供可读,可写,错误等不同事件的IO事件回调函数,然后将Channel和对应的描述符添加到Poller事件监控中。
2.当描述符在Poller模块中就绪了IO可读事件,则调用描述符对应Channel中保存的读事件处理函数,进行数据读取,将socket接收缓冲区全部读取到Connection管理的用户态接收缓冲区中。然后调用由组件使用者传入的新数据到来回调函数进行处理。
3.组件使用者进行数据的业务处理完毕后,通过Connection向使用者提供的数据发送接口,将数据写入Connection的发送缓冲区中。
4.启动描述符在Poll模块中的IO写事件监控,就绪后,调用Channel中保存的写事件处理函数,将发送缓冲区中的数据通过Socket进行面向系统的实际数据发送。
1.5Acceptor模块
功能:对监听套接字进行管理
意义:
1.当获取了一个新建连接的描述符之后,需要为这个通信连接,封装一个Connection对象,设置各种不同回调
2.注意:因为Acceptor模块本身并不知道一个连接产生了某个事件该如何处理,因此获取一个通信连接后,Connection的封装,以及事件回调的设置都应该由服务器模块来进行
功能设计:回调函数设置,新建连接获取成功的回调设置,由服务器来指定
Acceptor模块是对Socket模块,Channel模块的一个整体封装,实现了对一个监听套接字的整体的管理。
Acceptor模块内部包含有一个Socket对象:实现监听套接字的操作
Acceptor模块内部包含有一个Channel对象:实现监听套接字IO事件就绪的处理
具体处理流程如下:
1.实现向Channel提供可读事件的IO事件处理回调函数,函数的功能其实也就是获取新连接
2.为新连接构建一个Connection对象出来。
1.6TimerQueue模块
功能:定时任务模块,让一个任务可以在指定的时间之后被执行
意义:组件内部,对于非活跃连接希望在N秒之后被释放
功能设计:
1.添加定时任务
2.刷新定时任务:希望一个定时任务重新开始计时
3.取消定时任务
TimerQueue模块是实现固定时间定时任务的模块,可以理解就是要给定时任务管理器,向定时任务管理器中添加一个任务,任务将在固定时间后被执行,同时也可以通过刷新定时任务来延迟任务的执行。这个模块主要是对Connection对象的生命周期管理,对非活跃连接进行超时后的释放功能。
TimerQueue模块内部包含有一个timerfd:Linux系统提供的定时器。
TimerQueue模块内部包含有一个Channel对象:实现对timerfd的IO时间就绪回调处理
1.7Poller模块
功能:对任意的描述符进行lO事件监控
意义:对epoll进行的封装,让对描述符进行事件监控的操作更加简单
功能接口:
1.添加事件监控:Channel模块
2.修改事件监控
3.移除事件监控
Poller模块是对epoll进行封装的一个模块,主要实现epoll的IO事件添加,修改,移除,获取活跃连接功能。
1.8EventLoop模块
功能:
1.进行事件监控管理的模块
2.这个模块其实就是我们所说的one thread one loop中的loop,也是我们所说的reactor
3.这个模块必然是一个模块对应一个线程
意义:
1.对于服务器中的所有的事件都是由EventLoop模块来完成
2.每一个Connection连接,都会绑定一个EventLoop模块和线程,因为外界对于连接的所有操作,都是要放到同一个线程中进行的
思想:
1.对所有的连接进行事件监控,连接触发事件后调用回调进行处理
2.对于连接的所有操作,都要放到EventLoop线程中执行
功能设计:
1.将连接的操作任务添加到任务队列
2.定时任务的添加
3.定时任务的刷新
4.定时任务的取消
EventLoop模块可以理解就是我们上边所说的Reactor模块,它是对Poller模块,TimerQueue模块,Socket模块的一个整体封装,进行所有描述符的事件监控。EventLoop模块必然是一个对象对应一个线程的模块,线程内部的目的就是运行EventLoop的启动函数。
EventLoop模块为了保证整个服务器的线程安全问题,因此要求使用者对于Connection的所有操作一定要在其对应的EventLoop线程内完成,不能在其他线程中进行(比如组件使用者使用Connection发送数据,以及关闭连接这种操作)。
EventLoop模块保证自己内部所监控的所有描述符,都要是活跃连接,非活跃连接就要及时释放避免资源浪费。
EventLoop模块内部包含有一个eventfd:eventfd其实就是Linux内核提供的一个事件fd,专门用于事件通知。
EventLoop模块内部包含有一个Poller对象:用于进行描述符的IO事件监控。
EventLoop模块内部包含有一个TimerQueue对象:用于进行定时任务的管理。
EventLoop模块内部包含有一个PendingTask队列:组件使用者将对Connection进行的所有操作,都加入到任务队列中,由EventLoop模块进行管理,并在EventLoop对应的线程中进行执行。
每一个Connection对象都会绑定到一个EventLoop上,这样能保证对这个连接的所有操作都是在一个线程中完成的。
具体操作流程:
1.通过Poller模块对当前模块管理内的所有描述符进行IO事件监控,有描述符事件就绪后,通过描述符对应的Channel进行事件处理。
2.所有就绪的描述符IO事件处理完毕后,对任务队列中的所有操作顺序进行执行。
3.由于epoll的事件监控,有可能会因为没有事件到来而持续阻塞,导致任务队列中的任务不能及时得到执行,因此创建了eventfd,添加到Poller的事件监控中,用于实现每次向任务队列添加任务的时候,通过向eventfd写入数据来唤醒epoll的阻塞。
1.9TcpServer模块
功能:对前边所有子模块的整合模块,是提供给用户用于搭建一个高性能服务器的模块
意义:让组件使用者可以更加轻使的完成—个服务器的搭建
功能设计:
1.对于监听连接的管理
2.对于通信连接的管理
3.对于超时连接的管理
4.对于事件监控的管理
5.事件回调函数的设置:一个连接产生了一个事件,对于这个事件如何处理,只有组件使用者知道,因此一个事件的处理回调,一定是组件使用者,设置给TcpServer,TcpServer设置给各个Connection连接
这个模块是一个整体TCP服务器模块的封装,内部封装了Acceptor模块,EventLoop ThreadPool模块。
TcpServer中包含有一个EventLoop对象:以备在超轻量使用场景中不需要EventLoop线程池,只需要在主线程中完成所有操作的情况。
TcpServer模块内部包含有一个EventLoop ThreadPool对象:其实就是EventLoop线程池,也就是子Reactor线程池
TcpServer模块内部包含有一个Acceptor对象:一个TcpServer服务器,必然对应有一个监听套接字,能够完成获取客户端新连接,并处理的任务。
TcpServer模块内部包含有一个std::shared_ptr的hash表:保存了所有的新建连接对应的Connection,注意,所有的Connection使用shared_ptr进行管理,这样能够保证在hash表中删除了Connection信息后,在shared_ptr计数器为0的情况下完成对Connection资源的释放操作。
具体操作流程如下:
1.在实例化TcpServer对象过程中,完成BaseLoop的设置,Acceptor对象的实例化,以及EventLoop线程池的实例化,以及std::shared_ptr的hash表的实例化。
2.为Acceptor对象设置回调函数:获取到新连接后,为新连接构建Connection对象,设置Connection的各项回调,并使用shared_ptr进行管理,并添加到hash表中进行管理,并为Connection选择一个EventLoop线程,为Connection添加一个定时销毁任务,为Connection添加事件监控,
3.启动BaseLoop。
2.HTTP协议模块
HTTP协议模块用于对高并发服务器模块进行协议支持,基于提供的协议支持能够更方便的完成指定协议服务器的搭建。而HTTP协议支持模块的实现,可以细分为以下几个模块。
2.1Util模块
这个模块是一个工具模块,主要提供HTTP协议模块所用到的一些工具函数,比如url编解码,文件读写等。
2.2HttpRequest模块
这个模块是HTTP请求数据模块,用于保存HTTP请求数据被解析后的各项请求元素信息。
2.3HttpResponse模块
这个模块是HTTP响应数据模块,用于业务处理后设置并保存HTTP响应数据的的各项元素信息,最终会被按照HTTP协议响应格式组织成为响应信息发送给客户端。
2.4HttpContext模块
这个模块是一个HTTP请求接收的上下文模块,主要是为了防止在一次接收的数据中,不是一个完整的HTTP请求,则解析过程并未完成,无法进行完整的请求处理,需要在下次接收到新数据后继续根据上下文进行解析,最终得到一个HttpRequest请求信息对象,因此在请求数据的接收以及解析部分需要一个上下文来进行控制接收和处理节奏。
2.5HttpServer模块
这个模块是最终给组件使用者提供的HTTP服务器模块了,用于以简单的接口实现HTTP服务器的搭建。
HttpServer模块内部包含有一个TcpServer对象:TcpServer对象实现服务器的搭建
HttpServer模块内部包含有两个提供给TcpServer对象的接口:连接建立成功设置上下文接口,数据处理接口。
HttpServer模块内部包含有一个hash-map表存储请求与处理函数的映射表:组件使用者向HttpServer设置哪些请求应该使用哪些函数进行处理,等TcpServer收到对应的请求就会使用对应的函数进行处理。
整体的模块示意图如下:
五、模块关系图
Connection 模块关系图,Acceptor 模块关系图,EventLoop 模块关系图
六、SERVER服务器模块实现
1.缓冲区Buffer类实现
Buffer模块:缓冲区模块
提供的功能:存储数据,取出数据
实现思想:
1.实现缓冲区得有一块存储空间,采用vector vector底层其实使用的就是一块线性的空间
2.要素:
1.默认空间的大小
2.当前的读取数据位置
3.当前的写入数据位置
3.操作:
1.写入数据:当前写入位置指向哪里,就从哪里开始写入,如果后续剩余的空间不够了,考虑整体缓冲区空间是否足够(因为读位置也会向后偏移,前边有可能会有空闲空间)如果空间足够,将数据移动到起始位置即可,如果空间不够,就进行扩容,从当前写位置开始扩容足够空间大小,数据一旦写入成功,当前写位置就要向后偏移
2.读取数据:当前的读取位置指向哪里,就从哪里开始读取,前提是有数据可读,可读数据大小:当前写入位置减去当前读取位置
Buffer 模块的设计思想如下:
框架设计:
class Buffer
{
private :
std:: vector< char > _buffer;
uint64_t _read_index;
uint64_t _write_index;
public :
} ;
具体实现:
# define BUFFER_DEFAULT_SIZE 1024
class Buffer
{
private :
std:: vector< char > _buffer;
uint64_t _reader_idx;
uint64_t _writer_idx;
public :
Buffer ( ) : _reader_idx ( 0 ) , _writer_idx ( 0 ) , _buffer ( BUFFER_DEFAULT_SIZE) { }
~ Buffer ( ) { }
public :
char * Begin ( ) { return & * _buffer. begin ( ) ; }
char * WritePosition ( ) { return Begin ( ) + _writer_idx; }
char * ReadPosition ( ) { return Begin ( ) + _reader_idx; }
uint64_t TailIdleSize ( ) { return _buffer. size ( ) - _writer_idx; }
uint64_t HeadIdleSize ( ) { return _reader_idx; }
uint64_t ReadAbleSize ( ) { return _writer_idx - _reader_idx; }
void MoveReadOffset ( const uint64_t & len)
{
if ( len == 0 )
return ;
assert ( len <= ReadAbleSize ( ) ) ;
_reader_idx += len;
}
void MoveWriteOffset ( const uint64_t & len)
{
if ( len == 0 )
return ;
assert ( len <= TailIdleSize ( ) ) ;
_writer_idx += len;
}
void EnsureWriteSpace ( const uint64_t & len)
{
if ( len <= TailIdleSize ( ) )
{
return ;
}
else if ( len <= TailIdleSize ( ) + HeadIdleSize ( ) )
{
uint64_t readablesize = ReadAbleSize ( ) ;
std:: copy ( ReadPosition ( ) , ReadPosition ( ) + readablesize, Begin ( ) ) ;
_reader_idx = 0 ;
_writer_idx = readablesize;
}
else
{
_buffer. resize ( _writer_idx + len) ;
}
}
void Write ( const void * buffer, const uint64_t & len)
{
if ( len == 0 )
return ;
EnsureWriteSpace ( len) ;
const char * d = ( const char * ) buffer;
std:: copy ( d, d + len, WritePosition ( ) ) ;
}
void WriteAndPush ( const void * buffer, const uint64_t & len)
{
Write ( buffer, len) ;
MoveWriteOffset ( len) ;
}
void WriteString ( std:: string & data)
{
return Write ( data. c_str ( ) , data. size ( ) ) ;
}
void WriteStringAndPush ( std:: string & data)
{
WriteString ( data) ;
MoveWriteOffset ( data. size ( ) ) ;
}
void WriteBuffer ( Buffer & data)
{
return Write ( data. ReadPosition ( ) , data. ReadAbleSize ( ) ) ;
}
void WriteBufferAndPush ( Buffer & data)
{
WriteBuffer ( data) ;
MoveWriteOffset ( data. ReadAbleSize ( ) ) ;
}
void Read ( void * buffer, const uint64_t & len)
{
assert ( len <= ReadAbleSize ( ) ) ;
std:: copy ( ReadPosition ( ) , ReadPosition ( ) + len, ( char * ) buffer) ;
}
void ReadAndPop ( void * buffer, const uint64_t & len)
{
Read ( buffer, len) ;
MoveReadOffset ( len) ;
}
std:: string ReadAsString ( const uint64_t & len)
{
assert ( len <= ReadAbleSize ( ) ) ;
std:: string str;
str. resize ( len) ;
Read ( & str[ 0 ] , len) ;
return str;
}
std:: string ReadAsStringAndPop ( const uint64_t & len)
{
assert ( len <= ReadAbleSize ( ) ) ;
std:: string str = ReadAsString ( len) ;
MoveReadOffset ( len) ;
return str;
}
char * FindCRLF ( )
{
char * res = ( char * ) memchr ( ReadPosition ( ) , '
' , ReadAbleSize ( ) ) ;
return res;
}
std:: string GetOneLine ( )
{
char * pos = FindCRLF ( ) ;
if ( pos == nullptr )
return "" ;
return ReadAsString ( pos - ReadPosition ( ) + 1 ) ;
}
std:: string GetOneLineAndPop ( )
{
std:: string str = GetOneLine ( ) ;
MoveReadOffset ( str. size ( ) ) ;
return str;
}
void Clear ( )
{
_reader_idx = 0 ;
_writer_idx = 0 ;
}
} ;
2.日志宏的实现
# include
# include
# include
# define NORMAL 0
# define DEBUG 1
# define ERROR 2
# define LOG_LEVEL DEBUG
# define LOG ( level, format, . . . ) do {
if ( level < NORMAL) break ;
time_t t = time ( nullptr ) ;
struct tm * ltm = localtime ( & t) ;
char buffer[ 32 ] = { 0 } ;
strftime ( buffer, 31 , "%H:%M:%S" , ltm) ;
fprintf ( stdout , "[%p %s %s:%d]" format "
" , ( void * ) pthread_self ( ) , buffer, __FILE__ , __LINE__ , ## __VA_ARGS__) ;
} while ( 0 )
# define NOR_LOG ( format, . . . ) LOG ( NORMAL, format, ## __VA_ARGS__)
# define DBG_LOG ( format, . . . ) LOG ( DEBUG, format, ## __VA_ARGS__)
# define ERR_LOG ( fromat, . . . ) LOG ( ERROR, format, ## __VA_ARGS__)
enum
{
NORMAL,
DEBUG,
WARNING,
ERROR,
FATAL
} ;
const char * level_to_string ( int level)
{
switch ( level)
{
case NORMAL:
return "NORMAL" ;
case DEBUG:
return "DEBUG" ;
case WARNING:
return "WARNING" ;
case ERROR:
return "ERROR" ;
case FATAL:
return "FATAL" ;
default :
return "" ;
}
}
# define LogMessage ( level, format, . . . )
do
{
const char * level_str = level_to_string ( level) ;
time_t ts = time ( nullptr ) ;
struct tm * lt = localtime ( & ts) ;
char buffer[ 32 ] = { 0 } ;
strftime ( buffer, sizeof ( buffer) - 1 , "%y-%m-%d %H:%M:%S" , lt) ;
fprintf ( stdout , "[%s][%s][%s:%d] " format "
" , level_str, buffer, __FILE__ , __LINE__ , ## __VA_ARGS__) ;
} while ( 0 )
3.Socket模块实现
功能设计:
1.创建套接字
2.绑定地址信息
3.开始监听
4.向服务器发起连接
5.获取新连接
6.接收数据
7.发送数据
8.创建一个监听连接
9.创建一个客户端连接
10.设置套接字选项–开启地址端口复用
11.设置套接字阻塞属性–设置为非阻塞
具体实现时的一些细节如下:
Socket 类的目的是对 socket 原生的各种操作进行封装,便于我们后面使用,但即使是这样,创建一个服务端/客户端连接的步骤也显得较为繁琐,所以我们在 Socket 类中提供了直接创建一个服务端连接以及直接创建一个客户端连接的接口。 在 TCP 中,一个连接 bind 了一个地址与端口后,一旦连接断开则会进入 time_wait 状态,此时连接不会立即释放,会继续占用地址和端口,这种策略是用来保护客户端的,但它也会造成我们服务器崩溃后不能立即重新启动,因此我们需要对服务端连接设置套接字选项,开启地址与端口复用。 我们通过 recv/send 系统调用来读取与发送 socket 中的数据时,一般会直接将 socket 缓冲区读空或者写满,而由于套接字默认是阻塞的,因此这会导致我们的程序阻塞在 recv/send 函数这里,因此我们还需要为套接字设置非阻塞属性。
具体实现:
# define MAX_LISTEN 1024
class Socket
{
private :
int _sockfd;
public :
Socket ( ) : _sockfd ( - 1 ) { }
Socket ( int fd) : _sockfd ( fd) { }
~ Socket ( ) { Close ( ) ; }
int Fd ( ) { return _sockfd; }
public :
bool Create ( )
{
_sockfd = socket ( AF_INET, SOCK_STREAM, IPPROTO_TCP) ;
if ( _sockfd < 0 )
{
LogMessage ( FATAL, "socket create failed" ) ;
return false ;
}
return true ;
}
bool Bind ( const std:: string & ip, const uint16_t & port)
{
struct sockaddr_in local;
memset ( & local, 0 , sizeof local) ;
local. sin_family = AF_INET;
local. sin_port = htons ( port) ;
local. sin_addr. s_addr = inet_addr ( ip. c_str ( ) ) ;
socklen_t len = sizeof ( local) ;
int ret = bind ( _sockfd, ( struct sockaddr * ) & local, len) ;
if ( ret < 0 )
{
LogMessage ( FATAL, "socket bind failed" ) ;
return false ;
}
return true ;
}
bool Listen ( int backlog = MAX_LISTEN)
{
int ret = listen ( _sockfd, backlog) ;
if ( ret < 0 )
{
LogMessage ( FATAL, "socket listen failed" ) ;
return false ;
}
return true ;
}
bool Connect ( const std:: string & ip, const uint16_t & port)
{
struct sockaddr_in server;
memset ( & server, 0 , sizeof server) ;
server. sin_family = AF_INET;
server. sin_port = htons ( port) ;
server. sin_addr. s_addr = inet_addr ( ip. c_str ( ) ) ;
socklen_t len = sizeof ( server) ;
int ret = connect ( _sockfd, ( struct sockaddr * ) & server, len) ;
if ( ret < 0 )
{
LogMessage ( FATAL, "socket connect failed" ) ;
return false ;
}
return true ;
}
int Accept ( )
{
int newfd = accept ( _sockfd, nullptr , nullptr ) ;
if ( newfd < 0 )
{
LogMessage ( FATAL, "connect newfd failed" ) ;
return - 1 ;
}
return newfd;
}
ssize_t Recv ( void * buf, size_t len, int flags = 0 )
{
ssize_t ret = recv ( _sockfd, buf, len, flags) ;
if ( ret <= 0 )
{
if ( errno == EAGAIN || errno == EINTR)
{
return 0 ;
}
LogMessage ( FATAL, "recv message failed" ) ;
return - 1 ;
}
return ret;
}
ssize_t NonBlockRecv ( void * buf, size_t len)
{
return Recv ( buf, len, MSG_DONTWAIT) ;
}
ssize_t Send ( const void * buf, size_t len, int flags = 0 )
{
ssize_t ret = send ( _sockfd, buf, len, flags) ;
if ( ret < 0 )
{
if ( errno == EINTR || errno == EAGAIN)
{
return 0 ;
}
LogMessage ( FATAL, "send message failed" ) ;
return - 1 ;
}
return ret;
}
ssize_t NonBlockSend ( const void * buf, size_t len)
{
if ( len == 0 )
return 0 ;
return Send ( buf, len, MSG_DONTWAIT) ;
}
void Close ( )
{
if ( _sockfd != - 1 )
{
close ( _sockfd) ;
_sockfd = - 1 ;
}
}
bool CreateServer ( const uint16_t & port, const std:: string & ip = "0.0.0.0" , bool block_flag = false )
{
if ( Create ( ) == false )
return false ;
if ( block_flag)
NonBlock ( ) ;
if ( Bind ( ip, port) == false )
return false ;
if ( Listen ( ) == false )
return false ;
ReuseAddress ( ) ;
return true ;
}
bool CreateClient ( const uint16_t & port, const std:: string & ip)
{
if ( Create ( ) == false )
return false ;
if ( Connect ( ip, port) == false )
return false ;
return true ;
}
void ReuseAddress ( )
{
int opt = 1 ;
setsockopt ( _sockfd, SOL_SOCKET, SO_REUSEADDR, ( void * ) & opt, sizeof opt) ;
opt = 1 ;
setsockopt ( _sockfd, SOL_SOCKET, SO_REUSEPORT, ( void * ) & opt, sizeof opt) ;
}
void NonBlock ( )
{
int flag = fcntl ( _sockfd, F_GETFL, 0 ) ;
fcntl ( _sockfd, F_SETFL, flag | O_NONBLOCK) ;
}
} ;
4.Channel模块实现
Channel类设计
目的:对描述符的监控事件管理
1.事件管理:
描述符是否可读
描述符是否可写
对描述符监控可写
对描述符监控可读
解除可写事件监控
解除可读事件监控
2.事件触发后的处理的管理
1.需要处理的事件:可读,可写,挂断,错误,任意
2.事件处理的回调函数
具体实现时的一些细节如下:
通信描述符事件触发后会调用回调函数进行处理,而这个回调函数是由 Connection 模块设置给 Channel 模块的,因为 Connection 是对通信连接进行整体管理的一个模块,Channel 模块只是 Connection 模块的一个子模块。 为了保证线程安全,添加/修改/移除事件监控的操作需要放到 Connection 对象关联的 EventLoop 对应的线程中去执行,同时,对描述监控事件的修改最后也必须通过 Poller 模块中的 epoll 相关函数来完成,而 Poller 模块也是 EventLoop 的一个子模块。
成员:因为后边使用epoll进行事件监控
EPOLLIN 可读
EPOLLOUT 可写
EPOLLRDHUP 连接断开
EPOLLPRI 优先数据
EPOLLERR 出错了
EPOLLHUP 挂断
以上的事件都是使用一个数值uint32_t 进行保存
要进行事件管理,就需要有一个uint32_t 类型的成员保存当前需要监控的事件
事件处理这里,因为有五种事件需要处理,就需要五个回调函数
主要框架:
class Channel
{
private :
uint32_t _events;
uint32_t _revents;
using EventCallback = std:: function< void ( ) > ;
EventCallback _read_callback;
EventCallback _write_callback;
EventCallback _error_callback;
EventCallback _close_callback;
EventCallback _event_callback;
public :
Channel ( ) ;
void SetReadCallback ( const EventCallback& cb) ;
void SetWriteCallback ( const EventCallback& cb) ;
void SetErrorCallback ( const EventCallback& cb) ;
void SetCloseCallback ( const EventCallback& cb) ;
void SetEventCallback ( const EventCallback& cb) ;
bool ReadAble ( ) ;
bool WriteAble ( ) ;
void EnableRead ( ) ;
void EnableWrite ( ) ;
void DisableRead ( ) ;
void DisableWrite ( ) ;
void DisableAll ( ) ;
void Remove ( ) ;
void Update ( ) ;
void HandleEvent ( ) ;
} ;
具体实现:
class Poller ;
class EventLoop ;
class Channel
{
private :
int _fd;
EventLoop * _loop;
uint32_t _events;
uint32_t _revents;
using EventCallback = std:: function< void ( ) > ;
EventCallback _read_callback;
EventCallback _write_callback;
EventCallback _error_callback;
EventCallback _close_callback;
EventCallback _event_callback;
public :
Channel ( EventLoop * loop, int fd) : _fd ( fd) , _events ( 0 ) , _revents ( 0 ) , _loop ( loop) { }
int Fd ( ) { return _fd; }
uint32_t Events ( ) { return _events; }
void SetRevents ( const uint32_t & revents) { _revents = revents; }
void SetReadCallback ( const EventCallback & cb) { _read_callback = cb; }
void SetWriteCallback ( const EventCallback & cb) { _write_callback = cb; }
void SetErrorCallback ( const EventCallback & cb) { _error_callback = cb; }
void SetCloseCallback ( const EventCallback & cb) { _close_callback = cb; }
void SetEventCallback ( const EventCallback & cb) { _event_callback = cb; }
bool ReadAble ( ) { return ( _events & EPOLLIN) ; }
bool WriteAble ( ) { return ( _events & EPOLLOUT) ; }
void EnableRead ( ) { _events |= EPOLLIN; Update ( ) ; }
void EnableWrite ( ) { _events |= EPOLLOUT; Update ( ) ; }
void DisableRead ( ) { _events &= ( ~ EPOLLIN) ; Update ( ) ; }
void DisableWrite ( ) { _events &= ( ~ EPOLLOUT) ; Update ( ) ; }
void DisableAll ( ) { _events = 0 ; Update ( ) ; }
void Remove ( ) ;
void Update ( ) ;
void HandleEvent ( )
{
if ( ( _revents & EPOLLIN) || ( _revents & EPOLLRDHUP) || ( _revents & EPOLLPRI) )
{
if ( _read_callback)
{
_read_callback ( ) ;
}
if ( _event_callback)
{
_event_callback ( ) ;
}
}
if ( _revents & EPOLLOUT)
{
if ( _write_callback)
_write_callback ( ) ;
if ( _event_callback)
_event_callback ( ) ;
}
else if ( _revents & EPOLLERR)
{
if ( _event_callback)
_event_callback ( ) ;
if ( _error_callback)
_error_callback ( ) ;
}
else if ( _revents & EPOLLHUP)
{
if ( _event_callback)
_event_callback ( ) ;
if ( _close_callback)
_close_callback ( ) ;
}
}
} ;
void Channel :: Remove ( ) { return _loop-> RemoveEvent ( this ) ; }
void Channel :: Update ( ) { return _loop-> UpdateEvent ( this ) ; }
3.描述符事件监控Poller类实现
Poller模块:描述符IO事件监控模块
意义:通过epoll实现对描述符的 IO 事件监控
功能:
1.添加/修改描述符的事件监控(不存在则添加,存在则修改)
2.移除描述符的事件监控
封装思想:
1.必须拥有一个epoll的操作句柄
2.拥有一个struct epoll_event 结构数组,监控时保存所有的活跃事件
3.使用hash表管理描述符与描述符对应的事件管理Channel事件
逻辑流程:
1.对描述符进行监控,通过Channel才知道描述符需要监控什么事件
2.当描述符就绪了,通过描述符在hash表中找到对应的Channel(得到了Channel才能知道什么事件如何处理)当描述符就绪了,返回描述符对应的Channel
具体实现时的一些细节如下:
由于描述符需要被监控的事件 _events 以及事件触发后的各种回调函数都保存在 Channel 中,并且就绪事件也需要保存到 Channel 的 _revents 中,因此在 Poller 中我们需要保存描述符与 Channel 的关联关系,这样才能知道要添加哪些事件监控,以及事件就绪后应该如何处理。
# define MAX_EPOLLEVENTS 1024
class Poller
{
private :
int _epfd;
struct epoll_event _evs[ MAX_EPOLLEVENTS] ;
std:: unordered_map< int , Channel* > ;
private :
public :
} ;
主要框架:
# define MAX_EPOLLEVENTS 1024
class Poller
{
private :
int _epfd;
struct epoll_event _evs[ MAX_EPOLLEVENTS] ;
std:: unordered_map< int , Channel* > _channels;
private :
void Update ( Channel* channel, int op) ;
bool HashChannel ( Channel* channel) ;
public :
Poller ( ) ;
void UpdateEvent ( Channel* channel) ;
void RemoveEvent ( Channel* channel) ;
void Poll ( std:: vector< Channel* > active) ;
} ;
具体实现:
# define MAX_EPOLLEVENT 1024
class Poller
{
private :
int _epfd;
struct epoll_event _evs[ MAX_EPOLLEVENT] ;
std:: unordered_map< int , Channel * > _channels;
private :
void Update ( Channel * channel, int op)
{
struct epoll_event ev;
ev. events = channel-> Events ( ) ;
ev. data. fd = channel-> Fd ( ) ;
int ret = epoll_ctl ( _epfd, op, channel-> Fd ( ) , & ev) ;
if ( ret < 0 )
{
LogMessage ( FATAL, "epollctl failed" ) ;
}
return ;
}
bool HasChannel ( Channel * Channel)
{
auto it = _channels. find ( Channel-> Fd ( ) ) ;
return it != _channels. end ( ) ;
}
public :
Poller ( )
{
_epfd = epoll_create ( MAX_EPOLLEVENT) ;
if ( _epfd < 0 )
{
LogMessage ( FATAL, "epoll create failed" ) ;
abort ( ) ;
}
}
void UpdateEvent ( Channel * channel)
{
bool ret = HasChannel ( channel) ;
if ( ret == false )
{
_channels. insert ( std:: make_pair ( channel-> Fd ( ) , channel) ) ;
return Update ( channel, EPOLL_CTL_ADD) ;
}
return Update ( channel, EPOLL_CTL_MOD) ;
}
void RemoveEvent ( Channel * channel)
{
auto it = _channels. find ( channel-> Fd ( ) ) ;
if ( it != _channels. end ( ) )
{
_channels. erase ( it) ;
return Update ( channel, EPOLL_CTL_DEL) ;
}
}
void Poll ( std:: vector< Channel * > * actives)
{
int nfds = epoll_wait ( _epfd, _evs, MAX_EPOLLEVENT, - 1 ) ;
if ( nfds < 0 )
{
if ( errno == EINTR)
{
return ;
}
LogMessage ( FATAL, "epoll wait failed" ) ;
abort ( ) ;
}
for ( int i = 0 ; i < nfds; i++ )
{
auto it = _channels. find ( _evs[ i] . data. fd) ;
assert ( it != _channels. end ( ) ) ;
it-> second-> SetRevents ( _evs[ i] . events) ;
actives-> push_back ( it-> second) ;
}
return ;
}
} ;
4.EventLoop类的实现
eventfd:一种事件通知机制
创建一个描述符用于实现事件通知,eventfd本质在内核中管理的就是一个计数器,创建eventfd就会在内核中创建一个计数器(结构),每当eventfd中写入一个数值–用于表示事件通知次数,可以使用read进行数据的读取,读取到的数据就是通知的次数,假设每次给eventfd中写入一个1,就表示通知了一次,连续写了三次之后,再去read读取出来的数字就是3,读取之后计数清0。
用处:在EventLoop模块中实现线程间事件通知功能
# include
int eventfd ( unsigned int initval, int flags) ;
功能: 创建一个eventfd对象, 实现事件通知
参数:
initval: 计数初值
flags:
EFD_CLOEXEC-- 禁止进程复制
EFD_NONBLCK-- 启动非阻塞属性
返回值: 返回一个文件描述符用于操作
eventdfd也是通过read/ write/ close进行操作
注意: read/ write进行IO的时候数据只能是一个8 字节数据
使用案例:
# include
# include
# include
# include
int main ( )
{
int efd = eventfd ( 0 , EFD_CLOEXEC | EFD_NONBLOCK) ;
if ( efd < 0 )
{
std:: cerr << "eventfd create error" << std:: endl;
}
uint64_t val = 1 ;
write ( efd, & val, sizeof ( val) ) ;
write ( efd, & val, sizeof ( val) ) ;
write ( efd, & val, sizeof ( val) ) ;
uint64_t res;
int ret = read ( efd, & res, sizeof ( res) ) ;
if ( ret < 0 )
{
std:: cerr << "read failed" << std:: endl;
}
std:: cout << res << std:: endl;
return 0 ;
}
EventLoop:进行事件监控以及事件处理的模块
关键点:这个模块与线程是一一对应关联的
监控了一个连接,而这个连接一旦就绪,就要进行事件处理,但是如果这个描述符,在多个线程中都触发了事件,进行处理,就会存在线程安全问题,因此我们需要将一个连接的事件监控,以及连接事件处理,以及其他操作都放在一个线程中进行
如何保证一个连接的所有操作都在EventLoop对应的线程中,解决方案:给EventLoop模块中,添加一个任务队列,对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当做任务添加到任务队列中
EventLoop处理流程:
1.在线程中对描述符进行事件监控
2.有描述符就绪则对描述符进行事件处理(如何保证处理回调函数中的操作都在线程中)
3.所有的就绪事件处理完了,这个时候再去将任务队列中的所有任务一一执行
EventLoop分为epoll和task任务队列。epoll中文件描述符就绪了,进行事件处理–调用回调函数,处理过程中调用了send,而这个send是封装后的send,实际上内部是将数据的发送操作,压入队列,等到所有的就绪事件都处理完了,然后从task任务队列中一 一取出实际要进行操作执行。这样能够保证对于连接的所有操作,都是在一个线程中进行的,不涉及线程安全问题,但是对于任务队列的操作有线程安全问题,只需要给task的操作加上一把锁即可。
数据:
1.事件监控:即Poller模块,有事件就绪则进行事件处理
2.执行任务队列中的任务:一个线程安全的任务队列
注意点:因为有可能因为等待描述符IO事件就绪,导致执行流阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西,能够唤醒事件监控的阻塞
当事件就绪,需要处理的时候,处理过程中,如果对连接要进行某些操作:这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。
1.如果执行的操作本就在线程中,不需要将操作压入队列了,可以直接执行
2.如果执行的操作不在线程中,才需要加入任务池,等待事件处理完了然后执行任务
具体实现时的一些细节如下:
当我们监控了一个客户端连接后,一旦这个连接触发了事件,就需要调用对应的回调函数进行事件处理,而在我们处理事件的过程中如果此连接触发了新的事件,那么新事件的处理就有可能被分配到其他线程中去执行,这样就有可能会导致线程安全问题。 那么我们需要为每一个连接的操作都加一把锁来保证线程安全吗?这样做当然是可以的,但是没必要,因为当我们的连接很多时就需要创建很多的锁,这会造成不必要的资源开销;我们仅需将一个连接的事件监控,连接的事件处理以及连接的所有其他操作都放在同一个线程中去完成即可,即让连接与线程一一对应。 虽然连接无法直接与线程一一对应,但是 EventLoop 模块是与线程是一一对应的,因此我们只需将一个连接与一个 EventLoop 模块相绑定,从而间接完成连接与线程的一一绑定。 但是这样仍不保险,因为组件使用者可能自己设计了任务线程池,再一次对任务进行了分摊,在这种情况下我们并不能保证连接的所有操作都在同一个线程中完成,那么如何保证一个连接的所有操作都必定在 EventLoop 对应的线程中呢? 我们的解决方案是给 EventLoop 模块中添加一个任务队列,对连接的所有操作并不直接执行,而是将其进行一次封装,然后当作任务添加到任务队列中,最后等到连接所有的就绪事件处理完了 (都添加都任务队列中了),再去将任务队列中的所有任务一一执行;此时我们仅需要对这个任务队列加一把锁保证其线程安全即可。 我们举个例子,在一号线程中我们对连接1进行了事件监控,此时连接触发了事件A,事件A在一号线程中被执行,执行过程中触发了事件B,由于一号线程忙碌,因此事件B被分配到二号线程中执行 (假设外部设置了任务线程池),但事件A和事件B其实并没有被真正执行,而是仅仅压入任务队列后就返回了,最后得到所有就绪事件都被压入任务队列后,我们再在一号线程中逐个取出任务队列中的任务执行,从而保证线程安全。 最后,因为有可能因为等待描述符IO事件就绪,导致执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此需要使用 eventfd 来进行事件通知,唤醒事件监控的阻塞。
定时器模块的整合:
timerfd:实现内核每个一段时间,给进程一次超时时间(timerfd可读)
timewheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务,要实现一个完整的秒级定时器,就需要将这两个功能整合到一起。
timerfd设置每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel的runtimertask,执行一下所有的过期定时任务
具体实现时的一些细节如下:
在前面我们学习了 timerfd 的使用以及 timerwheel 的设计思想,而要实现一个完整的秒级定时器,就需要将这两个功能整合到一起: 一方面,我们将 timerfd 的超时时间设置为 1s,这样 timerfd 每秒钟就会触发一次可读事件 (timerfd 可读事件监控可以通过 EventLoop 来实现);另一方面,每当 timerfd 触发可读事件,我们就执行一次 TimerWheel 中的 RunTimerTask 函数,即执行秒针所在位置的所有超时事件。 这样,我们在 TimerWheel 定时器中记录所有的超时事件,然后使用 timerfd 模拟来模拟定时器秒针的移动,从而实现了非活跃连接在 N 秒后释放的功能。
timewheel模块代码
using TaskFunc = std:: function< void ( ) > ;
using ReleaseFunc = std:: function< void ( ) > ;
class TimerTask
{
private :
uint64_t _id;
uint32_t _timeout;
bool _canceled;
TaskFunc _task_cb;
ReleaseFunc _release;
public :
TimerTask ( const uint64_t & id, const uint32_t & delay, const TaskFunc & cb)
: _id ( id) , _timeout ( delay) , _task_cb ( cb) , _canceled ( false ) { }
~ TimerTask ( )
{
if ( _canceled == false )
_task_cb ( ) ;
_release ( ) ;
}
void Canceled ( ) { _canceled = true ; }
uint32_t DelayTime ( ) { return _timeout; }
void SetRelease ( const ReleaseFunc & cb) { _release = cb; }
} ;
using PtrTask = std:: shared_ptr< TimerTask> ;
using WeakTask = std:: weak_ptr< TimerTask> ;
class EventLoop ;
class TimerWheel
{
private :
int _tick;
int _capacity;
std:: vector< std:: vector< PtrTask>> _wheel;
std:: unordered_map< uint64_t , WeakTask> _timers;
EventLoop * _loop;
int _timerfd;
std:: unique_ptr< Channel> _timer_channel;
private :
void RemoveTimer ( const uint64_t & id)
{
auto it = _timers. find ( id) ;
if ( it != _timers. end ( ) )
{
_timers. erase ( it) ;
}
}
static int CreateTimerfd ( )
{
int timerfd = timerfd_create ( CLOCK_MONOTONIC, 0 ) ;
if ( timerfd < 0 )
{
LogMessage ( FATAL, "create timerfd failed" ) ;
abort ( ) ;
}
struct itimerspec ims;
ims. it_value. tv_sec = 1 ;
ims. it_value. tv_nsec = 0 ;
ims. it_interval. tv_sec = 1 ;
ims. it_interval. tv_nsec = 0 ;
int n = timerfd_settime ( timerfd, 0 , & ims, nullptr ) ;
if ( n < 0 )
{
LogMessage ( FATAL, "timerfd settime failed" ) ;
abort ( ) ;
}
return timerfd;
}
int ReadTimerfd ( )
{
uint64_t times;
ssize_t ret = read ( _timerfd, & times, 8 ) ;
if ( ret < 0 )
{
LogMessage ( FATAL, "read timerfd failed" ) ;
abort ( ) ;
}
return times;
}
void RunTimerTask ( )
{
_tick = ( _tick + 1 ) % _capacity;
_wheel[ _tick] . clear ( ) ;
}
void OnTime ( )
{
int times = ReadTimerfd ( ) ;
for ( int i = 0 ; i < times; i++ )
{
RunTimerTask ( ) ;
}
}
void TimerAddInLoop ( const uint64_t & id, const uint32_t & delay, const TaskFunc & cb)
{
PtrTask ptr ( new TimerTask ( id, delay, cb) ) ;
ptr-> SetRelease ( std:: bind ( & TimerWheel:: RemoveTimer, this , id) ) ;
int pos = ( _tick + delay) % _capacity;
_wheel[ pos] . push_back ( ptr) ;
_timers[ id] = WeakTask ( ptr) ;
}
void TimerRefreshInLoop ( const uint64_t & id)
{
auto it = _timers. find ( id) ;
if ( it == _timers. end ( ) )
{
return ;
}
PtrTask ptr = it-> second. lock ( ) ;
int delay = ptr-> DelayTime ( ) ;
int pos = ( _tick + delay) % _capacity;
_wheel[ pos] . push_back ( ptr) ;
}
void TimerCancelInLoop ( const uint64_t & id)
{
auto it = _timers. find ( id) ;
if ( it == _timers. end ( ) )
return ;
PtrTask ptr = it-> second. lock ( ) ;
if ( ptr)
ptr-> Canceled ( ) ;
}
public :
TimerWheel ( EventLoop * loop)
: _tick ( 0 ) , _capacity ( 60 ) , _wheel ( _capacity) ,
_loop ( loop) , _timerfd ( CreateTimerfd ( ) ) ,
_timer_channel ( new Channel ( _loop, _timerfd) )
{
_timer_channel-> SetReadCallback ( std:: bind ( & TimerWheel:: OnTime, this ) ) ;
_timer_channel-> EnableRead ( ) ;
}
void TimerAdd ( const uint64_t & id, const uint32_t & delay, const TaskFunc & cb) ;
void TimerRefresh ( const uint64_t & id) ;
void TimerCancel ( const uint64_t & id) ;
bool HasTimer ( const uint64_t & id)
{
auto it = _timers. find ( id) ;
if ( it == _timers. end ( ) )
return false ;
return true ;
}
} ;
void TimerWheel :: TimerAdd ( const uint64_t & id, const uint32_t & delay, const TaskFunc & cb)
{
return _loop-> RunInLoop ( std:: bind ( & TimerWheel:: TimerAddInLoop, this , id, delay, cb) ) ;
}
void TimerWheel :: TimerRefresh ( const uint64_t & id)
{
return _loop-> RunInLoop ( std:: bind ( & TimerWheel:: TimerRefreshInLoop, this , id) ) ;
}
void TimerWheel :: TimerCancel ( const uint64_t & id)
{
return _loop-> RunInLoop ( std:: bind ( & TimerWheel:: TimerCancelInLoop, this , id) ) ;
}
EventLoop类的实现:
using Functor = std:: function< void ( ) > ;
class EventLoop
{
private :
std:: thread:: id _thread_id;
int _event_fd;
std:: unique_ptr< Channel> _event_channel;
Poller _poller;
std:: vector< Functor> _tasks;
std:: mutex _mutex;
TimerWheel _timer_wheel;
private :
void RunAllTask ( )
{
std:: vector< Functor> functor;
{
std:: unique_lock< std:: mutex> lock ( _mutex) ;
_tasks. swap ( functor) ;
}
for ( auto & func : functor)
{
func ( ) ;
}
return ;
}
static int CreateEventFd ( )
{
int efd = eventfd ( 0 , EFD_CLOEXEC | EFD_NONBLOCK) ;
if ( efd < 0 )
{
LogMessage ( FATAL, "eventfd create failed" ) ;
abort ( ) ;
}
return efd;
}
void ReadEventFd ( )
{
uint64_t res = 0 ;
int ret = read ( _event_fd, & res, sizeof ( res) ) ;
if ( ret <= 0 )
{
if ( errno == EINTR || errno == EAGAIN)
{
return ;
}
LogMessage ( FATAL, "read eventfd failed" ) ;
abort ( ) ;
}
return ;
}
void WeakUpEventFd ( )
{
uint64_t val = 1 ;
int ret = write ( _event_fd, & val, sizeof ( val) ) ;
if ( ret <= 0 )
{
if ( errno == EINTR)
{
return ;
}
LogMessage ( FATAL, "write eventfd failed" ) ;
abort ( ) ;
}
return ;
}
public :
EventLoop ( ) : _thread_id ( std:: this_thread:: get_id ( ) ) ,
_event_fd ( CreateEventFd ( ) ) ,
_event_channel ( new Channel ( this , _event_fd) ) ,
_timer_wheel ( this )
{
_event_channel-> SetReadCallback ( std:: bind ( & EventLoop:: ReadEventFd, this ) ) ;
_event_channel-> EnableRead ( ) ;
}
void Start ( )
{
for ( ; ; )
{
std:: vector< Channel * > actives;
_poller. Poll ( & actives) ;
for ( auto & channel : actives)
{
channel-> HandleEvent ( ) ;
}
RunAllTask ( ) ;
}
}
bool IsInLoop ( )
{
return _thread_id == std:: this_thread:: get_id ( ) ;
}
void AssertInLoop ( )
{
assert ( _thread_id == std:: this_thread:: get_id ( ) ) ;
}
void RunInLoop ( const Functor & cb)
{
if ( IsInLoop ( ) )
{
return cb ( ) ;
}
return QueueInLoop ( cb) ;
}
void QueueInLoop ( const Functor & cb)
{
{
std:: unique_lock< std:: mutex> lock ( _mutex) ;
_tasks. push_back ( cb) ;
}
WeakUpEventFd ( ) ;
}
void UpdateEvent ( Channel * channel)
{
return _poller. UpdateEvent ( channel) ;
}
void RemoveEvent ( Channel * channel)
{
return _poller. RemoveEvent ( channel) ;
}
void TimerAdd ( const uint64_t & id, const uint32_t & delay, const TaskFunc & cb)
{
return _timer_wheel. TimerAdd ( id, delay, cb) ;
}
void TimerRefresh ( const uint64_t & id)
{
return _timer_wheel. TimerRefresh ( id) ;
}
void TimerCancel ( const uint64_t & id)
{
return _timer_wheel. TimerCancel ( id) ;
}
bool HasTimer ( const uint64_t & id)
{
return _timer_wheel. HasTimer ( id) ;
}
} ;
4.通信连接管理Connection类实现
Connection:
目的:对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成
管理: 1.套接字的管理,能够进行套接字的操作
2.连接事件的管理,可读,可写,错误,挂断,任意
3.缓冲区的管理,便于socket数据的接收和发送
4.协议上下文的管理,记录请求数据的处理过程
5.回调函数的管理
因为连接接收到数据之后该如何处理,需要由用户决定,因此必须有业务处理回调函数
一个连接建立成功后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数
一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。
任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数
功能: 1.发送数据—给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控
2关闭连接—给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
3.启动非活跃连接的超时销毁功能
4.取消非活跃连接的超时销毁功能
5.协议切换-一个连接接收数据后如何进行业务处理,取决于上下文,以及数据的业务处理回调函数
Connection模块是对连接的管理模块,对于连接的所有操作都是通过这个模块完成的
场景:对连接进行操作的时候,但是连接已经被释放,导致内存访问错误,最终程序崩溃
解决方案:使用智能指针shared_ptr对Connection对象进行管理,这样就能保证任意一个地方对Connection对象进行操作的时候,保存了一份shared_ptr,因此就算其他地方进行释放操作,也只是对shared_ptr的计数器-1.而不会导致Connection的实际释放
具体实现时的一些细节如下:
Connection 模块是对连接进行全方位管理的一个模块,而管理具体包括套接字的管理 – 使连接能够进行套接字的操作,连接事件的管理 – 包括可读,可写,错误,挂断以及任意事件,缓冲区的管理 – 便于 socket 数据的接收和发送,协议上下文的管理 – 用于记录请求数据的处理过程,以及回调函数的管理 – 提供连接建立完成、接收新数据、连接关闭、任意事件的回调函数设置接口,让组件使用者能够根据需要进行设置。 Connection 模块需要提供数据发送接口,但这并不是真正的发送接口,而只是把数据放到用户态发送缓冲区,然后描述符启动写事件监控,待到 socket 缓冲区可写后再真正发送数据;同样,关闭连接接口也并不是直接关闭连接,而应该在实际释放连接之前,看看输入输出缓冲区中是否有数据待处理,有则处理后再真正关闭连接;最后,一个连接接收到数据后应该如何进行业务处理,取决于上下文以及数据的业务处理回调函数,即上层协议,而切换协议接口的作用就是更改协议对应的上下文以及各种回调函数 (通用容器 Any)。 由于对连接的所有操作都是通过 Connection 模块来完成的,因此可能出现对连接进行某种操作的时候,Connection 对象已经被释放的场景,从而造成内存访问错误,导致程序崩溃 (虽然其他线程中对连接的所有操作都会被放入任务队列中,最后在连接对应的 EventLoop 关联的线程中去执行,但是任务队列中任务的执行也存在先后顺序);因此我们使用 shared_ptr 对 Connection 对象进行管理,然后在任意一个地方对 Connection 对象进行操作的时候都保存一份 shared_ptr,这样就算其他地方进行了释放操作,也只是将 shared_ptr 的计数器 -1,而不会导致 Connection 的实际释放。
class Connection ;
typedef enum
{
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING
} ConnStatu;
using PtrConnection = std:: shared_ptr< Connection> ;
class Connection
{
private :
uint64_t _conn_id;
int _sockfd;
bool _enable_inactive_release;
ConnStatu _statu;
Socket _socket;
Channel _channel;
Buffer _in_buffer;
Buffer _out_buffer;
Any _context;
using ConnectedCallback = std:: function< void ( const PtrConnection & ) > ;
using MessageCallback = std:: function< void ( const PtrConnection & , Buffer * ) > ;
using ClosedCallback = std:: function< void ( const PtrConnection & ) > ;
using AnyEventCallback = std:: function< void ( const PtrConnection & ) > ;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
ClosedCallback _server_closed_callback;
private :
void HandleRead ( ) ;
void HandleWrite ( ) ;
void HandleClose ( ) ;
void HandleError ( ) ;
void HandleEvent ( ) ;
void EstableishdInLoop ( ) ;
void ReleaseInLoop ( ) ;
void SendInLoop ( const char * data, int len) ;
void ShutDownInLoop ( ) ;
void EnableInactiveReleaseInLoop ( int sec) ;
void CancelInactiveReleaseInLoop ( ) ;
void UpgrateInLoop ( const Any & context,
const ConnectedCallback & conn,
const MessageCallback & msg,
const ClosedCallback & closed,
const AnyEventCallback & event) ;
public :
Connection ( EventLoop * loop, const uint64_t & conn_id, int sockfd) ;
~ Connection ( ) ;
int Fd ( ) ;
int Id ( ) ;
bool Connected ( ) ;
void SetContext ( const Any & context) ;
Any * GetContext ( ) ;
void SetConnextedCallback ( const ConnectedCallback & cb) ;
void SetMessageCallback ( const MessageCallback & cb) ;
void SetClosedCallback ( const ClosedCallback & cb) ;
void SetAnyEventedCallback ( const AnyEventCallback & cb) ;
void Established ( ) ;
void Send ( const char * data, size_t len) ;
void Shutdown ( ) ;
void EnableInactiveRelease ( int sec) ;
void CancleInactiveRelease ( int sec) ;
void Update ( const Any & context, const ConnectedCallback & conn, const MessageCallback & msg,
const ClosedCallback & closed, const AnyEventCallback & event) ;
} ;
完整代码:
class Connection ;
typedef enum
{
DISCONNECTED,
CONNECTING,
CONNECTED,
DISCONNECTING
} ConnStatu;
using PtrConnection = std:: shared_ptr< Connection> ;
class Connection : public std:: enable_shared_from_this < Connection >
{
private :
uint64_t _conn_id;
int _sockfd;
bool _enable_inactive_release;
EventLoop * _loop;
ConnStatu _statu;
Socket _socket;
Channel _channel;
Buffer _in_buffer;
Buffer _out_buffer;
Any _context;
using ConnectedCallback = std:: function< void ( const PtrConnection & ) > ;
using MessageCallback = std:: function< void ( const PtrConnection & , Buffer * ) > ;
using ClosedCallback = std:: function< void ( const PtrConnection & ) > ;
using AnyEventCallback = std:: function< void ( const PtrConnection & ) > ;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
ClosedCallback _server_closed_callback;
private :
void HandleRead ( )
{
char buffer[ 65536 ] ;
ssize_t ret = _socket. NonBlockRecv ( buffer, 65535 ) ;
if ( ret < 0 )
{
return ShutDownInLoop ( ) ;
}
else if ( ret == 0 )
{
return ;
}
_in_buffer. WriteAndPush ( buffer, ret) ;
if ( _in_buffer. ReadAbleSize ( ) > 0 )
{
return _message_callback ( shared_from_this ( ) , & _in_buffer) ;
}
}
void HandleWrite ( )
{
ssize_t ret = _socket. NonBlockSend ( _out_buffer. ReadPosition ( ) , _out_buffer. ReadAbleSize ( ) ) ;
if ( ret < 0 )
{
if ( _in_buffer. ReadAbleSize ( ) > 0 )
{
_message_callback ( shared_from_this ( ) , & _in_buffer) ;
}
return Release ( ) ;
}
_out_buffer. MoveReadOffset ( ret) ;
if ( _out_buffer. ReadAbleSize ( ) == 0 )
{
_channel. DisableWrite ( ) ;
if ( _statu == DISCONNECTING)
return Release ( ) ;
}
return ;
}
void HandleClose ( )
{
if ( _in_buffer. ReadAbleSize ( ) > 0 )
{
_message_callback ( shared_from_this ( ) , & _in_buffer) ;
}
return Release ( ) ;
}
void HandleError ( )
{
return HandleClose ( ) ;
}
void HandleEvent ( )
{
if ( _enable_inactive_release == true )
{
_loop-> TimerRefresh ( _conn_id) ;
}
if ( _event_callback)
{
_event_callback ( shared_from_this ( ) ) ;
}
}
void EstableishdInLoop ( )
{
assert ( _statu == CONNECTING) ;
_statu = CONNECTED;
_channel. EnableRead ( ) ;
if ( _connected_callback)
_connected_callback ( shared_from_this ( ) ) ;
}
void ReleaseInLoop ( )
{
_statu = DISCONNECTED;
_channel. Remove ( ) ;
_socket. Close ( ) ;
if ( _loop-> HasTimer ( _conn_id) )
CancelInactiveReleaseInLoop ( ) ;
if ( _closed_callback)
_closed_callback ( shared_from_this ( ) ) ;
if ( _server_closed_callback)
_server_closed_callback ( shared_from_this ( ) ) ;
}
void SendInLoop ( Buffer & buffer)
{
if ( _statu == DISCONNECTED)
return ;
_out_buffer. WriteBufferAndPush ( buffer) ;
if ( _channel. WriteAble ( ) == false )
{
_channel. EnableWrite ( ) ;
}
}
void ShutDownInLoop ( )
{
_statu == DISCONNECTING;
if ( _in_buffer. ReadAbleSize ( ) > 0 )
{
if ( _message_callback)
_message_callback ( shared_from_this ( ) , & _in_buffer) ;
}
if ( _out_buffer. ReadAbleSize ( ) > 0 )
{
if ( _channel. WriteAble ( ) == false )
{
_channel. EnableWrite ( ) ;
}
}
if ( _out_buffer. ReadAbleSize ( ) == 0 )
{
Release ( ) ;
}
}
void EnableInactiveReleaseInLoop ( int sec)
{
_enable_inactive_release = true ;
if ( _loop-> HasTimer ( _conn_id) )
{
return _loop-> TimerRefresh ( sec) ;
}
_loop-> TimerAdd ( _conn_id, sec, std:: bind ( & Connection:: Release, this ) ) ;
}
void CancelInactiveReleaseInLoop ( )
{
_enable_inactive_release = false ;
if ( _loop-> HasTimer ( _conn_id) )
{
_loop-> TimerCancel ( _conn_id) ;
}
}
void UpgrateInLoop ( const Any & context,
const ConnectedCallback & conn,
const MessageCallback & msg,
const ClosedCallback & closed,
const AnyEventCallback & event)
{
_context = context;
_connected_callback = conn;
_message_callback = msg;
_closed_callback = closed;
_event_callback = event;
}
public :
Connection ( EventLoop * loop, const uint64_t & conn_id, int sockfd)
: _conn_id ( conn_id) , _sockfd ( sockfd) , _enable_inactive_release ( false ) , _loop ( loop) ,
_statu ( CONNECTING) , _socket ( sockfd) , _channel ( loop, _sockfd)
{
_channel. SetCloseCallback ( std:: bind ( & Connection:: HandleClose, this ) ) ;
_channel. SetReadCallback ( std:: bind ( & Connection:: HandleRead, this ) ) ;
_channel. SetWriteCallback ( std:: bind ( & Connection:: HandleWrite, this ) ) ;
_channel. SetErrorCallback ( std:: bind ( & Connection:: HandleError, this ) ) ;
_channel. SetEventCallback ( std:: bind ( & Connection:: HandleEvent, this ) ) ;
}
~ Connection ( )
{
LogMessage ( NORMAL, "release connection: %p" , this ) ;
}
int Fd ( ) { return _sockfd; }
int Id ( ) { return _conn_id; }
bool Connected ( ) { return _statu == CONNECTED; }
void SetContext ( const Any & context) { _context = context; }
Any * GetContext ( ) { return & _context; }
void SetConnectedCallback ( const ConnectedCallback & cb) { _connected_callback = cb; }
void SetMessageCallback ( const MessageCallback & cb) { _message_callback = cb; }
void SetClosedCallback ( const ClosedCallback & cb) { _closed_callback = cb; }
void SetAnyEventedCallback ( const AnyEventCallback & cb) { _event_callback = cb; }
void SetSrvClosedCallback ( const ClosedCallback & cb) { _server_closed_callback = cb; }
void Established ( )
{
return _loop-> RunInLoop ( std:: bind ( & Connection:: EstableishdInLoop, this ) ) ;
}
void Send ( const char * data, size_t len)
{
Buffer buffer;
buffer. WriteAndPush ( data, len) ;
return _loop-> RunInLoop ( std:: bind ( & Connection:: SendInLoop, this , std:: move ( buffer) ) ) ;
}
void Shutdown ( )
{
return _loop-> RunInLoop ( std:: bind ( & Connection:: ShutDownInLoop, this ) ) ;
}
void Release ( )
{
_loop-> QueueInLoop ( std:: bind ( & Connection:: ReleaseInLoop, this ) ) ;
}
void EnableInactiveRelease ( int sec)
{
return _loop-> RunInLoop ( std:: bind ( & Connection:: EnableInactiveReleaseInLoop, this , sec) ) ;
}
void CancleInactiveRelease ( )
{
return _loop-> RunInLoop ( std:: bind ( & Connection:: CancelInactiveReleaseInLoop, this ) ) ;
}
void Update ( const Any & context, const ConnectedCallback & conn, const MessageCallback & msg,
const ClosedCallback & closed, const AnyEventCallback & event)
{
_loop-> AssertInLoop ( ) ;
_loop-> RunInLoop ( std:: bind ( & Connection:: UpgrateInLoop, this , context, conn, msg, closed, event) ) ;
}
} ;
5.监听描述符管理Acceptor类实现
Acceptor模块:对监听套接字进行管理
1.创建一个监听套接字
2.启动读事件监控
3.事件触发后,获取新连接
4.调用新连接获取成功之后的回调函数
为新连接创建Connection进行管理(这一步不是Acceptor模块的操作,应该是服务器模块)
因为Acceptor模块只进行监听连接的管理,因此获取新连接的描述符之后,对于新连接描述符如何处理其实并不关心
对于新连接如何处理,应该服务器模块来管理的
服务器模块,实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数
实现时的一些细节如下:
由于 Acceptor 仅对监听套接字进行管理,所以它的设计流程很简单:
创建一个监听套接字用于监听客户端连接。 启动监听套接字的可读事件监控。 当可读事件触发后获取客户端新连接。 调用新连接获取成功后的回调函数,为新连接创建 Connection 对象进行管理。 需要注意的是,服务器监听到一个新的客户端连接后,应该为新连接创建 Connection 对象,但由于 Acceptor 模块只对监听套接字进行管理,所以获取到新的客户端连接后需要由服务器模块对其进行处理,比如为其创建 Connection 对象,设置各种回调函数,因此 Acceptor 模块中仅有一个服务器模块设置的获取到新连接后的回调函数。
完整代码:
class Acceptor
{
private :
Socket _socket;
EventLoop * _loop;
Channel _channel;
using AcceptCallback = std:: function< void ( int ) > ;
AcceptCallback _accept_callback;
private :
int CreateServer ( const uint16_t & port)
{
bool ret = _socket. CreateServer ( port) ;
assert ( ret == true ) ;
return _socket. Fd ( ) ;
}
void HandleRead ( )
{
int newfd = _socket. Accept ( ) ;
if ( newfd < 0 )
{
LogMessage ( FATAL, "accept new fd failed" ) ;
return ;
}
if ( _accept_callback)
_accept_callback ( newfd) ;
}
public :
Acceptor ( EventLoop * loop, const uint16_t & port)
: _socket ( CreateServer ( port) ) , _loop ( loop) ,
_channel ( _loop, _socket. Fd ( ) )
{
_channel. SetReadCallback ( std:: bind ( & Acceptor:: HandleRead, this ) ) ;
}
void SetAcceptCallback ( const AcceptCallback & cb)
{
_accept_callback = cb;
}
void Listen ( )
{
_channel. EnableRead ( ) ;
}
} ;
6.LoopThread类的实现
目标:将EventLoop模块与线程整合起来
EventLoop模块与线程是一一对应的
EventLoop模块实例化的对象,在构造的时候就会初始化_thread_id
而后边当运行一个操作的时候判断当前是否运行EventLoop模块对应的线程中,就是将线程ID与EventLoop中的_thread_id进行一个比较,相同就表示同一个线程,不同就表示当前运行的并不是EventLoop线程
含义:EventLoop模块在实例化对象的时候,必须在线程的内部
EventLoop实例化对象时会设置自己的_thread_id
如果我们先创建了多个EventLoop对象,再设置新的_thread_id期间是不可控的
因此我们必须先创建线程,然后在线程的入口函数中,去实例化EventLoop对象
构造一个新的模块:LoopThread
这个模块的功能:将EventLoop与Thread整合到一起
思想:
1.创建线程
2.在线程中实例化EventLoop对象
功能:可以向外部返回实例化的EventLoop
我们上面在设计 EventLoop 模块时提到 EventLoop 模块与线程是一一对应的,并且由于 EventLoop 模块在构造时就会使用当前线程 id 来作为 EventLoop 对象所关联的线程的 id – _thread_id(std::this_thread::get_id());同时,我们后面在运行一个操作的时候判断当前是否运行在 EventLoop 模块对应的线程中,就是将线程 ID 与 EventLoop 模块中的 _thread id 进行比较,相同表示在同一个线程,不同则表示当前运行线程并不是 EventLoop 线程。
因此,EventLoop 模块必须在线程内部实例化,即先为 EventLoop 对象创建一个线程,然后在该线程的入口函数中去实例化 EventLoop 对象,这样该线程就会与 EventLoop 对象相关联 (实例化时该线程 id 被用于初始化 EventLoop 对象的 _thread_id)。
需要注意的是,我们不能事先创建多个 EventLoop 对象,然后创建多个线程,最后将各个线程的 id 重新赋值给 EventLoop 进行关联,因为这样在构造 EventLoop 对象到设置新的 _thread_id 期间,EventLoop 产生的操作将是不可控的。
基于以上思想,我们需要构建一个 LoopThread 模块,这个模块的功能是将 EventLoop 与 thread 整合到一起,向外部返回所实例化的 EventLoop 对象。
代码实现:
class LoopThread
{
private :
std:: mutex _mutex;
std:: condition_variable _cond;
EventLoop * _loop;
std:: thread _thread;
private :
void ThreadEntry ( )
{
EventLoop loop;
{
std:: unique_lock< std:: mutex> lock ( _mutex) ;
_loop = & loop;
_cond. notify_all ( ) ;
}
loop. Start ( ) ;
}
public :
LoopThread ( ) : _thread ( std:: thread ( & LoopThread:: ThreadEntry, this ) ) , _loop ( nullptr ) { }
EventLoop * GetEventLoop ( )
{
EventLoop * loop = nullptr ;
{
std:: unique_lock< std:: mutex> lock ( _mutex) ;
_cond. wait ( lock, [ & ] ( )
{ return _loop != nullptr ; } ) ;
loop = _loop;
}
return loop;
}
} ;
8.LoopThreadPool类的实现
LoopThreadPool针对LoopThread设计一个线程池
LoopThreadPool模块:对所有的LoopThread进行管理和分配
功能:
1.线程数量可配置(0个或多个)
注意事项:在服务器中,主从Reactor模型是主线程只负责新连接获取,从属Reactor负责线连接的事件监控及业务处理,因此当前的线程池,有可能从属线程数量为0,也就是实现单Reactor服务器,一个线程既负责获取新连接,也负责连接到处理
2.对所有的线程进行管理,其实就是管理0个或多个LoopThread对象
3.提供线程分配的功能:
当主线程获取了一个新连接,需要将新连接挂到从属线程进行事件监控及处理
假设有0个从属线程,则直接分配给主线程的EventLoop,进行处理。
假设有多个从属线程,则采用RR轮转思想,进行线程的分配(将对应的EventLoop获取到,设置对应的Connection)
我们上面针对 EventLoop 设计 LoopThread 模块,由于客户端连接有多个,而每一个客户端连接都对应一个 Connection 模块、EventLoop 模块以及 LoopThread 模块,因此我们需要针对 LoopThread 设计一个线程池 – LoopThreadPool,用于对所有的 LoopThread 进行管理及分配。 LoopThreadPool 模块所要完成的功能如下:
线程数量可配置 (0个或多个)。
需要注意的是,在服务器中,由于主从 Reactor 模型是主线程只负责新连接获取,从属线程负责新连接的事件监控及处理,因此当前的线程池中从属线程的数量有可能会为0,也就是实现单 Reactor 服务器,仅有一个线程,其即负责获取连接,也负责连接的处理。
对所有的线程进行管理 – 管理0个或多个 LoopThread 对象.
提供线程分配的功能 – 当主线程获取了一个新连接时,将新连接挂到从属线程上进行事件监控及处理。
假设有0个从属线程,则直接分配给主线程的 EventLoop 进行处理;假设有多个从属线程,则采用 RR 轮转思想,进行线程的分配 (将被选择线程的 EventLoop 对象获取到,然后设置给对应的 Connection 对象)
class LoopThreadPool
{
private :
int _thread_count;
int _next_loop_id;
EventLoop* _baseloop;
std:: vector< LoopThread* > _threads;
std:: vector< EventLoop* > _loops;
public :
LoopThreadPool ( ) ;
void SetThreadCount ( int count) ;
void Create ( ) ;
EventLoop* GetNextLoop ( ) ;
} ;
完整代码:
class LoopThreadPool
{
private :
int _thread_count;
int _next_idx;
EventLoop * _base_loop;
std:: vector< LoopThread * > _threads;
std:: vector< EventLoop * > _loops;
public :
LoopThreadPool ( EventLoop * baesloop)
: _thread_count ( 0 ) , _next_idx ( 0 ) , _base_loop ( baesloop) { }
void SetThreadCount ( const int & count) { _thread_count = count; }
void Create ( )
{
if ( _thread_count > 0 )
{
_threads. resize ( _thread_count) ;
_loops. resize ( _thread_count) ;
for ( int i = 0 ; i < _thread_count; i++ )
{
_threads[ i] = new LoopThread ( ) ;
_loops[ i] = _threads[ i] -> GetEventLoop ( ) ;
}
}
}
EventLoop * GetNextLoop ( )
{
if ( _thread_count == 0 )
{
return _base_loop;
}
_next_idx = ( _next_idx + 1 ) % _thread_count;
return _loops[ _next_idx] ;
}
} ;
9.服务器TcpServer类实现
TcpServer模块:对于所有模块的整合,通过TcpServer模块实例化的对象,可以非常简单的实现一个服务器的搭建
管理:
1.Acceptor对象,创建一个监听套接字
2.EventLoop对象,baseloop对象,实现对监听套接字的事件监控
3.std::unordered_map _conns;实现对所有新建连接的管理
4.LoopThreadLoop对象,创建loop线程池,对新建连接进行事件监控及处理
功能:
1.设置从属线程池数量
2.启动服务器
3.设置各种回调函数(连接建立完成,消息,关闭,任意),用户设置给TcpServer,TcpServer设置给获取的新连接
4.是否启动非活跃连接超时销毁功能
5.添加定时任务功能
流程:
1.在TcpServer中实例化一个Acceptot对象,以及一个EventLoop对象(baseloop)
2.将Acceptor挂到baseloop上进行事件监控
3.一旦Acceptor对象就绪了可读事件,则执行可读事件回调函数获取新建连接
4.对新连接,创建一个Connection进行管理
5.对连接对应的Connection设置功能回调(连接完成回调,消息回调,关闭回调,任意事件回调)
6.启动Connection的非活跃连接的超时销毁规则
7.将新连接对应的Connection挂到LoopThreadPool中的从属线程对应的EventLoop中进行事件监控
8.一旦Connection对应的连接就绪了可读事件,则这时候执行事件回调函数,读取数据,读取完毕后调用TcpServer设置的消息回调函数
完整代码:
class TcpServer
{
private :
uint64_t _next_id;
uint16_t _port;
int _timeout;
bool _enable_inactive_release;
EventLoop _baseloop;
Acceptor _acceptor;
LoopThreadPool _pool;
std:: unordered_map< uint64_t , PtrConnection> _conns;
using ConnectedCallback = std:: function< void ( const PtrConnection & ) > ;
using MessageCallback = std:: function< void ( const PtrConnection & , Buffer * ) > ;
using ClosedCallback = std:: function< void ( const PtrConnection & ) > ;
using AnyEventCallback = std:: function< void ( const PtrConnection & ) > ;
using Functor = std:: function< void ( ) > ;
ConnectedCallback _connected_callback;
MessageCallback _message_callback;
ClosedCallback _closed_callback;
AnyEventCallback _event_callback;
private :
void RunAfterInLoop ( const Functor & task, int delay)
{
_next_id++ ;
_baseloop. TimerAdd ( _next_id, delay, task) ;
}
void NewConnection ( int fd)
{
_next_id++ ;
PtrConnection conn ( new Connection ( _pool. GetNextLoop ( ) , _next_id, fd) ) ;
conn-> SetConnectedCallback ( _connected_callback) ;
conn-> SetClosedCallback ( _closed_callback) ;
conn-> SetMessageCallback ( _message_callback) ;
conn-> SetAnyEventedCallback ( _event_callback) ;
conn-> SetSrvClosedCallback ( std:: bind ( & TcpServer:: RemoveConnection, this , std:: placeholders:: _1) ) ;
if ( _enable_inactive_release)
conn-> EnableInactiveRelease ( _timeout) ;
conn-> Established ( ) ;
_conns. insert ( std:: make_pair ( _next_id, conn) ) ;
}
void RemoveConnectionInLoop ( const PtrConnection & conn)
{
int id = conn-> Id ( ) ;
auto it = _conns. find ( id) ;
if ( it != _conns. end ( ) )
{
_conns. erase ( id) ;
}
}
void RemoveConnection ( const PtrConnection & conn)
{
_baseloop. RunInLoop ( std:: bind ( & TcpServer:: RemoveConnectionInLoop, this , conn) ) ;
}
public :
TcpServer ( const uint16_t & port)
: _next_id ( 0 ) ,
_port ( port) ,
_enable_inactive_release ( false ) ,
_acceptor ( & _baseloop, port) ,
_pool ( & _baseloop)
{
_acceptor. SetAcceptCallback ( std:: bind ( & TcpServer:: NewConnection, this , std:: placeholders:: _1) ) ;
_acceptor. Listen ( ) ;
}
void SetThreadCount ( int count) { _pool. SetThreadCount ( count) ; }
void SetConnectedCallback ( const ConnectedCallback & cb) { _connected_callback = cb; }
void SetMessageCallback ( const MessageCallback & cb) { _message_callback = cb; }
void SetClosedCallback ( const ClosedCallback & cb) { _closed_callback = cb; }
void SetAnyEventCallback ( const AnyEventCallback & cb) { _event_callback = cb; }
void EnableInactiveRelease ( int timeout)
{
_enable_inactive_release = true ;
_timeout = timeout;
}
void RunAfter ( const Functor & task, int delay)
{
_baseloop. RunInLoop ( std:: bind ( & TcpServer:: RunAfterInLoop, this , task, delay) ) ;
}
void Start ( )
{
_pool. Create ( ) ;
_baseloop. Start ( ) ;
}
} ;
10.基于TcpServer实现回显服务器
EchoServer.hpp
# include "../server.hpp"
class EchoServer
{
private :
TcpServer _server;
private :
void OnConnected ( const PtrConnection & conn)
{
DLOG ( "NEW CONNECTION: %p" , conn. get ( ) ) ;
}
void OnClosed ( const PtrConnection & conn)
{
DLOG ( "CLOSE CONNECTION: %p" , conn. get ( ) ) ;
}
void OnMessage ( const PtrConnection & conn, Buffer * buf)
{
conn-> Send ( buf-> ReadPosition ( ) , buf-> ReadAbleSize ( ) ) ;
buf-> MoveReadOffset ( buf-> ReadAbleSize ( ) ) ;
conn-> Shutdown ( ) ;
}
public :
EchoServer ( const uint16_t & port) : _server ( port)
{
_server. SetThreadCount ( 2 ) ;
_server. EnableInactiveRelease ( 10 ) ;
_server. SetConnectedCallback ( std:: bind ( & EchoServer:: OnConnected, this , std:: placeholders:: _1) ) ;
_server. SetClosedCallback ( std:: bind ( & EchoServer:: OnClosed, this , std:: placeholders:: _1) ) ;
_server. SetMessageCallback ( std:: bind ( & EchoServer:: OnMessage, this , std:: placeholders:: _1, std:: placeholders:: _2) ) ;
}
void Start ( )
{
_server. Start ( ) ;
}
} ;
tcpClient.cc
# include "../source/server.hpp"
int main ( )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
for ( int i = 0 ; i < 5 ; i++ )
{
std:: string str = "hello server" ;
clientsock. Send ( str. c_str ( ) , str. size ( ) ) ;
char buffer[ 1024 ] ;
ssize_t n = clientsock. Recv ( buffer, sizeof ( buffer) - 1 ) ;
buffer[ n] = 0 ;
LogMessage ( NORMAL, "[echo]# %s" , buffer) ;
sleep ( 1 ) ;
}
while ( true )
sleep ( 1 ) ;
clientsock. Close ( ) ;
return 0 ;
}
main.cc
# include "echo.hpp"
int main ( )
{
EchoServer server ( 8080 ) ;
server. Start ( ) ;
return 0 ;
}
七、HTTP协议模块
1.Util工具类实现
在Util工具类中主要实现一下的功能:
1.读取文件内容
2.向文件写入数据
3.URL编码
在RFC3986文档中规定的URL绝对不编码字符:. - _ ~以及数字和字母
还有一个就是在不同的一些标准中的特殊处理
W3C标准中规定param中的空格必须被编码为+
REF 2396中规定URL中保留字符需要转换为%HH格式
4.URL解码
5.响应状态码描述的获取
6.根据文件后缀名获取文件mime
7.判断一个文件是否是目录
8.判断一个文件是否是一个普通的文件
9.http请求的资源路径有效性判断
代码实现:
std:: unordered_map< int , std:: string> _statu_msg = {
{ 100 , "Continue" } ,
{ 101 , "Switching Protocol" } ,
{ 102 , "Processing" } ,
{ 103 , "Early Hints" } ,
{ 200 , "OK" } ,
{ 201 , "Created" } ,
{ 202 , "Accepted" } ,
{ 203 , "Non-Authoritative Information" } ,
{ 204 , "No Content" } ,
{ 205 , "Reset Content" } ,
{ 206 , "Partial Content" } ,
{ 207 , "Multi-Status" } ,
{ 208 , "Already Reported" } ,
{ 226 , "IM Used" } ,
{ 300 , "Multiple Choice" } ,
{ 301 , "Moved Permanently" } ,
{ 302 , "Found" } ,
{ 303 , "See Other" } ,
{ 304 , "Not Modified" } ,
{ 305 , "Use Proxy" } ,
{ 306 , "unused" } ,
{ 307 , "Temporary Redirect" } ,
{ 308 , "Permanent Redirect" } ,
{ 400 , "Bad Request" } ,
{ 401 , "Unauthorized" } ,
{ 402 , "Payment Required" } ,
{ 403 , "Forbidden" } ,
{ 404 , "Not Found" } ,
{ 405 , "Method Not Allowed" } ,
{ 406 , "Not Acceptable" } ,
{ 407 , "Proxy Authentication Required" } ,
{ 408 , "Request Timeout" } ,
{ 409 , "Conflict" } ,
{ 410 , "Gone" } ,
{ 411 , "Length Required" } ,
{ 412 , "Precondition Failed" } ,
{ 413 , "Payload Too Large" } ,
{ 414 , "URI Too Long" } ,
{ 415 , "Unsupported Media Type" } ,
{ 416 , "Range Not Satisfiable" } ,
{ 417 , "Expectation Failed" } ,
{ 418 , "I'm a teapot" } ,
{ 421 , "Misdirected Request" } ,
{ 422 , "Unprocessable Entity" } ,
{ 423 , "Locked" } ,
{ 424 , "Failed Dependency" } ,
{ 425 , "Too Early" } ,
{ 426 , "Upgrade Required" } ,
{ 428 , "Precondition Required" } ,
{ 429 , "Too Many Requests" } ,
{ 431 , "Request Header Fields Too Large" } ,
{ 451 , "Unavailable For Legal Reasons" } ,
{ 501 , "Not Implemented" } ,
{ 502 , "Bad Gateway" } ,
{ 503 , "Service Unavailable" } ,
{ 504 , "Gateway Timeout" } ,
{ 505 , "HTTP Version Not Supported" } ,
{ 506 , "Variant Also Negotiates" } ,
{ 507 , "Insufficient Storage" } ,
{ 508 , "Loop Detected" } ,
{ 510 , "Not Extended" } ,
{ 511 , "Network Authentication Required" } } ;
std:: unordered_map< std:: string, std:: string> _mime_msg = {
{ ".aac" , "audio/aac" } ,
{ ".abw" , "application/x-abiword" } ,
{ ".arc" , "application/x-freearc" } ,
{ ".avi" , "video/x-msvideo" } ,
{ ".azw" , "application/vnd.amazon.ebook" } ,
{ ".bin" , "application/octet-stream" } ,
{ ".bmp" , "image/bmp" } ,
{ ".bz" , "application/x-bzip" } ,
{ ".bz2" , "application/x-bzip2" } ,
{ ".csh" , "application/x-csh" } ,
{ ".css" , "text/css" } ,
{ ".csv" , "text/csv" } ,
{ ".doc" , "application/msword" } ,
{ ".docx" , "application/vnd.openxmlformats-officedocument.wordprocessingml.document" } ,
{ ".eot" , "application/vnd.ms-fontobject" } ,
{ ".epub" , "application/epub+zip" } ,
{ ".gif" , "image/gif" } ,
{ ".htm" , "text/html" } ,
{ ".html" , "text/html" } ,
{ ".ico" , "image/vnd.microsoft.icon" } ,
{ ".ics" , "text/calendar" } ,
{ ".jar" , "application/java-archive" } ,
{ ".jpeg" , "image/jpeg" } ,
{ ".jpg" , "image/jpeg" } ,
{ ".js" , "text/javascript" } ,
{ ".json" , "application/json" } ,
{ ".jsonld" , "application/ld+json" } ,
{ ".mid" , "audio/midi" } ,
{ ".midi" , "audio/x-midi" } ,
{ ".mjs" , "text/javascript" } ,
{ ".mp3" , "audio/mpeg" } ,
{ ".mpeg" , "video/mpeg" } ,
{ ".mpkg" , "application/vnd.apple.installer+xml" } ,
{ ".odp" , "application/vnd.oasis.opendocument.presentation" } ,
{ ".ods" , "application/vnd.oasis.opendocument.spreadsheet" } ,
{ ".odt" , "application/vnd.oasis.opendocument.text" } ,
{ ".oga" , "audio/ogg" } ,
{ ".ogv" , "video/ogg" } ,
{ ".ogx" , "application/ogg" } ,
{ ".otf" , "font/otf" } ,
{ ".png" , "image/png" } ,
{ ".pdf" , "application/pdf" } ,
{ ".ppt" , "application/vnd.ms-powerpoint" } ,
{ ".pptx" , "application/vnd.openxmlformats-officedocument.presentationml.presentation" } ,
{ ".rar" , "application/x-rar-compressed" } ,
{ ".rtf" , "application/rtf" } ,
{ ".sh" , "application/x-sh" } ,
{ ".svg" , "image/svg+xml" } ,
{ ".swf" , "application/x-shockwave-flash" } ,
{ ".tar" , "application/x-tar" } ,
{ ".tif" , "image/tiff" } ,
{ ".tiff" , "image/tiff" } ,
{ ".ttf" , "font/ttf" } ,
{ ".txt" , "text/plain" } ,
{ ".vsd" , "application/vnd.visio" } ,
{ ".wav" , "audio/wav" } ,
{ ".weba" , "audio/webm" } ,
{ ".webm" , "video/webm" } ,
{ ".webp" , "image/webp" } ,
{ ".woff" , "font/woff" } ,
{ ".woff2" , "font/woff2" } ,
{ ".xhtml" , "application/xhtml+xml" } ,
{ ".xls" , "application/vnd.ms-excel" } ,
{ ".xlsx" , "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet" } ,
{ ".xml" , "application/xml" } ,
{ ".xul" , "application/vnd.mozilla.xul+xml" } ,
{ ".zip" , "application/zip" } ,
{ ".3gp" , "video/3gpp" } ,
{ ".3g2" , "video/3gpp2" } ,
{ ".7z" , "application/x-7z-compressed" } } ;
class Util
{
public :
static size_t Split ( const std:: string & src, const std:: string & sep, std:: vector< std:: string> * array)
{
size_t offset = 0 ;
while ( offset < src. size ( ) )
{
size_t pos = src. find ( sep, offset) ;
if ( pos == std:: string:: npos)
{
array-> push_back ( src. substr ( offset) ) ;
return array-> size ( ) ;
}
if ( offset == pos)
{
offset = pos + sep. size ( ) ;
continue ;
}
array-> push_back ( src. substr ( offset, pos - offset) ) ;
offset = pos + sep. size ( ) ;
}
return array-> size ( ) ;
}
static bool ReadFile ( const std:: string & filename, std:: string * buf)
{
std:: ifstream ifs ( filename, std:: ios:: binary) ;
if ( ifs. is_open ( ) == false )
{
LogMessage ( FATAL, "%s open failed" , filename. c_str ( ) ) ;
return false ;
}
size_t fsize = 0 ;
ifs. seekg ( 0 , ifs. end) ;
fsize = ifs. tellg ( ) ;
ifs. seekg ( 0 , ifs. beg) ;
buf-> resize ( fsize) ;
ifs. read ( & ( * buf) [ 0 ] , fsize) ;
if ( ifs. good ( ) == false )
{
LogMessage ( FATAL, "%s read failed" , filename. c_str ( ) ) ;
ifs. close ( ) ;
return false ;
}
ifs. close ( ) ;
return true ;
}
static bool WriteFile ( const std:: string & filename, const std:: string & buf)
{
std:: ofstream ofs ( filename, std:: ios:: binary | std:: ios:: trunc) ;
if ( ofs. is_open ( ) == false )
{
LogMessage ( FATAL, "%s open failed" , filename. c_str ( ) ) ;
return false ;
}
ofs. write ( buf. c_str ( ) , buf. size ( ) ) ;
if ( ofs. good ( ) == false )
{
LogMessage ( FATAL, "%s write failed" , filename. c_str ( ) ) ;
ofs. close ( ) ;
return false ;
}
ofs. close ( ) ;
return true ;
}
static std:: string UrlEncode ( const std:: string & url, bool convert_space_to_plus = false )
{
std:: string res;
for ( auto ch : url)
{
if ( ch == '.' || ch == '-' || ch == '_' || ch == '~' || isalnum ( ch) )
{
res += ch;
continue ;
}
if ( ch == ' ' && convert_space_to_plus)
{
res += '+' ;
continue ;
}
char tmp[ 4 ] = { 0 } ;
snprintf ( tmp, 4 , "%%%02X" , ch) ;
res += tmp;
}
return res;
}
static char HEXTOI ( const char ch)
{
if ( ch >= '0' && ch <= '9' )
{
return ch - '0' ;
}
else if ( ch >= 'a' && ch <= 'z' )
{
return ch - 'a' + 10 ;
}
else if ( ch >= 'A' && ch <= 'Z' )
{
return ch - 'A' + 10 ;
}
else
return - 1 ;
}
static std:: string UrlDecode ( const std:: string & url, bool convert_plus_to_space)
{
std:: string res;
for ( int i = 0 ; i < url. size ( ) ; i++ )
{
if ( url[ i] == '%' && ( i + 2 ) < url. size ( ) )
{
char val1 = HEXTOI ( url[ i + 1 ] ) ;
char val2 = HEXTOI ( url[ i + 2 ] ) ;
char val = val1 * 16 + val2;
i += 2 ;
res += val;
continue ;
}
if ( url[ i] == '+' && convert_plus_to_space)
{
res += ' ' ;
continue ;
}
res += url[ i] ;
}
return res;
}
static std:: string StatuDesc ( int statu)
{
auto it = _statu_msg. find ( statu) ;
if ( it != _statu_msg. end ( ) )
{
return it-> second;
}
return "UnKnown" ;
}
static std:: string ExtMime ( const std:: string & filename)
{
size_t pos = filename. rfind ( "." ) ;
if ( pos == std:: string:: npos)
{
return "application/octet-stream" ;
}
std:: string ext = filename. substr ( pos) ;
auto it = _mime_msg. find ( ext) ;
if ( it == _mime_msg. end ( ) )
{
return "application/octet-stream" ;
}
return it-> second;
}
static bool IsDirectory ( const std:: string & filename)
{
struct stat st;
int ret = stat ( filename. c_str ( ) , & st) ;
if ( ret < 0 )
{
return false ;
}
return S_ISDIR ( st. st_mode) ;
}
static bool IsRegular ( const std:: string & filename)
{
struct stat st;
int ret = stat ( filename. c_str ( ) , & st) ;
if ( ret < 0 )
{
return false ;
}
return S_ISREG ( st. st_mode) ;
}
static bool ValidPath ( const std:: string & path)
{
std:: vector< std:: string> subdir;
Split ( path, "/" , & subdir) ;
int level = 0 ;
for ( auto & dir : subdir)
{
if ( dir == ".." )
{
level-- ;
if ( level < 0 )
return false ;
}
else
level++ ;
}
return true ;
}
} ;
2.HttpRequest请求类实现
HttpRequest模块:
Http请求信息模块:存储http请求信息要素,提供简单的功能性接口
请求信息要素:
请求行:请求方法 URL 协议版本
URL:资源路径 查询字符串
GET /search/1234?word=C++&en=utf8 HTTP/1.1
请求头部:key:value
key:value
…
Content-Length:0
正文
要素:请求方法,资源路径,查询字符串,头部字段,正文,协议版本
std::smatch 保存首行使用regex正则进行解析后,所提取的数据,比如提取资源路径中的数字
代码实现:
class HttpRequest
{
public :
std:: string _method;
std:: string _path;
std:: string _version;
std:: string _body;
std:: smatch _matches;
std:: unordered_map< std:: string, std:: string> _headers;
std:: unordered_map< std:: string, std:: string> _params;
public :
HttpRequest ( ) : _version ( "HTTP/1.1" ) { }
void ReSet ( )
{
_method. clear ( ) ;
_path. clear ( ) ;
_version = "HTTP/1.1" ;
_body. clear ( ) ;
std:: smatch match;
_matches. swap ( match) ;
_headers. clear ( ) ;
_params. clear ( ) ;
}
void SetHeader ( const std:: string & key, const std:: string & value)
{
_headers. insert ( std:: make_pair ( key, value) ) ;
}
bool HasHeader ( const std:: string & key) const
{
auto it = _headers. find ( key) ;
if ( it == _headers. end ( ) )
{
return false ;
}
return true ;
}
std:: string GetHeader ( const std:: string & key) const
{
auto it = _headers. find ( key) ;
if ( it == _headers. end ( ) )
{
return "" ;
}
return it-> second;
}
void SetParam ( const std:: string & key, const std:: string & val)
{
_params. insert ( std:: make_pair ( key, val) ) ;
}
bool HasParam ( const std:: string & key) const
{
auto it = _params. find ( key) ;
if ( it == _params. end ( ) )
{
return false ;
}
return true ;
}
std:: string GetParam ( const std:: string & key) const
{
auto it = _params. find ( key) ;
if ( it == _params. end ( ) )
{
return "" ;
}
return it-> second;
}
size_t ContentLength ( ) const
{
bool ret = HasHeader ( "Content-Length" ) ;
if ( ret == false )
{
return 0 ;
}
std:: string clen = GetHeader ( "Content-Length" ) ;
return std:: stol ( clen) ;
}
bool Close ( ) const
{
if ( HasHeader ( "Connection" ) && GetHeader ( "Connection" ) == "keep-alive" )
{
return false ;
}
return true ;
}
} ;
3.HttpResponse响应类实现
HttpResponse模块:
功能:存储http响应信息要素,提供简单的功能性接口
响应信息要素:
1.响应状态码
2.头部字段
3.响应正文
4.重定向信息(是否进行了重定向的标志,重定向的路径)
功能性接口:
0.为了便于成员的访问,因此将成员设置为共有成员
1.头部字段的新增,查询,获取
2.正文的设置
3.重定向的设置
4.长短连接的判断
代码实现:
class HttpResponse
{
public :
int _statu;
bool _redirect_flag;
std:: string _body;
std:: string _redirect_url;
std:: unordered_map< std:: string, std:: string> _headers;
public :
HttpResponse ( ) : _statu ( 200 ) , _redirect_flag ( false ) { }
HttpResponse ( int statu) : _statu ( statu) , _redirect_flag ( false ) { }
void ReSet ( )
{
_statu = 200 ;
_redirect_flag = false ;
_body. clear ( ) ;
_redirect_url. clear ( ) ;
_headers. clear ( ) ;
}
void SetHeader ( const std:: string & key, const std:: string & value)
{
_headers. insert ( std:: make_pair ( key, value) ) ;
}
bool HasHeader ( const std:: string & key) const
{
auto it = _headers. find ( key) ;
if ( it == _headers. end ( ) )
{
return false ;
}
return true ;
}
std:: string GetHeader ( const std:: string & key) const
{
auto it = _headers. find ( key) ;
if ( it == _headers. end ( ) )
{
return "" ;
}
return it-> second;
}
void SetContent ( const std:: string & body, const std:: string & type = "text/html" )
{
_body = body;
SetHeader ( "Content-Length" , type) ;
}
void SetRedirect ( const std:: string & url, const int statu = 302 )
{
_statu = statu;
_redirect_flag = true ;
_redirect_url = url;
}
bool Close ( )
{
if ( HasHeader ( "Connection" ) == true && GetHeader ( "Connection" ) == "keep-alive" )
{
return false ;
}
return true ;
}
} ;
4.HttpContext上下文类实现
代码实现:
# define MAX_LINE 8192
typedef enum
{
RECV_HTTP_ERROR,
RECV_HTTP_LINE,
RECV_HTTP_HEAD,
RECV_HTTP_BODY,
RECV_HTTP_OVER
} HttpRecvStatu;
class HttpContext
{
private :
int _resp_statu;
HttpRecvStatu _recv_statu;
HttpRequest _request;
private :
bool PraseHttpLine ( std:: string & line)
{
std:: smatch matches;
std:: regex e ( "(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:?(.*))? (HTTP/1.[01])(?:
|
)?" , std:: regex:: icase) ;
bool ret = std:: regex_match ( line, matches, e) ;
if ( ret == false )
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400 ;
return false ;
}
_request. _method = matches[ 1 ] ;
std:: transform ( _request. _method. begin ( ) , _request. _method. end ( ) , _request. _method. begin ( ) , :: toupper) ;
_request. _path = Util :: UrlDecode ( matches[ 2 ] , false ) ;
_request. _version = matches[ 4 ] ;
std:: vector< std:: string> query_string_array;
std:: string query_string = matches[ 3 ] ;
Util :: Split ( query_string, "&" , & query_string_array) ;
for ( auto & str : query_string_array)
{
size_t pos = str. find ( "=" ) ;
if ( pos == std:: string:: npos)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400 ;
return false ;
}
std:: string key = Util :: UrlDecode ( str. substr ( 0 , pos) , true ) ;
std:: string value = Util :: UrlDecode ( str. substr ( pos + 1 ) , true ) ;
_request. SetParam ( key, value) ;
}
return true ;
}
bool RecvHttpLine ( Buffer * buf)
{
if ( _recv_statu != RECV_HTTP_LINE)
return false ;
std:: string line = buf-> GetOneLineAndPop ( ) ;
if ( line. size ( ) == 0 )
{
if ( buf-> ReadAbleSize ( ) > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414 ;
return false ;
}
return true ;
}
if ( line. size ( ) > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414 ;
return false ;
}
bool ret = PraseHttpLine ( line) ;
if ( ret == false )
{
return false ;
}
_recv_statu = RECV_HTTP_HEAD;
return true ;
}
bool RecvHttpHead ( Buffer * buf)
{
if ( _recv_statu != RECV_HTTP_HEAD)
return false ;
while ( true )
{
std:: string line = buf-> GetOneLineAndPop ( ) ;
if ( line. size ( ) == 0 )
{
if ( buf-> ReadAbleSize ( ) > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414 ;
return false ;
}
return true ;
}
if ( line. size ( ) > MAX_LINE)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 414 ;
return false ;
}
if ( line == "
" || line == "
" )
{
break ;
}
bool ret = PraseHttpHead ( line) ;
if ( ret == false )
return false ;
}
_recv_statu = RECV_HTTP_BODY;
return true ;
}
bool PraseHttpHead ( std:: string & line)
{
if ( line. back ( ) == '
' )
line. pop_back ( ) ;
if ( line. back ( ) == '
' )
line. pop_back ( ) ;
size_t pos = line. find ( ": " ) ;
if ( pos == std:: string:: npos)
{
_recv_statu = RECV_HTTP_ERROR;
_resp_statu = 400 ;
return false ;
}
std:: string key = line. substr ( 0 , pos) ;
std:: string value = line. substr ( pos + 2 ) ;
_request. SetHeader ( key, value) ;
return true ;
}
bool RecvHttpBody ( Buffer * buf)
{
if ( _recv_statu != RECV_HTTP_BODY)
return false ;
size_t content_length = _request. ContentLength ( ) ;
if ( content_length == 0 )
{
_recv_statu = RECV_HTTP_OVER;
return true ;
}
size_t real_length = content_length - _request. _body. size ( ) ;
if ( real_length <= buf-> ReadAbleSize ( ) )
{
_request. _body. append ( buf-> ReadPosition ( ) , real_length) ;
buf-> MoveReadOffset ( real_length) ;
_recv_statu = RECV_HTTP_OVER;
return true ;
}
_request. _body. append ( buf-> ReadPosition ( ) , buf-> ReadAbleSize ( ) ) ;
buf-> MoveReadOffset ( buf-> ReadAbleSize ( ) ) ;
return true ;
}
public :
HttpContext ( ) : _resp_statu ( 200 ) , _recv_statu ( RECV_HTTP_LINE) { }
void ReSet ( )
{
_resp_statu = 200 ;
_recv_statu = RECV_HTTP_LINE;
_request. ReSet ( ) ;
}
int RespStatu ( ) { return _resp_statu; }
HttpRecvStatu RecvStatu ( ) { return _recv_statu; }
HttpRequest & Request ( ) { return _request; }
void RecvHttpRequest ( Buffer * buf)
{
switch ( _recv_statu)
{
case RECV_HTTP_LINE:
RecvHttpLine ( buf) ;
case RECV_HTTP_HEAD:
RecvHttpHead ( buf) ;
case RECV_HTTP_BODY:
RecvHttpBody ( buf) ;
}
}
} ;
5.HttpServer类实现
HttpServer模块:用于实现HTTP服务器的搭建
设计一张请求路由表:
表中记录了针对哪个请求,应该使用哪个函数来进行业务处理的映射关系
当服务器收到了一个请求,就在请求路由表中,查找有没有对应请求的处理函数,如果有,则执行对应的处理函数即可说白了,什么请求,怎么处理,由用户来设定,服务器收到了请求只需要执行函数即可
这样做的好处:用户只需要实现业务处理函数,然后将请求与处理函数的映射关系,添加到服务器中
而服务器只需要接收数据,解析数据,查找路由表映射关系,执行业务处理函数。
要实现简便的搭建HTTP服务器,所需要的要素和提供的功能要素:
1.GET请求的路由映射表
2.POST请求的路由映射表
3.PUT请求的路由映射表
4.DELETE请求的路由映射表﹐—路由映射表记录对应请求方法的请求的处理函数映射关系—更多是功能性请求的处理
5.静态资源相对根目录—实现静态资源请求的处理
6.高性能TCP服务器—进行连接的IO操作
接口:
服务器处理流程:
1.从socket接收数据,放到接收缓冲区
2.调用OnMessage回调函数进行业务处理
3.对请求进行解析,得到了一个HttpRequest结构,包含了所有的请求要素
4.进行请求的路由查找–找到对应请求的处理方法
1.静态资源请求—一些实体文件资源的请求,html,image…
将静态资源文件的数据读取出来,填充到HttpResponse结构中
2.功能性请求—在请求路由映射表中查找处理函数,找到了则执行函数
具体的业务处理,并进行HttpResponse结构的数据填充
5.对静态资源请求/功能性请求进行处理完毕后,得到了一个填充了响应信息的HttpResponse对象,组织http格式响应,进行发送
接口:
添加请求-处理函数映射信息(GET/POST/PUT/DELETE)设置静态资源根目录
设置是否启动超时连接关闭
设置线程池中线程数量启动服务器
OnConnected —用于给TcpServer设置协议上下文
OnMessage -----用于进行缓冲区数据解析处理
获取上下文,进行缓冲区数据解析
请求的路由查找
静态资源请求查找和处理功能性请求的查找和处理组织响应进行回复
HTTP 服务器的运行流程如下:
从 socket 中接收数据,放到接收缓冲区。 调用 OnMessage 回调函数进行业务处理。 对请求进行解析,得到了一个 HttpRequest 结构对象,其中包含了所有的请求要素信息。 进行请求的路由查找 – 找到请求对应的处理方法。
如果是静态资源请求,比如 html 页面,image 文件等,则将静态资源文件的数据读取出来,填充到 HttpResponse 结构中。 如果是功能性请求,则在请求路由映射表中查找处理函数,找到了则执行函数进行具体的业务处理,并进行 HttpResponse 结构的数据填充。 对静态资源请求/功能性请求进行处理完毕后,已经得到了一个填充了响应信息的 HttpResponse 对象,将其组织成为 HTTP 格式响应,发送给客户端即可。
代码实现:
# define DEFAULT_TIME_OUT 30
class HttpServer
{
private :
using Handler = std:: function< void ( const HttpRequest & , HttpResponse * ) > ;
using Handlers = std:: vector< std:: pair< std:: regex, Handler>> ;
Handlers _get_route;
Handlers _post_route;
Handlers _put_route;
Handlers _delete_route;
std:: string _basedir;
TcpServer _server;
private :
void ErrorHandler ( const HttpRequest & req, HttpResponse * rsp)
{
std:: string body;
body += "" ;
body += "" ;
body += " " ;
body += "" ;
body += "" ;
body += "" ;
body += std:: to_string ( rsp-> _statu) ;
body += " " ;
body += Util :: StatuDesc ( rsp-> _statu) ;
body += "" ;
body += "" ;
body += "" ;
rsp-> SetContent ( body, "text/html" ) ;
}
void WriteResponse ( const PtrConnection & conn, const HttpRequest & req, HttpResponse & rsp)
{
if ( req. Close ( ) == true )
rsp. SetHeader ( "Connection" , "close" ) ;
else
rsp. SetHeader ( "Connection" , "keep-alive" ) ;
if ( rsp. _body. empty ( ) == false && rsp. HasHeader ( "Content-Length" ) == false )
{
rsp. SetHeader ( "Content-Length" , std:: to_string ( rsp. _body. size ( ) ) ) ;
}
if ( rsp. _body. empty ( ) == false && rsp. HasHeader ( "Content-Type" ) == false )
{
rsp. SetHeader ( "Content-Type" , "application/octet-stream" ) ;
}
if ( rsp. _redirect_flag == true )
{
rsp. SetHeader ( "Location" , rsp. _redirect_url) ;
}
std:: stringstream rsp_str;
rsp_str << req. _version << " " << std:: to_string ( rsp. _statu) << " "
<< Util :: StatuDesc ( rsp. _statu) << "
" ;
for ( auto & [ key, val] : rsp. _headers)
{
rsp_str << key << ": " << val << "
" ;
}
rsp_str << "
" ;
rsp_str << rsp. _body;
conn-> Send ( rsp_str. str ( ) . c_str ( ) , rsp_str. str ( ) . size ( ) ) ;
}
void Dispatcher ( HttpRequest & req, HttpResponse * rsp, Handlers & handlers)
{
for ( auto & handler : handlers)
{
const std:: regex & re = handler. first;
const Handler & functor = handler. second;
bool ret = std:: regex_match ( req. _path, req. _matches, re) ;
if ( ret == false )
continue ;
return functor ( req, rsp) ;
}
rsp-> _statu = 404 ;
}
bool IsFileHandler ( const HttpRequest & req)
{
if ( _basedir. empty ( ) )
return false ;
if ( req. _method != "GET" && req. _method != "HEAD" )
return false ;
if ( Util :: ValidPath ( req. _path) == false )
return false ;
std:: string req_path = _basedir + req. _path;
if ( req_path. back ( ) == '/' )
{
req_path += "index.html" ;
}
if ( Util :: IsRegular ( req_path) == false )
return false ;
return true ;
}
void FileHandler ( const HttpRequest & req, HttpResponse * rsp)
{
std:: string req_path = _basedir + req. _path;
if ( req_path. back ( ) == '/' )
req_path += "index.html" ;
bool ret = Util :: ReadFile ( req_path, & rsp-> _body) ;
if ( ret == false )
return ;
std:: string mime = Util :: ExtMime ( req_path) ;
rsp-> SetHeader ( "Content-Type" , mime) ;
return ;
}
void Route ( HttpRequest & req, HttpResponse * rsp)
{
if ( IsFileHandler ( req) == true )
{
return FileHandler ( req, rsp) ;
}
if ( req. _method == "GET" || req. _method == "HEAD" )
return Dispatcher ( req, rsp, _get_route) ;
else if ( req. _method == "POST" )
return Dispatcher ( req, rsp, _post_route) ;
else if ( req. _method == "PUT" )
return Dispatcher ( req, rsp, _put_route) ;
else if ( req. _method == "DELETE" )
return Dispatcher ( req, rsp, _delete_route) ;
rsp-> _statu = 405 ;
return ;
}
void OnConnected ( const PtrConnection & conn)
{
conn-> SetContext ( HttpContext ( ) ) ;
LogMessage ( NORMAL, "NEW CONNECTION %p" , conn. get ( ) ) ;
}
void OnMessage ( const PtrConnection & conn, Buffer * buf)
{
while ( buf-> ReadAbleSize ( ) > 0 )
{
HttpContext * context = conn-> GetContext ( ) -> get< HttpContext> ( ) ;
context-> RecvHttpRequest ( buf) ;
HttpRequest & req = context-> Request ( ) ;
HttpResponse rsp ( context-> RespStatu ( ) ) ;
if ( context-> RespStatu ( ) >= 400 )
{
ErrorHandler ( req, & rsp) ;
WriteResponse ( conn, req, rsp) ;
context-> ReSet ( ) ;
buf-> MoveReadOffset ( buf-> ReadAbleSize ( ) ) ;
conn-> Shutdown ( ) ;
return ;
}
if ( context-> RecvStatu ( ) != RECV_HTTP_OVER)
return ;
Route ( req, & rsp) ;
WriteResponse ( conn, req, rsp) ;
context-> ReSet ( ) ;
if ( rsp. Close ( ) == true )
{
conn-> Shutdown ( ) ;
}
}
}
public :
HttpServer ( int port, int timeout = DEFAULT_TIME_OUT) : _server ( port)
{
_server. EnableInactiveRelease ( timeout) ;
_server. SetConnectedCallback ( std:: bind ( & HttpServer:: OnConnected, this , std:: placeholders:: _1) ) ;
_server. SetMessageCallback ( std:: bind ( & HttpServer:: OnMessage, this ,
std:: placeholders:: _1, std:: placeholders:: _2) ) ;
}
void SetBaseDir ( const std:: string & path) { _basedir = path; }
void Get ( const std:: string & pattern, const Handler & handler)
{
_get_route. push_back ( std:: make_pair ( std:: regex ( pattern) , handler) ) ;
}
void Post ( const std:: string & pattern, const Handler & handler)
{
_post_route. push_back ( std:: make_pair ( std:: regex ( pattern) , handler) ) ;
}
void Put ( const std:: string & pattern, const Handler & handler)
{
_put_route. push_back ( std:: make_pair ( std:: regex ( pattern) , handler) ) ;
}
void Delete ( const std:: string & pattern, const Handler & handler)
{
_delete_route. push_back ( std:: make_pair ( std:: regex ( pattern) , handler) ) ;
}
void SetThreadCount ( int count) { _server. SetThreadCount ( count) ; }
void Listen ( ) { _server. Start ( ) ; }
} ;
6.HttpServer 简单测试
下面我们分别对 HttpServer 的 GPT、POST、PUT 以及 DELETE 请求进行简单测试。
测试代码如下:
# include "./http.hpp"
# define WWWROOT "./wwwroot/"
std:: string RequestStr ( const HttpRequest & req)
{
std:: stringstream ss;
ss << req. _method << " " << req. _path << " " << req. _version << "
" ;
for ( auto & [ key, val] : req. _params)
{
ss << key << ": " << val << "
" ;
}
for ( auto & [ key, val] : req. _headers)
{
ss << key << ": " << val << "
" ;
}
ss << "
" ;
ss << req. _body;
return ss. str ( ) ;
}
void Hello ( const HttpRequest & req, HttpResponse * rsp)
{
rsp-> SetContent ( RequestStr ( req) , "text/plain" ) ;
}
void Login ( const HttpRequest & req, HttpResponse * rsp)
{
rsp-> SetContent ( RequestStr ( req) , "text/plain" ) ;
}
void PutFile ( const HttpRequest & req, HttpResponse * rsp)
{
std:: string pathname = WWWROOT + req. _path;
Util :: WriteFile ( pathname, req. _body) ;
}
void DelFile ( const HttpRequest & req, HttpResponse * rsp)
{
rsp-> SetContent ( RequestStr ( req) , "text/plain" ) ;
}
int main ( )
{
HttpServer server ( 8080 ) ;
server. SetThreadCount ( 3 ) ;
server. SetBaseDir ( WWWROOT) ;
server. Get ( "/hello" , Hello) ;
server. Post ( "/login" , Login) ;
server. Put ( "/1234.txt" , PutFile) ;
server. Delete ( "/1234.txt" , DelFile) ;
server. Listen ( ) ;
return 0 ;
}
项目目录结构如下:
| -- echo
| | -- echo. hpp
| | -- main. cc
| `-- makefile
| -- http
| | -- http. hpp
| | -- main
| | -- main. cc
| | -- makefile
| `-- wwwroot
| | -- 1234. txt
| `-- index. html
| -- log. hpp
| -- main. cc
| -- makefile
`-- server. hpp
静态资源根目录中的文件信息如下:
< html>
< head>
< meta charset = " utf8" >
head>
< body>
< form action = " /login" method = " post" >
< input type = " text" name = " username" > < br/>
< input type = " password" name = " password" > < br/>
< input type = " submit" value = " 提交" name = " submit" >
form>
body>
html>
回显服务器测试结果如下:
登录请求测试结果如下:
八、性能测试
面我们已经完成了 SERVER 模块和协议模块的开发,并进行了简单的功能测试,下面我们来进行一些边界性的功能测试,观察服务器在边界情况下能够正常运行。
1. 服务器长连接测试
创建一个客户端,设置 Connection 头部字段为 keep-alive,观察客户端是否能够持续与服务器进行通信。
# include "../source/server.hpp"
int main ( )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
for ( ; ; )
{
int ret = clientsock. Send ( req. c_str ( ) , req. size ( ) ) ;
assert ( ret > 0 ) ;
char buffer[ 1024 ] = { 0 } ;
ret = clientsock. Recv ( buffer, 1023 ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
sleep ( 3 ) ;
}
clientsock. Close ( ) ;
return 0 ;
}
从测试结果可以看到,客户端能够持续与服务器进行通信,并且服务器也不会在 10s 后将客户端连接释放,而是等待客户端主动退出后才会释放:
过几分钟之后再关闭客户端
客户端收到的响应
2. 服务器超时连接测试
客户端连接上服务器后,长时间不给服务器发送数据,观察超时时间 (10s) 后服务器是否会将客户端连接进行释放。
# include "../source/server.hpp"
int main ( )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
for ( ; ; )
{
int ret = clientsock. Send ( req. c_str ( ) , req. size ( ) ) ;
assert ( ret > 0 ) ;
char buffer[ 1024 ] = { 0 } ;
ret = clientsock. Recv ( buffer, 1023 ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
sleep ( 15 ) ;
}
clientsock. Close ( ) ;
return 0 ;
}
从测试结果可以看到,服务器经过超时时间后自动将客户端连接释放:
3.服务器错误请求测试
给服务器发送一个请求,添加头部字段 Content-Length 为100,但实际发送的正文长度不足100,观察服务器的处理结果;我们的预期结果有两种:
如果客户端只发生一次请求,由于服务器未接收到完整请求(正文数据不足),所以会等待新数据到来,不会给与客户端响应,直到连接超时释放。 如果客户端发送多次请求,那么服务器会将后面的请求字段作为第一次请求的正文,完成业务处理后发送一次响应,但这样很有可能会导致后面的请求解析错误。
测试代码
# include "../source/server.hpp"
int main ( )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 100
Hello world" ;
for ( ; ; )
{
assert ( clientsock. Send ( req. c_str ( ) , req. size ( ) ) != - 1 ) ;
assert ( clientsock. Send ( req. c_str ( ) , req. size ( ) ) != - 1 ) ;
assert ( clientsock. Send ( req. c_str ( ) , req. size ( ) ) != - 1 ) ;
assert ( clientsock. Send ( req. c_str ( ) , req. size ( ) ) != - 1 ) ;
char buffer[ 1024 ] = { 0 } ;
assert ( clientsock. Recv ( buffer, 1023 ) ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
sleep ( 3 ) ;
}
clientsock. Close ( ) ;
return 0 ;
}
测试结果如下:
4.服务器业务处理超时测试
当服务器达到性能瓶颈,即处理一次业务花费的时间超过了服务器设置的非活跃连接超时时间时,查看服务器的处理情况。我们的预期结果如下:
由于服务器进行单次业务处理的时间超过了连接的超时时间,所以可能导致其他连接被拖累从而超时释放,具体来说,假设现在4 5 6 7描述符就绪,并且在处理4号描述符就绪事件时超时,那么会出现以下两种情况:
如果4后面的5 6 7号都是通信连接描述符,则并不影响,因为4号描述符就绪事件处理完毕后就会处理它们的就绪事件并刷新其活跃度。 如果5号描述符是定时器描述符,此时定时器触发超时,就会执行定时任务,由于6、7号描述符被4号描述符拖累,达到了超时时间,因此会被释放,从而导致在进行6 7业务处理时发生内存访问错误 (6号同理)。 因此,在本次事件处理过程中,并不能直接释放通信连接,而应该将释放操作压入任务队列中,待就绪事件全部处理完毕后再真正释放连接。
# include "../source/server.hpp"
int main ( )
{
for ( int i = 0 ; i < 10 ; i++ )
{
pid_t pid = fork ( ) ;
if ( pid < 0 )
{
LogMessage ( ERROR, "fork error" ) ;
return - 1 ;
}
else if ( pid == 0 )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
for ( ; ; )
{
int ret = clientsock. Send ( req. c_str ( ) , req. size ( ) ) ;
assert ( ret > 0 ) ;
char buffer[ 1024 ] = { 0 } ;
ret = clientsock. Recv ( buffer, 1023 ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
}
clientsock. Close ( ) ;
exit ( 0 ) ;
}
}
while ( 1 ) sleep ( 1 ) ;
return 0 ;
}
测试结果如下:
5.服务器同时多条请求测试
客户端一次性给服务器发送多条请求,观察服务器处理结果。
# include "../source/server.hpp"
int main ( )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
req += "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
req += "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
for ( ; ; )
{
int ret = clientsock. Send ( req. c_str ( ) , req. size ( ) ) ;
assert ( ret > 0 ) ;
char buffer[ 1024 ] = { 0 } ;
ret = clientsock. Recv ( buffer, 1023 ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
sleep ( 3 ) ;
}
clientsock. Close ( ) ;
return 0 ;
}
测试结果如下,服务器能够正常处理并响应:
服务器大文件传输测试
使用PUT方法向服务器传输大文件,观察服务器处理结果。
# include "../source/server.hpp"
# include "../source/http/http.hpp"
int main ( )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "PUT /1234.txt HTTP/1.1
Connection: keep-alive
" ;
std:: string body;
Util :: ReadFile ( "./hello.txt" , & body) ;
req += "Content-Length: " + std:: to_string ( body. size ( ) ) + "
" ;
assert ( clientsock. Send ( req. c_str ( ) , req. size ( ) ) != - 1 ) ;
assert ( clientsock. Send ( body. c_str ( ) , req. size ( ) ) != - 1 ) ;
char buffer[ 1024 ] = { 0 } ;
assert ( clientsock. Recv ( buffer, 1023 ) != - 1 ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
sleep ( 15 ) ;
clientsock. Close ( ) ;
return 0 ;
}
服务器内存情况以及 test.txt 文件情况如下:
测试结果如下,服务器能够正常处理并响应:
由于服务器资源有限,只能上传100M数据
服务器性能压力测试
使用服务器压力测试工具 WebBench 模拟多个客户端同时访问服务器,测试服务器的并发量 (可以同时处理多少个客户端的请求而不会出现连接失败) 以及 QPS (每秒钟处理的包的数量)。
测试环境如下:
服务器为2核2G带宽4M的云服务器。 在服务器上运行 WebBench 程序。 使用 WebBench 程序以 1000 的并发量,进行 1h 的测试。
./webbench -c 1000 -t 3600 http://127.0.0.1:8080/hello
测试结果如下 ( 2000 QPS ):
4.服务器业务处理超时测试
当服务器达到性能瓶颈,即处理一次业务花费的时间超过了服务器设置的非活跃连接超时时间时,查看服务器的处理情况。我们的预期结果如下:
由于服务器进行单次业务处理的时间超过了连接的超时时间,所以可能导致其他连接被拖累从而超时释放,具体来说,假设现在4 5 6 7描述符就绪,并且在处理4号描述符就绪事件时超时,那么会出现以下两种情况:
如果4后面的5 6 7号都是通信连接描述符,则并不影响,因为4号描述符就绪事件处理完毕后就会处理它们的就绪事件并刷新其活跃度。 如果5号描述符是定时器描述符,此时定时器触发超时,就会执行定时任务,由于6、7号描述符被4号描述符拖累,达到了超时时间,因此会被释放,从而导致在进行6 7业务处理时发生内存访问错误 (6号同理)。 因此,在本次事件处理过程中,并不能直接释放通信连接,而应该将释放操作压入任务队列中,待就绪事件全部处理完毕后再真正释放连接。
# include "../source/server.hpp"
int main ( )
{
for ( int i = 0 ; i < 10 ; i++ )
{
pid_t pid = fork ( ) ;
if ( pid < 0 )
{
LogMessage ( ERROR, "fork error" ) ;
return - 1 ;
}
else if ( pid == 0 )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
for ( ; ; )
{
int ret = clientsock. Send ( req. c_str ( ) , req. size ( ) ) ;
assert ( ret > 0 ) ;
char buffer[ 1024 ] = { 0 } ;
ret = clientsock. Recv ( buffer, 1023 ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
}
clientsock. Close ( ) ;
exit ( 0 ) ;
}
}
while ( 1 ) sleep ( 1 ) ;
return 0 ;
}
测试结果如下:
5.服务器同时多条请求测试
客户端一次性给服务器发送多条请求,观察服务器处理结果。
# include "../source/server.hpp"
int main ( )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
req += "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
req += "GET /hello HTTP/1.1
Connection: keep-alive
Content-Length: 0
" ;
for ( ; ; )
{
int ret = clientsock. Send ( req. c_str ( ) , req. size ( ) ) ;
assert ( ret > 0 ) ;
char buffer[ 1024 ] = { 0 } ;
ret = clientsock. Recv ( buffer, 1023 ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
sleep ( 3 ) ;
}
clientsock. Close ( ) ;
return 0 ;
}
测试结果如下,服务器能够正常处理并响应:
服务器大文件传输测试
使用PUT方法向服务器传输大文件,观察服务器处理结果。
# include "../source/server.hpp"
# include "../source/http/http.hpp"
int main ( )
{
Socket clientsock;
clientsock. CreateClient ( 8080 , "127.0.0.1" ) ;
std:: string req = "PUT /1234.txt HTTP/1.1
Connection: keep-alive
" ;
std:: string body;
Util :: ReadFile ( "./hello.txt" , & body) ;
req += "Content-Length: " + std:: to_string ( body. size ( ) ) + "
" ;
assert ( clientsock. Send ( req. c_str ( ) , req. size ( ) ) != - 1 ) ;
assert ( clientsock. Send ( body. c_str ( ) , req. size ( ) ) != - 1 ) ;
char buffer[ 1024 ] = { 0 } ;
assert ( clientsock. Recv ( buffer, 1023 ) != - 1 ) ;
LogMessage ( DEBUG, "[%s]" , buffer) ;
sleep ( 15 ) ;
clientsock. Close ( ) ;
return 0 ;
}
服务器内存情况以及 test.txt 文件情况如下:
测试结果如下,服务器能够正常处理并响应:
由于服务器资源有限,只能上传100M数据
服务器性能压力测试
使用服务器压力测试工具 WebBench 模拟多个客户端同时访问服务器,测试服务器的并发量 (可以同时处理多少个客户端的请求而不会出现连接失败) 以及 QPS (每秒钟处理的包的数量)。
测试环境如下:
服务器为2核2G带宽4M的云服务器。 在服务器上运行 WebBench 程序。 使用 WebBench 程序以 1000 的并发量,进行 1h 的测试。
./webbench -c 1000 -t 3600 http://127.0.0.1:8080/hello
测试结果如下 ( 2000 QPS ):