wait.cpp 10.5 KB
// wait.cpp - written and placed in the public domain by Wei Dai

#include "pch.h"
#include "wait.h"
#include "misc.h"

#ifdef SOCKETS_AVAILABLE

#ifdef USE_BERKELEY_STYLE_SOCKETS
#include <errno.h>
#include <sys/types.h>
#include <sys/time.h>
#include <unistd.h>
#endif

NAMESPACE_BEGIN(CryptoPP)

unsigned int WaitObjectContainer::MaxWaitObjects()
{
#ifdef USE_WINDOWS_STYLE_SOCKETS
	return MAXIMUM_WAIT_OBJECTS * (MAXIMUM_WAIT_OBJECTS-1);
#else
	return FD_SETSIZE;
#endif
}

WaitObjectContainer::WaitObjectContainer(WaitObjectsTracer* tracer)
	: m_tracer(tracer), m_eventTimer(Timer::MILLISECONDS)
	, m_sameResultCount(0), m_noWaitTimer(Timer::MILLISECONDS)
{
	Clear();
	m_eventTimer.StartTimer();
}

void WaitObjectContainer::Clear()
{
#ifdef USE_WINDOWS_STYLE_SOCKETS
	m_handles.clear();
#else
	m_maxFd = 0;
	FD_ZERO(&m_readfds);
	FD_ZERO(&m_writefds);
#endif
	m_noWait = false;
	m_firstEventTime = 0;
}

inline void WaitObjectContainer::SetLastResult(LastResultType result)
{
	if (result == m_lastResult)
		m_sameResultCount++;
	else
	{
		m_lastResult = result;
		m_sameResultCount = 0;
	}
}

void WaitObjectContainer::DetectNoWait(LastResultType result, CallStack const& callStack)
{
	if (result == m_lastResult && m_noWaitTimer.ElapsedTime() > 1000)
	{
		if (m_sameResultCount > m_noWaitTimer.ElapsedTime())
		{
			if (m_tracer)
			{
				std::string desc = "No wait loop detected - m_lastResult: ";
				desc.append(IntToString(m_lastResult)).append(", call stack:");
				for (CallStack const* cs = &callStack; cs; cs = cs->Prev())
					desc.append("\n- ").append(cs->Format());
				m_tracer->TraceNoWaitLoop(desc);
			}
			try { throw 0; } catch (...) {}		// help debugger break
		}

		m_noWaitTimer.StartTimer();
		m_sameResultCount = 0;
	}
}

void WaitObjectContainer::SetNoWait(CallStack const& callStack)
{
	DetectNoWait(LASTRESULT_NOWAIT, CallStack("WaitObjectContainer::SetNoWait()", &callStack));
	m_noWait = true;
}

void WaitObjectContainer::ScheduleEvent(double milliseconds, CallStack const& callStack)
{
	if (milliseconds <= 3)
		DetectNoWait(LASTRESULT_SCHEDULED, CallStack("WaitObjectContainer::ScheduleEvent()", &callStack));
	double thisEventTime = m_eventTimer.ElapsedTimeAsDouble() + milliseconds;
	if (!m_firstEventTime || thisEventTime < m_firstEventTime)
		m_firstEventTime = thisEventTime;
}

#ifdef USE_WINDOWS_STYLE_SOCKETS

struct WaitingThreadData
{
	bool waitingToWait, terminate;
	HANDLE startWaiting, stopWaiting;
	const HANDLE *waitHandles;
	unsigned int count;
	HANDLE threadHandle;
	DWORD threadId;
	DWORD* error;
};

WaitObjectContainer::~WaitObjectContainer()
{
	try		// don't let exceptions escape destructor
	{
		if (!m_threads.empty())
		{
			HANDLE threadHandles[MAXIMUM_WAIT_OBJECTS];
			unsigned int i;
			for (i=0; i<m_threads.size(); i++)
			{
				WaitingThreadData &thread = *m_threads[i];
				while (!thread.waitingToWait)	// spin until thread is in the initial "waiting to wait" state
					Sleep(0);
				thread.terminate = true;
				threadHandles[i] = thread.threadHandle;
			}
			PulseEvent(m_startWaiting);
			::WaitForMultipleObjects((DWORD)m_threads.size(), threadHandles, TRUE, INFINITE);
			for (i=0; i<m_threads.size(); i++)
				CloseHandle(threadHandles[i]);
			CloseHandle(m_startWaiting);
			CloseHandle(m_stopWaiting);
		}
	}
	catch (...)
	{
	}
}


void WaitObjectContainer::AddHandle(HANDLE handle, CallStack const& callStack)
{
	DetectNoWait(m_handles.size(), CallStack("WaitObjectContainer::AddHandle()", &callStack));
	m_handles.push_back(handle);
}

DWORD WINAPI WaitingThread(LPVOID lParam)
{
	std::auto_ptr<WaitingThreadData> pThread((WaitingThreadData *)lParam);
	WaitingThreadData &thread = *pThread;
	std::vector<HANDLE> handles;

	while (true)
	{
		thread.waitingToWait = true;
		::WaitForSingleObject(thread.startWaiting, INFINITE);
		thread.waitingToWait = false;

		if (thread.terminate)
			break;
		if (!thread.count)
			continue;

		handles.resize(thread.count + 1);
		handles[0] = thread.stopWaiting;
		std::copy(thread.waitHandles, thread.waitHandles+thread.count, handles.begin()+1);

		DWORD result = ::WaitForMultipleObjects((DWORD)handles.size(), &handles[0], FALSE, INFINITE);

		if (result == WAIT_OBJECT_0)
			continue;	// another thread finished waiting first, so do nothing
		SetEvent(thread.stopWaiting);
		if (!(result > WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + handles.size()))
		{
			assert(!"error in WaitingThread");	// break here so we can see which thread has an error
			*thread.error = ::GetLastError();
		}
	}

	return S_OK;	// return a value here to avoid compiler warning
}

void WaitObjectContainer::CreateThreads(unsigned int count)
{
	size_t currentCount = m_threads.size();
	if (currentCount == 0)
	{
		m_startWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL);
		m_stopWaiting = ::CreateEvent(NULL, TRUE, FALSE, NULL);
	}

	if (currentCount < count)
	{
		m_threads.resize(count);
		for (size_t i=currentCount; i<count; i++)
		{
			m_threads[i] = new WaitingThreadData;
			WaitingThreadData &thread = *m_threads[i];
			thread.terminate = false;
			thread.startWaiting = m_startWaiting;
			thread.stopWaiting = m_stopWaiting;
			thread.waitingToWait = false;
			thread.threadHandle = CreateThread(NULL, 0, &WaitingThread, &thread, 0, &thread.threadId);
		}
	}
}

bool WaitObjectContainer::Wait(unsigned long milliseconds)
{
	if (m_noWait || (m_handles.empty() && !m_firstEventTime))
	{
		SetLastResult(LASTRESULT_NOWAIT);
		return true;
	}

	bool timeoutIsScheduledEvent = false;

	if (m_firstEventTime)
	{
		double timeToFirstEvent = SaturatingSubtract(m_firstEventTime, m_eventTimer.ElapsedTimeAsDouble());

		if (timeToFirstEvent <= milliseconds)
		{
			milliseconds = (unsigned long)timeToFirstEvent;
			timeoutIsScheduledEvent = true;
		}

		if (m_handles.empty() || !milliseconds)
		{
			if (milliseconds)
				Sleep(milliseconds);
			SetLastResult(timeoutIsScheduledEvent ? LASTRESULT_SCHEDULED : LASTRESULT_TIMEOUT);
			return timeoutIsScheduledEvent;
		}
	}

	if (m_handles.size() > MAXIMUM_WAIT_OBJECTS)
	{
		// too many wait objects for a single WaitForMultipleObjects call, so use multiple threads
		static const unsigned int WAIT_OBJECTS_PER_THREAD = MAXIMUM_WAIT_OBJECTS-1;
		unsigned int nThreads = (unsigned int)((m_handles.size() + WAIT_OBJECTS_PER_THREAD - 1) / WAIT_OBJECTS_PER_THREAD);
		if (nThreads > MAXIMUM_WAIT_OBJECTS)	// still too many wait objects, maybe implement recursive threading later?
			throw Err("WaitObjectContainer: number of wait objects exceeds limit");
		CreateThreads(nThreads);
		DWORD error = S_OK;
		
		for (unsigned int i=0; i<m_threads.size(); i++)
		{
			WaitingThreadData &thread = *m_threads[i];
			while (!thread.waitingToWait)	// spin until thread is in the initial "waiting to wait" state
				Sleep(0);
			if (i<nThreads)
			{
				thread.waitHandles = &m_handles[i*WAIT_OBJECTS_PER_THREAD];
				thread.count = UnsignedMin(WAIT_OBJECTS_PER_THREAD, m_handles.size() - i*WAIT_OBJECTS_PER_THREAD);
				thread.error = &error;
			}
			else
				thread.count = 0;
		}

		ResetEvent(m_stopWaiting);
		PulseEvent(m_startWaiting);

		DWORD result = ::WaitForSingleObject(m_stopWaiting, milliseconds);
		if (result == WAIT_OBJECT_0)
		{
			if (error == S_OK)
				return true;
			else
				throw Err("WaitObjectContainer: WaitForMultipleObjects in thread failed with error " + IntToString(error));
		}
		SetEvent(m_stopWaiting);
		if (result == WAIT_TIMEOUT)
		{
			SetLastResult(timeoutIsScheduledEvent ? LASTRESULT_SCHEDULED : LASTRESULT_TIMEOUT);
			return timeoutIsScheduledEvent;
		}
		else
			throw Err("WaitObjectContainer: WaitForSingleObject failed with error " + IntToString(::GetLastError()));
	}
	else
	{
#if TRACE_WAIT
		static Timer t(Timer::MICROSECONDS);
		static unsigned long lastTime = 0;
		unsigned long timeBeforeWait = t.ElapsedTime();
#endif
		DWORD result = ::WaitForMultipleObjects((DWORD)m_handles.size(), &m_handles[0], FALSE, milliseconds);
#if TRACE_WAIT
		if (milliseconds > 0)
		{
			unsigned long timeAfterWait = t.ElapsedTime();
			OutputDebugString(("Handles " + IntToString(m_handles.size()) + ", Woke up by " + IntToString(result-WAIT_OBJECT_0) + ", Busied for " + IntToString(timeBeforeWait-lastTime) + " us, Waited for " + IntToString(timeAfterWait-timeBeforeWait) + " us, max " + IntToString(milliseconds) + "ms\n").c_str());
			lastTime = timeAfterWait;
		}
#endif
		if (result >= WAIT_OBJECT_0 && result < WAIT_OBJECT_0 + m_handles.size())
		{
			if (result == m_lastResult)
				m_sameResultCount++;
			else
			{
				m_lastResult = result;
				m_sameResultCount = 0;
			}
			return true;
		}
		else if (result == WAIT_TIMEOUT)
		{
			SetLastResult(timeoutIsScheduledEvent ? LASTRESULT_SCHEDULED : LASTRESULT_TIMEOUT);
			return timeoutIsScheduledEvent;
		}
		else
			throw Err("WaitObjectContainer: WaitForMultipleObjects failed with error " + IntToString(::GetLastError()));
	}
}

#else // #ifdef USE_WINDOWS_STYLE_SOCKETS

void WaitObjectContainer::AddReadFd(int fd, CallStack const& callStack)	// TODO: do something with callStack
{
	FD_SET(fd, &m_readfds);
	m_maxFd = STDMAX(m_maxFd, fd);
}

void WaitObjectContainer::AddWriteFd(int fd, CallStack const& callStack) // TODO: do something with callStack
{
	FD_SET(fd, &m_writefds);
	m_maxFd = STDMAX(m_maxFd, fd);
}

bool WaitObjectContainer::Wait(unsigned long milliseconds)
{
	if (m_noWait || (!m_maxFd && !m_firstEventTime))
		return true;

	bool timeoutIsScheduledEvent = false;

	if (m_firstEventTime)
	{
		double timeToFirstEvent = SaturatingSubtract(m_firstEventTime, m_eventTimer.ElapsedTimeAsDouble());
		if (timeToFirstEvent <= milliseconds)
		{
			milliseconds = (unsigned long)timeToFirstEvent;
			timeoutIsScheduledEvent = true;
		}
	}

	timeval tv, *timeout;

	if (milliseconds == INFINITE_TIME)
		timeout = NULL;
	else
	{
		tv.tv_sec = milliseconds / 1000;
		tv.tv_usec = (milliseconds % 1000) * 1000;
		timeout = &tv;
	}

	int result = select(m_maxFd+1, &m_readfds, &m_writefds, NULL, timeout);

	if (result > 0)
		return true;
	else if (result == 0)
		return timeoutIsScheduledEvent;
	else
		throw Err("WaitObjectContainer: select failed with error " + errno);
}

#endif

// ********************************************************

std::string CallStack::Format() const
{
	return m_info;
}

std::string CallStackWithNr::Format() const
{
	return std::string(m_info) + " / nr: " + IntToString(m_nr);
}

std::string CallStackWithStr::Format() const
{
	return std::string(m_info) + " / " + std::string(m_z);
}

bool Waitable::Wait(unsigned long milliseconds, CallStack const& callStack)
{
	WaitObjectContainer container;
	GetWaitObjects(container, callStack);	// reduce clutter by not adding this func to stack
	return container.Wait(milliseconds);
}

NAMESPACE_END

#endif