Adding first version of trigger LUT handling to Shuttle
[u/mrichter/AliRoot.git] / HLT / BASE / HOMER / AliHLTHOMERReader.cxx
CommitLineData
2be16a33 1/************************************************************************
2**
3**
4** This file is property of and copyright by the Technical Computer
5** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6** University, Heidelberg, Germany, 2001
7** This file has been written by Timm Morten Steinbeck,
8** timm@kip.uni-heidelberg.de
9**
10**
11** See the file license.txt for details regarding usage, modification,
12** distribution and warranty.
13** Important: This file is provided without any warranty, including
14** fitness for any particular purpose.
15**
16**
17** Newer versions of this file's package will be made available from
18** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19** or the corresponding page of the Heidelberg Alice Level 3 group.
20**
21*************************************************************************/
22
23/*
24***************************************************************************
25**
26** $Author$ - Initial Version by Timm Morten Steinbeck
27**
28** $Id$
29**
30***************************************************************************
31*/
32
9e91e956 33/** @file AliHLTHOMERReader.cxx
2be16a33 34 @author Timm Steinbeck
35 @date Sep 14 2007
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
38
04a939f7 39// see header file for class documentation
2be16a33 40// or
41// refer to README to build package
42// or
43// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44
45#include "AliHLTHOMERReader.h"
46#include <stdio.h>
47#include <string.h>
48#include <errno.h>
49#include <netdb.h>
50extern int h_errno;
51#include <sys/types.h>
52#include <sys/socket.h>
53#include <netinet/in.h>
54#include <netinet/tcp.h>
55#include <unistd.h>
56#include <rpc/types.h>
57#include <fcntl.h>
58#include <sys/stat.h>
59#include <netinet/in.h>
60#include <arpa/inet.h>
61#ifdef USE_ROOT
62#include <Rtypes.h>
63#endif
64
65
66#define MOD_BIN "MOD BIN\n"
67#define MOD_ASC "MOD ASC\n"
68#define GET_ONE "GET ONE\n"
69#define GET_ALL "GET ALL\n"
70
71#ifdef USE_ROOT
72ClassImp(MonitoringReader);
73ClassImp(HOMERReader);
74#endif
75
76
77
78
79
80#ifdef USE_ROOT
81HOMERReader::HOMERReader()
746d2a94 82 :
83 fCurrentEventType(~(homer_uint64)0),
84 fCurrentEventID(~(homer_uint64)0),
85 fBlockCnt(0),
86 fMaxBlockCnt(0),
87 fBlocks(NULL),
88 fDataSourceCnt(0),
89 fTCPDataSourceCnt(0),
90 fShmDataSourceCnt(0),
91 fDataSourceMaxCnt(0),
04a939f7 92 fDataSources(NULL),
93 fConnectionStatus(0),
94 fErrorConnection(~(unsigned int)0),
95 fEventRequestAdvanceTime(0)
2be16a33 96 {
1d47dbac 97// Reader implementation of the HOMER interface.
98// The HLT Monitoring Environment including ROOT is
99// a native interface to ship out data from the HLT chain.
100// See pdf document shiped with the package
101// for class documentation and tutorial.
2be16a33 102 Init();
103 }
104#endif
105
106
2be16a33 107HOMERReader::HOMERReader( const char* hostname, unsigned short port )
746d2a94 108 :
109 MonitoringReader(),
110 fCurrentEventType(~(homer_uint64)0),
111 fCurrentEventID(~(homer_uint64)0),
112 fBlockCnt(0),
113 fMaxBlockCnt(0),
114 fBlocks(NULL),
115 fDataSourceCnt(0),
116 fTCPDataSourceCnt(0),
117 fShmDataSourceCnt(0),
118 fDataSourceMaxCnt(0),
04a939f7 119 fDataSources(NULL),
120 fConnectionStatus(0),
121 fErrorConnection(~(unsigned int)0),
122 fEventRequestAdvanceTime(0)
2be16a33 123 {
04a939f7 124// see header file for class documentation
125// For reading from a TCP port
2be16a33 126 Init();
127 if ( !AllocDataSources(1) )
128 {
129 fErrorConnection = 0;
130 fConnectionStatus = ENOMEM;
131 return;
132 }
133 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
134 if ( fConnectionStatus )
135 fErrorConnection = 0;
136 else
137 {
138 fDataSourceCnt++;
139 fTCPDataSourceCnt++;
140 fDataSources[0].fNdx = 0;
141 }
142 }
143
2be16a33 144HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
746d2a94 145 :
146 MonitoringReader(),
147 fCurrentEventType(~(homer_uint64)0),
148 fCurrentEventID(~(homer_uint64)0),
149 fBlockCnt(0),
150 fMaxBlockCnt(0),
151 fBlocks(NULL),
152 fDataSourceCnt(0),
153 fTCPDataSourceCnt(0),
154 fShmDataSourceCnt(0),
155 fDataSourceMaxCnt(0),
04a939f7 156 fDataSources(NULL),
157 fConnectionStatus(0),
158 fErrorConnection(~(unsigned int)0),
159 fEventRequestAdvanceTime(0)
2be16a33 160 {
04a939f7 161// see header file for class documentation
162// For reading from multiple TCP ports
2be16a33 163 Init();
164 if ( !AllocDataSources(tcpCnt) )
165 {
166 fErrorConnection = 0;
167 fConnectionStatus = ENOMEM;
168 return;
169 }
170 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
171 {
172 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
173 if ( fConnectionStatus )
174 {
175 fErrorConnection = n;
176 return;
177 }
178 fDataSources[n].fNdx = n;
179 }
180 }
181
2be16a33 182HOMERReader::HOMERReader( key_t shmKey, int shmSize )
746d2a94 183 :
184 MonitoringReader(),
185 fCurrentEventType(~(homer_uint64)0),
186 fCurrentEventID(~(homer_uint64)0),
187 fBlockCnt(0),
188 fMaxBlockCnt(0),
189 fBlocks(NULL),
190 fDataSourceCnt(0),
191 fTCPDataSourceCnt(0),
192 fShmDataSourceCnt(0),
193 fDataSourceMaxCnt(0),
04a939f7 194 fDataSources(NULL),
195 fConnectionStatus(0),
196 fErrorConnection(~(unsigned int)0),
197 fEventRequestAdvanceTime(0)
2be16a33 198 {
04a939f7 199// see header file for class documentation
200// For reading from a System V shared memory segment
2be16a33 201 Init();
202 if ( !AllocDataSources(1) )
203 {
204 fErrorConnection = 0;
205 fConnectionStatus = ENOMEM;
206 return;
207 }
208 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
209 if ( fConnectionStatus )
210 fErrorConnection = 0;
211 else
212 {
213 fDataSourceCnt++;
214 fShmDataSourceCnt++;
215 fDataSources[0].fNdx = 0;
216 }
217 }
218
2be16a33 219HOMERReader::HOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
746d2a94 220 :
221 MonitoringReader(),
222 fCurrentEventType(~(homer_uint64)0),
223 fCurrentEventID(~(homer_uint64)0),
224 fBlockCnt(0),
225 fMaxBlockCnt(0),
226 fBlocks(NULL),
227 fDataSourceCnt(0),
228 fTCPDataSourceCnt(0),
229 fShmDataSourceCnt(0),
230 fDataSourceMaxCnt(0),
04a939f7 231 fDataSources(NULL),
232 fConnectionStatus(0),
233 fErrorConnection(~(unsigned int)0),
234 fEventRequestAdvanceTime(0)
2be16a33 235 {
04a939f7 236// see header file for class documentation
237// For reading from multiple System V shared memory segments
2be16a33 238 Init();
239 if ( !AllocDataSources(shmCnt) )
240 {
241 fErrorConnection = 0;
242 fConnectionStatus = ENOMEM;
243 return;
244 }
245 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
246 {
247 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
248 if ( fConnectionStatus )
249 {
250 fErrorConnection = n;
251 return;
252 }
253 fDataSources[n].fNdx = n;
254 }
255 }
256
2be16a33 257HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
258 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
746d2a94 259 :
260 MonitoringReader(),
261 fCurrentEventType(~(homer_uint64)0),
262 fCurrentEventID(~(homer_uint64)0),
263 fBlockCnt(0),
264 fMaxBlockCnt(0),
265 fBlocks(NULL),
266 fDataSourceCnt(0),
267 fTCPDataSourceCnt(0),
268 fShmDataSourceCnt(0),
269 fDataSourceMaxCnt(0),
04a939f7 270 fDataSources(NULL),
271 fConnectionStatus(0),
272 fErrorConnection(~(unsigned int)0),
273 fEventRequestAdvanceTime(0)
2be16a33 274 {
04a939f7 275// see header file for class documentation
276// For reading from multiple TCP ports and multiple System V shared memory segments
2be16a33 277 Init();
278 if ( !AllocDataSources(tcpCnt+shmCnt) )
279 {
280 fErrorConnection = 0;
281 fConnectionStatus = ENOMEM;
282 return;
283 }
284 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
285 {
286 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
287 if ( fConnectionStatus )
288 {
289 fErrorConnection = n;
290 return;
291 }
292 fDataSources[n].fNdx = n;
293 }
294 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
295 {
296 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
297 if ( fConnectionStatus )
298 {
299 fErrorConnection = tcpCnt+n;
300 return;
301 }
302 fDataSources[n].fNdx = n;
303 }
304 }
305HOMERReader::~HOMERReader()
306 {
04a939f7 307// see header file for class documentation
2be16a33 308 ReleaseCurrentEvent();
309 FreeDataSources();
310 }
311
2be16a33 312int HOMERReader::ReadNextEvent()
313 {
04a939f7 314// see header file for class documentation
315// Read in the next available event
2be16a33 316 return ReadNextEvent( false, 0 );
317 }
318
2be16a33 319int HOMERReader::ReadNextEvent( unsigned long timeout )
320 {
04a939f7 321// see header file for class documentation
322// Read in the next available event
2be16a33 323 return ReadNextEvent( true, timeout );
324 }
325
2be16a33 326unsigned long HOMERReader::GetBlockDataLength( unsigned long ndx ) const
327 {
04a939f7 328// see header file for class documentation
329// Return the size (in bytes) of the current event's data
330// block with the given block index (starting at 0).
2be16a33 331 if ( ndx >= fBlockCnt )
332 return 0;
333 return fBlocks[ndx].fLength;
334 }
335
2be16a33 336const void* HOMERReader::GetBlockData( unsigned long ndx ) const
337 {
04a939f7 338// see header file for class documentation
339// Return a pointer to the start of the current event's data
340// block with the given block index (starting at 0).
2be16a33 341 if ( ndx >= fBlockCnt )
342 return NULL;
343 return fBlocks[ndx].fData;
344 }
345
2be16a33 346const char* HOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
347 {
04a939f7 348// see header file for class documentation
349// Return IP address or hostname of node which sent the
350// current event's data block with the given block index
351// (starting at 0).
352// For HOMER this is the ID of the node on which the subscriber
353// that provided this data runs/ran.
2be16a33 354 if ( ndx >= fBlockCnt )
355 return NULL;
356#ifdef DEBUG
357 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
358 {
359 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
360 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
361 return NULL;
362 }
363#endif
364 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
365 //return fBlocks[ndx].fOriginatingNodeID;
366 }
367
2be16a33 368homer_uint8 HOMERReader::GetBlockByteOrder( unsigned long ndx ) const
369 {
04a939f7 370// see header file for class documentation
371// Return byte order of the data stored in the
372// current event's data block with the given block index (starting at 0).
373// 0 is unknown alignment,
374// 1 ist little endian,
375// 2 is big endian. */
2be16a33 376 if ( ndx >= fBlockCnt )
377 return 0;
378 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
379 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
380 }
04a939f7 381
2be16a33 382homer_uint8 HOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
383 {
04a939f7 384// see header file for class documentation
385// Return the alignment (in bytes) of the given datatype
386// in the data stored in the current event's data block
387// with the given block index (starting at 0).
388// Possible values for the data type are
389// 0: homer_uint64
390// 1: homer_uint32
391// 2: uin16
392// 3: homer_uint8
393// 4: double
394// 5: float
2be16a33 395 if ( ndx >= fBlockCnt )
396 return 0;
397 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
398 return 0;
399 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
400 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
401 }
402
403homer_uint64 HOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
404 {
04a939f7 405// see header file for class documentation
2be16a33 406 if ( ndx >= fBlockCnt )
407 return 0;
408 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
409 }
410
411/* HOMER specific */
412/* Return the type of the data in the current event's data
413 block with the given block index (starting at 0). */
414homer_uint64 HOMERReader::GetBlockDataType( unsigned long ndx ) const
415 {
04a939f7 416// see header file for class documentation
2be16a33 417 if ( ndx >= fBlockCnt )
418 return ~(homer_uint64)0;
419 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
420 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
421 }
422
423/* Return the origin of the data in the current event's data
424 block with the given block index (starting at 0). */
425homer_uint32 HOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
426 {
04a939f7 427// see header file for class documentation
2be16a33 428 if ( ndx >= fBlockCnt )
429 return ~(homer_uint32)0;
430 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
431 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
432 }
433
434/* Return a specification of the data in the current event's data
435 block with the given block index (starting at 0). */
436homer_uint32 HOMERReader::GetBlockDataSpec( unsigned long ndx ) const
437 {
04a939f7 438// see header file for class documentation
2be16a33 439 if ( ndx >= fBlockCnt )
440 return ~(homer_uint32)0;
441 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
442 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
443 }
444
445/* Find the next data block in the current event with the given
446 data type, origin, and specification. Returns the block's
447 index. */
448unsigned long HOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
449 homer_uint32 spec, unsigned long startNdx ) const
450 {
04a939f7 451// see header file for class documentation
2be16a33 452 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
453 {
454 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
455 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
456 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
457 return n;
458 }
459 return ~(unsigned long)0;
460 }
461
462/* Find the next data block in the current event with the given
463 data type, origin, and specification. Returns the block's
464 index. */
465unsigned long HOMERReader::FindBlockNdx( char type[8], char origin[4],
466 homer_uint32 spec, unsigned long startNdx ) const
467 {
04a939f7 468// see header file for class documentation
2be16a33 469 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
470 {
471 bool found1=true, found2=true;
472 for ( unsigned i = 0; i < 8; i++ )
473 {
474 if ( type[i] != (char)0xFF )
475 {
476 found1=false;
477 break;
478 }
479 }
480 if ( !found1 )
481 {
482 found1 = true;
483 for ( unsigned i = 0; i < 8; i++ )
484 {
485 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
486 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
487 {
488 found1=false;
489 break;
490 }
491 }
492 }
493 for ( unsigned i = 0; i < 4; i++ )
494 {
495 if ( origin[i] != (char)0xFF )
496 {
497 found2 = false;
498 break;
499 }
500 }
501 if ( !found2 )
502 {
503 found2 = true;
504 for ( unsigned i = 0; i < 4; i++ )
505 {
506 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
507 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
508 {
509 found2=false;
510 break;
511 }
512 }
513 }
514 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
515 if ( found1 && found2 &&
516 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
517 return n;
518 }
519 return ~(unsigned long)0;
520 }
521
522/* Return the ID of the node that actually produced this data block.
523 This may be different from the node which sent the data to this
524 monitoring object as returned by GetBlockSendNodeID. */
525const char* HOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
526 {
04a939f7 527// see header file for class documentation
2be16a33 528 if ( ndx >= fBlockCnt )
529 return NULL;
530 return fBlocks[ndx].fOriginatingNodeID;
531 }
532
533
534void HOMERReader::Init()
535 {
04a939f7 536// see header file for class documentation
2be16a33 537 fCurrentEventType = ~(homer_uint64)0;
538 fCurrentEventID = ~(homer_uint64)0;
539 fMaxBlockCnt = fBlockCnt = 0;
540 fBlocks = NULL;
541
542 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
543 fDataSources = NULL;
544
545
546 fConnectionStatus = 0;
547 fErrorConnection = ~(unsigned int)0;
548
04a939f7 549 fEventRequestAdvanceTime = 0;
2be16a33 550 }
551
552bool HOMERReader::AllocDataSources( unsigned int sourceCnt )
553 {
04a939f7 554// see header file for class documentation
2be16a33 555 fDataSources = new DataSource[ sourceCnt ];
556 if ( !fDataSources )
557 return false;
558 fDataSourceCnt = 0;
559 fDataSourceMaxCnt = sourceCnt;
560 return true;
561 }
562
563int HOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
564 {
04a939f7 565// see header file for class documentation
2be16a33 566 struct hostent* he;
567 he = gethostbyname( hostname );
568 if ( he == NULL )
569 {
570 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
571 return EADDRNOTAVAIL;
572 }
573
574 struct sockaddr_in remoteAddr;
575 remoteAddr.sin_family = AF_INET; // host byte order
576 remoteAddr.sin_port = htons(port); // short, network byte order
577 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
578 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
579
580 // Create socket and connect to target program on remote node
581 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
582 if ( source.fTCPConnection == -1 )
583 {
584 return errno;
585 }
586
587 int ret;
588
589 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
590 if ( ret == -1 )
591 {
592 ret=errno;
593 close( source.fTCPConnection );
594 return ret;
595 }
596
597 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
598 if ( ret != (int)strlen(MOD_BIN) )
599 {
600 ret=errno;
601 close( source.fTCPConnection );
602 return ret;
603 }
604
605 char* tmpchar = new char[ strlen( hostname )+1 ];
606 if ( !tmpchar )
607 {
608 close( source.fTCPConnection );
609 return ENOMEM;
610 }
611 strcpy( tmpchar, hostname );
612 source.fHostname = tmpchar;
613
614 source.fType = kTCP;
615 source.fTCPPort = port;
616 source.fData = NULL;
617 source.fDataSize = 0;
618 source.fDataRead = 0;
619 return 0;
620 }
621
622int HOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
623 {
04a939f7 624// see header file for class documentation
2be16a33 625 int ret;
626 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
627 if ( !tmpchar )
628 {
629 return ENOMEM;
630 }
631 gethostname( tmpchar, MAXHOSTNAMELEN );
632 tmpchar[MAXHOSTNAMELEN]=(char)0;
633 source.fHostname = tmpchar;
634
635 source.fShmID = shmget( shmKey, shmSize, 0660 );
636 if ( source.fShmID == -1 )
637 {
638 ret = errno;
639 delete [] source.fHostname;
640 return ret;
641 }
642
643 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
644
645 if ( !source.fShmPtr )
646 {
647 ret = errno;
648 shmctl( source.fShmID, IPC_RMID, NULL );
649 delete [] source.fHostname;
650 return ret;
651 }
652
653 source.fType = kShm;
654 source.fShmKey = shmKey;
655 source.fShmSize = shmSize;
656 source.fDataSize = 0;
657 source.fDataRead = 0;
658 return 0;
659 }
660
661void HOMERReader::FreeDataSources()
662 {
04a939f7 663// see header file for class documentation
2be16a33 664 for ( unsigned n=0; n < fDataSourceCnt; n++ )
665 {
666 if ( fDataSources[n].fType == kTCP )
667 FreeTCPDataSource( fDataSources[n] );
668 else
669 FreeShmDataSource( fDataSources[n] );
670 }
671 }
672
673int HOMERReader::FreeShmDataSource( DataSource& source )
674 {
04a939f7 675// see header file for class documentation
2be16a33 676 if ( source.fShmPtr )
677 shmdt( source.fShmPtr );
678// if ( source.fShmID != -1 )
679// shmctl( source.fShmID, IPC_RMID, NULL );
680 if ( source.fHostname )
681 delete [] source.fHostname;
682 return 0;
683 }
684
685int HOMERReader::FreeTCPDataSource( DataSource& source )
686 {
04a939f7 687// see header file for class documentation
2be16a33 688 if ( source.fTCPConnection )
689 close( source.fTCPConnection );
690 if ( source.fHostname )
691 delete [] source.fHostname;
692 return 0;
693 }
694
695int HOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
696 {
04a939f7 697// see header file for class documentation
2be16a33 698 if ( fDataSourceCnt<=0 )
699 return ENXIO;
700 // Clean up currently active event.
701 ReleaseCurrentEvent();
702 int ret;
703 // Trigger all configured data sources
704 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
705 {
706 if ( fDataSources[n].fType == kTCP )
707 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
708 else
709 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
710 if ( ret )
711 {
712 fErrorConnection = n;
713 fConnectionStatus=ret;
714 return fConnectionStatus;
715 }
716 }
717 // Now read in data from the configured data source
718 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
719 if ( ret )
720 {
721 return ret;
722 }
723 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
724 if ( ret )
725 {
726 return ret;
727 }
728// for ( unsigned n = 0; n<fDataSourceCnt; n++ )
729// {
730// if ( fDataSources[n].fType == kTCP )
731// ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
732// else
733// ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
734// if ( ret )
735// {
736// fErrorConnection = n;
737// fConnectionStatus=ret;
738// return fConnectionStatus;
739// }
740// }
741 //Check to see that all sources contributed data for the same event
742 homer_uint64 eventID;
743 homer_uint64 eventType;
744 eventID = GetSourceEventID( fDataSources[0] );
745 eventType = GetSourceEventType( fDataSources[0] );
746 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
747 {
748 if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
749 {
750 fErrorConnection = n;
746d2a94 751 fConnectionStatus=56;//EBADRQC;
2be16a33 752 return fConnectionStatus;
753 }
754 }
755 // Find all the different data blocks contained in the data from all
756 // the sources.
757 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
758 {
759 ret = ParseSourceData( fDataSources[n] );
760 if ( ret )
761 {
762 fErrorConnection = n;
746d2a94 763 fConnectionStatus=57;//EBADSLT;
2be16a33 764 return fConnectionStatus;
765 }
766 }
767 fCurrentEventID = eventID;
768 fCurrentEventType = eventType;
769 return 0;
770 }
771
772void HOMERReader::ReleaseCurrentEvent()
773 {
04a939f7 774// see header file for class documentation
2be16a33 775 // sources.fDataRead = 0;
776 // fMaxBlockCnt
777 fCurrentEventID = ~(homer_uint64)0;
778 fCurrentEventType = ~(homer_uint64)0;
779 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
780 {
781 if ( fDataSources[n].fData )
782 {
783 if ( fDataSources[n].fType == kTCP )
784 delete [] (homer_uint8*)fDataSources[n].fData;
785 fDataSources[n].fData = NULL;
786 }
787 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
788 }
789 if ( fBlocks )
790 {
791 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
792 {
793 if ( fBlocks[n].fOriginatingNodeID )
794 delete [] fBlocks[n].fOriginatingNodeID;
795 }
796 delete [] fBlocks;
797 fBlocks=0;
798 fMaxBlockCnt = 0;
799 fBlockCnt=0;
800 }
801 }
802
3a7c0444 803int HOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeoutUsec )
2be16a33 804 {
04a939f7 805// see header file for class documentation
2be16a33 806 int ret;
807 struct timeval oldSndTO, newSndTO;
808 if ( useTimeout )
809 {
810 socklen_t optlen=sizeof(oldSndTO);
811 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
812 if ( ret )
813 {
814 return errno;
815 }
816 if ( optlen!=sizeof(oldSndTO) )
817 {
818 return ENXIO;
819 }
3a7c0444 820 newSndTO.tv_sec = timeoutUsec / 1000000;
821 newSndTO.tv_usec = timeoutUsec - (newSndTO.tv_sec*1000000);
2be16a33 822 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
823 if ( ret )
824 {
825 return errno;
826 }
827 }
828 // Send one event request
04a939f7 829 if ( !fEventRequestAdvanceTime )
2be16a33 830 {
831 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
832
833 if ( ret != (int)strlen(GET_ONE) )
834 {
835 ret=errno;
836 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
837 return ret;
838 }
839 }
840 else
841 {
842 char tmpCmd[ 128 ];
843
04a939f7 844 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime );
2be16a33 845 if ( len>128 || len<0 )
846 {
847 ret=EMSGSIZE;
848 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
849 return ret;
850 }
851
852 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
853
854 if ( ret != (int)strlen(tmpCmd) )
855 {
856 ret=errno;
857 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
858 return ret;
859 }
860
861 }
862 return 0;
863 }
864
865int HOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
866 {
04a939f7 867// see header file for class documentation
2be16a33 868 if ( source.fShmPtr )
869 {
870 *(homer_uint32*)( source.fShmPtr ) = 0;
871 return 0;
872 }
873 else
874 return EFAULT;
875 }
876
877int HOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
878 {
3a7c0444 879// see header file for class documentation
2be16a33 880 bool toRead = false;
881 do
882 {
883 fd_set conns;
884 FD_ZERO( &conns );
885 int highestConn=0;
886 toRead = false;
887 unsigned firstConnection=~(unsigned)0;
888 for ( unsigned long n = 0; n < sourceCnt; n++ )
889 {
890 if ( sources[n].fDataSize == 0 // size specifier not yet read
891 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
892 {
893 toRead = true;
894 FD_SET( sources[n].fTCPConnection, &conns );
895 if ( sources[n].fTCPConnection > highestConn )
896 highestConn = sources[n].fTCPConnection;
897 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
898 if ( firstConnection == ~(unsigned)0 )
899 firstConnection = n;
900 }
901 else
902 {
903 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
904 }
905 }
906 if ( toRead )
907 {
908 struct timeval tv, *ptv;
909 if ( useTimeout )
910 {
911 tv.tv_sec = timeout / 1000000;
912 tv.tv_usec = timeout - (tv.tv_sec*1000000);
913 ptv = &tv;
914 }
915 else
916 ptv = NULL;
917 // wait until something is ready to be read
918 // either for timeout usecs or until eternity
919 int ret;
920 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
921 if ( ret <=0 )
922 {
923 fErrorConnection = firstConnection;
924 if ( errno )
925 fConnectionStatus = errno;
926 else
927 fConnectionStatus = ETIMEDOUT;
928 return fConnectionStatus;
929 }
930 for ( unsigned n = 0; n < sourceCnt; n++ )
931 {
932 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
933 {
934 if ( sources[n].fDataSize == 0 )
935 {
936 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
937 if ( ret != sizeof(homer_uint32) )
938 {
939 fErrorConnection = n;
940 if ( errno )
941 fConnectionStatus = errno;
942 else
943 fConnectionStatus = ENOMSG;
944 return fConnectionStatus;
945 }
946 sources[n].fDataSize = ntohl( sources[n].fDataSize );
947 sources[n].fDataRead = 0;
948 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
949 if ( !sources[n].fData )
950 {
951 fErrorConnection = n;
952 fConnectionStatus = ENOMEM;
953 return fConnectionStatus;
954 }
955 }
956 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
957 {
958 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
959 if ( ret>0 )
960 sources[n].fDataRead += ret;
961 else if ( ret == 0 )
962 {
963 fErrorConnection = n;
964 fConnectionStatus = ECONNRESET;
965 return fConnectionStatus;
966 }
967 else
968 {
969 fErrorConnection = n;
970 fConnectionStatus = errno;
971 return fConnectionStatus;
972 }
973 }
974 else
975 {
976 fErrorConnection = n;
977 fConnectionStatus = ENXIO;
978 return fConnectionStatus;
979 }
980 }
981 }
982 }
983 }
984 while ( toRead );
985 return 0;
986 }
987
988/*
989int HOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
990 {
991#warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
992 // Send one event request
993 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
994 if ( ret != strlen(GET_ONE) )
995 {
996 return errno;
997 }
998 // wait for and read back size specifier
999 unsigned sizeNBO;
1000 // The value transmitted is binary, in network byte order
1001 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
1002 if ( ret != sizeof(sizeNBO) )
1003 {
1004 return errno;
1005 }
1006 // Convert back to host byte order
1007 source.fDataSize = ntohl( sizeNBO );
1008 source.fData = new homer_uint8[ source.fDataSize ];
1009 unsigned long dataRead=0, toRead;
1010 if ( !source.fData )
1011 {
1012 char buffer[1024];
1013 // Read in data into buffer in order not to block connection
1014 while ( dataRead < source.fDataSize )
1015 {
1016 if ( source.fDataSize-dataRead > 1024 )
1017 toRead = 1024;
1018 else
1019 toRead = source.fDataSize-dataRead;
1020 ret = read( source.fTCPConnection, buffer, toRead );
1021 if ( ret > 0 )
1022 dataRead += ret;
1023 else
1024 return errno;
1025 }
1026 return ENOMEM;
1027 }
1028 while ( dataRead < source.fDataSize )
1029 {
1030 toRead = source.fDataSize-dataRead;
1031 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
1032 if ( ret > 0 )
1033 dataRead += ret;
1034 else if ( ret == 0 && useTimeout )
1035 {
1036 struct timeval tv;
1037 tv.tv_sec = timeout / 1000000;
1038 tv.tv_usec = timeout - (tv.tv_sec*1000000);
1039 fd_set conns;
1040 FD_ZERO( &conns );
1041 FD_SET( source.fTCPConnection, &conns );
1042 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
1043 if ( ret <=0 )
1044 return errno;
1045 }
1046 else if ( ret == 0 )
1047 {
1048 if ( errno == EOK )
1049 return ECONNRESET;
1050 else
1051 return errno;
1052 }
1053 else
1054 {
1055 return errno;
1056 }
1057 }
1058 return 0;
1059 }
1060*/
1061
1062/*
1063int HOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
1064 {
1065
1066 }
1067*/
1068
1069int HOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
1070 {
04a939f7 1071// see header file for class documentation
2be16a33 1072 struct timeval tv1, tv2;
1073 bool found=false;
1074 bool all=true;
1075 if ( useTimeout )
1076 gettimeofday( &tv1, NULL );
1077 do
1078 {
1079 found = false;
1080 all = true;
1081 for ( unsigned n = 0; n < sourceCnt; n++ )
1082 {
1083 if ( !sources[n].fDataSize )
1084 all = false;
1085 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
1086 {
1087 found = true;
1088 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
1089 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
1090 }
1091 }
1092 if ( found && useTimeout )
1093 gettimeofday( &tv1, NULL );
1094 if ( !all && useTimeout )
1095 {
1096 gettimeofday( &tv2, NULL );
1097 unsigned long long tdiff;
1098 tdiff = tv2.tv_sec-tv1.tv_sec;
1099 tdiff *= 1000000;
1100 tdiff += tv2.tv_usec-tv1.tv_usec;
1101 if ( tdiff > timeout )
1102 return ETIMEDOUT;
1103 }
1104 if ( !all )
1105 usleep( 0 );
1106 }
1107 while ( !all );
1108 return 0;
1109 }
1110
1111int HOMERReader::ParseSourceData( DataSource& source )
1112 {
04a939f7 1113// see header file for class documentation
2be16a33 1114 if ( source.fData )
1115 {
1116 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1117 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
1118 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
1119 if ( ret )
1120 return ret;
1121 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
1122 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
1123 {
1124 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
1125 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
1126 fBlocks[fBlockCnt].fSource = source.fNdx;
1127 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1128 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1129 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1130 struct in_addr tmpA;
1131 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1132 char* addr = inet_ntoa( tmpA );
1133 char* tmpchar = new char[ strlen(addr)+1 ];
1134 if ( !tmpchar )
1135 return ENOMEM;
1136 strcpy( tmpchar, addr );
1137 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1138 descrOffset += descrLen;
1139 }
1140 return 0;
1141 }
1142 return EFAULT;
1143 }
1144
1145int HOMERReader::ReAllocBlocks( unsigned long newCnt )
1146 {
04a939f7 1147// see header file for class documentation
2be16a33 1148 DataBlock* newBlocks;
1149 newBlocks = new DataBlock[ newCnt ];
1150 if ( !newBlocks )
1151 return ENOMEM;
1152 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1153 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1154 if ( newCnt > fMaxBlockCnt )
1155 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1156 if ( fBlocks )
1157 delete [] fBlocks;
1158 fBlocks = newBlocks;
1159 fMaxBlockCnt = newCnt;
1160 return 0;
1161 }
1162
1163homer_uint64 HOMERReader::GetSourceEventID( DataSource& source )
1164 {
04a939f7 1165// see header file for class documentation
2be16a33 1166 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1167 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1168 }
1169
1170homer_uint64 HOMERReader::GetSourceEventType( DataSource& source )
1171 {
04a939f7 1172// see header file for class documentation
2be16a33 1173 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1174 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1175 }
1176
3a7c0444 1177homer_uint64 HOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint64 source ) const
1178 {
1179// see header file for class documentation
1180 if ( destFormat == sourceFormat )
1181 return source;
1182 else
1183 return ((source & 0xFFULL) << 56) |
1184 ((source & 0xFF00ULL) << 40) |
1185 ((source & 0xFF0000ULL) << 24) |
1186 ((source & 0xFF000000ULL) << 8) |
1187 ((source & 0xFF00000000ULL) >> 8) |
1188 ((source & 0xFF0000000000ULL) >> 24) |
1189 ((source & 0xFF000000000000ULL) >> 40) |
1190 ((source & 0xFF00000000000000ULL) >> 56);
1191 }
2be16a33 1192
3a7c0444 1193homer_uint32 HOMERReader::Swap( homer_uint8 destFormat, homer_uint8 sourceFormat, homer_uint32 source ) const
1194 {
1195// see header file for class documentation
1196 if ( destFormat == sourceFormat )
1197 return source;
1198 else
1199 return ((source & 0xFFUL) << 24) |
1200 ((source & 0xFF00UL) << 8) |
1201 ((source & 0xFF0000UL) >> 8) |
1202 ((source & 0xFF000000UL) >> 24);
1203 }
2be16a33 1204/*
1205***************************************************************************
1206**
1207** $Author$ - Initial Version by Timm Morten Steinbeck
1208**
1209** $Id$
1210**
1211***************************************************************************
1212*/