src/parasol/broadNode/broadNode.c 1.17
1.17 2010/01/12 09:09:14 markd
add option to set minimum log level, move some logging to debug
Index: src/parasol/broadNode/broadNode.c
===================================================================
RCS file: /projects/compbio/cvsroot/kent/src/parasol/broadNode/broadNode.c,v
retrieving revision 1.16
retrieving revision 1.17
diff -b -B -U 1000000 -r1.16 -r1.17
--- src/parasol/broadNode/broadNode.c 29 Nov 2006 17:44:51 -0000 1.16
+++ src/parasol/broadNode/broadNode.c 12 Jan 2010 09:09:14 -0000 1.17
@@ -1,471 +1,473 @@
/* broadNode - Daemon that runs on cluster nodes in broadcast data system. */
#include "paraCommon.h"
#include "linefile.h"
#include "hash.h"
#include "options.h"
#include "log.h"
#include "paraLib.h"
#include "broadData.h"
#include "md5.h"
/* command line option specifications */
static struct optionSpec optionSpecs[] = {
{"hubInPort", OPTION_INT},
{"nodeInPort", OPTION_INT},
{"logFacility", OPTION_STRING},
+ {"logMinPriority", OPTION_STRING},
{"log", OPTION_STRING},
{"drop", OPTION_INT},
{"ip", OPTION_STRING},
{"broadIp", OPTION_STRING},
{NULL, 0}
};
char *broadIp = "10.1.255.255";
void usage()
/* Explain usage and exit. */
{
errAbort(
"broadNode - Daemon that runs on cluster nodes in broadcast data system\n"
"usage:\n"
" broadNode now\n"
"options:\n"
" -hubInPort=N (default %d)\n"
" -nodeInPort=N (default %d)\n"
" -logFacility=facility log to the specified syslog facility.\n"
+ " -logMinPriority=pri minimum syslog priority to log, also filters file logging.\n"
" -log=file log to file instead of syslog.\n"
" -debug Don't daemonize\n"
" -drop=N - Drop every Nth packet\n"
" -ip=NNN.NNN.NNN.NNN ip address of current machine, usually needed.\n"
" -broadIp - network broadcast address, %s by default\n"
, bdHubInPort, bdNodeInPort, broadIp
);
}
int hubInPort, nodeInPort;
int dropTest = 0;
int openInputSocket(int port)
/* Open up a datagram socket that can accept messages from anywhere. */
{
struct sockaddr_in sai;
int err, sd;
int size, sizeSize = sizeof(size);
ZeroVar(&sai);
sai.sin_family = AF_INET;
sai.sin_port = htons(port);
sai.sin_addr.s_addr = INADDR_ANY;
sd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sd < 0)
errAbort("Couldn't open datagram socket");
if (bind(sd, (struct sockaddr *)&sai, sizeof(sai)) < 0)
errAbort("Couldn't bind socket");
if ((err = getsockopt(sd, SOL_SOCKET, SO_RCVBUF, &size, &sizeSize)) != 0)
errAbort("Couldn't get socket size");
return sd;
}
int openOutputSocket(int port)
{
int sd = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
if (sd < 0)
errAbort("Couldn't open datagram socket");
return sd;
}
bits32 ipForName(char *name)
/* Return named IP address. */
{
struct hostent *hostent;
bits32 ret;
hostent = gethostbyname(name);
if (hostent == NULL)
errAbort("Couldn't find host %s. h_errno %d", name, h_errno);
{
int i,j;
for (i=0; ; ++i)
{
char *addr = hostent->h_addr_list[i];
if (addr == NULL)
break;
for (j=0; j<4; ++j)
{
printf("%d", addr[j]);
putchar( (j == 3 ? '\n' : '.'));
}
}
}
memcpy(&ret, hostent->h_addr_list[0], sizeof(ret));
ret = ntohl(ret);
return ret;
}
bits32 getOwnIpAddress()
/* Return IP address of ourselves. */
{
static bits32 id = 0;
if (id == 0)
{
char *machName = optionVal("ip", NULL);
if (machName == NULL)
errAbort("Need to specify ip address with -ip");
id = ipForName(machName);
}
return id;
}
void personallyAck(int sd, struct bdMessage *m, bits32 ownIp, bits32 destIp,
int port, int err)
/* Send acknowledgement message if message targets this node personally. */
{
if (m->machine == ownIp)
{
bdMakeAckMessage(m, ownIp, m->id, err);
bdSendTo(sd, m, destIp, port);
}
}
struct fileSection
/* Keep track of a section of a file. */
{
bool blockTracker[bdSectionBlocks]; /* Set to TRUE when have seen block. */
};
struct fileTracker
/* Information on a file. */
{
struct fileTracker *next; /* Next in list. */
bits32 fileId; /* File id number. */
char *fileName; /* File name. */
int fh; /* Open file handle. */
time_t openTime; /* Time file opened. */
off_t pos; /* Position in file. */
int curSectionIx; /* Which section are we working on? */
int curSectionSize; /* Size of current section. */
struct fileSection section; /* Keep track of blocks in this section. */
char sectionData[bdSectionBytes]; /* Data for this section. */
bool sectionClosed; /* True if this section is closed. */
unsigned char md5[16]; /* Keep the md5 checksum here. */
bool doneMd5; /* Calculated md5. */
};
void fileTrackerFree(struct fileTracker **pFt)
/* Free up file tracker */
{
struct fileTracker *ft = *pFt;
if (ft != NULL)
{
freeMem(ft->fileName);
freez(pFt);
}
}
struct fileTracker *findTracker(struct fileTracker *list, bits32 fileId)
/* Find file tracker on list. */
{
struct fileTracker *ft;
for (ft = list; ft != NULL; ft = ft->next)
{
if (ft->fileId == fileId)
return ft;
}
return NULL;
}
int pFileOpen(struct bdMessage *m, struct fileTracker **pList)
/* Process file open message. Create a new file tracker if we
* don't have one already. Return 0 on success, error number on
* failure. */
{
struct fileTracker *ft;
bits32 fileId;
char *fileName;
bdParseFileOpenMessage(m, &fileId, &fileName);
ft = findTracker(*pList, fileId);
if (ft == NULL)
{
int fh = creat(fileName, 0777);
if (fh < 0)
return fh;
AllocVar(ft);
ft->fileId = fileId;
ft->fileName = cloneString(fileName);
ft->fh = fh;
ft->openTime = time(NULL);
ft->sectionClosed = TRUE;
ft->curSectionIx = -1;
slAddHead(pList, ft);
}
return 0;
}
int pBlock(struct bdMessage *m, struct fileTracker *list)
/* Process block message. Write out data. Keep track of block. */
{
bits32 fileId, sectionIx, blockIx;
int size;
void *data;
struct fileTracker *ft;
bdParseBlockMessage(m, &fileId, §ionIx, &blockIx, &size, &data);
assert(blockIx <= bdSectionBlocks);
if (size == 0)
return 0;
ft = findTracker(list, fileId);
if (ft != NULL)
{
struct fileSection *section = &ft->section;
off_t startOffset;
if (ft->curSectionIx != sectionIx)
{
if (!ft->sectionClosed)
{
logDebug("Moving on with an unclosed section %d in %s",
ft->curSectionIx, ft->fileName);
return -222;
}
ft->curSectionIx = sectionIx;
ft->curSectionSize = 0;
ft->sectionClosed = FALSE;
ft->doneMd5 = FALSE;
memset(section->blockTracker, 0, sizeof(section->blockTracker));
}
if (section->blockTracker[blockIx] == FALSE)
{
int dataOffset = blockIx * bdBlockSize;
int dataEnd = dataOffset + size;
startOffset = bdBlockOffset(sectionIx, blockIx);
if (ft->pos != startOffset)
{
if (lseek(ft->fh, startOffset, SEEK_SET) == -1)
return errno;
ft->pos = startOffset;
}
if (dataEnd > ft->curSectionSize) ft->curSectionSize = dataEnd;
memcpy(ft->sectionData + dataOffset, data, size);
size = write(ft->fh, data, size);
if (size == -1)
return errno;
ft->pos += size;
section->blockTracker[blockIx] = TRUE;
}
}
return 0;
}
int pFileClose(struct bdMessage *m, struct fileTracker **pList)
/* Close file and free associated resources. */
{
bits32 fileId;
char *fileName;
struct fileTracker *ft;
bdParseFileCloseMessage(m, &fileId, &fileName);
ft = findTracker(*pList, fileId);
if (ft != NULL)
{
boolean sameName = sameString(ft->fileName, fileName);
int err = close(ft->fh);
slRemoveEl(pList, ft);
fileTrackerFree(&ft);
if (err < 0)
return errno;
if (!sameName)
return -111;
}
return 0;
}
void calcMd5OnSection(struct fileTracker *ft)
/* Calculate md5. */
{
if (!ft->doneMd5)
{
#ifdef SOON
md5_starts(&ctx);
md5_update(&ctx, (uint8 *)ft->sectionData, ft->curSectionSize);
md5_finish(&ctx, ft->md5);
#endif
ft->doneMd5 = TRUE;
}
}
void pSectionDone(struct bdMessage *m, struct fileTracker *ftList, bits32 ownIp)
/* Try to get ahead on md5 count. */
{
struct fileTracker *ft;
bits32 fileId, sectionIx, blockCount, i;
bdParseSectionDoneMessage(m, &fileId, §ionIx, &blockCount);
ft = findTracker(ftList, fileId);
if (ft != NULL && sectionIx == ft->curSectionIx)
{
int missingCount = 0;
for (i=0; i<blockCount; ++i)
++missingCount;
if (missingCount == 0)
calcMd5OnSection(ft);
}
}
void pSectionQuery(struct bdMessage *m, struct fileTracker *ftList, bits32 ownIp)
/* Process section query. Set up message so that it contains list
* of missing blocks. */
{
struct fileTracker *ft;
bits32 fileId, sectionIx, blockCount;
unsigned char *md5;
bdParseSectionQueryMessage(m, &fileId, §ionIx, &blockCount, &md5);
ft = findTracker(ftList, fileId);
if (ft == NULL || sectionIx < ft->curSectionIx)
{
/* Don't complain about missing stuff. Supposively at least
* we've already dealt with this, and this is just a late message
* coming into the socket. */
bdMakeMissingBlocksMessage(m, ownIp, m->id, fileId, 0, NULL);
}
else
{
/* Go through section and store missing blockIx's. */
struct fileSection *section = &ft->section;
bits32 *missingList = ((bits32 *)m->data) + 2;
bits32 *ml = missingList;
int i, missingCount = 0;
if (sectionIx > ft->curSectionIx)
{
/* We haven't even seen anything from this section. Return it all
* as missing. */
for (i=0; i<blockCount; ++i)
{
*ml++ = i;
++missingCount;
}
}
else
{
for (i=0; i<blockCount; ++i)
{
if (!section->blockTracker[i])
{
*ml++ = i;
++missingCount;
}
}
}
logDebug("%d missing %d of %d", sectionIx, missingCount, blockCount);
if (missingCount == 0)
{
/* Check md5 signature. If it matches we're good to go, otherwise
* set up things to tell server to resend the whole section. */
#ifdef SOON
calcMd5OnSection(ft);
if (memcmp(ft->md5, md5, sizeof(ft->md5)) == 0)
ft->sectionClosed = TRUE;
else
{
logDebug("Section %d of %s failed md5 check", sectionIx, ft->fileName);
for (i=0; i<blockCount; ++i)
{
section->blockTracker[i] = FALSE;
*ml++ = i;
++missingCount;
}
ft->doneMd5 = FALSE;
}
#else /* SOON */
ft->sectionClosed = TRUE;
#endif /* SOON */
}
bdMakeMissingBlocksMessage(m, ownIp, m->id, fileId, missingCount, missingList);
}
}
void broadNode()
/* broadNode - Daemon that runs on cluster nodes in broadcast data system. */
{
int inSd, outSd;
int err = 0;
struct bdMessage *m = NULL;
boolean alive = TRUE;
bits32 ownIp = getOwnIpAddress();
bits32 sourceIp;
struct fileTracker *ftList = NULL;
int drop = 0;
inSd = openInputSocket(nodeInPort);
outSd = openOutputSocket(hubInPort);
AllocVar(m);
while (alive)
{
findNow();
if ((err = bdReceive(inSd, m, &sourceIp)) < 0)
warn("bdReceive error %s", strerror(err));
else
{
// logDebug("got message type %d size %d sourceIp %x", m->type, m->size, sourceIp);
if (dropTest != 0)
{
if (--drop <= 0)
{
drop = dropTest;
continue;
}
}
switch (m->type)
{
case bdmQuit:
alive = FALSE;
personallyAck(outSd, m, ownIp, sourceIp, hubInPort, 0);
break;
case bdmFileOpen:
err = pFileOpen(m, &ftList);
personallyAck(outSd, m, ownIp, sourceIp, hubInPort, err);
break;
case bdmBlock:
err = pBlock(m, ftList);
personallyAck(outSd, m, ownIp, sourceIp, hubInPort, err);
break;
case bdmFileClose:
if (m->machine == ownIp)
err = pFileClose(m, &ftList);
personallyAck(outSd, m, ownIp, sourceIp, hubInPort, err);
break;
case bdmPing:
// logDebug("<PING>%s</PING>", m->data);
personallyAck(outSd, m, ownIp, sourceIp, hubInPort, 0);
break;
case bdmSectionQuery:
if (m->machine == ownIp)
{
pSectionQuery(m, ftList, ownIp);
bdSendTo(outSd, m, sourceIp, hubInPort);
}
break;
case bdmMissingBlocks:
break;
case bdmSectionDone:
break;
default:
break;
}
}
}
close(inSd);
close(outSd);
}
int main(int argc, char *argv[])
/* Process command line. */
{
optionInit(&argc, argv, optionSpecs);
if (argc != 2)
usage();
nodeInPort = optionInt("nodeInPort", bdNodeInPort);
hubInPort = optionInt("hubInPort", bdHubInPort);
broadIp = optionVal("broadIp", broadIp);
dropTest = optionInt("drop", 0);
paraDaemonize("broadNode");
broadNode();
return 0;
}