240717班级,工业化控制系统,煤矿相关行业,昆仑系统
Administrator
2024-11-07 96e93f6a525896ece8dc35ff4a3c3f02fffd577a
Server/ÍõçûÔª/code/ConnectionPool.cpp
@@ -1,125 +1,101 @@
#include "stdafx.h"
#include "ConnectionPool.h"
#include <fstream>
#include <iostream>
#include <thread>
ConnectionPool* ConnectionPool::getInstance() {
   static ConnectionPool connPool;
   return &connPool;
}
ConnectionPool::ConnectionPool() {
   // åŠ è½½é…ç½®æ–‡ä»¶
   if (!parseJsonFile()) {
      std::cout << "parseJsonFile is failed!" << std::endl;
      return;
   }
   for ( int i = 0 ; i < m_min_conn ; i++ ) {
      addConnection( );
   }
   // åˆ›å»ºä¸€ä¸ªçº¿ç¨‹ï¼Œå¯¹è¿žæŽ¥æ•°è¿›è¡Œç›‘控 ï¼Œè¿žæŽ¥æ•°ä¸è¶³å°±å†ç»§ç»­åˆ›å»º
   std::thread producer(&ConnectionPool::produce, this);
   // å¯¹è¿žæŽ¥æ•°è¿›è¡Œç›‘控 ï¼Œå¦‚果有太多空闲得线程 ï¼Œé‚£ä¹ˆå°±å¯¹å…¶è¿›è¡Œé”€æ¯
   std::thread recycler( &ConnectionPool::recycle,  this);
   // çº¿ç¨‹åˆ†ç¦»
   producer.detach();
   recycler.detach();
    m_min_conn = 5; // å‡è®¾æœ€å°è¿žæŽ¥æ•°ä¸º 5
    m_max_conn = 10; // å‡è®¾æœ€å¤§è¿žæŽ¥æ•°ä¸º 10
    m_timeout = 1000; // å‡è®¾è¿žæŽ¥è¶…时时间为 1000 æ¯«ç§’
    max_del_time = 60000; // å‡è®¾æœ€å¤§ç©ºé—²æ—¶é—´ä¸º 60000 æ¯«ç§’
    for (int i = 0; i < m_min_conn; i++) {
        addConnection();
    }
    // åˆ›å»ºä¸€ä¸ªçº¿ç¨‹ï¼Œå¯¹è¿žæŽ¥æ•°è¿›è¡Œç›‘控 ï¼Œè¿žæŽ¥æ•°ä¸è¶³å°±å†ç»§ç»­åˆ›å»º
    thread producer(&ConnectionPool::produce, this);
    // å¯¹è¿žæŽ¥æ•°è¿›è¡Œç›‘控 ï¼Œå¦‚果有太多空闲得线程 ï¼Œé‚£ä¹ˆå°±å¯¹å…¶è¿›è¡Œé”€æ¯
    thread recycler(&ConnectionPool::recycle, this);
    // çº¿ç¨‹åˆ†ç¦»
    producer.detach();
    recycler.detach();
}
bool ConnectionPool::parseJsonFile() {
   /*std::ifstream ifs("dbconf.json");
   Reader rd;
   Value root;
   rd.parse(ifs, root);
   if ( root.isObject( ) ) {
      m_ip = root["ip"].asString();
      m_port = root["port"].asInt();
      m_userName = root["userName"].asString();
      m_passwd = root["password"].asString();
      m_db = root["dbName"].asString();
      m_min_conn = root["minSize"].asInt();
      m_max_conn = root["maxSize"].asInt();
      max_del_time = root["maxDleTime"].asInt();
      m_timeout = root["timeout"].asInt();
      return true;
   }*/
   return false;
}
void ConnectionPool::description() {
   std::cout << m_ip << ". " << m_userName << ". " << m_passwd << ". " << m_db << ". "
             << m_port << ". " << m_max_conn << ". " << m_min_conn << ". " << m_timeout << ". "
             << max_del_time << std::endl;
}
ConnectionPool::~ConnectionPool() {
   while (!m_connkQueue.empty()) {
      MysqlConn* conn = m_connkQueue.front();
      m_connkQueue.pop();
      delete  conn;
   }
    while (!m_connkQueue.empty()) {
        MysqlConn* conn = m_connkQueue.front();
        m_connkQueue.pop();
        delete conn;
    }
}
void ConnectionPool::addConnection() {
   MysqlConn* conn = new MysqlConn();
   if ( !conn->connect( m_ip, m_userName, m_passwd, m_db, m_port  ) ) {
      std::cout << "ConnectionPool connect to mysql is failed!" << std::endl;
      delete conn;
      return;
   }
   conn->refreshActiveTime( );
   m_connkQueue.push(conn);
   m_cond.notify_one();   // å”¤é†’一个线程
    MysqlConn* conn = new MysqlConn();
    if (!conn->isConnected()) {
        cout << "ConnectionPool connect to mysql is failed!" << endl;
        delete conn;
        return;
    }
    conn->refreshActiveTime();
    m_connkQueue.push(conn);
    m_cond.notify_one();   // å”¤é†’一个线程
}
void ConnectionPool::produce(){
   while (true) {
      std::unique_lock<std::mutex> lock(m_mutex);
      while (m_connkQueue.size() >= m_min_conn) { // è¿žæŽ¥é˜Ÿåˆ—的数量大于最小的连接数
         m_cond.wait(lock);
      }
      addConnection();
   }
void ConnectionPool::produce() {
    while (true) {
        unique_lock<mutex> lock(m_mutex);
        while (m_connkQueue.size() >= m_min_conn) { // è¿žæŽ¥é˜Ÿåˆ—的数量大于最小的连接数
            m_cond.wait(lock);
        }
        addConnection();
    }
}
// åˆ é™¤ç©ºé—²è¿žæŽ¥
void ConnectionPool::recycle() {
   while ( true ) {
      std::this_thread::sleep_for( std::chrono::milliseconds(500));   // ä¼‘眠500 ms
      std::unique_lock<std::mutex>lock(m_mutex);
      while ( m_connkQueue.size() > m_min_conn ) {
         MysqlConn* conn = m_connkQueue.front( );
         if (conn->getActiveTime() >= max_del_time) {
            m_connkQueue.pop();
            delete conn;
         }
         else {
            break;
         }
      }
   }
    while (true) {
        this_thread::sleep_for(chrono::milliseconds(500));   // ä¼‘眠 500 ms
        unique_lock<mutex> lock(m_mutex);
        while (m_connkQueue.size() > m_min_conn) {
            MysqlConn* conn = m_connkQueue.front();
            if (conn->getActiveTime() >= max_del_time) {
                m_connkQueue.pop();
                delete conn;
            }
            else {
                break;
            }
        }
    }
}
std::shared_ptr<MysqlConn>  ConnectionPool::getMysqlConn() {
   std::unique_lock<std::mutex>lock(m_mutex);
   while ( m_connkQueue.empty()) {
      // å¦‚果等待一段时间后,队列还是为空,返回一个 null
      if ( std::cv_status::timeout == m_cond.wait_for(lock, std::chrono::milliseconds(m_timeout)) ) {
         if( m_connkQueue.empty( ) )    return nullptr;
      }
   }
   std::shared_ptr<MysqlConn> connPtr(std::move(m_connkQueue.front()), [this](MysqlConn* conn) {
      std::unique_lock <std::mutex>lock(m_mutex) ;
      conn->refreshActiveTime();
      m_connkQueue.push(conn);
      });
   m_connkQueue.pop();
   m_cond.notify_one();   // å”¤é†’阻塞的生产者线程,开始生产
   return connPtr;
shared_ptr<MysqlConn> ConnectionPool::getMysqlConn() {
    unique_lock<mutex> lock(m_mutex);
    while (m_connkQueue.empty()) {
        // å¦‚果等待一段时间后,队列还是为空,返回一个 null
        if (cv_status::timeout == m_cond.wait_for(lock, chrono::milliseconds(m_timeout))) {
            if (m_connkQueue.empty()) return nullptr;
        }
    }
    shared_ptr<MysqlConn> connPtr(move(m_connkQueue.front()), [this](MysqlConn* conn) {
        unique_lock<mutex> lock(m_mutex);
        conn->refreshActiveTime();
        m_connkQueue.push(conn);
        });
    m_connkQueue.pop();
    m_cond.notify_one();   // å”¤é†’阻塞的生产者线程,开始生产
    return connPtr;
}
void ConnectionPool::description() {
    cout << m_ip << ". " << m_userName << ". " << m_passwd << ". " << m_db << ". "
        << m_port << ". " << m_max_conn << ". " << m_min_conn << ". " << m_timeout << ". "
        << max_del_time << endl;
}
ConnectionPool* ConnectionPool::getInstance() {
    static ConnectionPool connPool;
    return &connPool;
}