240717班级,工业化控制系统,煤矿相关行业,昆仑系统
gong
2024-11-06 828ba1dcdd1fa803dddf94f4f1643452d5efa703
Server/ÍõçûÔª/code/ConnectionPool.cpp
@@ -1,95 +1,102 @@
#include "stdafx.h"
#include "ConnectionPool.h"
ConnectionPool::ConnectionPool()
{
   if (!parseXmlFile())
#include <fstream>
#include <iostream>
#include <thread>
ConnectionPool* ConnectionPool::getInstance() {
   static ConnectionPool connPool;
   return &connPool;
}
ConnectionPool::ConnectionPool() {
   // åŠ è½½é…ç½®æ–‡ä»¶
   if (!parseJsonFile()) {
      std::cout << "parseJsonFile is failed!" << std::endl;
      return;
   for (m_num = 0; m_num < m_minSize;) {
      bool flag = addConnection();
      if (!flag) {
         return;
      }
   }
   // å¦‚果子线程的任务函数是类的非静态函数,我们需要指定任务函数的地址和任务函数的所有者
   thread producer(&ConnectionPool::productConnection, this); // åˆ›å»ºè¿žæŽ¥
   thread recycler(&ConnectionPool::recycleConnection, this); // æ£€æµ‹å¹¶é”€æ¯è¿žæŽ¥
                                                // çº¿ç¨‹åˆ†ç¦»ï¼Œé˜²æ­¢é˜»å¡žä¸»çº¿ç¨‹
   for ( int i = 0 ; i < m_min_conn ; i++ ) {
      addConnection( );
   }
   // åˆ›å»ºä¸€ä¸ªçº¿ç¨‹ï¼Œå¯¹è¿žæŽ¥æ•°è¿›è¡Œç›‘控 ï¼Œè¿žæŽ¥æ•°ä¸è¶³å°±å†ç»§ç»­åˆ›å»º
   std::thread producer(&ConnectionPool::produce, this);
   // å¯¹è¿žæŽ¥æ•°è¿›è¡Œç›‘控 ï¼Œå¦‚果有太多空闲得线程 ï¼Œé‚£ä¹ˆå°±å¯¹å…¶è¿›è¡Œé”€æ¯
   std::thread recycler( &ConnectionPool::recycle,  this);
   // çº¿ç¨‹åˆ†ç¦»
   producer.detach();
   recycler.detach();
}
ConnectionPool::~ConnectionPool()
{
   while (!m_connections.empty()) {
      MysqlConn* conn = m_connections.front();
      m_connections.pop();
      delete conn;
   }
}
bool ConnectionPool::parseXmlFile()
{
   TiXmlDocument xml("mysql.xml");
   // åŠ è½½æ–‡ä»¶
   bool res = xml.LoadFile();
   if (!res) {
      return false; // æç¤º
   }
   // æ ¹
   TiXmlElement* rootElement = xml.RootElement();
   TiXmlElement* childElement = rootElement->FirstChildElement("mysql");
   // è¯»å–信息
   m_ip = childElement->FirstChildElement("ip")->GetText();
   m_port = static_cast<unsigned short>(stoi(string(childElement->FirstChildElement("port")->GetText())));
   m_user = childElement->FirstChildElement("username")->GetText();
   m_passwd = childElement->FirstChildElement("password")->GetText();
   m_dbName = childElement->FirstChildElement("dbName")->GetText();
   m_minSize = static_cast<int>(stoi(string(childElement->FirstChildElement("minSize")->GetText())));
   m_maxSize = static_cast<int>(stoi(string(childElement->FirstChildElement("maxSize")->GetText())));
   m_maxIdleTime = static_cast<int>(stoi(string(childElement->FirstChildElement("maxIdleTime")->GetText())));
   m_timeout = static_cast<int>(stoi(string(childElement->FirstChildElement("timeout")->GetText())));
   return true;
}
bool ConnectionPool::addConnection()
{
   MysqlConn* conn = new MysqlConn;
   bool res = conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
   if (res) {
      // åˆ·æ–°ç©ºé—²æ—¶é—´
      conn->refreashAliveTime();
      m_connections.push(conn);
      ++m_num;
bool ConnectionPool::parseJsonFile() {
   /*std::ifstream ifs("dbconf.json");
   Reader rd;
   Value root;
   rd.parse(ifs, root);
   if ( root.isObject( ) ) {
      m_ip = root["ip"].asString();
      m_port = root["port"].asInt();
      m_userName = root["userName"].asString();
      m_passwd = root["password"].asString();
      m_db = root["dbName"].asString();
      m_min_conn = root["minSize"].asInt();
      m_max_conn = root["maxSize"].asInt();
      max_del_time = root["maxDleTime"].asInt();
      m_timeout = root["timeout"].asInt();
      return true;
   }*/
   return false;
}
void ConnectionPool::description() {
   std::cout << m_ip << ". " << m_userName << ". " << m_passwd << ". " << m_db << ". "
             << m_port << ". " << m_max_conn << ". " << m_min_conn << ". " << m_timeout << ". "
             << max_del_time << std::endl;
}
ConnectionPool::~ConnectionPool() {
   while (!m_connkQueue.empty()) {
      MysqlConn* conn = m_connkQueue.front();
      m_connkQueue.pop();
      delete  conn;
   }
   else {
}
void ConnectionPool::addConnection() {
   MysqlConn* conn = new MysqlConn();
   if ( !conn->connect( m_ip, m_userName, m_passwd, m_db, m_port  ) ) {
      std::cout << "ConnectionPool connect to mysql is failed!" << std::endl;
      delete conn;
      return false; // æç¤º
      return;
   }
   conn->refreshActiveTime( );
   m_connkQueue.push(conn);
   m_cond.notify_one();   // å”¤é†’一个线程
}
void ConnectionPool::productConnection()
{
void ConnectionPool::produce(){
   while (true) {
      unique_lock<mutex> lc(m_mutex);
      m_cond.wait(lc, [this]() {return m_connections.empty(); });
      if (m_num < m_maxSize) {
         bool flag = addConnection();
         if (!flag) {
            return;
         }
      std::unique_lock<std::mutex> lock(m_mutex);
      while (m_connkQueue.size() >= m_min_conn) { // è¿žæŽ¥é˜Ÿåˆ—的数量大于最小的连接数
         m_cond.wait(lock);
      }
      // å”¤é†’
      m_cond1.notify_all();
      addConnection();
   }
}
void ConnectionPool::recycleConnection()
{
   while (true) {
      // ä¼‘眠一段时间 0.5s
      this_thread::sleep_for(milliseconds(500));
      lock_guard<mutex> lc(m_mutex);
      while (!m_connections.empty() && m_num > m_minSize) {
         MysqlConn* conn = m_connections.front();
         if (conn->getAliveTime() >= m_maxIdleTime) {
            m_connections.pop();
// åˆ é™¤ç©ºé—²è¿žæŽ¥
void ConnectionPool::recycle() {
   while ( true ) {
      std::this_thread::sleep_for( std::chrono::milliseconds(500));   // ä¼‘眠500 ms
      std::unique_lock<std::mutex>lock(m_mutex);
      while ( m_connkQueue.size() > m_min_conn ) {
         MysqlConn* conn = m_connkQueue.front( );
         if (conn->getActiveTime() >= max_del_time) {
            m_connkQueue.pop();
            delete conn;
            --m_num;
         }
         else {
            break;
@@ -97,35 +104,22 @@
      }
   }
}
ConnectionPool* ConnectionPool::getConnectPool()
{
   // ä¸ä½¿ç”¨äº’斥锁的线程安全的懒汉模式
   static ConnectionPool pool; // åªåœ¨ç¬¬ä¸€æ¬¡è°ƒç”¨å‡½æ•°æ—¶åˆå§‹åŒ–
   return &pool;
}
shared_ptr<MysqlConn> ConnectionPool::getConnection()
{
   unique_lock<mutex> lc(m_mutex);
   while (m_connections.empty()) {
      if (cv_status::timeout == m_cond1.wait_for(lc, chrono::milliseconds(m_timeout))) {
         if (m_connections.empty()) {
            // cout << "out of time" << endl;
            return nullptr; // ç»“束  // æç¤º
                        // continue; // åˆ©ç”¨while配合continue ç»§ç»­é˜»å¡ž
         }
std::shared_ptr<MysqlConn>  ConnectionPool::getMysqlConn() {
   std::unique_lock<std::mutex>lock(m_mutex);
   while ( m_connkQueue.empty()) {
      // å¦‚果等待一段时间后,队列还是为空,返回一个 null
      if ( std::cv_status::timeout == m_cond.wait_for(lock, std::chrono::milliseconds(m_timeout)) ) {
         if( m_connkQueue.empty( ) )    return nullptr;
      }
   }
   // è¦æŒ‡å®šåˆ é™¤å™¨destructor,来保证连接的归还
   shared_ptr<MysqlConn> conn(m_connections.front(), [this](MysqlConn* conn) {
      // åŠ é”ä¿è¯é˜Ÿåˆ—çº¿ç¨‹å®‰å…¨
      // m_mutex.lock(); // 1
      unique_lock<mutex> lc(m_mutex); // 2
      // lock_guard<mutex> lc(m_mutex); // 3
      conn->refreashAliveTime();
      m_connections.push(conn);
      // m_mutex.unlock(); // 1
   std::shared_ptr<MysqlConn> connPtr(std::move(m_connkQueue.front()), [this](MysqlConn* conn) {
      std::unique_lock <std::mutex>lock(m_mutex) ;
      conn->refreshActiveTime();
      m_connkQueue.push(conn);
      });
   m_connections.pop();
   m_cond.notify_all();
   return conn;
}
   m_connkQueue.pop();
   m_cond.notify_one();   // å”¤é†’阻塞的生产者线程,开始生产
   return connPtr;
}