1 /************************************************************************
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck,
8 ** timm@kip.uni-heidelberg.de
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
17 ** Newer versions of this file's package will be made available from
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
21 *************************************************************************/
24 ***************************************************************************
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
30 ***************************************************************************
33 /** @file AliHLTHOMERReader.cxx
34 @author Timm Steinbeck
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
39 // see header file for class documentation
41 // refer to README to build package
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
45 #include "AliHLTHOMERReader.h"
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <netinet/in.h>
54 #include <netinet/tcp.h>
56 #include <rpc/types.h>
59 #include <netinet/in.h>
60 #include <arpa/inet.h>
66 #define MOD_BIN "MOD BIN\n"
67 #define MOD_ASC "MOD ASC\n"
68 #define GET_ONE "GET ONE\n"
69 #define GET_ALL "GET ALL\n"
72 ClassImp(MonitoringReader);
73 ClassImp(HOMERReader);
81 HOMERReader::HOMERReader()
83 fCurrentEventType(~(homer_uint64)0),
84 fCurrentEventID(~(homer_uint64)0),
94 fErrorConnection(~(unsigned int)0),
95 fEventRequestAdvanceTime(0)
97 // Reader implementation of the HOMER interface.
98 // The HLT Monitoring Environment including ROOT is
99 // a native interface to ship out data from the HLT chain.
100 // See pdf document shiped with the package
101 // for class documentation and tutorial.
107 HOMERReader::HOMERReader( const char* hostname, unsigned short port )
110 fCurrentEventType(~(homer_uint64)0),
111 fCurrentEventID(~(homer_uint64)0),
116 fTCPDataSourceCnt(0),
117 fShmDataSourceCnt(0),
118 fDataSourceMaxCnt(0),
120 fConnectionStatus(0),
121 fErrorConnection(~(unsigned int)0),
122 fEventRequestAdvanceTime(0)
124 // see header file for class documentation
125 // For reading from a TCP port
127 if ( !AllocDataSources(1) )
129 fErrorConnection = 0;
130 fConnectionStatus = ENOMEM;
133 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
134 if ( fConnectionStatus )
135 fErrorConnection = 0;
140 fDataSources[0].fNdx = 0;
144 HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
147 fCurrentEventType(~(homer_uint64)0),
148 fCurrentEventID(~(homer_uint64)0),
153 fTCPDataSourceCnt(0),
154 fShmDataSourceCnt(0),
155 fDataSourceMaxCnt(0),
157 fConnectionStatus(0),
158 fErrorConnection(~(unsigned int)0),
159 fEventRequestAdvanceTime(0)
161 // see header file for class documentation
162 // For reading from multiple TCP ports
164 if ( !AllocDataSources(tcpCnt) )
166 fErrorConnection = 0;
167 fConnectionStatus = ENOMEM;
170 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
172 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
173 if ( fConnectionStatus )
175 fErrorConnection = n;
178 fDataSources[n].fNdx = n;
182 HOMERReader::HOMERReader( key_t shmKey, int shmSize )
185 fCurrentEventType(~(homer_uint64)0),
186 fCurrentEventID(~(homer_uint64)0),
191 fTCPDataSourceCnt(0),
192 fShmDataSourceCnt(0),
193 fDataSourceMaxCnt(0),
195 fConnectionStatus(0),
196 fErrorConnection(~(unsigned int)0),
197 fEventRequestAdvanceTime(0)
199 // see header file for class documentation
200 // For reading from a System V shared memory segment
202 if ( !AllocDataSources(1) )
204 fErrorConnection = 0;
205 fConnectionStatus = ENOMEM;
208 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
209 if ( fConnectionStatus )
210 fErrorConnection = 0;
215 fDataSources[0].fNdx = 0;
219 HOMERReader::HOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
222 fCurrentEventType(~(homer_uint64)0),
223 fCurrentEventID(~(homer_uint64)0),
228 fTCPDataSourceCnt(0),
229 fShmDataSourceCnt(0),
230 fDataSourceMaxCnt(0),
232 fConnectionStatus(0),
233 fErrorConnection(~(unsigned int)0),
234 fEventRequestAdvanceTime(0)
236 // see header file for class documentation
237 // For reading from multiple System V shared memory segments
239 if ( !AllocDataSources(shmCnt) )
241 fErrorConnection = 0;
242 fConnectionStatus = ENOMEM;
245 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
247 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
248 if ( fConnectionStatus )
250 fErrorConnection = n;
253 fDataSources[n].fNdx = n;
257 HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
258 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
261 fCurrentEventType(~(homer_uint64)0),
262 fCurrentEventID(~(homer_uint64)0),
267 fTCPDataSourceCnt(0),
268 fShmDataSourceCnt(0),
269 fDataSourceMaxCnt(0),
271 fConnectionStatus(0),
272 fErrorConnection(~(unsigned int)0),
273 fEventRequestAdvanceTime(0)
275 // see header file for class documentation
276 // For reading from multiple TCP ports and multiple System V shared memory segments
278 if ( !AllocDataSources(tcpCnt+shmCnt) )
280 fErrorConnection = 0;
281 fConnectionStatus = ENOMEM;
284 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
286 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
287 if ( fConnectionStatus )
289 fErrorConnection = n;
292 fDataSources[n].fNdx = n;
294 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
296 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
297 if ( fConnectionStatus )
299 fErrorConnection = tcpCnt+n;
302 fDataSources[n].fNdx = n;
305 HOMERReader::~HOMERReader()
307 // see header file for class documentation
308 ReleaseCurrentEvent();
312 int HOMERReader::ReadNextEvent()
314 // see header file for class documentation
315 // Read in the next available event
316 return ReadNextEvent( false, 0 );
319 int HOMERReader::ReadNextEvent( unsigned long timeout )
321 // see header file for class documentation
322 // Read in the next available event
323 return ReadNextEvent( true, timeout );
326 unsigned long HOMERReader::GetBlockDataLength( unsigned long ndx ) const
328 // see header file for class documentation
329 // Return the size (in bytes) of the current event's data
330 // block with the given block index (starting at 0).
331 if ( ndx >= fBlockCnt )
333 return fBlocks[ndx].fLength;
336 const void* HOMERReader::GetBlockData( unsigned long ndx ) const
338 // see header file for class documentation
339 // Return a pointer to the start of the current event's data
340 // block with the given block index (starting at 0).
341 if ( ndx >= fBlockCnt )
343 return fBlocks[ndx].fData;
346 const char* HOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
348 // see header file for class documentation
349 // Return IP address or hostname of node which sent the
350 // current event's data block with the given block index
352 // For HOMER this is the ID of the node on which the subscriber
353 // that provided this data runs/ran.
354 if ( ndx >= fBlockCnt )
357 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
359 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
360 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
364 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
365 //return fBlocks[ndx].fOriginatingNodeID;
368 homer_uint8 HOMERReader::GetBlockByteOrder( unsigned long ndx ) const
370 // see header file for class documentation
371 // Return byte order of the data stored in the
372 // current event's data block with the given block index (starting at 0).
373 // 0 is unknown alignment,
374 // 1 ist little endian,
375 // 2 is big endian. */
376 if ( ndx >= fBlockCnt )
378 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
379 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
382 homer_uint8 HOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
384 // see header file for class documentation
385 // Return the alignment (in bytes) of the given datatype
386 // in the data stored in the current event's data block
387 // with the given block index (starting at 0).
388 // Possible values for the data type are
395 if ( ndx >= fBlockCnt )
397 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
399 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
400 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
403 homer_uint64 HOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
405 // see header file for class documentation
406 if ( ndx >= fBlockCnt )
408 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
412 /* Return the type of the data in the current event's data
413 block with the given block index (starting at 0). */
414 homer_uint64 HOMERReader::GetBlockDataType( unsigned long ndx ) const
416 // see header file for class documentation
417 if ( ndx >= fBlockCnt )
418 return ~(homer_uint64)0;
419 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
420 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
423 /* Return the origin of the data in the current event's data
424 block with the given block index (starting at 0). */
425 homer_uint32 HOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
427 // see header file for class documentation
428 if ( ndx >= fBlockCnt )
429 return ~(homer_uint32)0;
430 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
431 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
434 /* Return a specification of the data in the current event's data
435 block with the given block index (starting at 0). */
436 homer_uint32 HOMERReader::GetBlockDataSpec( unsigned long ndx ) const
438 // see header file for class documentation
439 if ( ndx >= fBlockCnt )
440 return ~(homer_uint32)0;
441 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
442 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
445 /* Find the next data block in the current event with the given
446 data type, origin, and specification. Returns the block's
448 unsigned long HOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
449 homer_uint32 spec, unsigned long startNdx ) const
451 // see header file for class documentation
452 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
454 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
455 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
456 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
459 return ~(unsigned long)0;
462 /* Find the next data block in the current event with the given
463 data type, origin, and specification. Returns the block's
465 unsigned long HOMERReader::FindBlockNdx( char type[8], char origin[4],
466 homer_uint32 spec, unsigned long startNdx ) const
468 // see header file for class documentation
469 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
471 bool found1=true, found2=true;
472 for ( unsigned i = 0; i < 8; i++ )
474 if ( type[i] != (char)0xFF )
483 for ( unsigned i = 0; i < 8; i++ )
485 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
486 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
493 for ( unsigned i = 0; i < 4; i++ )
495 if ( origin[i] != (char)0xFF )
504 for ( unsigned i = 0; i < 4; i++ )
506 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
507 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
514 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
515 if ( found1 && found2 &&
516 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
519 return ~(unsigned long)0;
522 /* Return the ID of the node that actually produced this data block.
523 This may be different from the node which sent the data to this
524 monitoring object as returned by GetBlockSendNodeID. */
525 const char* HOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
527 // see header file for class documentation
528 if ( ndx >= fBlockCnt )
530 return fBlocks[ndx].fOriginatingNodeID;
534 void HOMERReader::Init()
536 // see header file for class documentation
537 fCurrentEventType = ~(homer_uint64)0;
538 fCurrentEventID = ~(homer_uint64)0;
539 fMaxBlockCnt = fBlockCnt = 0;
542 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
546 fConnectionStatus = 0;
547 fErrorConnection = ~(unsigned int)0;
549 fEventRequestAdvanceTime = 0;
552 bool HOMERReader::AllocDataSources( unsigned int sourceCnt )
554 // see header file for class documentation
555 fDataSources = new DataSource[ sourceCnt ];
559 fDataSourceMaxCnt = sourceCnt;
563 int HOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
565 // see header file for class documentation
567 he = gethostbyname( hostname );
570 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
571 return EADDRNOTAVAIL;
574 struct sockaddr_in remoteAddr;
575 remoteAddr.sin_family = AF_INET; // host byte order
576 remoteAddr.sin_port = htons(port); // short, network byte order
577 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
578 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
580 // Create socket and connect to target program on remote node
581 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
582 if ( source.fTCPConnection == -1 )
589 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
593 close( source.fTCPConnection );
597 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
598 if ( ret != (int)strlen(MOD_BIN) )
601 close( source.fTCPConnection );
605 char* tmpchar = new char[ strlen( hostname )+1 ];
608 close( source.fTCPConnection );
611 strcpy( tmpchar, hostname );
612 source.fHostname = tmpchar;
615 source.fTCPPort = port;
617 source.fDataSize = 0;
618 source.fDataRead = 0;
622 int HOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
624 // see header file for class documentation
626 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
631 gethostname( tmpchar, MAXHOSTNAMELEN );
632 tmpchar[MAXHOSTNAMELEN]=(char)0;
633 source.fHostname = tmpchar;
635 source.fShmID = shmget( shmKey, shmSize, 0660 );
636 if ( source.fShmID == -1 )
639 delete [] source.fHostname;
643 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
645 if ( !source.fShmPtr )
648 shmctl( source.fShmID, IPC_RMID, NULL );
649 delete [] source.fHostname;
654 source.fShmKey = shmKey;
655 source.fShmSize = shmSize;
656 source.fDataSize = 0;
657 source.fDataRead = 0;
661 void HOMERReader::FreeDataSources()
663 // see header file for class documentation
664 for ( unsigned n=0; n < fDataSourceCnt; n++ )
666 if ( fDataSources[n].fType == kTCP )
667 FreeTCPDataSource( fDataSources[n] );
669 FreeShmDataSource( fDataSources[n] );
673 int HOMERReader::FreeShmDataSource( DataSource& source )
675 // see header file for class documentation
676 if ( source.fShmPtr )
677 shmdt( source.fShmPtr );
678 // if ( source.fShmID != -1 )
679 // shmctl( source.fShmID, IPC_RMID, NULL );
680 if ( source.fHostname )
681 delete [] source.fHostname;
685 int HOMERReader::FreeTCPDataSource( DataSource& source )
687 // see header file for class documentation
688 if ( source.fTCPConnection )
689 close( source.fTCPConnection );
690 if ( source.fHostname )
691 delete [] source.fHostname;
695 int HOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
697 // see header file for class documentation
698 if ( fDataSourceCnt<=0 )
700 // Clean up currently active event.
701 ReleaseCurrentEvent();
703 // Trigger all configured data sources
704 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
706 if ( fDataSources[n].fType == kTCP )
707 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
709 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
712 fErrorConnection = n;
713 fConnectionStatus=ret;
714 return fConnectionStatus;
717 // Now read in data from the configured data source
718 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
723 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
728 // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
730 // if ( fDataSources[n].fType == kTCP )
731 // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
733 // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
736 // fErrorConnection = n;
737 // fConnectionStatus=ret;
738 // return fConnectionStatus;
741 //Check to see that all sources contributed data for the same event
742 homer_uint64 eventID;
743 homer_uint64 eventType;
744 eventID = GetSourceEventID( fDataSources[0] );
745 eventType = GetSourceEventType( fDataSources[0] );
746 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
748 if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
750 fErrorConnection = n;
751 fConnectionStatus=56;//EBADRQC;
752 return fConnectionStatus;
755 // Find all the different data blocks contained in the data from all
757 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
759 ret = ParseSourceData( fDataSources[n] );
762 fErrorConnection = n;
763 fConnectionStatus=57;//EBADSLT;
764 return fConnectionStatus;
767 fCurrentEventID = eventID;
768 fCurrentEventType = eventType;
772 void HOMERReader::ReleaseCurrentEvent()
774 // see header file for class documentation
775 // sources.fDataRead = 0;
777 fCurrentEventID = ~(homer_uint64)0;
778 fCurrentEventType = ~(homer_uint64)0;
779 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
781 if ( fDataSources[n].fData )
783 if ( fDataSources[n].fType == kTCP )
784 delete [] (homer_uint8*)fDataSources[n].fData;
785 fDataSources[n].fData = NULL;
787 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
791 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
793 if ( fBlocks[n].fOriginatingNodeID )
794 delete [] fBlocks[n].fOriginatingNodeID;
803 int HOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
805 // see header file for class documentation
807 struct timeval oldSndTO, newSndTO;
810 socklen_t optlen=sizeof(oldSndTO);
811 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
816 if ( optlen!=sizeof(oldSndTO) )
820 newSndTO.tv_sec = timeoutUsec / 1000000;
821 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
822 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
828 // Send one event request
829 if ( !fEventRequestAdvanceTime )
831 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
833 if ( ret != (int)strlen(GET_ONE) )
836 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
844 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
845 if ( len>128 || len<0 )
848 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
852 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
854 if ( ret != (int)strlen(tmpCmd) )
857 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
865 int HOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
867 // see header file for class documentation
868 if ( source.fShmPtr )
870 *(homer_uint32*)( source.fShmPtr ) = 0;
877 int HOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
879 // see header file for class documentation
887 unsigned firstConnection=~(unsigned)0;
888 for ( unsigned long n = 0; n < sourceCnt; n++ )
890 if ( sources[n].fDataSize == 0 // size specifier not yet read
891 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
894 FD_SET( sources[n].fTCPConnection, &conns );
895 if ( sources[n].fTCPConnection > highestConn )
896 highestConn = sources[n].fTCPConnection;
897 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
898 if ( firstConnection == ~(unsigned)0 )
903 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
908 struct timeval tv, *ptv;
911 tv.tv_sec = timeout / 1000000;
912 tv.tv_usec = timeout - (tv.tv_sec*1000000);
917 // wait until something is ready to be read
918 // either for timeout usecs or until eternity
920 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
923 fErrorConnection = firstConnection;
925 fConnectionStatus = errno;
927 fConnectionStatus = ETIMEDOUT;
928 return fConnectionStatus;
930 for ( unsigned n = 0; n < sourceCnt; n++ )
932 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
934 if ( sources[n].fDataSize == 0 )
936 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
937 if ( ret != sizeof(homer_uint32) )
939 fErrorConnection = n;
941 fConnectionStatus = errno;
943 fConnectionStatus = ENOMSG;
944 return fConnectionStatus;
946 sources[n].fDataSize = ntohl( sources[n].fDataSize );
947 sources[n].fDataRead = 0;
948 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
949 if ( !sources[n].fData )
951 fErrorConnection = n;
952 fConnectionStatus = ENOMEM;
953 return fConnectionStatus;
956 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
958 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
960 sources[n].fDataRead += ret;
963 fErrorConnection = n;
964 fConnectionStatus = ECONNRESET;
965 return fConnectionStatus;
969 fErrorConnection = n;
970 fConnectionStatus = errno;
971 return fConnectionStatus;
976 fErrorConnection = n;
977 fConnectionStatus = ENXIO;
978 return fConnectionStatus;
989 int HOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
991 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
992 // Send one event request
993 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
994 if ( ret != strlen(GET_ONE) )
998 // wait for and read back size specifier
1000 // The value transmitted is binary, in network byte order
1001 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1002 if ( ret != sizeof(sizeNBO) )
1006 // Convert back to host byte order
1007 source.fDataSize = ntohl( sizeNBO );
1008 source.fData = new homer_uint8[ source.fDataSize ];
1009 unsigned long dataRead=0, toRead;
1010 if ( !source.fData )
1013 // Read in data into buffer in order not to block connection
1014 while ( dataRead < source.fDataSize )
1016 if ( source.fDataSize-dataRead > 1024 )
1019 toRead = source.fDataSize-dataRead;
1020 ret = read( source.fTCPConnection, buffer, toRead );
1028 while ( dataRead < source.fDataSize )
1030 toRead = source.fDataSize-dataRead;
1031 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1034 else if ( ret == 0 && useTimeout )
1037 tv.tv_sec = timeout / 1000000;
1038 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1041 FD_SET( source.fTCPConnection, &conns );
1042 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1046 else if ( ret == 0 )
1063 int HOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1069 int HOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1071 // see header file for class documentation
1072 struct timeval tv1, tv2;
1076 gettimeofday( &tv1, NULL );
1081 for ( unsigned n = 0; n < sourceCnt; n++ )
1083 if ( !sources[n].fDataSize )
1085 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1088 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1089 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1092 if ( found && useTimeout )
1093 gettimeofday( &tv1, NULL );
1094 if ( !all && useTimeout )
1096 gettimeofday( &tv2, NULL );
1097 unsigned long long tdiff;
1098 tdiff = tv2.tv_sec-tv1.tv_sec;
1100 tdiff += tv2.tv_usec-tv1.tv_usec;
1101 if ( tdiff > timeout )
1111 int HOMERReader::ParseSourceData( DataSource& source )
1113 // see header file for class documentation
1116 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1117 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1118 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1121 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1122 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1124 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1125 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1126 fBlocks[fBlockCnt].fSource = source.fNdx;
1127 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1128 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1129 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1130 struct in_addr tmpA;
1131 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1132 char* addr = inet_ntoa( tmpA );
1133 char* tmpchar = new char[ strlen(addr)+1 ];
1136 strcpy( tmpchar, addr );
1137 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1138 descrOffset += descrLen;
1145 int HOMERReader::ReAllocBlocks( unsigned long newCnt )
1147 // see header file for class documentation
1148 DataBlock* newBlocks;
1149 newBlocks = new DataBlock[ newCnt ];
1152 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1153 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1154 if ( newCnt > fMaxBlockCnt )
1155 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1158 fBlocks = newBlocks;
1159 fMaxBlockCnt = newCnt;
1163 homer_uint64 HOMERReader::GetSourceEventID( DataSource& source )
1165 // see header file for class documentation
1166 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1167 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1170 homer_uint64 HOMERReader::GetSourceEventType( DataSource& source )
1172 // see header file for class documentation
1173 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1174 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1177 homer_uint64 HOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1179 // see header file for class documentation
1180 if ( destFormat == sourceFormat )
1183 return ((source & 0xFFULL) << 56) |
1184 ((source & 0xFF00ULL) << 40) |
1185 ((source & 0xFF0000ULL) << 24) |
1186 ((source & 0xFF000000ULL) << 8) |
1187 ((source & 0xFF00000000ULL) >> 8) |
1188 ((source & 0xFF0000000000ULL) >> 24) |
1189 ((source & 0xFF000000000000ULL) >> 40) |
1190 ((source & 0xFF00000000000000ULL) >> 56);
1193 homer_uint32 HOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1195 // see header file for class documentation
1196 if ( destFormat == sourceFormat )
1199 return ((source & 0xFFUL) << 24) |
1200 ((source & 0xFF00UL) << 8) |
1201 ((source & 0xFF0000UL) >> 8) |
1202 ((source & 0xFF000000UL) >> 24);
1205 ***************************************************************************
1207 ** $Author$ - Initial Version by Timm Morten Steinbeck
1211 ***************************************************************************