]> git.uio.no Git - u/mrichter/AliRoot.git/blame - HLT/BASE/HOMER/AliHLTHOMERReader.cxx
added HOMER library from PubSub package HLT-stable-20070905.141318 (rev. 2375)
[u/mrichter/AliRoot.git] / HLT / BASE / HOMER / AliHLTHOMERReader.cxx
CommitLineData
2be16a33 1/************************************************************************
2**
3**
4** This file is property of and copyright by the Technical Computer
5** Science Group, Kirchhoff Institute for Physics, Ruprecht-Karls-
6** University, Heidelberg, Germany, 2001
7** This file has been written by Timm Morten Steinbeck,
8** timm@kip.uni-heidelberg.de
9**
10**
11** See the file license.txt for details regarding usage, modification,
12** distribution and warranty.
13** Important: This file is provided without any warranty, including
14** fitness for any particular purpose.
15**
16**
17** Newer versions of this file's package will be made available from
18** http://web.kip.uni-heidelberg.de/Hardwinf/L3/
19** or the corresponding page of the Heidelberg Alice Level 3 group.
20**
21*************************************************************************/
22
23/*
24***************************************************************************
25**
26** $Author$ - Initial Version by Timm Morten Steinbeck
27**
28** $Id$
29**
30***************************************************************************
31*/
32
33/** @file AliHLTHomerReader.cxx
34 @author Timm Steinbeck
35 @date Sep 14 2007
36 @brief HLT Online Monitoring Environment including ROOT - Reader
37 @note migrated from PubSub HLT-stable-20070905.141318 (rev 2375) */
38
39// see below for class documentation
40// or
41// refer to README to build package
42// or
43// visit http://web.ift.uib.no/~kjeks/doc/alice-hlt
44
45#include "AliHLTHOMERReader.h"
46#include <stdio.h>
47#include <string.h>
48#include <errno.h>
49#include <netdb.h>
50extern int h_errno;
51#include <sys/types.h>
52#include <sys/socket.h>
53#include <netinet/in.h>
54#include <netinet/tcp.h>
55#include <unistd.h>
56#include <rpc/types.h>
57#include <fcntl.h>
58#include <sys/stat.h>
59#include <netinet/in.h>
60#include <arpa/inet.h>
61#ifdef USE_ROOT
62#include <Rtypes.h>
63#endif
64
65
66#define MOD_BIN "MOD BIN\n"
67#define MOD_ASC "MOD ASC\n"
68#define GET_ONE "GET ONE\n"
69#define GET_ALL "GET ALL\n"
70
71#ifdef USE_ROOT
72ClassImp(MonitoringReader);
73ClassImp(HOMERReader);
74#endif
75
76
77
78
79
80#ifdef USE_ROOT
81HOMERReader::HOMERReader()
82 {
83 Init();
84 }
85#endif
86
87
88/* For reading from a TCP port */
89HOMERReader::HOMERReader( const char* hostname, unsigned short port )
90 {
91 Init();
92 if ( !AllocDataSources(1) )
93 {
94 fErrorConnection = 0;
95 fConnectionStatus = ENOMEM;
96 return;
97 }
98 fConnectionStatus = AddDataSource( hostname, port, fDataSources[0] );
99 if ( fConnectionStatus )
100 fErrorConnection = 0;
101 else
102 {
103 fDataSourceCnt++;
104 fTCPDataSourceCnt++;
105 fDataSources[0].fNdx = 0;
106 }
107 }
108
109/* For reading from multiple TCP ports */
110HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports )
111 {
112 Init();
113 if ( !AllocDataSources(tcpCnt) )
114 {
115 fErrorConnection = 0;
116 fConnectionStatus = ENOMEM;
117 return;
118 }
119 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
120 {
121 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
122 if ( fConnectionStatus )
123 {
124 fErrorConnection = n;
125 return;
126 }
127 fDataSources[n].fNdx = n;
128 }
129 }
130
131/* For reading from a System V shared memory segment */
132HOMERReader::HOMERReader( key_t shmKey, int shmSize )
133 {
134 Init();
135 if ( !AllocDataSources(1) )
136 {
137 fErrorConnection = 0;
138 fConnectionStatus = ENOMEM;
139 return;
140 }
141 fConnectionStatus = AddDataSource( shmKey, shmSize, fDataSources[0] );
142 if ( fConnectionStatus )
143 fErrorConnection = 0;
144 else
145 {
146 fDataSourceCnt++;
147 fShmDataSourceCnt++;
148 fDataSources[0].fNdx = 0;
149 }
150 }
151
152/* For reading from multiple System V shared memory segments */
153HOMERReader::HOMERReader( unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
154 {
155 Init();
156 if ( !AllocDataSources(shmCnt) )
157 {
158 fErrorConnection = 0;
159 fConnectionStatus = ENOMEM;
160 return;
161 }
162 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
163 {
164 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[n] );
165 if ( fConnectionStatus )
166 {
167 fErrorConnection = n;
168 return;
169 }
170 fDataSources[n].fNdx = n;
171 }
172 }
173
174/* For reading from multiple TCP ports and multiple System V shared memory segments */
175HOMERReader::HOMERReader( unsigned int tcpCnt, const char** hostnames, unsigned short* ports,
176 unsigned int shmCnt, key_t* shmKeys, int* shmSizes )
177 {
178 Init();
179 if ( !AllocDataSources(tcpCnt+shmCnt) )
180 {
181 fErrorConnection = 0;
182 fConnectionStatus = ENOMEM;
183 return;
184 }
185 for ( unsigned int n = 0; n < tcpCnt; n++, fDataSourceCnt++, fTCPDataSourceCnt++ )
186 {
187 fConnectionStatus = AddDataSource( hostnames[n], ports[n], fDataSources[n] );
188 if ( fConnectionStatus )
189 {
190 fErrorConnection = n;
191 return;
192 }
193 fDataSources[n].fNdx = n;
194 }
195 for ( unsigned int n = 0; n < shmCnt; n++, fDataSourceCnt++, fShmDataSourceCnt++ )
196 {
197 fConnectionStatus = AddDataSource( shmKeys[n], shmSizes[n], fDataSources[tcpCnt+n] );
198 if ( fConnectionStatus )
199 {
200 fErrorConnection = tcpCnt+n;
201 return;
202 }
203 fDataSources[n].fNdx = n;
204 }
205 }
206HOMERReader::~HOMERReader()
207 {
208 ReleaseCurrentEvent();
209 FreeDataSources();
210 }
211
212/* Read in the next available event */
213int HOMERReader::ReadNextEvent()
214 {
215 return ReadNextEvent( false, 0 );
216 }
217
218/* Read in the next available event */
219int HOMERReader::ReadNextEvent( unsigned long timeout )
220 {
221 return ReadNextEvent( true, timeout );
222 }
223
224/* Return the size (in bytes) of the current event's data
225 block with the given block index (starting at 0). */
226unsigned long HOMERReader::GetBlockDataLength( unsigned long ndx ) const
227 {
228 if ( ndx >= fBlockCnt )
229 return 0;
230 return fBlocks[ndx].fLength;
231 }
232
233 /* Return a pointer to the start of the current event's data
234 block with the given block index (starting at 0). */
235const void* HOMERReader::GetBlockData( unsigned long ndx ) const
236 {
237 if ( ndx >= fBlockCnt )
238 return NULL;
239 return fBlocks[ndx].fData;
240 }
241
242/* Return IP address or hostname of node which sent the
243 current event's data block with the given block index
244 (starting at 0).
245 For HOMER this is the ID of the node on which the subscriber
246 that provided this data runs/ran. */
247const char* HOMERReader::GetBlockSendNodeID( unsigned long ndx ) const
248 {
249 if ( ndx >= fBlockCnt )
250 return NULL;
251#ifdef DEBUG
252 if ( fBlocks[ndx].fSource >= fDataSourceCnt )
253 {
254 fprintf( stderr, "%s:%d: Internal Error: fBlocks[ndx].fSource (%lu) >= fDataSourceCnt (%lu)\n",
255 __FILE__, __LINE__, fBlocks[ndx].fSource, fDataSourceCnt );
256 return NULL;
257 }
258#endif
259 return fDataSources[ fBlocks[ndx].fSource ].fHostname;
260 //return fBlocks[ndx].fOriginatingNodeID;
261 }
262
263 /* Return byte order of the data stored in the
264 current event's data block with the given block
265 index (starting at 0).
266 0 is unknown alignment,
267 1 ist little endian,
268 2 is big endian. */
269homer_uint8 HOMERReader::GetBlockByteOrder( unsigned long ndx ) const
270 {
271 if ( ndx >= fBlockCnt )
272 return 0;
273 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
274 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kByteOrderAttribute_8b_Offset);
275 }
276 /* Return the alignment (in bytes) of the given datatype
277 in the data stored in the current event's data block
278 with the given block index (starting at 0).
279 Possible values for the data type are
280 0: homer_uint64
281 1: homer_uint32
282 2: uin16
283 3: homer_uint8
284 4: double
285 5: float
286 */
287homer_uint8 HOMERReader::GetBlockTypeAlignment( unsigned long ndx, homer_uint8 dataType ) const
288 {
289 if ( ndx >= fBlockCnt )
290 return 0;
291 if ( dataType > (kFloatAlignment_8b_Offset-kAlignment_8b_StartOffset) )
292 return 0;
293 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
294 return *(((homer_uint8*)fBlocks[ndx].fMetaData)+kAlignment_8b_StartOffset+dataType);
295 }
296
297homer_uint64 HOMERReader::GetBlockStatusFlags( unsigned long ndx ) const
298 {
299 if ( ndx >= fBlockCnt )
300 return 0;
301 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kStatusFlags_64b_Offset);
302 }
303
304/* HOMER specific */
305/* Return the type of the data in the current event's data
306 block with the given block index (starting at 0). */
307homer_uint64 HOMERReader::GetBlockDataType( unsigned long ndx ) const
308 {
309 if ( ndx >= fBlockCnt )
310 return ~(homer_uint64)0;
311 //return ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fType.fID;
312 return *(((homer_uint64*)fBlocks[ndx].fMetaData)+kType_64b_Offset);
313 }
314
315/* Return the origin of the data in the current event's data
316 block with the given block index (starting at 0). */
317homer_uint32 HOMERReader::GetBlockDataOrigin( unsigned long ndx ) const
318 {
319 if ( ndx >= fBlockCnt )
320 return ~(homer_uint32)0;
321 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType1.fID );
322 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType1_64b_Offset);
323 }
324
325/* Return a specification of the data in the current event's data
326 block with the given block index (starting at 0). */
327homer_uint32 HOMERReader::GetBlockDataSpec( unsigned long ndx ) const
328 {
329 if ( ndx >= fBlockCnt )
330 return ~(homer_uint32)0;
331 //return (homer_uint32)( ((AliHLTRIBlockDescriptorV1*)fBlocks[ndx].fMetaData)->fSubType2.fID );
332 return (homer_uint32)*(((homer_uint64*)fBlocks[ndx].fMetaData)+kSubType2_64b_Offset);
333 }
334
335/* Find the next data block in the current event with the given
336 data type, origin, and specification. Returns the block's
337 index. */
338unsigned long HOMERReader::FindBlockNdx( homer_uint64 type, homer_uint32 origin,
339 homer_uint32 spec, unsigned long startNdx ) const
340 {
341 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
342 {
343 if ( ( type == 0xFFFFFFFFFFFFFFFFULL || *(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset)==type ) &&
344 ( origin == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset) )==origin ) &&
345 ( spec == 0xFFFFFFFF || (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) )==spec ) )
346 return n;
347 }
348 return ~(unsigned long)0;
349 }
350
351/* Find the next data block in the current event with the given
352 data type, origin, and specification. Returns the block's
353 index. */
354unsigned long HOMERReader::FindBlockNdx( char type[8], char origin[4],
355 homer_uint32 spec, unsigned long startNdx ) const
356 {
357 for ( unsigned long n=startNdx; n < fBlockCnt; n++ )
358 {
359 bool found1=true, found2=true;
360 for ( unsigned i = 0; i < 8; i++ )
361 {
362 if ( type[i] != (char)0xFF )
363 {
364 found1=false;
365 break;
366 }
367 }
368 if ( !found1 )
369 {
370 found1 = true;
371 for ( unsigned i = 0; i < 8; i++ )
372 {
373 //printf( "%u: Comparing type '%c' and '%c'\n", i, ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i], type[i] );
374 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kType_64b_Offset))[i] != type[i] )
375 {
376 found1=false;
377 break;
378 }
379 }
380 }
381 for ( unsigned i = 0; i < 4; i++ )
382 {
383 if ( origin[i] != (char)0xFF )
384 {
385 found2 = false;
386 break;
387 }
388 }
389 if ( !found2 )
390 {
391 found2 = true;
392 for ( unsigned i = 0; i < 4; i++ )
393 {
394 //printf( "Comparing origin '%c' and '%c'\n", ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i], origin[i] );
395 if ( ((char*)(((homer_uint64*)fBlocks[n].fMetaData)+kSubType1_64b_Offset))[i] != origin[i] )
396 {
397 found2=false;
398 break;
399 }
400 }
401 }
402 //printf( "Comparing spec '0x%08lX' and '0x%08lX'\n", (homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset) ), spec );
403 if ( found1 && found2 &&
404 ( spec == 0xFFFFFFFF || ((homer_uint32)( *(((homer_uint64*)fBlocks[n].fMetaData)+kSubType2_64b_Offset)) )==spec ) )
405 return n;
406 }
407 return ~(unsigned long)0;
408 }
409
410/* Return the ID of the node that actually produced this data block.
411 This may be different from the node which sent the data to this
412 monitoring object as returned by GetBlockSendNodeID. */
413const char* HOMERReader::GetBlockCreateNodeID( unsigned long ndx ) const
414 {
415 if ( ndx >= fBlockCnt )
416 return NULL;
417 return fBlocks[ndx].fOriginatingNodeID;
418 }
419
420
421void HOMERReader::Init()
422 {
423 fCurrentEventType = ~(homer_uint64)0;
424 fCurrentEventID = ~(homer_uint64)0;
425 fMaxBlockCnt = fBlockCnt = 0;
426 fBlocks = NULL;
427
428 fDataSourceMaxCnt = fDataSourceCnt = fTCPDataSourceCnt = fShmDataSourceCnt = 0;
429 fDataSources = NULL;
430
431
432 fConnectionStatus = 0;
433 fErrorConnection = ~(unsigned int)0;
434
435 fEventRequestAdvanceTime_us = 0;
436 }
437
438bool HOMERReader::AllocDataSources( unsigned int sourceCnt )
439 {
440 fDataSources = new DataSource[ sourceCnt ];
441 if ( !fDataSources )
442 return false;
443 fDataSourceCnt = 0;
444 fDataSourceMaxCnt = sourceCnt;
445 return true;
446 }
447
448int HOMERReader::AddDataSource( const char* hostname, unsigned short port, DataSource& source )
449 {
450 struct hostent* he;
451 he = gethostbyname( hostname );
452 if ( he == NULL )
453 {
454 //fprintf( stderr, "Unable to determine remote host address from '%s'.\n", hostname );
455 return EADDRNOTAVAIL;
456 }
457
458 struct sockaddr_in remoteAddr;
459 remoteAddr.sin_family = AF_INET; // host byte order
460 remoteAddr.sin_port = htons(port); // short, network byte order
461 remoteAddr.sin_addr = *((struct in_addr *)he->h_addr);
462 memset(&(remoteAddr.sin_zero), '\0', 8); // zero the rest of the struct
463
464 // Create socket and connect to target program on remote node
465 source.fTCPConnection = socket( AF_INET, SOCK_STREAM, 0 );
466 if ( source.fTCPConnection == -1 )
467 {
468 return errno;
469 }
470
471 int ret;
472
473 ret = connect( source.fTCPConnection, (struct sockaddr *)&remoteAddr, sizeof(struct sockaddr) );
474 if ( ret == -1 )
475 {
476 ret=errno;
477 close( source.fTCPConnection );
478 return ret;
479 }
480
481 ret = write( source.fTCPConnection, MOD_BIN, strlen(MOD_BIN) );
482 if ( ret != (int)strlen(MOD_BIN) )
483 {
484 ret=errno;
485 close( source.fTCPConnection );
486 return ret;
487 }
488
489 char* tmpchar = new char[ strlen( hostname )+1 ];
490 if ( !tmpchar )
491 {
492 close( source.fTCPConnection );
493 return ENOMEM;
494 }
495 strcpy( tmpchar, hostname );
496 source.fHostname = tmpchar;
497
498 source.fType = kTCP;
499 source.fTCPPort = port;
500 source.fData = NULL;
501 source.fDataSize = 0;
502 source.fDataRead = 0;
503 return 0;
504 }
505
506int HOMERReader::AddDataSource( key_t shmKey, int shmSize, DataSource& source )
507 {
508 int ret;
509 char* tmpchar = new char[ MAXHOSTNAMELEN+1 ];
510 if ( !tmpchar )
511 {
512 return ENOMEM;
513 }
514 gethostname( tmpchar, MAXHOSTNAMELEN );
515 tmpchar[MAXHOSTNAMELEN]=(char)0;
516 source.fHostname = tmpchar;
517
518 source.fShmID = shmget( shmKey, shmSize, 0660 );
519 if ( source.fShmID == -1 )
520 {
521 ret = errno;
522 delete [] source.fHostname;
523 return ret;
524 }
525
526 source.fShmPtr = (void*)shmat( source.fShmID, NULL, 0 );
527
528 if ( !source.fShmPtr )
529 {
530 ret = errno;
531 shmctl( source.fShmID, IPC_RMID, NULL );
532 delete [] source.fHostname;
533 return ret;
534 }
535
536 source.fType = kShm;
537 source.fShmKey = shmKey;
538 source.fShmSize = shmSize;
539 source.fDataSize = 0;
540 source.fDataRead = 0;
541 return 0;
542 }
543
544void HOMERReader::FreeDataSources()
545 {
546 for ( unsigned n=0; n < fDataSourceCnt; n++ )
547 {
548 if ( fDataSources[n].fType == kTCP )
549 FreeTCPDataSource( fDataSources[n] );
550 else
551 FreeShmDataSource( fDataSources[n] );
552 }
553 }
554
555int HOMERReader::FreeShmDataSource( DataSource& source )
556 {
557 if ( source.fShmPtr )
558 shmdt( source.fShmPtr );
559// if ( source.fShmID != -1 )
560// shmctl( source.fShmID, IPC_RMID, NULL );
561 if ( source.fHostname )
562 delete [] source.fHostname;
563 return 0;
564 }
565
566int HOMERReader::FreeTCPDataSource( DataSource& source )
567 {
568 if ( source.fTCPConnection )
569 close( source.fTCPConnection );
570 if ( source.fHostname )
571 delete [] source.fHostname;
572 return 0;
573 }
574
575int HOMERReader::ReadNextEvent( bool useTimeout, unsigned long timeout )
576 {
577 if ( fDataSourceCnt<=0 )
578 return ENXIO;
579 // Clean up currently active event.
580 ReleaseCurrentEvent();
581 int ret;
582 // Trigger all configured data sources
583 for ( unsigned n = 0; n<fDataSourceCnt; n++ )
584 {
585 if ( fDataSources[n].fType == kTCP )
586 ret = TriggerTCPSource( fDataSources[n], useTimeout, timeout );
587 else
588 ret = TriggerShmSource( fDataSources[n], useTimeout, timeout );
589 if ( ret )
590 {
591 fErrorConnection = n;
592 fConnectionStatus=ret;
593 return fConnectionStatus;
594 }
595 }
596 // Now read in data from the configured data source
597 ret = ReadDataFromTCPSources( fTCPDataSourceCnt, fDataSources, useTimeout, timeout );
598 if ( ret )
599 {
600 return ret;
601 }
602 ret = ReadDataFromShmSources( fShmDataSourceCnt, fDataSources+fTCPDataSourceCnt, useTimeout, timeout );
603 if ( ret )
604 {
605 return ret;
606 }
607// for ( unsigned n = 0; n<fDataSourceCnt; n++ )
608// {
609// if ( fDataSources[n].fType == kTCP )
610// ret = ReadDataFromTCPSource( fDataSources[n], useTimeout, timeout );
611// else
612// ret = ReadDataFromShmSource( fDataSources[n], useTimeout, timeout );
613// if ( ret )
614// {
615// fErrorConnection = n;
616// fConnectionStatus=ret;
617// return fConnectionStatus;
618// }
619// }
620 //Check to see that all sources contributed data for the same event
621 homer_uint64 eventID;
622 homer_uint64 eventType;
623 eventID = GetSourceEventID( fDataSources[0] );
624 eventType = GetSourceEventType( fDataSources[0] );
625 for ( unsigned n = 1; n < fDataSourceCnt; n++ )
626 {
627 if ( GetSourceEventID( fDataSources[n] ) != eventID || GetSourceEventType( fDataSources[n] ) != eventType )
628 {
629 fErrorConnection = n;
630 fConnectionStatus=EBADRQC;
631 return fConnectionStatus;
632 }
633 }
634 // Find all the different data blocks contained in the data from all
635 // the sources.
636 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
637 {
638 ret = ParseSourceData( fDataSources[n] );
639 if ( ret )
640 {
641 fErrorConnection = n;
642 fConnectionStatus=EBADSLT;
643 return fConnectionStatus;
644 }
645 }
646 fCurrentEventID = eventID;
647 fCurrentEventType = eventType;
648 return 0;
649 }
650
651void HOMERReader::ReleaseCurrentEvent()
652 {
653 // sources.fDataRead = 0;
654 // fMaxBlockCnt
655 fCurrentEventID = ~(homer_uint64)0;
656 fCurrentEventType = ~(homer_uint64)0;
657 for ( unsigned n = 0; n < fDataSourceCnt; n++ )
658 {
659 if ( fDataSources[n].fData )
660 {
661 if ( fDataSources[n].fType == kTCP )
662 delete [] (homer_uint8*)fDataSources[n].fData;
663 fDataSources[n].fData = NULL;
664 }
665 fDataSources[n].fDataSize = fDataSources[n].fDataRead = 0;
666 }
667 if ( fBlocks )
668 {
669 for ( unsigned n = 0; n < fMaxBlockCnt; n++ )
670 {
671 if ( fBlocks[n].fOriginatingNodeID )
672 delete [] fBlocks[n].fOriginatingNodeID;
673 }
674 delete [] fBlocks;
675 fBlocks=0;
676 fMaxBlockCnt = 0;
677 fBlockCnt=0;
678 }
679 }
680
681int HOMERReader::TriggerTCPSource( DataSource& source, bool useTimeout, unsigned long timeout_us )
682 {
683 int ret;
684 struct timeval oldSndTO, newSndTO;
685 if ( useTimeout )
686 {
687 socklen_t optlen=sizeof(oldSndTO);
688 ret = getsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, &optlen );
689 if ( ret )
690 {
691 return errno;
692 }
693 if ( optlen!=sizeof(oldSndTO) )
694 {
695 return ENXIO;
696 }
697 newSndTO.tv_sec = timeout_us / 1000000;
698 newSndTO.tv_usec = timeout_us - (newSndTO.tv_sec*1000000);
699 ret = setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &newSndTO, sizeof(newSndTO) );
700 if ( ret )
701 {
702 return errno;
703 }
704 }
705 // Send one event request
706 if ( !fEventRequestAdvanceTime_us )
707 {
708 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
709
710 if ( ret != (int)strlen(GET_ONE) )
711 {
712 ret=errno;
713 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
714 return ret;
715 }
716 }
717 else
718 {
719 char tmpCmd[ 128 ];
720
721 int len = snprintf( tmpCmd, 128, "FIRST ORBIT EVENT 0x%Lu\n", (unsigned long long)fEventRequestAdvanceTime_us );
722 if ( len>128 || len<0 )
723 {
724 ret=EMSGSIZE;
725 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
726 return ret;
727 }
728
729 ret = write( source.fTCPConnection, tmpCmd, strlen(tmpCmd) );
730
731 if ( ret != (int)strlen(tmpCmd) )
732 {
733 ret=errno;
734 setsockopt( source.fTCPConnection, SOL_SOCKET, SO_SNDTIMEO, &oldSndTO, sizeof(oldSndTO) );
735 return ret;
736 }
737
738 }
739 return 0;
740 }
741
742int HOMERReader::TriggerShmSource( DataSource& source, bool, unsigned long )
743 {
744 if ( source.fShmPtr )
745 {
746 *(homer_uint32*)( source.fShmPtr ) = 0;
747 return 0;
748 }
749 else
750 return EFAULT;
751 }
752
753int HOMERReader::ReadDataFromTCPSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
754 {
755 bool toRead = false;
756 do
757 {
758 fd_set conns;
759 FD_ZERO( &conns );
760 int highestConn=0;
761 toRead = false;
762 unsigned firstConnection=~(unsigned)0;
763 for ( unsigned long n = 0; n < sourceCnt; n++ )
764 {
765 if ( sources[n].fDataSize == 0 // size specifier not yet read
766 || sources[n].fDataRead < sources[n].fDataSize ) // Data not yet read fully
767 {
768 toRead = true;
769 FD_SET( sources[n].fTCPConnection, &conns );
770 if ( sources[n].fTCPConnection > highestConn )
771 highestConn = sources[n].fTCPConnection;
772 fcntl( sources[n].fTCPConnection, F_SETFL, O_NONBLOCK );
773 if ( firstConnection == ~(unsigned)0 )
774 firstConnection = n;
775 }
776 else
777 {
778 fcntl( sources[n].fTCPConnection, F_SETFL, 0 );
779 }
780 }
781 if ( toRead )
782 {
783 struct timeval tv, *ptv;
784 if ( useTimeout )
785 {
786 tv.tv_sec = timeout / 1000000;
787 tv.tv_usec = timeout - (tv.tv_sec*1000000);
788 ptv = &tv;
789 }
790 else
791 ptv = NULL;
792 // wait until something is ready to be read
793 // either for timeout usecs or until eternity
794 int ret;
795 ret = select( highestConn+1, &conns, NULL, NULL, ptv );
796 if ( ret <=0 )
797 {
798 fErrorConnection = firstConnection;
799 if ( errno )
800 fConnectionStatus = errno;
801 else
802 fConnectionStatus = ETIMEDOUT;
803 return fConnectionStatus;
804 }
805 for ( unsigned n = 0; n < sourceCnt; n++ )
806 {
807 if ( FD_ISSET( sources[n].fTCPConnection, &conns ) )
808 {
809 if ( sources[n].fDataSize == 0 )
810 {
811 ret=read( sources[n].fTCPConnection, &(sources[n].fDataSize), sizeof(homer_uint32) );
812 if ( ret != sizeof(homer_uint32) )
813 {
814 fErrorConnection = n;
815 if ( errno )
816 fConnectionStatus = errno;
817 else
818 fConnectionStatus = ENOMSG;
819 return fConnectionStatus;
820 }
821 sources[n].fDataSize = ntohl( sources[n].fDataSize );
822 sources[n].fDataRead = 0;
823 sources[n].fData = new homer_uint8[ sources[n].fDataSize ];
824 if ( !sources[n].fData )
825 {
826 fErrorConnection = n;
827 fConnectionStatus = ENOMEM;
828 return fConnectionStatus;
829 }
830 }
831 else if ( sources[n].fData && sources[n].fDataRead < sources[n].fDataSize)
832 {
833 ret=read( sources[n].fTCPConnection, ((homer_uint8*)sources[n].fData)+sources[n].fDataRead, sources[n].fDataSize-sources[n].fDataRead );
834 if ( ret>0 )
835 sources[n].fDataRead += ret;
836 else if ( ret == 0 )
837 {
838 fErrorConnection = n;
839 fConnectionStatus = ECONNRESET;
840 return fConnectionStatus;
841 }
842 else
843 {
844 fErrorConnection = n;
845 fConnectionStatus = errno;
846 return fConnectionStatus;
847 }
848 }
849 else
850 {
851 fErrorConnection = n;
852 fConnectionStatus = ENXIO;
853 return fConnectionStatus;
854 }
855 }
856 }
857 }
858 }
859 while ( toRead );
860 return 0;
861 }
862
863/*
864int HOMERReader::ReadDataFromTCPSources( DataSource& source, bool useTimeout, unsigned long timeout )
865 {
866#warning TODO If useTimeout: Set sockets to nonblocking, select + loop around GET_ONE write
867 // Send one event request
868 ret = write( source.fTCPConnection, GET_ONE, strlen(GET_ONE) );
869 if ( ret != strlen(GET_ONE) )
870 {
871 return errno;
872 }
873 // wait for and read back size specifier
874 unsigned sizeNBO;
875 // The value transmitted is binary, in network byte order
876 ret = read( source.fTCPConnection, &sizeNBO, sizeof(sizeNBO) );
877 if ( ret != sizeof(sizeNBO) )
878 {
879 return errno;
880 }
881 // Convert back to host byte order
882 source.fDataSize = ntohl( sizeNBO );
883 source.fData = new homer_uint8[ source.fDataSize ];
884 unsigned long dataRead=0, toRead;
885 if ( !source.fData )
886 {
887 char buffer[1024];
888 // Read in data into buffer in order not to block connection
889 while ( dataRead < source.fDataSize )
890 {
891 if ( source.fDataSize-dataRead > 1024 )
892 toRead = 1024;
893 else
894 toRead = source.fDataSize-dataRead;
895 ret = read( source.fTCPConnection, buffer, toRead );
896 if ( ret > 0 )
897 dataRead += ret;
898 else
899 return errno;
900 }
901 return ENOMEM;
902 }
903 while ( dataRead < source.fDataSize )
904 {
905 toRead = source.fDataSize-dataRead;
906 ret = read( source.fTCPConnection, source.fData+dataRead, toRead );
907 if ( ret > 0 )
908 dataRead += ret;
909 else if ( ret == 0 && useTimeout )
910 {
911 struct timeval tv;
912 tv.tv_sec = timeout / 1000000;
913 tv.tv_usec = timeout - (tv.tv_sec*1000000);
914 fd_set conns;
915 FD_ZERO( &conns );
916 FD_SET( source.fTCPConnection, &conns );
917 ret = select( source.fTCPConnection+1, &conns, NULL, NULL );
918 if ( ret <=0 )
919 return errno;
920 }
921 else if ( ret == 0 )
922 {
923 if ( errno == EOK )
924 return ECONNRESET;
925 else
926 return errno;
927 }
928 else
929 {
930 return errno;
931 }
932 }
933 return 0;
934 }
935*/
936
937/*
938int HOMERReader::ReadDataFromShmSource( DataSource& source, bool useTimeout, unsigned long timeout )
939 {
940
941 }
942*/
943
944int HOMERReader::ReadDataFromShmSources( unsigned sourceCnt, DataSource* sources, bool useTimeout, unsigned long timeout )
945 {
946 struct timeval tv1, tv2;
947 bool found=false;
948 bool all=true;
949 if ( useTimeout )
950 gettimeofday( &tv1, NULL );
951 do
952 {
953 found = false;
954 all = true;
955 for ( unsigned n = 0; n < sourceCnt; n++ )
956 {
957 if ( !sources[n].fDataSize )
958 all = false;
959 if ( sources[n].fShmPtr && *(homer_uint32*)sources[n].fShmPtr>0 && !sources[n].fDataSize )
960 {
961 found = true;
962 sources[n].fDataSize = *(homer_uint32*)sources[n].fShmPtr;
963 sources[n].fData = ((homer_uint8*)sources[n].fShmPtr)+sizeof(homer_uint32);
964 }
965 }
966 if ( found && useTimeout )
967 gettimeofday( &tv1, NULL );
968 if ( !all && useTimeout )
969 {
970 gettimeofday( &tv2, NULL );
971 unsigned long long tdiff;
972 tdiff = tv2.tv_sec-tv1.tv_sec;
973 tdiff *= 1000000;
974 tdiff += tv2.tv_usec-tv1.tv_usec;
975 if ( tdiff > timeout )
976 return ETIMEDOUT;
977 }
978 if ( !all )
979 usleep( 0 );
980 }
981 while ( !all );
982 return 0;
983 }
984
985int HOMERReader::ParseSourceData( DataSource& source )
986 {
987 if ( source.fData )
988 {
989 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
990 homer_uint64 blockCnt = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType2_64b_Offset ] );
991 int ret=ReAllocBlocks( fMaxBlockCnt+blockCnt );
992 if ( ret )
993 return ret;
994 homer_uint64 descrOffset = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kOffset_64b_Offset ] );
995 for ( homer_uint64 n = 0; n < blockCnt && fBlockCnt < fMaxBlockCnt; n++, fBlockCnt++ )
996 {
997 homer_uint8* descr = ((homer_uint8*)source.fData)+descrOffset;
998 unsigned descrLen = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kLength_64b_Offset ] );
999 fBlocks[fBlockCnt].fSource = source.fNdx;
1000 fBlocks[fBlockCnt].fData = ((homer_uint8*)source.fData) + Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kOffset_64b_Offset ] );
1001 fBlocks[fBlockCnt].fLength = Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)descr)[ kSize_64b_Offset ] );
1002 fBlocks[fBlockCnt].fMetaData = (homer_uint64*)descr;
1003 struct in_addr tmpA;
1004 tmpA.s_addr = (homer_uint32)( ((homer_uint64*)descr)[ kProducerNode_64b_Offset ] );
1005 char* addr = inet_ntoa( tmpA );
1006 char* tmpchar = new char[ strlen(addr)+1 ];
1007 if ( !tmpchar )
1008 return ENOMEM;
1009 strcpy( tmpchar, addr );
1010 fBlocks[fBlockCnt].fOriginatingNodeID = tmpchar;
1011 descrOffset += descrLen;
1012 }
1013 return 0;
1014 }
1015 return EFAULT;
1016 }
1017
1018int HOMERReader::ReAllocBlocks( unsigned long newCnt )
1019 {
1020 DataBlock* newBlocks;
1021 newBlocks = new DataBlock[ newCnt ];
1022 if ( !newBlocks )
1023 return ENOMEM;
1024 unsigned long cpCnt = (newCnt > fMaxBlockCnt) ? fMaxBlockCnt : newCnt;
1025 memcpy( newBlocks, fBlocks, cpCnt*sizeof(DataBlock) );
1026 if ( newCnt > fMaxBlockCnt )
1027 memset( newBlocks+fMaxBlockCnt, 0, (newCnt-fMaxBlockCnt)*sizeof(DataBlock) );
1028 if ( fBlocks )
1029 delete [] fBlocks;
1030 fBlocks = newBlocks;
1031 fMaxBlockCnt = newCnt;
1032 return 0;
1033 }
1034
1035homer_uint64 HOMERReader::GetSourceEventID( DataSource& source )
1036 {
1037 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1038 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kSubType1_64b_Offset ] );
1039 }
1040
1041homer_uint64 HOMERReader::GetSourceEventType( DataSource& source )
1042 {
1043 homer_uint8 sourceByteOrder = ((homer_uint8*)source.fData)[ kByteOrderAttribute_8b_Offset ];
1044 return Swap( kHOMERNativeByteOrder, sourceByteOrder, ((homer_uint64*)source.fData)[ kType_64b_Offset ] );
1045 }
1046
1047
1048
1049/*
1050***************************************************************************
1051**
1052** $Author$ - Initial Version by Timm Morten Steinbeck
1053**
1054** $Id$
1055**
1056***************************************************************************
1057*/