Made all arch sockets non-blocking.

This commit is contained in:
crs 2004-02-29 16:11:17 +00:00
parent 75cafe65a6
commit d6ec331b09
10 changed files with 179 additions and 286 deletions

View File

@ -435,10 +435,10 @@ CArch::acceptSocket(CArchSocket s, CArchNetAddress* addr)
return m_net->acceptSocket(s, addr);
}
void
bool
CArch::connectSocket(CArchSocket s, CArchNetAddress name)
{
m_net->connectSocket(s, name);
return m_net->connectSocket(s, name);
}
int
@ -471,12 +471,6 @@ CArch::throwErrorOnSocket(CArchSocket s)
m_net->throwErrorOnSocket(s);
}
bool
CArch::setBlockingOnSocket(CArchSocket s, bool blocking)
{
return m_net->setBlockingOnSocket(s, blocking);
}
bool
CArch::setNoDelayOnSocket(CArchSocket s, bool noDelay)
{

View File

@ -132,14 +132,13 @@ public:
virtual void bindSocket(CArchSocket s, CArchNetAddress addr);
virtual void listenOnSocket(CArchSocket s);
virtual CArchSocket acceptSocket(CArchSocket s, CArchNetAddress* addr);
virtual void connectSocket(CArchSocket s, CArchNetAddress name);
virtual bool connectSocket(CArchSocket s, CArchNetAddress name);
virtual int pollSocket(CPollEntry[], int num, double timeout);
virtual void unblockPollSocket(CArchThread thread);
virtual size_t readSocket(CArchSocket s, void* buf, size_t len);
virtual size_t writeSocket(CArchSocket s,
const void* buf, size_t len);
virtual void throwErrorOnSocket(CArchSocket);
virtual bool setBlockingOnSocket(CArchSocket, bool blocking);
virtual bool setNoDelayOnSocket(CArchSocket, bool noDelay);
virtual std::string getHostName();
virtual CArchNetAddress newAnyAddr(EAddressFamily);

View File

@ -69,18 +69,23 @@ CArchNetworkBSD::~CArchNetworkBSD()
CArchSocket
CArchNetworkBSD::newSocket(EAddressFamily family, ESocketType type)
{
// allocate socket object
CArchSocketImpl* newSocket = new CArchSocketImpl;
// create socket
int fd = socket(s_family[family], s_type[type], 0);
if (fd == -1) {
throwError(errno);
}
try {
setBlockingOnSocket(fd, false);
}
catch (...) {
close(fd);
throw;
}
newSocket->m_fd = fd;
newSocket->m_connected = false;
newSocket->m_refCount = 1;
// allocate socket object
CArchSocketImpl* newSocket = new CArchSocketImpl;
newSocket->m_fd = fd;
newSocket->m_refCount = 1;
return newSocket;
}
@ -108,23 +113,14 @@ CArchNetworkBSD::closeSocket(CArchSocket s)
// close the socket if necessary
if (doClose) {
do {
if (close(s->m_fd) == -1) {
// close failed
int err = errno;
if (err == EINTR) {
// interrupted system call
ARCH->testCancelThread();
continue;
}
// restore the last ref and throw
ARCH->lockMutex(m_mutex);
++s->m_refCount;
ARCH->unlockMutex(m_mutex);
throwError(err);
}
} while (false);
if (close(s->m_fd) == -1) {
// close failed. restore the last ref and throw.
int err = errno;
ARCH->lockMutex(m_mutex);
++s->m_refCount;
ARCH->unlockMutex(m_mutex);
throwError(err);
}
delete s;
}
}
@ -191,27 +187,32 @@ CArchNetworkBSD::acceptSocket(CArchSocket s, CArchNetAddress* addr)
*addr = new CArchNetAddressImpl;
// accept on socket
int fd;
do {
fd = accept(s->m_fd, &(*addr)->m_addr, &(*addr)->m_len);
if (fd == -1) {
int err = errno;
if (err == EINTR) {
// interrupted system call
ARCH->testCancelThread();
continue;
}
delete newSocket;
delete *addr;
*addr = NULL;
throwError(err);
int fd = accept(s->m_fd, &(*addr)->m_addr, &(*addr)->m_len);
if (fd == -1) {
int err = errno;
delete newSocket;
delete *addr;
*addr = NULL;
if (err == EAGAIN) {
return NULL;
}
} while (false);
throwError(err);
}
try {
setBlockingOnSocket(fd, false);
}
catch (...) {
close(fd);
delete newSocket;
delete *addr;
*addr = NULL;
throw;
}
// initialize socket
newSocket->m_fd = fd;
newSocket->m_connected = true;
newSocket->m_refCount = 1;
newSocket->m_fd = fd;
newSocket->m_refCount = 1;
// discard address if not requested
if (addr == &dummy) {
@ -221,32 +222,22 @@ CArchNetworkBSD::acceptSocket(CArchSocket s, CArchNetAddress* addr)
return newSocket;
}
void
bool
CArchNetworkBSD::connectSocket(CArchSocket s, CArchNetAddress addr)
{
assert(s != NULL);
assert(addr != NULL);
do {
if (connect(s->m_fd, &addr->m_addr, addr->m_len) == -1) {
if (errno == EINTR) {
// interrupted system call
ARCH->testCancelThread();
continue;
}
if (errno == EISCONN) {
// already connected
break;
}
throwError(errno);
if (connect(s->m_fd, &addr->m_addr, addr->m_len) == -1) {
if (errno == EISCONN) {
return true;
}
} while (false);
ARCH->lockMutex(m_mutex);
s->m_connected = true;
ARCH->unlockMutex(m_mutex);
if (errno == EINPROGRESS) {
return false;
}
throwError(errno);
}
return true;
}
#if HAVE_POLL
@ -435,23 +426,13 @@ CArchNetworkBSD::readSocket(CArchSocket s, void* buf, size_t len)
{
assert(s != NULL);
ssize_t n;
do {
n = read(s->m_fd, buf, len);
if (n == -1) {
if (errno == EINTR) {
// interrupted system call
n = 0;
break;
}
else if (errno == EAGAIN) {
n = 0;
break;
}
throwError(errno);
ssize_t n = read(s->m_fd, buf, len);
if (n == -1) {
if (errno == EINTR || errno == EAGAIN) {
return 0;
}
} while (false);
ARCH->testCancelThread();
throwError(errno);
}
return n;
}
@ -460,24 +441,13 @@ CArchNetworkBSD::writeSocket(CArchSocket s, const void* buf, size_t len)
{
assert(s != NULL);
ssize_t n;
do {
n = write(s->m_fd, buf, len);
if (n == -1) {
if (errno == EINTR) {
// interrupted system call
n = 0;
break;
}
else if (errno == EAGAIN) {
// no buffer space
n = 0;
break;
}
throwError(errno);
ssize_t n = write(s->m_fd, buf, len);
if (n == -1) {
if (errno == EINTR || errno == EAGAIN) {
return 0;
}
} while (false);
ARCH->testCancelThread();
throwError(errno);
}
return n;
}
@ -499,26 +469,24 @@ CArchNetworkBSD::throwErrorOnSocket(CArchSocket s)
}
}
bool
CArchNetworkBSD::setBlockingOnSocket(CArchSocket s, bool blocking)
void
CArchNetworkBSD::setBlockingOnSocket(int fd, bool blocking)
{
assert(s != NULL);
assert(fd != -1);
int mode = fcntl(s->m_fd, F_GETFL, 0);
int mode = fcntl(fd, F_GETFL, 0);
if (mode == -1) {
throwError(errno);
}
bool old = ((mode & O_NDELAY) == 0);
if (blocking) {
mode &= ~O_NDELAY;
}
else {
mode |= O_NDELAY;
}
if (fcntl(s->m_fd, F_SETFL, mode) == -1) {
if (fcntl(fd, F_SETFL, mode) == -1) {
throwError(errno);
}
return old;
}
bool
@ -775,8 +743,9 @@ void
CArchNetworkBSD::throwError(int err)
{
switch (err) {
case EAGAIN:
throw XArchNetworkWouldBlock(new XArchEvalUnix(err));
case EINTR:
ARCH->testCancelThread();
throw XArchNetworkInterrupted(new XArchEvalUnix(err));
case EACCES:
case EPERM:
@ -833,10 +802,6 @@ CArchNetworkBSD::throwError(int err)
case ECONNREFUSED:
throw XArchNetworkConnectionRefused(new XArchEvalUnix(err));
case EINPROGRESS:
case EALREADY:
throw XArchNetworkConnecting(new XArchEvalUnix(err));
case EHOSTDOWN:
case ETIMEDOUT:
throw XArchNetworkTimedOut(new XArchEvalUnix(err));

View File

@ -30,7 +30,6 @@ typedef int socklen_t;
class CArchSocketImpl {
public:
int m_fd;
bool m_connected;
int m_refCount;
};
@ -58,14 +57,13 @@ public:
virtual void bindSocket(CArchSocket s, CArchNetAddress addr);
virtual void listenOnSocket(CArchSocket s);
virtual CArchSocket acceptSocket(CArchSocket s, CArchNetAddress* addr);
virtual void connectSocket(CArchSocket s, CArchNetAddress name);
virtual bool connectSocket(CArchSocket s, CArchNetAddress name);
virtual int pollSocket(CPollEntry[], int num, double timeout);
virtual void unblockPollSocket(CArchThread thread);
virtual size_t readSocket(CArchSocket s, void* buf, size_t len);
virtual size_t writeSocket(CArchSocket s,
const void* buf, size_t len);
virtual void throwErrorOnSocket(CArchSocket);
virtual bool setBlockingOnSocket(CArchSocket, bool blocking);
virtual bool setNoDelayOnSocket(CArchSocket, bool noDelay);
virtual std::string getHostName();
virtual CArchNetAddress newAnyAddr(EAddressFamily);
@ -81,6 +79,7 @@ public:
virtual bool isEqualAddr(CArchNetAddress, CArchNetAddress);
private:
void setBlockingOnSocket(int fd, bool blocking);
void throwError(int);
void throwNameError(int);

View File

@ -187,20 +187,25 @@ CArchNetworkWinsock::init(HMODULE module)
CArchSocket
CArchNetworkWinsock::newSocket(EAddressFamily family, ESocketType type)
{
// allocate socket object
CArchSocketImpl* socket = new CArchSocketImpl;
// create socket
SOCKET fd = socket_winsock(s_family[family], s_type[type], 0);
if (fd == INVALID_SOCKET) {
throwError(getsockerror_winsock());
}
try {
setBlockingOnSocket(fd, false);
}
catch (...) {
close(fd);
throw;
}
socket->m_socket = fd;
socket->m_connected = false;
socket->m_refCount = 1;
socket->m_event = WSACreateEvent_winsock();
socket->m_pollWrite = false;
// allocate socket object
CArchSocketImpl* socket = new CArchSocketImpl;
socket->m_socket = fd;
socket->m_refCount = 1;
socket->m_event = WSACreateEvent_winsock();
socket->m_pollWrite = false;
return socket;
}
@ -228,23 +233,14 @@ CArchNetworkWinsock::closeSocket(CArchSocket s)
// close the socket if necessary
if (doClose) {
do {
if (close_winsock(s->m_socket) == SOCKET_ERROR) {
// close failed
int err = getsockerror_winsock();
if (err == WSAEINTR) {
// interrupted system call
ARCH->testCancelThread();
continue;
}
// restore the last ref and throw
ARCH->lockMutex(m_mutex);
++s->m_refCount;
ARCH->unlockMutex(m_mutex);
throwError(err);
}
} while (false);
if (close_winsock(s->m_socket) == SOCKET_ERROR) {
// close failed. restore the last ref and throw.
int err = getsockerror_winsock();
ARCH->lockMutex(m_mutex);
++s->m_refCount;
ARCH->unlockMutex(m_mutex);
throwError(err);
}
WSACloseEvent_winsock(s->m_event);
delete s;
}
@ -306,26 +302,31 @@ CArchNetworkWinsock::acceptSocket(CArchSocket s, CArchNetAddress* addr)
CArchNetAddress tmp = CArchNetAddressImpl::alloc(sizeof(struct sockaddr));
// accept on socket
SOCKET fd;
do {
fd = accept_winsock(s->m_socket, &tmp->m_addr, &tmp->m_len);
if (fd == INVALID_SOCKET) {
int err = getsockerror_winsock();
delete socket;
free(tmp);
*addr = NULL;
if (err == WSAEINTR) {
// interrupted system call
ARCH->testCancelThread();
return NULL;
}
throwError(err);
SOCKET fd = accept_winsock(s->m_socket, &tmp->m_addr, &tmp->m_len);
if (fd == INVALID_SOCKET) {
int err = getsockerror_winsock();
delete socket;
free(tmp);
*addr = NULL;
if (err == WSAEWOULDBLOCK) {
return NULL;
}
} while (false);
throwError(err);
}
try {
setBlockingOnSocket(fd, false);
}
catch (...) {
close(fd);
delete socket;
free(tmp);
*addr = NULL;
throw;
}
// initialize socket
socket->m_socket = fd;
socket->m_connected = true;
socket->m_refCount = 1;
socket->m_event = WSACreateEvent_winsock();
socket->m_pollWrite = true;
@ -339,39 +340,23 @@ CArchNetworkWinsock::acceptSocket(CArchSocket s, CArchNetAddress* addr)
return socket;
}
void
bool
CArchNetworkWinsock::connectSocket(CArchSocket s, CArchNetAddress addr)
{
assert(s != NULL);
assert(addr != NULL);
do {
if (connect_winsock(s->m_socket, &addr->m_addr,
addr->m_len) == SOCKET_ERROR) {
if (getsockerror_winsock() == WSAEINTR) {
// interrupted system call
ARCH->testCancelThread();
continue;
}
if (getsockerror_winsock() == WSAEISCONN) {
// already connected
break;
}
if (getsockerror_winsock() == WSAEWOULDBLOCK) {
// connecting
throw XArchNetworkConnecting(new XArchEvalWinsock(
getsockerror_winsock()));
}
throwError(getsockerror_winsock());
if (connect_winsock(s->m_socket, &addr->m_addr,
addr->m_len) == SOCKET_ERROR) {
if (getsockerror_winsock() == WSAEISCONN) {
return true;
}
} while (false);
ARCH->lockMutex(m_mutex);
s->m_connected = true;
ARCH->unlockMutex(m_mutex);
if (getsockerror_winsock() == WSAEWOULDBLOCK) {
return false;
}
throwError(getsockerror_winsock());
}
return true;
}
int
@ -540,23 +525,14 @@ CArchNetworkWinsock::readSocket(CArchSocket s, void* buf, size_t len)
{
assert(s != NULL);
int n;
do {
n = recv_winsock(s->m_socket, buf, len, 0);
if (n == SOCKET_ERROR) {
if (getsockerror_winsock() == WSAEINTR) {
// interrupted system call
n = 0;
break;
}
else if (getsockerror_winsock() == WSAEWOULDBLOCK) {
n = 0;
break;
}
throwError(getsockerror_winsock());
int n = recv_winsock(s->m_socket, buf, len, 0);
if (n == SOCKET_ERROR) {
int err = getsockerror_winsock();
if (err == WSAEINTR || err == WSAEWOULDBLOCK) {
return 0;
}
} while (false);
ARCH->testCancelThread();
throwError(err);
}
return static_cast<size_t>(n);
}
@ -565,24 +541,18 @@ CArchNetworkWinsock::writeSocket(CArchSocket s, const void* buf, size_t len)
{
assert(s != NULL);
int n;
do {
n = send_winsock(s->m_socket, buf, len, 0);
if (n == SOCKET_ERROR) {
if (getsockerror_winsock() == WSAEINTR) {
// interrupted system call
n = 0;
break;
}
else if (getsockerror_winsock() == WSAEWOULDBLOCK) {
s->m_pollWrite = true;
n = 0;
break;
}
throwError(getsockerror_winsock());
int n = send_winsock(s->m_socket, buf, len, 0);
if (n == SOCKET_ERROR) {
int err = getsockerror_winsock();
if (err == WSAEINTR) {
return 0;
}
} while (false);
ARCH->testCancelThread();
if (err == WSAEWOULDBLOCK) {
s->m_pollWrite = true;
return 0;
}
throwError(err);
}
return static_cast<size_t>(n);
}
@ -605,17 +575,15 @@ CArchNetworkWinsock::throwErrorOnSocket(CArchSocket s)
}
}
bool
CArchNetworkWinsock::setBlockingOnSocket(CArchSocket s, bool blocking)
void
CArchNetworkWinsock::setBlockingOnSocket(SOCKET s, bool blocking)
{
assert(s != NULL);
int flag = blocking ? 0 : 1;
if (ioctl_winsock(s->m_socket, FIONBIO, &flag) == SOCKET_ERROR) {
if (ioctl_winsock(s, FIONBIO, &flag) == SOCKET_ERROR) {
throwError(getsockerror_winsock());
}
// FIXME -- can't get the current blocking state of socket?
return true;
}
bool
@ -841,9 +809,6 @@ void
CArchNetworkWinsock::throwError(int err)
{
switch (err) {
case WSAEWOULDBLOCK:
throw XArchNetworkWouldBlock(new XArchEvalWinsock(err));
case WSAEACCES:
throw XArchNetworkAccess(new XArchEvalWinsock(err));
@ -890,10 +855,6 @@ CArchNetworkWinsock::throwError(int err)
case WSAECONNREFUSED:
throw XArchNetworkConnectionRefused(new XArchEvalWinsock(err));
case WSAEINPROGRESS:
case WSAEALREADY:
throw XArchNetworkConnecting(new XArchEvalWinsock(err));
case WSAEHOSTDOWN:
case WSAETIMEDOUT:
throw XArchNetworkTimedOut(new XArchEvalWinsock(err));

View File

@ -31,7 +31,6 @@
class CArchSocketImpl {
public:
SOCKET m_socket;
bool m_connected;
int m_refCount;
WSAEVENT m_event;
bool m_pollWrite;
@ -63,14 +62,13 @@ public:
virtual void bindSocket(CArchSocket s, CArchNetAddress addr);
virtual void listenOnSocket(CArchSocket s);
virtual CArchSocket acceptSocket(CArchSocket s, CArchNetAddress* addr);
virtual void connectSocket(CArchSocket s, CArchNetAddress name);
virtual bool connectSocket(CArchSocket s, CArchNetAddress name);
virtual int pollSocket(CPollEntry[], int num, double timeout);
virtual void unblockPollSocket(CArchThread thread);
virtual size_t readSocket(CArchSocket s, void* buf, size_t len);
virtual size_t writeSocket(CArchSocket s,
const void* buf, size_t len);
virtual void throwErrorOnSocket(CArchSocket);
virtual bool setBlockingOnSocket(CArchSocket, bool blocking);
virtual bool setNoDelayOnSocket(CArchSocket, bool noDelay);
virtual std::string getHostName();
virtual CArchNetAddress newAnyAddr(EAddressFamily);
@ -88,6 +86,8 @@ public:
private:
void init(HMODULE);
void setBlockingOnSocket(SOCKET, bool blocking);
void throwError(int);
void throwNameError(int);

View File

@ -152,25 +152,21 @@ public:
end. \c addr may be NULL if the remote address isn't required.
The original socket \c s is unaffected and remains in the listening
state. The new socket shares most of the properties of \c s except
it's not in the listening state, it's connected, and is not
non-blocking even is \c s is.
This call blocks if \c s is not non-blocking and there are no
pending connection requests.
(Cancellation point)
it's not in the listening state and it's connected. Returns NULL
if there are no pending connection requests.
*/
virtual CArchSocket acceptSocket(CArchSocket s, CArchNetAddress* addr) = 0;
//! Connect socket
/*!
Connects the socket \c s to the remote address \c addr. This call
blocks if \c s is not non-blocking. If \c s is non-blocking then
the client can \c poll() for writability to detect a connection.
(Cancellation point)
Connects the socket \c s to the remote address \c addr. Returns
true if the connection succeed immediately, false if the connection
is in progress, and throws if the connection failed immediately.
If it returns false, \c pollSocket() can be used to wait on the
socket for writing to detect when the connection finally succeeds
or fails.
*/
virtual void connectSocket(CArchSocket s, CArchNetAddress addr) = 0;
virtual bool connectSocket(CArchSocket s, CArchNetAddress addr) = 0;
//! Check socket state
/*!
@ -202,12 +198,7 @@ public:
Read up to \c len bytes from socket \c s in \c buf and return the
number of bytes read. The number of bytes can be less than \c len
if not enough data is available. Returns 0 if the remote end has
disconnected and there is no more queued received data. Blocks if
the socket is not non-blocking and there is no queued received data.
If non-blocking and there is no queued received data then throws
XArchNetworkWouldBlock.
(Cancellation point)
disconnected and/or there is no more queued received data.
*/
virtual size_t readSocket(CArchSocket s, void* buf, size_t len) = 0;
@ -215,12 +206,8 @@ public:
/*!
Write up to \c len bytes to socket \c s from \c buf and return the
number of bytes written. The number of bytes can be less than
\c len if the remote end disconnected or the socket is non-blocking
and the internal buffers are full. If non-blocking and the internal
buffers are full before any data is written then throws
XArchNetworkWouldBlock.
(Cancellation point)
\c len if the remote end disconnected or the internal buffers fill
up.
*/
virtual size_t writeSocket(CArchSocket s,
const void* buf, size_t len) = 0;
@ -232,14 +219,6 @@ public:
*/
virtual void throwErrorOnSocket(CArchSocket s) = 0;
//! Set socket to (non-)blocking operation
/*!
Set socket to block or not block on accept, connect, poll, read and
write (i.e. calls that may take an arbitrary amount of time).
Returns the previous state.
*/
virtual bool setBlockingOnSocket(CArchSocket, bool blocking) = 0;
//! Turn Nagle algorithm on or off on socket
/*!
Set socket to send messages immediately (true) or to collect small

View File

@ -90,8 +90,8 @@ library to indicate various errors.
*/
XARCH_SUBCLASS(XArchNetwork, XArch);
//! Operation would block
XARCH_SUBCLASS(XArchNetworkWouldBlock, XArchNetwork);
//! Operation was interrupted
XARCH_SUBCLASS(XArchNetworkInterrupted, XArchNetwork);
//! Network insufficient permission
XARCH_SUBCLASS(XArchNetworkAccess, XArchNetwork);
@ -126,9 +126,6 @@ XARCH_SUBCLASS(XArchNetworkDisconnected, XArchNetwork);
//! Remote end of socket refused connection
XARCH_SUBCLASS(XArchNetworkConnectionRefused, XArchNetwork);
//! Connection is in progress
XARCH_SUBCLASS(XArchNetworkConnecting, XArchNetwork);
//! Remote end of socket is not responding
XARCH_SUBCLASS(XArchNetworkTimedOut, XArchNetwork);

View File

@ -61,7 +61,6 @@ CTCPListenSocket::bind(const CNetworkAddress& addr)
CLock lock(m_mutex);
ARCH->bindSocket(m_socket, addr.getAddress());
ARCH->listenOnSocket(m_socket);
ARCH->setBlockingOnSocket(m_socket, false);
CSocketMultiplexer::getInstance()->addSocket(this,
new TSocketMultiplexerMethodJob<CTCPListenSocket>(
this, &CTCPListenSocket::serviceListening,
@ -104,10 +103,12 @@ CTCPListenSocket::accept()
try {
IDataSocket* socket =
new CTCPSocket(ARCH->acceptSocket(m_socket, NULL));
CSocketMultiplexer::getInstance()->addSocket(this,
if (socket != NULL) {
CSocketMultiplexer::getInstance()->addSocket(this,
new TSocketMultiplexerMethodJob<CTCPListenSocket>(
this, &CTCPListenSocket::serviceListening,
m_socket, true, false));
}
return socket;
}
catch (XArchNetwork&) {

View File

@ -275,13 +275,14 @@ CTCPSocket::connect(const CNetworkAddress& addr)
}
try {
ARCH->connectSocket(m_socket, addr.getAddress());
sendSocketEvent(getConnectedEvent());
onConnected();
}
catch (XArchNetworkConnecting&) {
// connection is in progress
m_writable = true;
if (ARCH->connectSocket(m_socket, addr.getAddress())) {
sendSocketEvent(getConnectedEvent());
onConnected();
}
else {
// connection is in progress
m_writable = true;
}
}
catch (XArchNetwork& e) {
throw XSocketConnect(e.what());
@ -299,9 +300,6 @@ CTCPSocket::init()
m_writable = false;
try {
// make socket non-blocking
ARCH->setBlockingOnSocket(m_socket, false);
// turn off Nagle algorithm. we send lots of very short messages
// that should be sent without (much) delay. for example, the
// mouse motion messages are much less useful if they're delayed.