구현한 채팅서버에서 패킷을 처리할때 패킷을 받으면서 처리했는데,
패킷을 처리할때 job을 생성하고, 해당 job을 UpdateThread에 넘겨서 그곳에서 처리하는 방식으로 변경했다.
iocp 환경상 많은 메세지를 처리해야하는데, packet을 바로 처리하기보다 updateThread에 넘겨서 처리하는 방식이 나을것이라고 판단했다.
✅ 수정 전
void CoreNetwork::RecvComplete(Session* recvCompleteSesion, const DWORD& transferred)
{
int loopCount = 0;
const int MAX_PACKET_LOOP = 64;
recvCompleteSesion->recvRingBuffer.MoveRear(transferred);
Packet::EncodeHeader encodeHeader;
Packet* packet = Packet::Alloc();
while (loopCount++ < MAX_PACKET_LOOP)
{
packet->Clear();
// 최소한 헤더 크기만큼은 데이터가 왔는지 확인한다.
if (recvCompleteSesion->recvRingBuffer.GetUseSize() < sizeof(Packet::EncodeHeader))
{
break;
}
// 헤더를 뽑아본다.
recvCompleteSesion->recvRingBuffer.Peek((char*)&encodeHeader, sizeof(Packet::EncodeHeader));
if (recvCompleteSesion->recvRingBuffer.GetUseSize() < encodeHeader.packetLen + sizeof(Packet::EncodeHeader))
{
break;
}
// 헤더 크기만큼 front를 움직이기
recvCompleteSesion->recvRingBuffer.MoveFront(sizeof(Packet::EncodeHeader));
// 패킷 길이만큼 뽑아서 packet에 넣기
recvCompleteSesion->recvRingBuffer.Dequeue(packet->GetRearBufferPtr(), encodeHeader.packetLen);
// 헤더 설정
packet->SetHeader((char*)&encodeHeader, sizeof(Packet::EncodeHeader));
// 패킷 길이 만큼 rear 움직이기
packet->MoveRearPosition(encodeHeader.packetLen);
// 디코딩에 실패하면 연결을 끊음
if (!packet->Decode())
{
Disconnect(recvCompleteSesion->sessionId);
break;
}
// 패킷 처리
OnRecv(recvCompleteSesion->sessionId, packet);
}
packet->Free();
RecvPost(recvCompleteSesion);
}
서버에서는 위처럼 클라로부터 packet을 받고 조립한 후 OnRecv()로 전달한다.
void ChattingServer::OnRecv(__int64 sessionId, Packet* packet)
{
short protocol;
*packet >> protocol;
(this->*packetProc[protocol])(sessionId, packet);
}
채팅 서버에서는 OnRecv()로 받은 packet으로 데이터를 받아 함수를 호출해 처리한다.
이처럼 WorkerThread에서 작업이 물려잇기 때문에 OnRecv()로 진입하면 Job을 생성해서 UpdateThread에 전달하고,
바로 다른 패킷을 처리해주도록 수정했다.
✅ 수정 후
void ChattingServer::OnRecv(__int64 sessionId, Packet* packet)
{
Job* job = _jobPool->Alloc();
job->sessionId = sessionId;
Packet* jobPacket = Packet::Alloc();
if (jobPacket == nullptr)
{
_jobPool->Free(job);
CRASH("JobPacket nullptr");
}
jobPacket->Clear();
jobPacket->SetHeader(packet->GetBufferPtr(), sizeof(Packet::EncodeHeader));
jobPacket->InsertData(packet->GetFrontBufferPtr(), packet->GetUseBufferSize() - sizeof(Packet::EncodeHeader));
job->packet = jobPacket;
EnterCriticalSection(&_jobQueueLock);
_jobQueue.push(job);
LeaveCriticalSection(&_jobQueueLock);
SetEvent(_hUpdateWakeEvent);
}
위처럼 OnRecv()에 진입하면 job을 생성해 패킷을 다시 담고, jobQueue에 넣은 후, UpdateThread를 깨운다.
unsigned __stdcall ChattingServer::UpdateThreadProc(void* argument)
{
ChattingServer* instance = (ChattingServer*)argument;
while (1)
{
WaitForSingleObject(instance->_hUpdateWakeEvent, INFINITE);
EnterCriticalSection(&instance->_jobQueueLock);
while (!instance->_jobQueue.empty())
{
Job* job = instance->_jobQueue.front();
instance->_jobQueue.pop();
LeaveCriticalSection(&instance->_jobQueueLock);
short protocol;
*job->packet >> protocol;
(instance->*(instance->packetProc[protocol]))(job->sessionId, job->packet);
job->packet->Free();
EnterCriticalSection(&instance->_jobQueueLock);
instance->_jobPool->Free(job);
}
LeaveCriticalSection(&instance->_jobQueueLock);
}
}
UpdateThread에서는 UpdateWakeEvent가 활성화될때까지 대기하다가, 일어나서 job을 처리한다.