구현한 채팅서버에서 패킷을 처리할때 패킷을 받으면서 처리했는데,

패킷을 처리할때 job을 생성하고, 해당 job을 UpdateThread에 넘겨서 그곳에서 처리하는 방식으로 변경했다.

iocp 환경상 많은 메세지를 처리해야하는데, packet을 바로 처리하기보다 updateThread에 넘겨서 처리하는 방식이 나을것이라고 판단했다.


✅ 수정 전

void CoreNetwork::RecvComplete(Session* recvCompleteSesion, const DWORD& transferred)
{
	int loopCount = 0;
	const int MAX_PACKET_LOOP = 64;
	recvCompleteSesion->recvRingBuffer.MoveRear(transferred);

	Packet::EncodeHeader encodeHeader;
	Packet* packet = Packet::Alloc();

	while (loopCount++ < MAX_PACKET_LOOP)
	{
		packet->Clear();

		// 최소한 헤더 크기만큼은 데이터가 왔는지 확인한다.
		if (recvCompleteSesion->recvRingBuffer.GetUseSize() < sizeof(Packet::EncodeHeader))
		{
			break;
		}
		
		// 헤더를 뽑아본다.
		recvCompleteSesion->recvRingBuffer.Peek((char*)&encodeHeader, sizeof(Packet::EncodeHeader));
		if (recvCompleteSesion->recvRingBuffer.GetUseSize() < encodeHeader.packetLen + sizeof(Packet::EncodeHeader))
		{
			break;
		}		

		// 헤더 크기만큼 front를 움직이기
		recvCompleteSesion->recvRingBuffer.MoveFront(sizeof(Packet::EncodeHeader));
		// 패킷 길이만큼 뽑아서 packet에 넣기
		recvCompleteSesion->recvRingBuffer.Dequeue(packet->GetRearBufferPtr(), encodeHeader.packetLen);
		// 헤더 설정
		packet->SetHeader((char*)&encodeHeader, sizeof(Packet::EncodeHeader));
		// 패킷 길이 만큼 rear 움직이기
		packet->MoveRearPosition(encodeHeader.packetLen);

		// 디코딩에 실패하면 연결을 끊음
		if (!packet->Decode())
		{
			Disconnect(recvCompleteSesion->sessionId);
			break;
		}	

		// 패킷 처리
		OnRecv(recvCompleteSesion->sessionId, packet);
	}
	
	packet->Free();

	RecvPost(recvCompleteSesion);
}

 

위 코드를 확인하면 알수 있듯이 packet을 받고 조립한 후 OnRecv()로 전달한다.

void ChattingServer::OnRecv(__int64 sessionId, Packet* packet)
{
	short protocol;
	*packet >> protocol;

	(this->*packetProc[protocol])(sessionId, packet);
}

 

채팅 서버에서는 OnRecv()로 받은 packet으로 데이터를 받아 함수를 호출해 처리한다.

이처럼 WorkerThread에 작업이 물려있기 때문에 packet 처리를 보다 빨리 할 수 있도록 구조를 변경했다.

 

✅ 수정 후

void ChattingServer::OnRecv(__int64 sessionId, Packet* packet)
{	
	Job* job = _jobPool->Alloc();	
	job->sessionId = sessionId;		

	Packet* jobPacket = Packet::Alloc();
	if (jobPacket == nullptr)
	{
		_jobPool->Free(job);
		CRASH("JobPacket nullptr");		
	}

	jobPacket->Clear();
	jobPacket->SetHeader(packet->GetBufferPtr(), sizeof(Packet::EncodeHeader));
	jobPacket->InsertData(packet->GetFrontBufferPtr(), packet->GetUseBufferSize() - sizeof(Packet::EncodeHeader));

	job->packet = jobPacket;

	EnterCriticalSection(&_jobQueueLock);
	_jobQueue.push(job);
	LeaveCriticalSection(&_jobQueueLock);

	SetEvent(_hUpdateWakeEvent);
}

 

위처럼 OnRecv()에 진입하면 job을 생성해 패킷을 다시 담고, jobQueue에 넣은 후, UpdateThread를 깨운다.

 

unsigned __stdcall ChattingServer::UpdateThreadProc(void* argument)
{
	ChattingServer* instance = (ChattingServer*)argument;
	while (1)
	{
		WaitForSingleObject(instance->_hUpdateWakeEvent, INFINITE);

		EnterCriticalSection(&instance->_jobQueueLock);
		while (!instance->_jobQueue.empty())
		{
			Job* job = instance->_jobQueue.front();
			instance->_jobQueue.pop();			

			LeaveCriticalSection(&instance->_jobQueueLock);

			short protocol;
			*job->packet >> protocol;

			(instance->*(instance->packetProc[protocol]))(job->sessionId, job->packet);

			job->packet->Free();			

			EnterCriticalSection(&instance->_jobQueueLock);
			instance->_jobPool->Free(job);
		}
		LeaveCriticalSection(&instance->_jobQueueLock);
	}
}

 

UpdateThread에서는 UpdateWakeEvent가 활성화될때까지 대기하다가, 일어나서 job을 처리한다.

 

https://github.com/YamSaeng/ChattingServer/blob/master/ChattingServer/ChattingServer.CPP

 

ChattingServer/ChattingServer/ChattingServer.CPP at master · YamSaeng/ChattingServer

IOCP로 만든 채팅서버. Contribute to YamSaeng/ChattingServer development by creating an account on GitHub.

github.com

 

 

+ Recent posts