src/lib/rudp.c 1.24
1.24 2010/01/07 19:53:45 angie
Fixed comparison edge condition (not a huge problem, it just set tv_{sec,usec} to {0,1000000} not {1,0}). Thx Galt for pointing out that the net.c code was dupl'd from this.
Index: src/lib/rudp.c
===================================================================
RCS file: /projects/compbio/cvsroot/kent/src/lib/rudp.c,v
retrieving revision 1.23
retrieving revision 1.24
diff -b -B -U 1000000 -r1.23 -r1.24
--- src/lib/rudp.c 23 Jun 2008 21:47:55 -0000 1.23
+++ src/lib/rudp.c 7 Jan 2010 19:53:45 -0000 1.24
@@ -1,561 +1,561 @@
/* rudp - (semi) reliable UDP communication. This adds an
* acknowledgement and resend layer on top of UDP.
*
* UDP is a packet based rather than stream based internet communication
* protocol. Messages sent by UDP are checked for integrety by the UDP layer,
* and discarded if transmission errors are detected. However packets are
* not necessarily received in the same order that they are sent,
* and packets may be duplicated or lost.
* Using rudp packets are only very rarely lost, and the sender is
* notified if they are. After rudp there are still duplicate
* packets that may arrive out of order. Aside from the duplicates
* the packets are in order though.
*
* For many, perhaps most applications, TCP/IP is a saner choice
* than UDP or rudp. If the communication channel is between just
* two computers you can pretty much just treat TCP/IP as a fairly
* reliable pipe. However if the communication involves many
* computers sometimes UDP can be a better choice. It is possible to
* do broadcast and multicast with UDP but not with TCP/IP. Also
* for systems like parasol, where a server may be making and breaking
* connections rapidly to thousands of computers, TCP paradoxically
* can end up less reliable than UDP. Though TCP is relatively
* robust when a connection is made, it can relatively easily fail
* to make a connection in the first place, and spend quite a long
* time figuring out that the connection can't be made. Moreover at
* the end of each connection TCP goes into a 'TIMED_WAIT' state, which
* prevents another connection from coming onto the same port for a
* time that can be as long as 255 seconds. Since there are only
* about 15000 available ports, this limits TCP/IP to 60 connections
* per second in some cases. Generally the system does not handle
* running out of ports gracefully, and this did occur with the
* TCP/IP based version of Parasol.
*
* This module puts a thin layer around UDP to make it a little bit more
* reliable. Currently the interface is geared towards Parasol rather
* than broadcast type applications. This module will try to send
* a message a limited number of times before giving up. It puts
* a small header containing a message ID and timestamp on each message.
* This header is echoed back in acknowledgment by the reciever. This
* echo lets the sender know if it needs to resend the message, and
* lets it know how long a message takes to get to the destination and
* back. It uses this round trip time information to figure out how
* long to wait between resends.
*
* Much of this code is based on the 'Adding Reliability to a UDP Application
* section in volume I, chapter 20, section 5, of _UNIX Network Programming_
* by W. Richard Stevens. */
#include "common.h"
#include <sys/time.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <pthread.h>
#include "errabort.h"
#include "hash.h"
#include "dlist.h"
#include "obscure.h"
#include "rudp.h"
static char const rcsid[] = "$Id$";
#define MAX_TIME_OUT 999999
static int rudpCalcTimeOut(struct rudp *ru)
/* Return calculation of time out based on current data. */
{
int timeOut = ru->rttAve + (ru->rttVary<<2);
if (timeOut > MAX_TIME_OUT) timeOut = MAX_TIME_OUT; /* No more than a second. */
if (timeOut < 10000) timeOut = 10000; /* No less than 1/100th second */
return timeOut;
}
static void rudpAddRoundTripTime(struct rudp *ru, int time)
/* Add info about round trip time and based on this recalculate
* time outs. */
{
int delta;
ru->rttLast = time;
delta = time - ru->rttAve;
ru->rttAve += (delta>>3); /* g = 1/8 */
if (delta < 0) delta = -delta;
ru->rttVary += ((delta - ru->rttVary) >> 2); /* h = 1/4 */
ru->timeOut = rudpCalcTimeOut(ru);
}
static void rudpTimedOut(struct rudp *ru)
/* Tell system about a time-out. */
{
ru->timeOut <<= 1; /* Back off exponentially. */
if (ru->timeOut >= MAX_TIME_OUT)
ru->timeOut = MAX_TIME_OUT;
}
struct rudp *rudpNew(int socket)
/* Wrap a rudp around a socket. Call rudpFree when done, or
* rudpClose if you also want to close(socket). */
{
/* Note scope of variable and mutex are the same */
static pthread_mutex_t rudpConnMutex = PTHREAD_MUTEX_INITIALIZER;
static int nextRudpConnId=0;
struct rudp *ru;
assert(socket >= 0);
AllocVar(ru);
ru->socket = socket;
ru->rttVary = 250; /* Initial variance 250 microseconds. */
ru->timeOut = rudpCalcTimeOut(ru);
ru->maxRetries = 7;
ru->pid = getpid();
pthread_mutex_lock( &rudpConnMutex );
ru->connId = ++nextRudpConnId;
pthread_mutex_unlock( &rudpConnMutex );
return ru;
}
void freePacketSeen(struct packetSeen **pP)
/* Free packet seen */
{
freez(pP);
}
void rudpFree(struct rudp **pRu)
/* Free up rudp. Note this does *not* close the associated socket. */
{
struct rudp *ru = *pRu;
if (ru->recvHash)
{
struct dlNode *node;
while (!dlEmpty(ru->recvList))
{
node = dlPopHead(ru->recvList);
struct packetSeen *p = node->val;
freeMem(node);
hashRemove(ru->recvHash, p->recvHashKey);
freePacketSeen(&p);
}
freeDlList(&ru->recvList);
hashFree(&ru->recvHash);
}
freez(pRu);
}
struct rudp *rudpOpen()
/* Open up an unbound rudp. This is suitable for
* writing to and for reading responses. However
* you'll want to rudpBind if you want to listen for
* incoming messages. Call rudpClose() when done
* with this one. Warns and returns NULL if there is
* a problem. */
{
int sd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sd < 0)
{
warn("Couldn't open socket in rudpOpen %s", strerror(errno));
return NULL;
}
return rudpNew(sd);
}
struct rudp *rudpMustOpen()
/* Open up unbound rudp. Warn and die if there is a problem. */
{
struct rudp *ru = rudpOpen();
if (ru == NULL)
noWarnAbort();
return ru;
}
struct rudp *rudpOpenBound(struct sockaddr_in *sai)
/* Open up a rudp socket bound to a particular port and address.
* Use this rather than rudpOpen if you want to wait for
* messages at a specific address in a server or the like. */
{
struct rudp *ru = rudpOpen();
if (ru != NULL)
{
if (bind(ru->socket, (struct sockaddr *)sai, sizeof(*sai)) < 0)
{
warn("Couldn't bind rudp socket: %s", strerror(errno));
rudpClose(&ru);
}
}
return ru;
}
struct rudp *rudpMustOpenBound(struct sockaddr_in *sai)
/* Open up a rudp socket bound to a particular port and address
* or die trying. */
{
struct rudp *ru = rudpOpenBound(sai);
if (ru == NULL)
noWarnAbort();
return ru;
}
void rudpClose(struct rudp **pRu)
/* Close socket and free memory. */
{
struct rudp *ru = *pRu;
if (ru != NULL)
{
close(ru->socket);
rudpFree(pRu);
}
}
static int timeDiff(struct timeval *t1, struct timeval *t2)
/* Return difference in microseconds between t1 and t2. t2 must be
* later than t1 (but less than 50 minutes later). */
{
int secDiff = t2->tv_sec - t1->tv_sec;
int microDiff = 0;
if (secDiff != 0)
microDiff = secDiff * 1000000;
microDiff += (t2->tv_usec - t1->tv_usec);
if (microDiff < 0)
{
/* Note, this case actually happens, currently particularly on
* kkr2u62 and kkr8u19. I think this is just a bug in their clock
* hardware/software. However in general it _could_ happen very
* rarely on normal machines when the clock is reset by the
* network time protocol thingie. */
warn("t1 %u.%u, t2 %u.%u. t1 > t2 but later?!", (unsigned)t1->tv_sec,
(unsigned)t1->tv_usec, (unsigned)t2->tv_sec, (unsigned)t2->tv_usec);
microDiff = 0;
}
return microDiff;
}
static boolean readReadyWait(int sd, int microseconds)
/* Wait for descriptor to have some data to read, up to
* given number of microseconds. */
{
struct timeval tv;
fd_set set;
int readyCount;
for (;;)
{
- if (microseconds > 1000000)
+ if (microseconds >= 1000000)
{
tv.tv_sec = microseconds/1000000;
tv.tv_usec = microseconds%1000000;
}
else
{
tv.tv_sec = 0;
tv.tv_usec = microseconds;
}
FD_ZERO(&set);
FD_SET(sd, &set);
readyCount = select(sd+1, &set, NULL, NULL, &tv);
if (readyCount < 0)
{
if (errno == EINTR) /* Select interrupted, not timed out. */
continue;
else
warn("select failure in rudp: %s", strerror(errno));
}
else
{
return readyCount > 0; /* Zero readyCount indicates time out */
}
}
}
static boolean getOurAck(struct rudp *ru, struct timeval *startTv, struct sockaddr_in *sai)
/* Wait for acknowledgement to the message we just sent.
* The set should be zeroed out. Only wait for up to
* ru->timeOut microseconds. Prints a message and returns FALSE
* if there's a problem. */
{
struct rudpHeader head;
int readSize;
int timeOut = ru->timeOut;
struct sockaddr_in retFrom;
unsigned int retFromSize = sizeof(retFrom);
for (;;)
{
/* Set up select with our time out. */
int dt;
struct timeval tv;
if (readReadyWait(ru->socket, timeOut))
{
/* Read message and if it's our ack return true. */
readSize = recvfrom(ru->socket, &head, sizeof(head), 0,
(struct sockaddr*)&retFrom, &retFromSize);
if (readSize >= sizeof(head) && head.type == rudpAck && head.id == ru->lastId)
{
if ((sai->sin_addr.s_addr==retFrom.sin_addr.s_addr) &&
(sai->sin_port==retFrom.sin_port))
{
gettimeofday(&tv, NULL);
dt = timeDiff(startTv, &tv);
rudpAddRoundTripTime(ru, dt);
return TRUE;
}
else
{
char retFromDottedQuad[17];
char saiDottedQuad[17];
internetIpToDottedQuad(ntohl(retFrom.sin_addr.s_addr), retFromDottedQuad);
internetIpToDottedQuad(ntohl(sai->sin_addr.s_addr), saiDottedQuad);
warn("rudp: discarding mistaken ack from %s:%d by confirming recipient ip:port %s:%d"
, retFromDottedQuad, retFrom.sin_port
, saiDottedQuad, sai->sin_port
);
}
}
}
/* If we got to here then we did get a message, but it's not our
* ack. We ignore the message and loop around again, but update
* our timeout so that we won't keep getting other people's messages
* forever. */
gettimeofday(&tv, NULL);
timeOut = ru->timeOut - timeDiff(startTv, &tv);
if (timeOut <= 0)
return FALSE;
}
}
int rudpSend(struct rudp *ru, struct sockaddr_in *sai, void *message, int size)
/* Send message of given size to port at host via rudp. Prints a warning and
* sets errno and returns -1 if there's a problem. */
{
struct timeval sendTv; /* Current time. */
char outBuf[udpEthMaxSize];
struct rudpHeader *head;
int fullSize = size + sizeof(*head);
int i, err = 0, maxRetry = ru->maxRetries;
/* Make buffer with header in front of message.
* At some point we might replace this with a scatter/gather
* iovector. */
ru->sendCount += 1;
ru->resend = FALSE;
assert(size <= rudpMaxSize);
head = (struct rudpHeader *)outBuf;
memcpy(head+1, message, size);
head->pid = ru->pid;
head->connId = ru->connId;
head->id = ++ru->lastId;
head->type = rudpData;
/* Go into send/wait for ack/retry loop. */
for (i=0; i<maxRetry; ++i)
{
gettimeofday(&sendTv, NULL);
head->sendSec = sendTv.tv_sec;
head->sendMicro = sendTv.tv_usec;
err = sendto(ru->socket, outBuf, fullSize, 0,
(struct sockaddr *)sai, sizeof(*sai));
if (err < 0)
{
/* Warn, wait, and retry. */
struct timeval tv;
warn("sendto problem %s", strerror(errno));
tv.tv_sec = 0;
tv.tv_usec = ru->timeOut;
select(0, NULL, NULL, NULL, &tv);
rudpTimedOut(ru);
ru->resendCount += 1;
ru->resend = TRUE;
continue;
}
if (getOurAck(ru, &sendTv, sai))
{
return 0;
}
rudpTimedOut(ru);
ru->resendCount += 1;
ru->resend = TRUE;
}
if (err >= 0)
{
err = ETIMEDOUT;
warn("rudpSend timed out");
}
ru->failCount += 1;
return err;
}
void sweepOutOldPacketsSeen(struct rudp *ru, long now)
/* Sweep out old packets seen, we can only remember so many. */
{
int period = 8; /* seconds */
struct dlNode *node;
struct packetSeen *p;
while (TRUE)
{
if (dlEmpty(ru->recvList))
break;
p = ru->recvList->head->val;
if (now - p->lastChecked < period)
break;
--ru->recvCount;
node = dlPopHead(ru->recvList);
freeMem(node);
hashRemove(ru->recvHash, p->recvHashKey);
freePacketSeen(&p);
}
}
int rudpReceiveTimeOut(struct rudp *ru, void *messageBuf, int bufSize,
struct sockaddr_in *retFrom, int timeOut)
/* Read message into buffer of given size. Returns actual size read on
* success. On failure prints a warning, sets errno, and returns -1.
* Also returns ip address of message source. If timeOut is nonzero,
* it represents the timeout in milliseconds. It will set errno to
* ETIMEDOUT in this case.*/
{
char inBuf[udpEthMaxSize];
struct rudpHeader *head = (struct rudpHeader *)inBuf;
struct rudpHeader ackHead;
struct sockaddr_in sai;
socklen_t saiSize = sizeof(sai);
int readSize, err;
assert(bufSize <= rudpMaxSize);
ru->receiveCount += 1;
for (;;)
{
if (timeOut != 0)
{
if (!readReadyWait(ru->socket, timeOut))
{
warn("rudpReceive timed out\n");
errno = ETIMEDOUT;
return -1;
}
}
readSize = recvfrom(ru->socket, inBuf, sizeof(inBuf), 0,
(struct sockaddr*)&sai, &saiSize);
if (retFrom != NULL)
*retFrom = sai;
if (readSize < 0)
{
if (errno == EINTR)
continue;
warn("recvfrom error: %s", strerror(errno));
ru->failCount += 1;
return readSize;
}
if (readSize < sizeof(*head))
{
warn("rudpRecieve truncated message");
continue;
}
if (head->type != rudpData)
{
if (head->type != rudpAck)
warn("skipping non-data message %d in rudpReceive", head->type);
continue;
}
ackHead = *head;
ackHead.type = rudpAck;
err = sendto(ru->socket, &ackHead, sizeof(ackHead), 0,
(struct sockaddr *)&sai, sizeof(sai));
if (err < 0)
{
warn("problem sending ack in rudpRecieve: %s", strerror(errno));
}
readSize -= sizeof(*head);
if (readSize > bufSize)
{
warn("read more bytes than have room for in rudpReceive");
readSize = bufSize;
}
memcpy(messageBuf, head+1, readSize);
/* check for duplicate packet */
if (!ru->recvHash)
{ /* take advantage of new auto-expanding hashes */
ru->recvHash = newHashExt(4, FALSE); /* do not use local mem in hash */
ru->recvList = newDlList();
ru->recvCount = 0;
}
char hashKey[64];
char saiDottedQuad[17];
internetIpToDottedQuad(ntohl(sai.sin_addr.s_addr), saiDottedQuad);
safef(hashKey, sizeof(hashKey), "%s-%d-%d-%d"
, saiDottedQuad
, head->pid
, head->connId
, head->id
);
if (hashLookup(ru->recvHash, hashKey))
{
warn("duplicate packet filtered out: %s", hashKey);
continue;
}
long now = time(NULL);
struct packetSeen *p;
AllocVar(p);
AllocVar(p->node);
p->node->val = p;
p->recvHashKey = hashStoreName(ru->recvHash, hashKey);
p->lastChecked = now;
dlAddTail(ru->recvList, p->node);
++ru->recvCount;
sweepOutOldPacketsSeen(ru, now);
ru->lastIdReceived = head->id;
break;
}
return readSize;
}
int rudpReceiveFrom(struct rudp *ru, void *messageBuf, int bufSize,
struct sockaddr_in *retFrom)
/* Read message into buffer of given size. Returns actual size read on
* success. On failure prints a warning, sets errno, and returns -1.
* Also returns ip address of message source. */
{
return rudpReceiveTimeOut(ru, messageBuf, bufSize, retFrom, 0);
}
int rudpReceive(struct rudp *ru, void *messageBuf, int bufSize)
/* Read message into buffer of given size. Returns actual size read on
* success. On failure prints a warning, sets errno, and returns -1. */
{
return rudpReceiveFrom(ru, messageBuf, bufSize, NULL);
}
void rudpPrintStatus(struct rudp *ru)
/* Print out rudpStatus */
{
printf("rudp status:\n");
printf(" receiveCount %d\n", ru->receiveCount);
printf(" sendCount %d\n", ru->sendCount);
printf(" resendCount %d\n", ru->resendCount);
printf(" failCount %d\n", ru->failCount);
printf(" timeOut %d\n", ru->timeOut);
printf(" rttVary %d\n", ru->rttVary);
printf(" rttAve %d\n", ru->rttAve);
printf(" rttLast %d\n", ru->rttLast);
}
void rudpTest()
/* Test out rudp stuff. */
{
static int times[] = {1000, 200, 200, 100, 200, 200, 200, 400, 200, 200, 200, 200, 1000,
200, 200, 200, 200};
struct rudp *ru = rudpNew(0);
int i;
for (i=0; i<ArraySize(times); ++i)
{
int oldTimeOut = ru->timeOut;
rudpAddRoundTripTime(ru, times[i]);
printf("%d\t%d\t%d\t%d\n", i, oldTimeOut, times[i], ru->timeOut);
}
}