1 /************************************************************************
4 ** This file is property of and copyright by the Technical Computer
5 ** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6 ** University, Heidelberg, Germany, 2001
7 ** This file has been written by Timm Morten Steinbeck,
8 ** timm@kip.uni-heidelberg.de
11 ** See the file license.txt for details regarding usage, modification,
12 ** distribution and warranty.
13 ** Important: This file is provided without any warranty, including
14 ** fitness for any particular purpose.
17 ** Newer versions of this file's package will be made available from
18 ** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19 ** or the corresponding page of the Heidelberg Alice Level 3 group.
21 *************************************************************************/
24 ***************************************************************************
26 ** $Author$ - Initial Version by Timm Morten Steinbeck
30 ***************************************************************************
33 /** @file AliHLTHOMERReader.cxx
34 @author Timm Steinbeck
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
39 // see header file for class documentation
41 // refer to README to build package
43 // visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
45 #include "AliHLTHOMERReader.h"
51 #include <sys/types.h>
52 #include <sys/socket.h>
53 #include <netinet/in.h>
54 #include <netinet/tcp.h>
56 #include <rpc/types.h>
59 #include <netinet/in.h>
60 #include <arpa/inet.h>
66 #define MOD_BIN "MOD BIN\n"
67 #define MOD_ASC "MOD ASC\n"
68 #define GET_ONE "GET ONE\n"
69 #define GET_ALL "GET ALL\n"
72 ClassImp(AliHLTMonitoringReader);
73 ClassImp(AliHLTHOMERReader);
81 AliHLTHOMERReader::AliHLTHOMERReader()
83 fCurrentEventType(~(homer_uint64)0),
84 fCurrentEventID(~(homer_uint64)0),
94 fErrorConnection(~(unsigned int)0),
95 fEventRequestAdvanceTime(0)
97 // Reader implementation of the HOMER interface.
98 // The HLT Monitoring Environment including ROOT is
99 // a native interface to ship out data from the HLT chain.
100 // See pdf document shiped with the package
101 // for class documentation and tutorial.
107 AliHLTHOMERReader::AliHLTHOMERReader( const char* hostname, unsigned short port )
109 AliHLTMonitoringReader(),
110 fCurrentEventType(~(homer_uint64)0),
111 fCurrentEventID(~(homer_uint64)0),
116 fTCPDataSourceCnt(0),
117 fShmDataSourceCnt(0),
118 fDataSourceMaxCnt(0),
120 fConnectionStatus(0),
121 fErrorConnection(~(unsigned int)0),
122 fEventRequestAdvanceTime(0)
124 // see header file for class documentation
125 // For reading from a TCP port
127 if ( !AllocDataSources(1) )
129 fErrorConnection = 0;
130 fConnectionStatus = ENOMEM;
133 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
134 if ( fConnectionStatus )
135 fErrorConnection = 0;
140 fDataSources[0].fNdx = 0;
144 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
146 AliHLTMonitoringReader(),
147 fCurrentEventType(~(homer_uint64)0),
148 fCurrentEventID(~(homer_uint64)0),
153 fTCPDataSourceCnt(0),
154 fShmDataSourceCnt(0),
155 fDataSourceMaxCnt(0),
157 fConnectionStatus(0),
158 fErrorConnection(~(unsigned int)0),
159 fEventRequestAdvanceTime(0)
161 // see header file for class documentation
162 // For reading from multiple TCP ports
164 if ( !AllocDataSources(tcpCnt) )
166 fErrorConnection = 0;
167 fConnectionStatus = ENOMEM;
170 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
172 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
173 if ( fConnectionStatus )
175 fErrorConnection = n;
178 fDataSources[n].fNdx = n;
182 AliHLTHOMERReader::AliHLTHOMERReader( key_t shmKey, int shmSize )
184 AliHLTMonitoringReader(),
185 fCurrentEventType(~(homer_uint64)0),
186 fCurrentEventID(~(homer_uint64)0),
191 fTCPDataSourceCnt(0),
192 fShmDataSourceCnt(0),
193 fDataSourceMaxCnt(0),
195 fConnectionStatus(0),
196 fErrorConnection(~(unsigned int)0),
197 fEventRequestAdvanceTime(0)
199 // see header file for class documentation
200 // For reading from a System V shared memory segment
202 if ( !AllocDataSources(1) )
204 fErrorConnection = 0;
205 fConnectionStatus = ENOMEM;
208 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
209 if ( fConnectionStatus )
210 fErrorConnection = 0;
215 fDataSources[0].fNdx = 0;
219 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
221 AliHLTMonitoringReader(),
222 fCurrentEventType(~(homer_uint64)0),
223 fCurrentEventID(~(homer_uint64)0),
228 fTCPDataSourceCnt(0),
229 fShmDataSourceCnt(0),
230 fDataSourceMaxCnt(0),
232 fConnectionStatus(0),
233 fErrorConnection(~(unsigned int)0),
234 fEventRequestAdvanceTime(0)
236 // see header file for class documentation
237 // For reading from multiple System V shared memory segments
239 if ( !AllocDataSources(shmCnt) )
241 fErrorConnection = 0;
242 fConnectionStatus = ENOMEM;
245 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
247 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
248 if ( fConnectionStatus )
250 fErrorConnection = n;
253 fDataSources[n].fNdx = n;
257 AliHLTHOMERReader::AliHLTHOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
258 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
260 AliHLTMonitoringReader(),
261 fCurrentEventType(~(homer_uint64)0),
262 fCurrentEventID(~(homer_uint64)0),
267 fTCPDataSourceCnt(0),
268 fShmDataSourceCnt(0),
269 fDataSourceMaxCnt(0),
271 fConnectionStatus(0),
272 fErrorConnection(~(unsigned int)0),
273 fEventRequestAdvanceTime(0)
275 // see header file for class documentation
276 // For reading from multiple TCP ports and multiple System V shared memory segments
278 if ( !AllocDataSources(tcpCnt+shmCnt) )
280 fErrorConnection = 0;
281 fConnectionStatus = ENOMEM;
284 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
286 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
287 if ( fConnectionStatus )
289 fErrorConnection = n;
292 fDataSources[n].fNdx = n;
294 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
296 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
297 if ( fConnectionStatus )
299 fErrorConnection = tcpCnt+n;
302 fDataSources[n].fNdx = n;
305 AliHLTHOMERReader::~AliHLTHOMERReader()
307 // see header file for class documentation
308 ReleaseCurrentEvent();
312 int AliHLTHOMERReader::ReadNextEvent()
314 // see header file for class documentation
315 // Read in the next available event
316 return ReadNextEvent( false, 0 );
319 int AliHLTHOMERReader::ReadNextEvent( unsigned long timeout )
321 // see header file for class documentation
322 // Read in the next available event
323 return ReadNextEvent( true, timeout );
326 unsigned long AliHLTHOMERReader::GetBlockDataLength( unsigned long ndx ) const
328 // see header file for class documentation
329 // Return the size (in bytes) of the current event's data
330 // block with the given block index (starting at 0).
331 if ( ndx >= fBlockCnt )
333 return fBlocks[ndx].fLength;
336 const void* AliHLTHOMERReader::GetBlockData( unsigned long ndx ) const
338 // see header file for class documentation
339 // Return a pointer to the start of the current event's data
340 // block with the given block index (starting at 0).
341 if ( ndx >= fBlockCnt )
343 return fBlocks[ndx].fData;
346 const char* AliHLTHOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
348 // see header file for class documentation
349 // Return IP address or hostname of node which sent the
350 // current event's data block with the given block index
352 // For HOMER this is the ID of the node on which the subscriber
353 // that provided this data runs/ran.
354 if ( ndx >= fBlockCnt )
357 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
359 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
360 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
364 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
365 //return fBlocks[ndx].fOriginatingNodeID;
368 homer_uint8 AliHLTHOMERReader::GetBlockByteOrder( unsigned long ndx ) const
370 // see header file for class documentation
371 // Return byte order of the data stored in the
372 // current event's data block with the given block index (starting at 0).
373 // 0 is unknown alignment,
374 // 1 ist little endian,
375 // 2 is big endian. */
376 if ( ndx >= fBlockCnt )
378 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
379 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
382 homer_uint8 AliHLTHOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
384 // see header file for class documentation
385 // Return the alignment (in bytes) of the given datatype
386 // in the data stored in the current event's data block
387 // with the given block index (starting at 0).
388 // Possible values for the data type are
395 if ( ndx >= fBlockCnt )
397 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
399 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
400 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
403 homer_uint64 AliHLTHOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
405 // see header file for class documentation
406 if ( ndx >= fBlockCnt )
408 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
412 /* Return the type of the data in the current event's data
413 block with the given block index (starting at 0). */
414 homer_uint64 AliHLTHOMERReader::GetBlockDataType( unsigned long ndx ) const
416 // see header file for class documentation
417 if ( ndx >= fBlockCnt )
418 return ~(homer_uint64)0;
419 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
420 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
423 /* Return the origin of the data in the current event's data
424 block with the given block index (starting at 0). */
425 homer_uint32 AliHLTHOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
427 // see header file for class documentation
428 if ( ndx >= fBlockCnt )
429 return ~(homer_uint32)0;
430 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
431 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
434 /* Return a specification of the data in the current event's data
435 block with the given block index (starting at 0). */
436 homer_uint32 AliHLTHOMERReader::GetBlockDataSpec( unsigned long ndx ) const
438 // see header file for class documentation
439 if ( ndx >= fBlockCnt )
440 return ~(homer_uint32)0;
441 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
442 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
445 /* Find the next data block in the current event with the given
446 data type, origin, and specification. Returns the block's
448 unsigned long AliHLTHOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
449 homer_uint32 spec, unsigned long startNdx ) const
451 // see header file for class documentation
452 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
454 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
455 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
456 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
459 return ~(unsigned long)0;
462 /* Find the next data block in the current event with the given
463 data type, origin, and specification. Returns the block's
465 unsigned long AliHLTHOMERReader::FindBlockNdx( char type[8], char origin[4],
466 homer_uint32 spec, unsigned long startNdx ) const
468 // see header file for class documentation
469 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
471 bool found1=true, found2=true;
472 for ( unsigned i = 0; i < 8; i++ )
474 if ( type[i] != (char)0xFF )
483 for ( unsigned i = 0; i < 8; i++ )
485 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
486 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
493 for ( unsigned i = 0; i < 4; i++ )
495 if ( origin[i] != (char)0xFF )
504 for ( unsigned i = 0; i < 4; i++ )
506 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
507 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
514 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
515 if ( found1 && found2 &&
516 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
519 return ~(unsigned long)0;
522 /* Return the ID of the node that actually produced this data block.
523 This may be different from the node which sent the data to this
524 monitoring object as returned by GetBlockSendNodeID. */
525 const char* AliHLTHOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
527 // see header file for class documentation
528 if ( ndx >= fBlockCnt )
530 return fBlocks[ndx].fOriginatingNodeID;
534 void AliHLTHOMERReader::Init()
536 // see header file for class documentation
537 fCurrentEventType = ~(homer_uint64)0;
538 fCurrentEventID = ~(homer_uint64)0;
539 fMaxBlockCnt = fBlockCnt = 0;
542 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
546 fConnectionStatus = 0;
547 fErrorConnection = ~(unsigned int)0;
549 fEventRequestAdvanceTime = 0;
552 bool AliHLTHOMERReader::AllocDataSources( unsigned int sourceCnt )
554 // see header file for class documentation
555 fDataSources = new DataSource[ sourceCnt ];
559 fDataSourceMaxCnt = sourceCnt;
563 int AliHLTHOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
565 // see header file for class documentation
567 he = gethostbyname( hostname );
570 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
571 return EADDRNOTAVAIL;
574 struct sockaddr_in remoteAddr;
575 remoteAddr.sin_family = AF_INET; // host byte order
576 remoteAddr.sin_port = htons(port); // short, network byte order
577 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
578 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
580 // Create socket and connect to target program on remote node
581 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
582 if ( source.fTCPConnection == -1 )
589 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
593 close( source.fTCPConnection );
597 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
598 if ( ret != (int)strlen(MOD_BIN) )
601 close( source.fTCPConnection );
605 char* tmpchar = new char[ strlen( hostname )+1 ];
608 close( source.fTCPConnection );
611 strcpy( tmpchar, hostname );
612 source.fHostname = tmpchar;
615 source.fTCPPort = port;
617 source.fDataSize = 0;
618 source.fDataRead = 0;
622 int AliHLTHOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
624 // see header file for class documentation
626 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
631 gethostname( tmpchar, MAXHOSTNAMELEN );
632 tmpchar[MAXHOSTNAMELEN]=(char)0;
633 source.fHostname = tmpchar;
635 source.fShmID = shmget( shmKey, shmSize, 0660 );
636 if ( source.fShmID == -1 )
639 delete [] source.fHostname;
643 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
645 if ( !source.fShmPtr )
648 shmctl( source.fShmID, IPC_RMID, NULL );
649 delete [] source.fHostname;
654 source.fShmKey = shmKey;
655 source.fShmSize = shmSize;
656 source.fDataSize = 0;
657 source.fDataRead = 0;
661 void AliHLTHOMERReader::FreeDataSources()
663 // see header file for class documentation
664 for ( unsigned n=0; n < fDataSourceCnt; n++ )
666 if ( fDataSources[n].fType == kTCP )
667 FreeTCPDataSource( fDataSources[n] );
669 FreeShmDataSource( fDataSources[n] );
673 int AliHLTHOMERReader::FreeShmDataSource( DataSource& source )
675 // see header file for class documentation
676 if ( source.fShmPtr )
677 shmdt( source.fShmPtr );
678 // if ( source.fShmID != -1 )
679 // shmctl( source.fShmID, IPC_RMID, NULL );
680 if ( source.fHostname )
681 delete [] source.fHostname;
685 int AliHLTHOMERReader::FreeTCPDataSource( DataSource& source )
687 // see header file for class documentation
688 if ( source.fTCPConnection )
689 close( source.fTCPConnection );
690 if ( source.fHostname )
691 delete [] source.fHostname;
695 int AliHLTHOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
697 // see header file for class documentation
698 if ( fDataSourceCnt<=0 )
700 // Clean up currently active event.
701 ReleaseCurrentEvent();
703 // Trigger all configured data sources
704 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
706 if ( fDataSources[n].fType == kTCP )
707 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
709 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
712 fErrorConnection = n;
713 fConnectionStatus=ret;
714 return fConnectionStatus;
717 // Now read in data from the configured data source
718 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
723 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
728 // for ( unsigned n = 0; n<fDataSourceCnt; n++ )
730 // if ( fDataSources[n].fType == kTCP )
731 // ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
733 // ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
736 // fErrorConnection = n;
737 // fConnectionStatus=ret;
738 // return fConnectionStatus;
741 //Check to see that all sources contributed data for the same event
742 homer_uint64 eventID;
743 homer_uint64 eventType;
744 eventID = GetSourceEventID( fDataSources[0] );
745 eventType = GetSourceEventType( fDataSources[0] );
746 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
748 if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
750 fErrorConnection = n;
751 fConnectionStatus=56;//EBADRQC;
752 return fConnectionStatus;
755 // Find all the different data blocks contained in the data from all
757 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
759 ret = ParseSourceData( fDataSources[n] );
762 fErrorConnection = n;
763 fConnectionStatus=57;//EBADSLT;
764 return fConnectionStatus;
767 fCurrentEventID = eventID;
768 fCurrentEventType = eventType;
772 void AliHLTHOMERReader::ReleaseCurrentEvent()
774 // see header file for class documentation
775 // sources.fDataRead = 0;
777 fCurrentEventID = ~(homer_uint64)0;
778 fCurrentEventType = ~(homer_uint64)0;
779 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
781 if ( fDataSources[n].fData )
783 if ( fDataSources[n].fType == kTCP )
784 delete [] (homer_uint8*)fDataSources[n].fData;
785 fDataSources[n].fData = NULL;
787 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
791 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
793 if ( fBlocks[n].fOriginatingNodeID )
794 delete [] fBlocks[n].fOriginatingNodeID;
803 int AliHLTHOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
805 // see header file for class documentation
807 struct timeval oldSndTO, newSndTO;
810 socklen_t optlen=sizeof(oldSndTO);
811 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
816 if ( optlen!=sizeof(oldSndTO) )
820 newSndTO.tv_sec = timeoutUsec / 1000000;
821 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
822 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
828 // Send one event request
829 if ( !fEventRequestAdvanceTime )
831 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
833 if ( ret != (int)strlen(GET_ONE) )
836 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
844 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
845 if ( len>128 || len<0 )
848 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
852 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
854 if ( ret != (int)strlen(tmpCmd) )
857 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
865 int AliHLTHOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
867 // see header file for class documentation
868 // clear the size indicator in the first 4 bytes of the buffer to request data
869 // from the HOMER writer.
870 if ( source.fShmPtr )
872 *(homer_uint32*)( source.fShmPtr ) = 0;
879 int AliHLTHOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
881 // see header file for class documentation
889 unsigned firstConnection=~(unsigned)0;
890 for ( unsigned long n = 0; n < sourceCnt; n++ )
892 if ( sources[n].fDataSize == 0 // size specifier not yet read
893 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
896 FD_SET( sources[n].fTCPConnection, &conns );
897 if ( sources[n].fTCPConnection > highestConn )
898 highestConn = sources[n].fTCPConnection;
899 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
900 if ( firstConnection == ~(unsigned)0 )
905 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
910 struct timeval tv, *ptv;
913 tv.tv_sec = timeout / 1000000;
914 tv.tv_usec = timeout - (tv.tv_sec*1000000);
919 // wait until something is ready to be read
920 // either for timeout usecs or until eternity
922 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
925 fErrorConnection = firstConnection;
927 fConnectionStatus = errno;
929 fConnectionStatus = ETIMEDOUT;
930 return fConnectionStatus;
932 for ( unsigned n = 0; n < sourceCnt; n++ )
934 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
936 if ( sources[n].fDataSize == 0 )
938 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
939 if ( ret != sizeof(homer_uint32) )
941 fErrorConnection = n;
943 fConnectionStatus = errno;
945 fConnectionStatus = ENOMSG;
946 return fConnectionStatus;
948 sources[n].fDataSize = ntohl( sources[n].fDataSize );
949 sources[n].fDataRead = 0;
950 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
951 if ( !sources[n].fData )
953 fErrorConnection = n;
954 fConnectionStatus = ENOMEM;
955 return fConnectionStatus;
958 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
960 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
962 sources[n].fDataRead += ret;
965 fErrorConnection = n;
966 fConnectionStatus = ECONNRESET;
967 return fConnectionStatus;
971 fErrorConnection = n;
972 fConnectionStatus = errno;
973 return fConnectionStatus;
978 fErrorConnection = n;
979 fConnectionStatus = ENXIO;
980 return fConnectionStatus;
991 int AliHLTHOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
993 #warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
994 // Send one event request
995 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
996 if ( ret != strlen(GET_ONE) )
1000 // wait for and read back size specifier
1002 // The value transmitted is binary, in network byte order
1003 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1004 if ( ret != sizeof(sizeNBO) )
1008 // Convert back to host byte order
1009 source.fDataSize = ntohl( sizeNBO );
1010 source.fData = new homer_uint8[ source.fDataSize ];
1011 unsigned long dataRead=0, toRead;
1012 if ( !source.fData )
1015 // Read in data into buffer in order not to block connection
1016 while ( dataRead < source.fDataSize )
1018 if ( source.fDataSize-dataRead > 1024 )
1021 toRead = source.fDataSize-dataRead;
1022 ret = read( source.fTCPConnection, buffer, toRead );
1030 while ( dataRead < source.fDataSize )
1032 toRead = source.fDataSize-dataRead;
1033 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1036 else if ( ret == 0 && useTimeout )
1039 tv.tv_sec = timeout / 1000000;
1040 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1043 FD_SET( source.fTCPConnection, &conns );
1044 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1048 else if ( ret == 0 )
1065 int AliHLTHOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1071 int AliHLTHOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1073 // see header file for class documentation
1074 struct timeval tv1, tv2;
1078 gettimeofday( &tv1, NULL );
1083 for ( unsigned n = 0; n < sourceCnt; n++ )
1085 if ( !sources[n].fDataSize )
1087 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1090 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1091 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1094 if ( found && useTimeout )
1095 gettimeofday( &tv1, NULL );
1096 if ( !all && useTimeout )
1098 gettimeofday( &tv2, NULL );
1099 unsigned long long tdiff;
1100 tdiff = tv2.tv_sec-tv1.tv_sec;
1102 tdiff += tv2.tv_usec-tv1.tv_usec;
1103 if ( tdiff > timeout )
1113 int AliHLTHOMERReader::ParseSourceData( DataSource& source )
1115 // see header file for class documentation
1118 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1119 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1120 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1123 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1124 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1126 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1127 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1128 fBlocks[fBlockCnt].fSource = source.fNdx;
1129 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1130 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1131 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1132 struct in_addr tmpA;
1133 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1134 char* addr = inet_ntoa( tmpA );
1135 char* tmpchar = new char[ strlen(addr)+1 ];
1138 strcpy( tmpchar, addr );
1139 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1140 descrOffset += descrLen;
1147 int AliHLTHOMERReader::ReAllocBlocks( unsigned long newCnt )
1149 // see header file for class documentation
1150 DataBlock* newBlocks;
1151 newBlocks = new DataBlock[ newCnt ];
1154 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1155 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1156 if ( newCnt > fMaxBlockCnt )
1157 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1160 fBlocks = newBlocks;
1161 fMaxBlockCnt = newCnt;
1165 homer_uint64 AliHLTHOMERReader::GetSourceEventID( DataSource& source )
1167 // see header file for class documentation
1168 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1169 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1172 homer_uint64 AliHLTHOMERReader::GetSourceEventType( DataSource& source )
1174 // see header file for class documentation
1175 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1176 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1179 homer_uint64 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1181 // see header file for class documentation
1182 if ( destFormat == sourceFormat )
1185 return ((source & 0xFFULL) << 56) |
1186 ((source & 0xFF00ULL) << 40) |
1187 ((source & 0xFF0000ULL) << 24) |
1188 ((source & 0xFF000000ULL) << 8) |
1189 ((source & 0xFF00000000ULL) >> 8) |
1190 ((source & 0xFF0000000000ULL) >> 24) |
1191 ((source & 0xFF000000000000ULL) >> 40) |
1192 ((source & 0xFF00000000000000ULL) >> 56);
1195 homer_uint32 AliHLTHOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1197 // see header file for class documentation
1198 if ( destFormat == sourceFormat )
1201 return ((source & 0xFFUL) << 24) |
1202 ((source & 0xFF00UL) << 8) |
1203 ((source & 0xFF0000UL) >> 8) |
1204 ((source & 0xFF000000UL) >> 24);
1207 AliHLTHOMERReader* AliHLTHOMERReaderCreate(const void* pBuffer, int size)
1209 // see header file for function documentation
1210 //return new AliHLTHOMERReader(pBuffer, size);
1214 void AliHLTHOMERReaderDelete(AliHLTHOMERReader* pInstance)
1216 // see header file for function documentation
1217 if (pInstance) delete pInstance;
1221 ***************************************************************************
1223 ** $Author$ - Initial Version by Timm Morten Steinbeck
1227 ***************************************************************************