IOCP Recv 완료 통지 함수인 RecvComplete를 구현했다.

 

64번을 반복하면서 클라이언트가 보낸 메세지를 확인한다.

 

packet을 처리할때 최소한 헤더 크기만큼은 데이터가 왔는지 먼저 확인한다.

이후 헤더를 뽑아보고 1차로 packetCode의 값을 확인한다.

데이터가 완벽하게 도착하지 않았으면 다음 RecvComplete를 기다리기 위해 반복문을 탈출한다.

이후 패킷을 꺼내서 Decode하고 통과하면 패킷을 처리한다.

 

void CoreNetwork::RecvComplete(Session* recvCompleteSesion, const DWORD& transferred)
{
	int loopCount = 0;
	const int MAX_PACKET_LOOP = 64;
	recvCompleteSesion->recvRingBuffer.MoveRear(transferred);

	Packet::EncodeHeader encodeHeader;
	Packet* packet = new Packet();

	while (loopCount++ < MAX_PACKET_LOOP)
	{
		packet->Clear();

		// 최소한 헤더 크기만큼은 데이터가 왔는지 확인한다.
		if (recvCompleteSesion->recvRingBuffer.GetUseSize() < sizeof(Packet::EncodeHeader))
		{
			break;
		}

		// 헤더를 뽑아본다.
		recvCompleteSesion->recvRingBuffer.Peek((char*)&encodeHeader, sizeof(Packet::EncodeHeader));
		if (encodeHeader.packetLen + sizeof(Packet::EncodeHeader) > recvCompleteSesion->recvRingBuffer.GetUseSize())
		{
			// 1차 패킷 코드인 52값이 아니라면 나감
			if (encodeHeader.packetCode != 52)
			{
				Disconnect(recvCompleteSesion->sessionId);
				break;
			}			
		}
		else
		{
			break;
		}

		InterlockedIncrement(&_recvPacketTPS);

		// 비정상적으로 너무 큰 패킷이 올경우 연결을 끊음
		if (encodeHeader.packetLen > PACKET_BUFFER_DEFAULT_SIZE)
		{
			Disconnect(recvCompleteSesion->sessionId);
			break;
		}

		// 헤더 크기만큼 front를 움직이기
		recvCompleteSesion->recvRingBuffer.MoveFront(sizeof(Packet::EncodeHeader));
		// 패킷 길이만큼 뽑아서 packet에 넣기
		recvCompleteSesion->recvRingBuffer.Dequeue(packet->GetRearBufferPtr(), encodeHeader.packetLen);
		// 헤더 설정
		packet->SetHeader((char*)&encodeHeader, sizeof(Packet::EncodeHeader));
		// 패킷 길이 만큼 rear 움직이기
		packet->MoveRearPosition(encodeHeader.packetLen);

		// 디코딩에 실패하면 연결을 끊음
		if (!packet->Decode())
		{
			Disconnect(recvCompleteSesion->sessionId);
			break;
		}	

		// 패킷 처리
		OnRecv(recvCompleteSesion->sessionId, packet);
	}
	
	delete packet;
}

 

IOCP에서 사용하는 WorkerThread를 구현했다.

기본적으로 GQCS( GetQueuedCompletionStatus )가 return 되기를 대기하고, return된 값에 따라 처리한다.

 

return 된 값이 0일 경우 fin 을 받은 것이므로 종료 처리하고, 0보다 클 경우 적절하게 recv 또는 send 처리한다.

 

unsigned __stdcall CoreNetwork::WorkerThreadProc(void* argument)
{
	CoreNetwork* instance = (CoreNetwork*)argument;

	if (instance != nullptr)
	{
		Session* completeSession = nullptr;
		while (1)
		{
			DWORD transferred = 0;
			OVERLAPPED* myOverlapped = nullptr;
			int completeRet;
			DWORD GQCSError;

			do
			{
				completeRet = GetQueuedCompletionStatus(instance->_HCP, &transferred,
					(PULONG_PTR)&completeSession, (LPOVERLAPPED*)&myOverlapped, INFINITE);
				if (myOverlapped == nullptr)
				{
					GQCSError = WSAGetLastError();
					wcout << L"MyOverlapped NULL " << GQCSError << endl;
					return -1;
				}

				// transferred가 0이라면 fin 패킷을 받은것이므로 종료처리한다.
				if (transferred == 0)
				{
					break;
				}

				// 전달받은 overlapped가 recvOverlapped라면 recv 완료 처리를 진행한다.
				if (myOverlapped == &completeSession->recvOverlapped)
				{
					instance->RecvComplete(completeSession, transferred);
				}
				// 전달받은 overlapped가 sendOverlapped라면 send 완료 처리를 진행한다.
				else if (myOverlapped == &completeSession->sendOverlapped)
				{
					instance->SendComplete(completeSession);
				}
			} while (0);

			// IO 처리가 하나 끝났으므로 IOCount를 감소시킨다.
			if (InterlockedDecrement64(&completeSession->IOBlock->IOCount) == 0)
			{
				instance->ReleaseSession(completeSession);
			}
		}
	}

	return 0;
}
#include"pch.h"
#include"ChattingServer.h"
#include"../Utils.h"
#include<rapidjson/document.h>

ChattingServer gChattingServer;

int main()
{
	std::string jsonStr = Utils::LoadFile(L"ServerInfo.json");	

	rapidjson::Document doc;
	if (doc.Parse(jsonStr.c_str()).HasParseError()) {
		std::cerr << "JSON 파싱 실패\n";
		return 1;
	}
		
	std::string ipAddress = doc["ipAddress"].GetString();
	int port = doc["port"].GetInt();	

	gChattingServer.Start(Utils::Convert(ipAddress).c_str(), port);

   	_setmode(_fileno(stdout), _O_U16TEXT);	

	while (true)
	{
		wcout << L"===================" << endl << endl;
		wcout << L"ChattingServer" << endl << endl;
		wcout << L"acceptTotal : [ " << gChattingServer._acceptTotal << " ]" << endl;
		wcout << L"acceptTPS : [ " << gChattingServer._acceptTPS << " ]" << endl;
		wcout << L"===================";

		gChattingServer._acceptTPS = 0;

		Sleep(1000);

		system("cls");
	}

	return 0;
}

 

main에서 ChattingServer를 선언하고 json으로 ip와 port를 읽어와 서버를 연다.

서버에서 수신 버퍼로 사용하기 위해 RingBuffer를 구현했다.

 

🔁 링 버퍼( Ring Buffer )

고정 크기의 원형 큐 형태의 자료구조로, FIFO( First-In-First-Out ) 방식으로 데이터를 저장한다.

 

📦 구조

구성 요소 설명
buffer[] 데이터를 담을 배열( 고정 크기 )
head 데이터를 읽는 위치
rear 데이터를 쓰는 위치

 

 

RingBuffer 클래스

#pragma once

#include <iostream>

using namespace std;

#define BUFFER_DEFAULT_SIZE	100001
#define BLANK				1

#ifdef RINGBUFFER
#define RINGBUFFER_DLL __declspec(dllexport)
#else
#define RINGBUFFER_DLL __declspec(dllimport)
#endif

// 원형 큐
class RINGBUFFER_DLL RingBuffer
{
private:
	char* _buffer;
	int _front;
	int _rear;
	int _bufferMaxSize;

	bool Init(int bufferSize);	
public:
	RingBuffer(void);
	RingBuffer(int bufferSize);
	~RingBuffer();

	int GetBufferSize(void);

	//현재 사용하고 있는 버퍼 크기 반환
	int GetUseSize(void);

	//남아 있는 공간 사이즈
	int GetFreeSize(void);

	// 한번에 enqueue 할 수 있는 사이즈
	int GetDirectEnqueueSize(void);

	// 한번에 Dequeue 할 수 있는 사이즈
	int GetDirectDequeueSize(void);

	// 데이터 넣기
	int Enqueue(char* data, int size);

	// 데이터 빼기
	int Dequeue(char* dest, int size);

	// 데이터가 있는지 확인
	int Peek(char* dest, int size);

	// rear 움직이기
	int MoveRear(int size);

	// front 움직이기
	int MoveFront(int size);

	// ringbuffer 초기화
	void ClearBuffer(void);

	// 비어잇는지 확인
	bool IsEmpty(void);

	// front 위치 반환
	char* GetFrontBufferPtr(void);

	// rear 위치 반환
	char* GetRearBufferPtr(void);

	// buffer 맨앞 반환
	char* GetBufferPtr(void);
};

 

'ChattingServer' 카테고리의 다른 글

[ChattingServer] WorkerThreadProc  (0) 2025.06.02
[ChattingServer] Start 함수  (0) 2025.06.01
[ChattingServer] AcceptThreadProc  (0) 2025.06.01
[ChattingServer] Session  (0) 2025.05.29
[ChattingServer] 직렬화 버퍼  (0) 2025.05.29

AcceptThreadProc를 구현했다.

클라가 접속하면 session을 할당하고 저장하며 동기로 처리한다.

 

unsigned __stdcall CoreNetwork::AcceptThreadProc(void* argument)
{
	CoreNetwork* instance = (CoreNetwork*)argument;

	if (instance != nullptr)
	{
		for (;;)
		{
			SOCKADDR_IN clientAddr;
			int addrLen = sizeof(clientAddr);

			// 클라 연결 대기 
			SOCKET clientSock = accept(instance->_listenSocket, (SOCKADDR*)&clientAddr, &addrLen);
			if (clientSock == INVALID_SOCKET)
			{
				DWORD error = WSAGetLastError();
				std::cout << "accept failed : " << error << std::endl;
				break;
			}

			// 연결 수락 총 개수 증가
			instance->_acceptTotal++;
			instance->_acceptTPS++;

			// 세션 할당
			Session* newSession = new Session();			

			newSession->sessionId = ++instance->_sessionId;
			newSession->clientAddr = clientAddr;
			newSession->clientSocket = clientSock;

			// IOCP에 등록
			CreateIoCompletionPort((HANDLE)newSession->clientSocket, instance->_HCP, (ULONG_PTR)newSession, 0);

			instance->OnClientJoin(newSession);

			// sessions에 저장
			instance->_sessions.push_back(newSession);
		}
	}

	return 0;
}

'ChattingServer' 카테고리의 다른 글

[ChattingServer] Start 함수  (0) 2025.06.01
[ChattingServer] RingBuffer ( 원형 큐 )  (0) 2025.06.01
[ChattingServer] Session  (0) 2025.05.29
[ChattingServer] 직렬화 버퍼  (0) 2025.05.29
[ChattingServer] 사용자 버퍼  (0) 2025.05.29

클라가 서버에 접속하면 Session을 할당하고 클라의 정보를 입력한 후 저장한다.

 

Session 구조체

struct Session
{
	LONG sessionId = 0;
	SOCKET clientSocket;
	
	SOCKADDR_IN clientAddr;

	RingBuffer recvRingBuffer; 
	queue<Packet*> sendQueue; 

	OVERLAPPED recvOverlapped = {};
	OVERLAPPED sendOverlapped = {};	

	IOBlock* IOBlock = nullptr;
};

 

sessionId: Session을 구분할 id ( 서버에서 부여 )

clientSocket: 서버에 접속한 클라와 소통할 Socket

clientAddr: 서버에 접속한 클라 주소

recvRingBuffer: 클라가 보낸 데이터를 담아둘 원형큐

sendQueue: 클라가 전송할 데이터를 담아둘 직렬화 버퍼 큐

recvOverlapped: WSARecv 통지용

sendOverlapped: WSASend 통지용

IOBlock: Session에 I/O 작업이 남아 있는지 확인

 

 

IOBlock 구조체

struct IOBlock
{
    LONG64 IOCount = 0;
    LONG64 IsRelease = false;
};

 

IOCount: Session의 I/O 작업을 기록해둘 변수

IsRelase: Relase를 하는지에 대한 여부( = 연결 해제 )

 

IOBlock을 따로 구성해서 Session이 사용중인지 아닌지를 판별해준다.

'ChattingServer' 카테고리의 다른 글

[ChattingServer] RingBuffer ( 원형 큐 )  (0) 2025.06.01
[ChattingServer] AcceptThreadProc  (0) 2025.06.01
[ChattingServer] 직렬화 버퍼  (0) 2025.05.29
[ChattingServer] 사용자 버퍼  (0) 2025.05.29
[ChattingServer] 패킷 구조  (0) 2025.05.29
직렬화 버퍼는 구조화된 데이터를 바이트 스트림으로 변환하거나, 반대로 바이트 스트림을 구조화된 데이터로 변환 하기 위해 사용하는 버퍼를 말한다.

 

📦 사용 목적

  1. 네트워크 전송: 구조체 -> 바이트 스트림으로 만들어서 전송
  2. 파일 저장: 바이너리 형태로 저장하기 위해 직렬화 필요
  3. 패킷 시스템: 커스텀 패킷 포맷 구성 시 필수

 

구조체 데이터나 일반 데이터를 바이트 스트림으로 패킷에 담아서 전송해야하기 때문에 직렬화 버퍼가 필요하다.

직렬화 버퍼를 담당하는 클래스를 만들어서 사용한다.

 

#pragma once

#ifdef PACKET
#define PACKET_DLL __declspec(dllexport)
#else
#define PACKET_DLL __declspec(dllimport)
#endif

#define PACKET_BUFFER_DEFAULT_SIZE	100000

// 패킷 클래스
class PACKET_DLL Packet
{
public:
#pragma pack(push,1)
	// 패킷 헤더
	struct EncodeHeader
	{
		unsigned char packetCode;

		unsigned short packetLen;

		unsigned char randXORCode;

		unsigned char checkSum;
	};
#pragma pack(pop)
protected:
	char _packetBuffer[PACKET_BUFFER_DEFAULT_SIZE];

	unsigned int _header;

	unsigned int _front;
	unsigned int _rear;

	unsigned int _bufferSize;
	unsigned int _useBufferSize;

	unsigned char _key;
public:
	Packet();
	~Packet();

	void Clear(void);

	unsigned int GetBufferSize(void);
	unsigned int GetUseBufferSize(void);
	void MoveRearPosition(int size);
	void MoveFrontPosition(int size);

	// 데이터 넣기
	Packet& operator = (Packet& packet);

	template<typename T>
	Packet& operator<<(T& value);

	template<typename T>
	Packet& operator<<(const std::pair<T, int>& value);

    // 데이터 빼기
	template<typename T>
	Packet& operator>>(T& value);

	template<typename T>
	Packet& operator>>(const std::pair<T, int>& value);

	int GetData(char* dest, int size);
	int GetData(wchar_t* dest, int size);

	// 헤더 설정
	void SetHeader(char* header, char size);

	// 패킷 인코딩
	bool Encode(void);
	// 패킷 디코딩
	bool Decode(void);
};

template<typename T>
Packet& Packet::operator<<(T& value)
{
	memcpy(&_packetBuffer[_rear], &value, sizeof(T));
	_rear += sizeof(T);
	_useBufferSize += sizeof(T);

	return *(this);
}

template<typename T>
Packet& Packet::operator<<(const std::pair<T, int>& value)
{
	*this << value.second;
	memcpy(&_packetBuffer[_rear], value.first, value.second);
	_rear += value.second;
	_useBufferSize += value.second;

	return *(this);
}

template<typename T>
Packet& Packet::operator>>(T& value)
{
	memcpy(&value, &_packetBuffer[_front], sizeof(T));
	_front += sizeof(T);
	_useBufferSize -= sizeof(T);

	return *(this);
}

template<typename T>
Packet& Packet::operator>>(const std::pair<T, int>& value)
{
	memcpy(value.first, &_packetBuffer[_front], value.second);
	_front += value.second;
	_useBufferSize -= value.second;

	return *(this);
}

 

연산자 <<를 오버로딩해서 데이터를 쉽게 직렬화버퍼에 담을 수 있도록 한다.

Encode를 통해 직렬화버퍼 안에 있는 데이터를 암호화하고 Decode를 통해 복호화한다.

 

Packet* newPacket = new Packet();

char* stringData = new char[10];
int data = 10;

*newPacket << data;
*newPacket << make_pair(stringData, 10);

 

위와 같이 사용하면 된다.

'ChattingServer' 카테고리의 다른 글

[ChattingServer] AcceptThreadProc  (0) 2025.06.01
[ChattingServer] Session  (0) 2025.05.29
[ChattingServer] 사용자 버퍼  (0) 2025.05.29
[ChattingServer] 패킷 구조  (0) 2025.05.29
[ChattingServer] 채팅 서버 시작  (0) 2025.05.29

네트워크 통신에서 사용자 버퍼가 필요한 이유를 살펴보자.

✅ 비동기 I/O와 데이터 누락 방지

  • 네트워크는 데이터를 순식간에 전송하거나 수신하지 못할 수 있다.
  • OS에서 recv()나 WSARecv() 호출 시 받은 데이터의 양이 프로그램이 기대한 양보다 적을 수 있다.
  • 이런 경우 사용자 버퍼에 누적하여 전체 패킷을 구성해야 한다.

예를 들어

  1. TCP 스트림으로 100바이트를 클라이언트에서 전송
  2. 서버에서 recv() 에서 60바이트만 수신
  3. 나머지 40바이트는 다음 호출 때 수신, 이후 사용자 버퍼가 누락된 데이터를 기억해줘야함

✅ 부분 송수신 대응 ( TCP의 특성 )

  • TCP는 패킷 단위가 아니라 바이트 스트림
  • 한 번의 recv()로 하나의 패킷이 온전히 들어온다는 보장이 없음

✅ 수신 / 송신 처리 분리

  • 사용자 버퍼를 통해 앱 로직과 OS I/O 호출을 분리할 수 있다
  • 수신 데이터를 사용자 버퍼에 모아두고, 패킷 단위로 처리하거나 송신 데이터를 사용자 버퍼에 모아두고, OS 전송이 가능할 때 전송하게 설계 가능

✅ 패킷 조립 / 파싱 편의

  • 수신된 데이터를 사용자 버퍼에 저장하고, 패킷 헤더 확인, 패킷 길이 검사, 완전한 패킷인지 확인, 부족하면 더 받기 와 같은 과정이 가능하려면 임시 저장소가 필요

 

위와 같은 이유로 사용자 버퍼가 필요하다.

 

이번 채팅 서버에 사용할 사용자 버퍼는 원형큐버퍼로 일명 RingBuffer라 불린다.

 

✅ RingBuffer

메모리를 원형으로 사용하는 고정 크기 순환 버퍼다. 네트워크 통신, 스트림 처리 등에 사용되고 FIFO 구조를 가진다.

 

#pragma once

#include <iostream>

using namespace std;

#define BUFFER_DEFAULT_SIZE 100001
#define BLANK               1

#ifdef RINGBUFFER
#define RINGBUFFER_DLL __declspec(dllexport)
#else
#define RINGBUFFER_DLL __declspec(dllimport)
#endif

// 원형 큐
class RINGBUFFER_DLL RingBuffer
{
private:
	char* _buffer;
	int _front;
	int _rear;
	int _bufferMaxSize;

	bool Init(int bufferSize);	
public:
	RingBuffer(void);
	RingBuffer(int bufferSize);
	~RingBuffer();

	int GetBufferSize(void);

	//현재 사용하고 있는 버퍼 크기 반환
	int GetUseSize(void);

	//남아 있는 공간 사이즈
	int GetFreeSize(void);

	// 한번에 enqueue 할 수 있는 사이즈
	int GetDirectEnqueueSize(void);

	// 한번에 Dequeue 할 수 있는 사이즈
	int GetDirectDequeueSize(void);

	// 데이터 넣기
	int Enqueue(char* data, int size);

	// 데이터 빼기
	int Dequeue(char* dest, int size);

	// 데이터가 있는지 확인
	int Peek(char* dest, int size);

	// rear 움직이기
	int MoveRear(int size);

	// front 움직이기
	int MoveFront(int size);

	// ringbuffer 초기화
	void ClearBuffer(void);

	// 비어잇는지 확인
	bool IsEmpty(void);

	// front 위치 반환
	char* GetFrontBufferPtr(void);

	// rear 위치 반환
	char* GetRearBufferPtr(void);

	// buffer 맨앞 반환
	char* GetBufferPtr(void);
};

 

위처럼 RingBuffer를 구성했다. 내부적으로 buffer를 관리한다.

'ChattingServer' 카테고리의 다른 글

[ChattingServer] AcceptThreadProc  (0) 2025.06.01
[ChattingServer] Session  (0) 2025.05.29
[ChattingServer] 직렬화 버퍼  (0) 2025.05.29
[ChattingServer] 패킷 구조  (0) 2025.05.29
[ChattingServer] 채팅 서버 시작  (0) 2025.05.29
채팅 서버를 구현하기 전에 서버와 클라이언트가 주고 받을 패킷 구조를 설계하려고 한다.

 

✅ 구조

패킷의 형태는 헤더 파일 + 데이터 형태로 구성되고

헤더 파일은 총 5바이트의 크기를 가지며, 아래와 같은 변수로 구성된다.

 

struct EncodeHeader
{
	unsigned char packetCode;

	unsigned short packetLen;

	unsigned char randXORCode;

	unsigned char checkSum;
};

 

  • packetCode: 1차적으로 패킷의 무결성을 확인하는 값으로 상수값으로 정해진다.
  • packetLen: 데이터의 총 길이를 나타낸다. ( 기본적으로 바이너리 데이터이기 때문에 길이가 꼭 필요하다 )
  • randXORCode: 패킷 데이터를 XOR 값을 이용해 암호화 시켜줄때 필요한 값이다.
  • checkSum: XOR로 암호화 시킨 데이터를 기준으로 삼아 checkSum을 구해 데이터 변조 여부를 판단한다.

'ChattingServer' 카테고리의 다른 글

[ChattingServer] AcceptThreadProc  (0) 2025.06.01
[ChattingServer] Session  (0) 2025.05.29
[ChattingServer] 직렬화 버퍼  (0) 2025.05.29
[ChattingServer] 사용자 버퍼  (0) 2025.05.29
[ChattingServer] 채팅 서버 시작  (0) 2025.05.29

✅ 채팅 서버 

IOCP를 이용해 채팅 서버를 구현하려고 한다.

 

 채팅 서버의 구조

✅ AcceptThread

accept를 담당하여 서버에 접속한 클라들을 대상으로 Session 형식으로 저장한다.

✅ WorkerThread

IOCP의 WorkerThread로 클라가 보낸 모든 데이터들을 받고 해석해 처리한다.

 

 

'ChattingServer' 카테고리의 다른 글

[ChattingServer] AcceptThreadProc  (0) 2025.06.01
[ChattingServer] Session  (0) 2025.05.29
[ChattingServer] 직렬화 버퍼  (0) 2025.05.29
[ChattingServer] 사용자 버퍼  (0) 2025.05.29
[ChattingServer] 패킷 구조  (0) 2025.05.29

+ Recent posts