240717班级,工业化控制系统,煤矿相关行业,昆仑系统
congmu
2024-11-07 39a14bc7e206fbfa93bde1677b9fc26487bd8c3b
Server/ÍõçûÔª/code/ConnectionPool.cpp
@@ -1,131 +1,101 @@
#include "stdafx.h"
#include "ConnectionPool.h"
ConnectionPool::ConnectionPool()
{
   if (!parseXmlFile())
      return;
   for (m_num = 0; m_num < m_minSize;) {
      bool flag = addConnection();
      if (!flag) {
         return;
      }
   }
   // å¦‚果子线程的任务函数是类的非静态函数,我们需要指定任务函数的地址和任务函数的所有者
   thread producer(&ConnectionPool::productConnection, this); // åˆ›å»ºè¿žæŽ¥
   thread recycler(&ConnectionPool::recycleConnection, this); // æ£€æµ‹å¹¶é”€æ¯è¿žæŽ¥
                                                // çº¿ç¨‹åˆ†ç¦»ï¼Œé˜²æ­¢é˜»å¡žä¸»çº¿ç¨‹
   producer.detach();
   recycler.detach();
ConnectionPool::ConnectionPool() {
    m_min_conn = 5; // å‡è®¾æœ€å°è¿žæŽ¥æ•°ä¸º 5
    m_max_conn = 10; // å‡è®¾æœ€å¤§è¿žæŽ¥æ•°ä¸º 10
    m_timeout = 1000; // å‡è®¾è¿žæŽ¥è¶…时时间为 1000 æ¯«ç§’
    max_del_time = 60000; // å‡è®¾æœ€å¤§ç©ºé—²æ—¶é—´ä¸º 60000 æ¯«ç§’
    for (int i = 0; i < m_min_conn; i++) {
        addConnection();
    }
    // åˆ›å»ºä¸€ä¸ªçº¿ç¨‹ï¼Œå¯¹è¿žæŽ¥æ•°è¿›è¡Œç›‘控 ï¼Œè¿žæŽ¥æ•°ä¸è¶³å°±å†ç»§ç»­åˆ›å»º
    thread producer(&ConnectionPool::produce, this);
    // å¯¹è¿žæŽ¥æ•°è¿›è¡Œç›‘控 ï¼Œå¦‚果有太多空闲得线程 ï¼Œé‚£ä¹ˆå°±å¯¹å…¶è¿›è¡Œé”€æ¯
    thread recycler(&ConnectionPool::recycle, this);
    // çº¿ç¨‹åˆ†ç¦»
    producer.detach();
    recycler.detach();
}
ConnectionPool::~ConnectionPool()
{
   while (!m_connections.empty()) {
      MysqlConn* conn = m_connections.front();
      m_connections.pop();
      delete conn;
   }
ConnectionPool::~ConnectionPool() {
    while (!m_connkQueue.empty()) {
        MysqlConn* conn = m_connkQueue.front();
        m_connkQueue.pop();
        delete conn;
    }
}
bool ConnectionPool::parseXmlFile()
{
   TiXmlDocument xml("mysql.xml");
   // åŠ è½½æ–‡ä»¶
   bool res = xml.LoadFile();
   if (!res) {
      return false; // æç¤º
   }
   // æ ¹
   TiXmlElement* rootElement = xml.RootElement();
   TiXmlElement* childElement = rootElement->FirstChildElement("mysql");
   // è¯»å–信息
   m_ip = childElement->FirstChildElement("ip")->GetText();
   m_port = static_cast<unsigned short>(stoi(string(childElement->FirstChildElement("port")->GetText())));
   m_user = childElement->FirstChildElement("username")->GetText();
   m_passwd = childElement->FirstChildElement("password")->GetText();
   m_dbName = childElement->FirstChildElement("dbName")->GetText();
   m_minSize = static_cast<int>(stoi(string(childElement->FirstChildElement("minSize")->GetText())));
   m_maxSize = static_cast<int>(stoi(string(childElement->FirstChildElement("maxSize")->GetText())));
   m_maxIdleTime = static_cast<int>(stoi(string(childElement->FirstChildElement("maxIdleTime")->GetText())));
   m_timeout = static_cast<int>(stoi(string(childElement->FirstChildElement("timeout")->GetText())));
   return true;
void ConnectionPool::addConnection() {
    MysqlConn* conn = new MysqlConn();
    if (!conn->isConnected()) {
        cout << "ConnectionPool connect to mysql is failed!" << endl;
        delete conn;
        return;
    }
    conn->refreshActiveTime();
    m_connkQueue.push(conn);
    m_cond.notify_one();   // å”¤é†’一个线程
}
bool ConnectionPool::addConnection()
{
   MysqlConn* conn = new MysqlConn;
   bool res = conn->connect(m_user, m_passwd, m_dbName, m_ip, m_port);
   if (res) {
      // åˆ·æ–°ç©ºé—²æ—¶é—´
      conn->refreashAliveTime();
      m_connections.push(conn);
      ++m_num;
      return true;
   }
   else {
      delete conn;
      return false; // æç¤º
   }
void ConnectionPool::produce() {
    while (true) {
        unique_lock<mutex> lock(m_mutex);
        while (m_connkQueue.size() >= m_min_conn) { // è¿žæŽ¥é˜Ÿåˆ—的数量大于最小的连接数
            m_cond.wait(lock);
        }
        addConnection();
    }
}
void ConnectionPool::productConnection()
{
   while (true) {
      unique_lock<mutex> lc(m_mutex);
      m_cond.wait(lc, [this]() {return m_connections.empty(); });
      if (m_num < m_maxSize) {
         bool flag = addConnection();
         if (!flag) {
            return;
         }
      }
      // å”¤é†’
      m_cond1.notify_all();
   }
// åˆ é™¤ç©ºé—²è¿žæŽ¥
void ConnectionPool::recycle() {
    while (true) {
        this_thread::sleep_for(chrono::milliseconds(500));   // ä¼‘眠 500 ms
        unique_lock<mutex> lock(m_mutex);
        while (m_connkQueue.size() > m_min_conn) {
            MysqlConn* conn = m_connkQueue.front();
            if (conn->getActiveTime() >= max_del_time) {
                m_connkQueue.pop();
                delete conn;
            }
            else {
                break;
            }
        }
    }
}
void ConnectionPool::recycleConnection()
{
   while (true) {
      // ä¼‘眠一段时间 0.5s
      this_thread::sleep_for(milliseconds(500));
      lock_guard<mutex> lc(m_mutex);
      while (!m_connections.empty() && m_num > m_minSize) {
         MysqlConn* conn = m_connections.front();
         if (conn->getAliveTime() >= m_maxIdleTime) {
            m_connections.pop();
            delete conn;
            --m_num;
         }
         else {
            break;
         }
      }
   }
shared_ptr<MysqlConn> ConnectionPool::getMysqlConn() {
    unique_lock<mutex> lock(m_mutex);
    while (m_connkQueue.empty()) {
        // å¦‚果等待一段时间后,队列还是为空,返回一个 null
        if (cv_status::timeout == m_cond.wait_for(lock, chrono::milliseconds(m_timeout))) {
            if (m_connkQueue.empty()) return nullptr;
        }
    }
    shared_ptr<MysqlConn> connPtr(move(m_connkQueue.front()), [this](MysqlConn* conn) {
        unique_lock<mutex> lock(m_mutex);
        conn->refreshActiveTime();
        m_connkQueue.push(conn);
        });
    m_connkQueue.pop();
    m_cond.notify_one();   // å”¤é†’阻塞的生产者线程,开始生产
    return connPtr;
}
ConnectionPool* ConnectionPool::getConnectPool()
{
   // ä¸ä½¿ç”¨äº’斥锁的线程安全的懒汉模式
   static ConnectionPool pool; // åªåœ¨ç¬¬ä¸€æ¬¡è°ƒç”¨å‡½æ•°æ—¶åˆå§‹åŒ–
   return &pool;
void ConnectionPool::description() {
    cout << m_ip << ". " << m_userName << ". " << m_passwd << ". " << m_db << ". "
        << m_port << ". " << m_max_conn << ". " << m_min_conn << ". " << m_timeout << ". "
        << max_del_time << endl;
}
shared_ptr<MysqlConn> ConnectionPool::getConnection()
{
   unique_lock<mutex> lc(m_mutex);
   while (m_connections.empty()) {
      if (cv_status::timeout == m_cond1.wait_for(lc, chrono::milliseconds(m_timeout))) {
         if (m_connections.empty()) {
            // cout << "out of time" << endl;
            return nullptr; // ç»“束  // æç¤º
                        // continue; // åˆ©ç”¨while配合continue ç»§ç»­é˜»å¡ž
         }
      }
   }
   // è¦æŒ‡å®šåˆ é™¤å™¨destructor,来保证连接的归还
   shared_ptr<MysqlConn> conn(m_connections.front(), [this](MysqlConn* conn) {
      // åŠ é”ä¿è¯é˜Ÿåˆ—çº¿ç¨‹å®‰å…¨
      // m_mutex.lock(); // 1
      unique_lock<mutex> lc(m_mutex); // 2
      // lock_guard<mutex> lc(m_mutex); // 3
      conn->refreashAliveTime();
      m_connections.push(conn);
      // m_mutex.unlock(); // 1
      });
   m_connections.pop();
   m_cond.notify_all();
   return conn;
}
ConnectionPool* ConnectionPool::getInstance() {
    static ConnectionPool connPool;
    return &connPool;
}