본문 바로가기
-----ETC-----/C++ 성능 최적화 및 고급 테크닉 시리즈

[C++ 성능 최적화 및 고급 테크닉] Day 28: 프로젝트: 실시간 데이터 처리 시스템 개발 (2)

by cogito21_cpp 2024. 8. 1.
반응형

프로젝트 목표

이번 단계에서는 실시간 데이터 처리 시스템의 성능을 향상시키기 위한 최적화 기법을 적용하고, 시스템의 신뢰성과 확장성을 높이기 위한 추가적인 기능을 구현합니다. 특히, 다음과 같은 부분을 다룹니다:

  1. 성능 최적화: 데이터 처리의 병렬화를 더욱 개선하고, 효율적인 데이터 구조를 사용합니다.
  2. 신뢰성 향상: 오류 처리와 로그 기능을 추가하여 시스템의 신뢰성을 높입니다.
  3. 확장성 향상: 시스템이 대규모 데이터를 효율적으로 처리할 수 있도록 합니다.

 

Step 1: 성능 최적화

데이터 처리의 병렬화를 개선하고, 효율적인 데이터 구조를 사용하여 성능을 최적화합니다.

 

DataCollector 클래스 개선

DataCollector.h

#ifndef DATACOLLECTOR_H
#define DATACOLLECTOR_H

#include <string>
#include <queue>
#include <mutex>
#include <condition_variable>
#include <thread>
#include <atomic>

class DataCollector {
private:
    std::queue<std::string> dataQueue;
    std::mutex mtx;
    std::condition_variable cv;
    std::atomic<bool> stopFlag;
    std::thread collectorThread;

    void collectDataFromFile(const std::string& fileName);

public:
    DataCollector();
    ~DataCollector();
    void startCollecting(const std::string& fileName);
    void stopCollecting();
    std::string getNextData();
};

#endif // DATACOLLECTOR_H

 

DataCollector.cpp

#include "DataCollector.h"
#include <fstream>
#include <iostream>

DataCollector::DataCollector() : stopFlag(false) {}

DataCollector::~DataCollector() {
    stopCollecting();
}

void DataCollector::startCollecting(const std::string& fileName) {
    collectorThread = std::thread(&DataCollector::collectDataFromFile, this, fileName);
}

void DataCollector::stopCollecting() {
    stopFlag = true;
    cv.notify_all();
    if (collectorThread.joinable()) {
        collectorThread.join();
    }
}

void DataCollector::collectDataFromFile(const std::string& fileName) {
    std::ifstream file(fileName);
    if (!file) {
        std::cerr << "Failed to open file: " << fileName << std::endl;
        return;
    }

    std::string line;
    while (std::getline(file, line) && !stopFlag) {
        {
            std::unique_lock<std::mutex> lock(mtx);
            dataQueue.push(line);
        }
        cv.notify_one();
    }
}

std::string DataCollector::getNextData() {
    std::unique_lock<std::mutex> lock(mtx);
    cv.wait(lock, [this] { return !dataQueue.empty() || stopFlag; });
    if (dataQueue.empty()) {
        return "";
    }
    std::string data = dataQueue.front();
    dataQueue.pop();
    return data;
}

 

Step 2: 신뢰성 향상

오류 처리와 로그 기능을 추가하여 시스템의 신뢰성을 높입니다.

 

Logger 클래스 설계

Logger.h

#ifndef LOGGER_H
#define LOGGER_H

#include <string>
#include <fstream>
#include <mutex>

class Logger {
private:
    std::ofstream logFile;
    std::mutex mtx;

public:
    Logger(const std::string& fileName);
    ~Logger();
    void log(const std::string& message);
};

#endif // LOGGER_H

 

Logger.cpp

#include "Logger.h"
#include <iostream>
#include <chrono>
#include <ctime>

Logger::Logger(const std::string& fileName) {
    logFile.open(fileName, std::ios::app);
    if (!logFile) {
        std::cerr << "Failed to open log file: " << fileName << std::endl;
    }
}

Logger::~Logger() {
    if (logFile.is_open()) {
        logFile.close();
    }
}

void Logger::log(const std::string& message) {
    std::lock_guard<std::mutex> lock(mtx);
    auto now = std::chrono::system_clock::now();
    auto now_c = std::chrono::system_clock::to_time_t(now);
    logFile << std::put_time(std::localtime(&now_c), "%F %T") << " - " << message << std::endl;
}

 

Step 3: 확장성 향상

시스템이 대규모 데이터를 효율적으로 처리할 수 있도록, 데이터 처리 파이프라인을 개선합니다.

 

DataProcessor 클래스 개선

DataProcessor.h

#ifndef DATAPROCESSOR_H
#define DATAPROCESSOR_H

#include <string>
#include <thread>
#include <vector>
#include <atomic>

class DataProcessor {
private:
    std::vector<std::thread> threads;
    std::atomic<bool> stopFlag;

    void processThread(const std::function<std::string()>& getData, const std::function<void(const std::string&)>& processData);

public:
    DataProcessor();
    ~DataProcessor();
    void startProcessing(int numThreads, const std::function<std::string()>& getData, const std::function<void(const std::string&)>& processData);
    void stopProcessing();
};

#endif // DATAPROCESSOR_H

 

DataProcessor.cpp

#include "DataProcessor.h"
#include <iostream>

DataProcessor::DataProcessor() : stopFlag(false) {}

DataProcessor::~DataProcessor() {
    stopProcessing();
}

void DataProcessor::startProcessing(int numThreads, const std::function<std::string()>& getData, const std::function<void(const std::string&)>& processData) {
    for (int i = 0; i < numThreads; ++i) {
        threads.emplace_back(&DataProcessor::processThread, this, getData, processData);
    }
}

void DataProcessor::stopProcessing() {
    stopFlag = true;
    for (auto& t : threads) {
        if (t.joinable()) {
            t.join();
        }
    }
}

void DataProcessor::processThread(const std::function<std::string()>& getData, const std::function<void(const std::string&)>& processData) {
    while (!stopFlag) {
        std::string data = getData();
        if (!data.empty()) {
            processData(data);
        }
    }
}

 

통합 테스트

데이터 수집, 처리, 저장 과정을 통합하여 테스트합니다.

 

main.cpp

#include "DataCollector.h"
#include "DataProcessor.h"
#include "DataStorage.h"
#include "Logger.h"
#include <thread>
#include <vector>

int main() {
    DataCollector collector;
    DataProcessor processor;
    DataStorage storage;
    Logger logger("system.log");

    // 데이터 수집 시작
    collector.startCollecting("data.txt");

    // 데이터 처리 및 저장 시작
    processor.startProcessing(4, [&collector]() {
        std::string data = collector.getNextData();
        return data;
    }, [&storage, &logger](const std::string& data) {
        // 데이터 처리 예제
        logger.log("Processing data: " + data);
        storage.storeData(data, "processed_data.txt");
    });

    std::this_thread::sleep_for(std::chrono::seconds(5));

    // 시스템 종료
    collector.stopCollecting();
    processor.stopProcessing();

    return 0;
}

 

데이터 파일 준비

데이터 수집을 테스트하기 위해 data.txt 파일을 준비합니다. 이 파일에는 테스트할 데이터가 포함되어야 합니다.

data.txt

Sample data line 1
Sample data line 2
Sample data line 3
...

 

이제 스물여덟 번째 날의 학습을 마쳤습니다. 실시간 데이터 처리 시스템 개발 프로젝트의 두 번째 단계를 통해 성능 최적화, 신뢰성 향상, 확장성 향상을 위한 기능을 추가했습니다.

질문이나 피드백이 있으면 언제든지 댓글로 남겨 주세요. 내일은 "프로젝트: 실시간 데이터 처리 시스템 개발 (3)"에 대해 학습하겠습니다.

반응형