混合编程python与C++-当前关注

来源:博客园   2023-06-07 22:39:27
A+A-


(相关资料图)

上个版本: 只是用到ctypes进行传输, 这次将python服务端更改为C++服务端,方便后续维护.本文实现功能: python传输图片给C++, C++接受图片后对图片进行处理,并将结果返回给python客户端, pass image from python to C++

C++ 服务端.h文件

注意文中的model

// .h#pragma once#include #include #include #include #include #include #include #include #include using namespace std;using namespace cv;class ModelManager;class ServerManager{private:int m_port;char *m_addr;cv::VideoCapture m_cap;int m_server;int m_accept; // client connpublic:bool initialization(const int &port, const cv::VideoCapture &cap, char *addr = nullptr);bool initialization(const int &port, char *addr = nullptr);bool build_connect();bool acceptClient();void error_print(const char *ptr);bool free_connect();bool send_data_frame(ModelManager& model);bool receive_data_frame(cv::Mat &frame, ModelManager& model);};
.cpp文件
#include "ServerManager.h"#include "ModelManager.h"#define BUFFER_SIZE 65538void ServerManager::error_print(const char * ptr) {        perror(ptr);        exit(EXIT_FAILURE);}bool ServerManager::initialization(const int& port, const cv::VideoCapture& cap, char* addr){    m_port = htons(port);m_addr = addr;m_cap = cap;return true;}bool ServerManager::initialization(const int& port, char* addr){    m_port = htons(port);    m_addr = addr;return true;}bool ServerManager::build_connect() {struct sockaddr_in server_addr;    bzero(&server_addr,sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = m_addr?inet_addr(m_addr):INADDR_ANY;  server_addr.sin_port = m_port;// create socket m_server = socket(AF_INET, SOCK_STREAM, 0);if(m_server < 0)        error_print("socket bind error");// can reuse port    int on = 1;    if(setsockopt(m_server,SOL_SOCKET,SO_REUSEADDR,&on,sizeof(on)) < 0)        error_print("setsockopt error");    // bind addr    if(bind(m_server, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0)        error_print("bind error");    // listen only one client    if(listen(m_server, 1) < 0)        error_print("listen failed");    cout << "ServerManager is listening, plesae wait..." << endl;return true;}bool ServerManager::acceptClient(){struct sockaddr_in accept_addr;socklen_t accept_len = sizeof(accept_addr);bzero(&accept_addr,sizeof(accept_addr));// accept client connection    if((m_accept = accept(m_server,(struct sockaddr*)&accept_addr,&accept_len)) < 0)        error_print("accept error");std::cout << "Connection established" << std::endl;return true;}bool ServerManager::send_data_frame(ModelManager& model) {char *json_output = nullptr;json_output = model.createJson();if (json_output == nullptr) {return false;}// printf("send data %s\n", json_output);// just send json_output, dont memcpy new char*!!! it wastes me two hours// send jsonint result = send(m_accept, json_output, strlen(json_output), 0);if (result == -1) {cout << "send fail" << endl;return false;}return true;}bool ServerManager::receive_data_frame(Mat& frame, ModelManager& model) {// recv frame sizeint data_size;if (recv(m_accept, &data_size, sizeof(data_size), 0) != sizeof(data_size)) {// when client close, then close connectionclose(m_accept);cout << "close connection to client" << endl;acceptClient(); // restart a new accept, to accept new connectionreturn false;}cout << data_size << endl;// recv frame data    // char buf[data_size];// std::vector decode;// int bytes_received = 0;// do// {// int nBytes = recv(m_accept, buf, data_size - bytes_received, 0);// for (int i = 0; i < nBytes; i++)  // maybe can use memcpy, maybe faster// {// decode.emplace_back(buf[i]);// }// cout << bytes_received << endl;// bytes_received += nBytes;// } while (bytes_received < data_size);    char *recv_char = new char[data_size];std::vector decode(data_size, 0);int index = 0;int bytes_received = 0;int count = data_size;while (count > 0)// if count >= 0, dead loop{int iRet = recv(m_accept, recv_char, count, 0);if (index >= data_size) index = data_size;memcpy(&decode[index], recv_char , iRet);index += iRet;if (!iRet) { return -1; }count -= iRet;}// decode messageframe = imdecode(decode, cv::IMREAD_COLOR);// push into Model"s queueMatmodel.mtxQueueImg.lock();model.queueMat.push(frame);model.mtxQueueImg.unlock();return true;}bool ServerManager::free_connect() {m_cap.release();close(m_accept);    close(m_server);return true;}
C++ model部分代码.h文件
#pragma once#include "CV_Classify.h"#include "CV_Detect.h"#include "ServerManager.h"#include #include #include #include  // usleep#include #include "cJSON.h"#include using namespace std;using namespace cv;class ModelManager{public:    Detect objdetect;    Classify objclassify;    std::mutex mtxQueueDet;  // mutex for detect queue    std::mutex mtxQueueImg;  // mutex for image queue    std::mutex mtxQueueCls;  // mutex for classify queue    std::queue queueMat;    std::queue queueDetOut;// Detect queue    std::queue queueClsOut;// Classify queue        bool DetectFlag = true;    bool ClassifyFlag = true;    bool empty_flag = false;    friend class ServerManager;public:    void initDetectModel() ;    void initClassifyModel() ;    void DetectImg();    void ClassifyImg();    void getClsResult(ObjClassifyOutput &output);    // ObjClassifyOutput getClsResult();    char* createJson();};
.cpp文件

部分有删减,createJson可参考使用,利用json来传递值

#include "ModelManager.h"void ModelManager::initDetectModel() {    std::string config_path = "DetectConfig.yaml";    objdetect.Init(config_path, 1);}void ModelManager::initClassifyModel() {    std::string config_path = "ClassiflyConfig.yaml";    objclassify.Init(config_path, 1);}void ModelManager::DetectImg(){    DetectInput detect_input;    DetectOutput detect_output;    cv::Mat frame;    size_t mm = 0;    while(1)    {         if (queueMat.empty())         {            if(!DetectFlag)            {                break;            }usleep(2000);            continue;}        // get image from queueMat        mtxQueueImg.lock();        frame = queueMat.front();        queueMat.pop();        mtxQueueImg.unlock();        // run model        objdetect.Run(detect_input, detect_output);        // push detect result into queueDetOut        mtxQueueDet.lock();        queueDetOut.push(detect_output);        // cout << "detect run !!" << endl;        mtxQueueDet.unlock();    }    return;}void ModelManager::ClassifyImg(){    ObjClassifyInput input;    ObjClassifyOutput output;    cv::Mat frame;    Detoutput detect_result;    while(1)    {        if (queueDetOut.empty())         {            if(!ClassifyFlag)            {                break;            }usleep(2000);            continue;}        // get detect from queueDetOut        mtxQueueDet.lock();        detect_result = queueDetOut.front();        queueDetOut.pop();        mtxQueueDet.unlock();        // run model        objclassify.Run(input, output);        // push cls result into queueClsOut        mtxQueueCls.lock();        queueClsOut.push(output);        mtxQueueCls.unlock();      }    return;}void ModelManager::getClsResult(ObjClassifyOutput& output){    if (queueClsOut.empty()){        output.object_list.object_num = -1;  // -1 is now empty;        return; // must return in thread otherwise cant use &output    }    output = queueClsOut.front();    queueClsOut.pop();    return;}char* ModelManager::createJson()  // dont know why cant use &value, need return value{    mtxQueueCls.lock();    ObjClassifyOutput output;    getClsResult(output);     mtxQueueCls.unlock();if (output.object_list.object_num == -1){return nullptr;}// prepare send data jsoncJSON* json_object_list = NULL;cJSON* json_ObjClassifyOutput = NULL;json_ObjClassifyOutput = cJSON_CreateObject();json_object_list = cJSON_CreateObject();cJSON_AddItemToObject(json_ObjClassifyOutput, "object_list", json_object_list);int obj_num = output.object_list.object_num;cJSON_AddNumberToObject(json_object_list, "object_num", obj_num);for (int i = 0; i < obj_num; ++i){cJSON* json_object = cJSON_CreateObject();cJSON* json_box = cJSON_CreateObject();cJSON_AddNumberToObject(json_box,"x", output.object_list.object[i].bbox.x);cJSON_AddNumberToObject(json_box,"y", output.object_list.object[i].bbox.y);cJSON_AddNumberToObject(json_box,"w", output.object_list.object[i].bbox.w);cJSON_AddNumberToObject(json_box,"h", output.object_list.object[i].bbox.h);cJSON_AddItemToObject(json_object,"bbox", json_box);cJSON_AddNumberToObject(json_object, "classes", output.object_list.object[i].classes);cJSON_AddNumberToObject(json_object, "objectness", output.object_list.object[i].objectness);        // double prob = output.object_list.object[i].prob;// cJSON_AddNumberToObject(json_object, "prob", prob); // pointer cant use?string str = "object" + to_string(i);        cJSON_AddItemToObject(json_object_list, str.c_str(), json_object);        // printf("prob: %f", output.object_list.object[i].prob);}char* json_output = cJSON_Print(json_ObjClassifyOutput);    cJSON_Delete(json_ObjClassifyOutput);    return json_output;}
C++ 服务端运行
#include <../include/ServerManager.h>#include <../include/ModelManager.h>#include #define PORT 8080void recvServer(ServerManager& s, ModelManager& model){    int idx = 0;        while (true){        // auto start = std::chrono::steady_clock::now();        cv::Mat frame;        s.receive_data_frame(frame, model);        // cal time cost        // auto end = std::chrono::steady_clock::now();        // std::chrono::duration elapsed = end - start;        // std::cout << "recv execution time: " << elapsed.count() << " ms\n";        if (frame.empty()) {            usleep(2000);            continue;        }        // cv::imwrite("image"+to_string(idx++)+".jpg", frame);        std::cout << "Image " << idx++ <<" received !!" << std::endl;    }}void sendServer(ServerManager& s, ModelManager& model){    while (true){        if (s.send_data_frame(model)) {            cout << "send success!!" << endl;            cout << endl;        }else{            // cout << "send fail!!" << endl;            usleep(2000);        }    }} int main(){    ServerManager s;    ModelManager model;    model.initDetectModel();    model.initClassifyModel();    cout << endl;    s.initialization(PORT);    s.build_connect();    s.acceptClient();        thread recv_server(recvServer, std::ref(s), std::ref(model));    thread send_server(sendServer, std::ref(s), std::ref(model));    thread detect(&ModelManager::DetectImg, &model);    thread classfy(&ModelManager::ClassifyImg, &model);    detect.join();    classfy.join();    recv_server.join();    send_server.join();    return 0;}
python客户端
import jsonimport socketimport structimport timefrom multiprocessing import JoinableQueuefrom threading import Threadimport osfrom natsort import ns, natsortedhost = "192.168.0.2"  # "192.168.0.2" "localhost"port = 8080def img_encode(img_path):    img = cv2.imread(img_path)    # img = cv2.resize(img, (500, 500), interpolation=cv2.INTER_CUBIC)    img_param = [95]  # 图片压缩率0-100    _, img = cv2.imencode(".jpg", img, img_param)    img = img.tobytes()    return imgdef img_product(img_queue, path, path_mode="image"):    if path_mode == "image":        image = img_encode(path)        img_queue.put(image)    elif path_mode == "dir":        dir_list = os.listdir(path)        files = natsorted(dir_list, alg=ns.PATH)  # 顺序读取文件名        for filename in files:            img_path = path + "/" + filename            image = img_encode(img_path)            img_queue.put(image)        img_queue.put("E")    img_queue.join()def server_consumer(img_queue):    while True:        start = int(round(time.time() * 1000))        # 1. get img from queue        img_obj = img_queue.get()        img_queue.task_done()        # get end signal        if img_obj[0] == "E":        client.close()        break        # 2. send package(img_bytes_size, img_bytes)        pack_size = struct.pack("i", len(img_obj))        client.send(pack_size + img_obj)        end = int(round(time.time() * 1000))        data = client.recv(65536)        json_str = data.decode("utf8", "ignore").strip(b"\x00".decode())        results = json.loads(json_str)        end = int(round(time.time() * 1000))        end_time = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())        print("send and recv cost time: ", (end - start))        print(results)        if __name__ == "__main__":        client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)        client.connect((host, port))        img_dir = "data"        one_img = "./data/image.jpg"        mode = "dir"        img_jq = JoinableQueue()        producer = Thread(target=img_product, args=(img_jq, img_dir, mode,))        consumer = Thread(target=server_consumer, args=(img_jq,))        producer.daemon = True  # set daemon but not set join()        producer.start()        consumer.start()    # producer.join() // 让生产者先关闭,防止close错误        consumer.join()
总结其实这个项目真正做完感觉还是挺简单, 就是对socket通信不太熟悉, 以及传图片没做过.实际上传图片只需要读取图片后,imencode,然后tobytes,最后发送size和data即可.而接受端只需要拼接数组,然后imdecode即可.另外传输结果的话利用json传输可以让结果可读性可高, 传输也比较方便, 当时copy别人的发送代码, 没有细看,导致使用memcpy让json格式乱码,导致无法解码json.如果你感觉接收端没问题,一定要看看发送端.之后的新任务就是视频传输利用rtsp流,敬请期待参考博客use memcpy in receive_frame function if you want,对应github地址a pure client and server code to create a simple demo如果用ctrl+z中断,可能导致address in use,使用bg/fg用户可以使用fg/bg操作继续前台或后台的任务,fg命令重新启动前台被中断的任务,bg命令把被中断的任务放在后台执行zmq send img优化imdecode速度,本代码未使用视频传输跟图片传输差不多Linux c++获取本地毫秒级精确时间如果不想用json,可以试试struct利用json来传输分类结果

标签:


Copyright ©  2015-2022 西方办公网版权所有  备案号:沪ICP备2020036824号-7   联系邮箱:5 626 629 @qq.com