-----ETC-----/C++ 성능 최적화 및 고급 테크닉 시리즈

[C++ 성능 최적화 및 고급 테크닉] Day 28: 프로젝트: 실시간 데이터 처리 시스템 개발 (2)

cogito21_cpp 2024. 8. 1. 22:44
반응형

프로젝트 목표

이번 단계에서는 실시간 데이터 처리 시스템의 성능을 향상시키기 위한 최적화 기법을 적용하고, 시스템의 신뢰성과 확장성을 높이기 위한 추가적인 기능을 구현합니다. 특히, 다음과 같은 부분을 다룹니다:

  1. 성능 최적화: 데이터 처리의 병렬화를 더욱 개선하고, 효율적인 데이터 구조를 사용합니다.
  2. 신뢰성 향상: 오류 처리와 로그 기능을 추가하여 시스템의 신뢰성을 높입니다.
  3. 확장성 향상: 시스템이 대규모 데이터를 효율적으로 처리할 수 있도록 합니다.

 

Step 1: 성능 최적화

데이터 처리의 병렬화를 개선하고, 효율적인 데이터 구조를 사용하여 성능을 최적화합니다.

 

DataCollector 클래스 개선

DataCollector.h

#ifndef DATACOLLECTOR_H
#define DATACOLLECTOR_H

#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>

class DataCollector {
private:
    std::queue<std::string> dataQueue;
    std::mutex mtx;
    std::condition_variable cv;
    std::atomic<bool> stopFlag;
    std::thread collectorThread;

    void collectDataFromFile(const std::string& fileName);

public:
    DataCollector();
    ~DataCollector();
    void startCollecting(const std::string& fileName);
    void stopCollecting();
    std::string getNextData();
};

#endif // DATACOLLECTOR_H

 

DataCollector.cpp

#include "DataCollector.h"
#include <fstream>
#include <iostream>

DataCollector::DataCollector() : stopFlag(false) {}

DataCollector::~DataCollector() {
    stopCollecting();
}

void DataCollector::startCollecting(const std::string& fileName) {
    collectorThread = std::thread(&DataCollector::collectDataFromFile, this, fileName);
}

void DataCollector::stopCollecting() {
    stopFlag = true;
    cv.notify_all();
    if (collectorThread.joinable()) {
        collectorThread.join();
    }
}

void DataCollector::collectDataFromFile(const std::string& fileName) {
    std::ifstream file(fileName);
    if (!file) {
        std::cerr << "Failed to open file: " << fileName << std::endl;
        return;
    }

    std::string line;
    while (std::getline(file, line) && !stopFlag) {
        {
            std::unique_lock<std::mutex> lock(mtx);
            dataQueue.push(line);
        }
        cv.notify_one();
    }
}

std::string DataCollector::getNextData() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, [this] { return !dataQueue.empty() || stopFlag; });
    if (dataQueue.empty()) {
        return "";
    }
    std::string data = dataQueue.front();
    dataQueue.pop();
    return data;
}

 

Step 2: 신뢰성 향상

오류 처리와 로그 기능을 추가하여 시스템의 신뢰성을 높입니다.

 

Logger 클래스 설계

Logger.h

#ifndef LOGGER_H
#define LOGGER_H

#include <string>
#include <fstream>
#include <mutex>

class Logger {
private:
    std::ofstream logFile;
    std::mutex mtx;

public:
    Logger(const std::string& fileName);
    ~Logger();
    void log(const std::string& message);
};

#endif // LOGGER_H

 

Logger.cpp

#include "Logger.h"
#include <iostream>
#include <chrono>
#include <ctime>

Logger::Logger(const std::string& fileName) {
    logFile.open(fileName, std::ios::app);
    if (!logFile) {
        std::cerr << "Failed to open log file: " << fileName << std::endl;
    }
}

Logger::~Logger() {
    if (logFile.is_open()) {
        logFile.close();
    }
}

void Logger::log(const std::string& message) {
    std::lock_guard<std::mutex> lock(mtx);
    auto now = std::chrono::system_clock::now();
    auto now_c = std::chrono::system_clock::to_time_t(now);
    logFile << std::put_time(std::localtime(&now_c), "%F %T") << " - " << message << std::endl;
}

 

Step 3: 확장성 향상

시스템이 대규모 데이터를 효율적으로 처리할 수 있도록, 데이터 처리 파이프라인을 개선합니다.

 

DataProcessor 클래스 개선

DataProcessor.h

#ifndef DATAPROCESSOR_H
#define DATAPROCESSOR_H

#include <string>
#include <thread>
#include <vector>
#include <atomic>

class DataProcessor {
private:
    std::vector<std::thread> threads;
    std::atomic<bool> stopFlag;

    void processThread(const std::function<std::string()>& getData, const std::function<void(const std::string&)>& processData);

public:
    DataProcessor();
    ~DataProcessor();
    void startProcessing(int numThreads, const std::function<std::string()>& getData, const std::function<void(const std::string&)>& processData);
    void stopProcessing();
};

#endif // DATAPROCESSOR_H

 

DataProcessor.cpp

#include "DataProcessor.h"
#include <iostream>

DataProcessor::DataProcessor() : stopFlag(false) {}

DataProcessor::~DataProcessor() {
    stopProcessing();
}

void DataProcessor::startProcessing(int numThreads, const std::function<std::string()>& getData, const std::function<void(const std::string&)>& processData) {
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(&DataProcessor::processThread, this, getData, processData);
    }
}

void DataProcessor::stopProcessing() {
    stopFlag = true;
    for (auto& t : threads) {
        if (t.joinable()) {
            t.join();
        }
    }
}

void DataProcessor::processThread(const std::function<std::string()>& getData, const std::function<void(const std::string&)>& processData) {
    while (!stopFlag) {
        std::string data = getData();
        if (!data.empty()) {
            processData(data);
        }
    }
}

 

통합 테스트

데이터 수집, 처리, 저장 과정을 통합하여 테스트합니다.

 

main.cpp

#include "DataCollector.h"
#include "DataProcessor.h"
#include "DataStorage.h"
#include "Logger.h"
#include <thread>
#include <vector>

int main() {
    DataCollector collector;
    DataProcessor processor;
    DataStorage storage;
    Logger logger("system.log");

    // 데이터 수집 시작
    collector.startCollecting("data.txt");

    // 데이터 처리 및 저장 시작
    processor.startProcessing(4, [&collector]() {
        std::string data = collector.getNextData();
        return data;
    }, [&storage, &logger](const std::string& data) {
        // 데이터 처리 예제
        logger.log("Processing data: " + data);
        storage.storeData(data, "processed_data.txt");
    });

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // 시스템 종료
    collector.stopCollecting();
    processor.stopProcessing();

    return 0;
}

 

데이터 파일 준비

데이터 수집을 테스트하기 위해 data.txt 파일을 준비합니다. 이 파일에는 테스트할 데이터가 포함되어야 합니다.

data.txt

Sample data line 1
Sample data line 2
Sample data line 3
...

 

이제 스물여덟 번째 날의 학습을 마쳤습니다. 실시간 데이터 처리 시스템 개발 프로젝트의 두 번째 단계를 통해 성능 최적화, 신뢰성 향상, 확장성 향상을 위한 기능을 추가했습니다.

질문이나 피드백이 있으면 언제든지 댓글로 남겨 주세요. 내일은 "프로젝트: 실시간 데이터 처리 시스템 개발 (3)"에 대해 학습하겠습니다.

반응형