Cleanup ACE_HAS_PTHREAD_SIGMASK_PROTOTYPE, all platforms support it so far as I can...
[ACE_TAO.git] / ACE / protocols / ace / RMCast / Protocol.h
blob2f53fcea03735a4b09e30a6e0baffc636b8c8429
1 // author : Boris Kolpackov <boris@kolpackov.net>
3 #ifndef ACE_RMCAST_PROTOCOL_H
4 #define ACE_RMCAST_PROTOCOL_H
6 #include <memory>
7 #include "ace/Bound_Ptr.h"
9 #include "ace/Vector_T.h"
10 #include "ace/Hash_Map_Manager.h"
12 #include "ace/CDR_Stream.h"
13 #include "ace/CDR_Size.h"
15 #include "ace/INET_Addr.h"
16 #include "ace/Null_Mutex.h"
18 #include "ace/OS_NS_string.h"
19 #include "ace/OS_NS_stdlib.h"
21 #include "Bits.h"
23 namespace ACE_RMCast
25 // Basic types.
27 typedef ACE_CDR::UShort u16;
28 typedef ACE_CDR::ULong u32;
29 typedef ACE_CDR::ULongLong u64;
31 // Protocol parameters
34 unsigned short const max_service_size = 60; // service profiles (Part, SN,
35 // etc), sizes plus message size.
40 typedef ACE_INET_Addr Address;
42 struct AddressHasher
44 unsigned long
45 operator() (Address const& a) const
47 unsigned long port (a.get_port_number ());
48 unsigned long ip (a.get_ip_address ());
50 port <<= sizeof (unsigned long) - sizeof (unsigned short);
52 return port ^ ip;
56 //@@ Provide stream<< (Address const&)
59 typedef ACE_OutputCDR ostream;
60 typedef ACE_SizeCDR sstream;
61 typedef ACE_InputCDR istream;
63 struct Profile;
65 typedef
66 ACE_Strong_Bound_Ptr<Profile, Mutex>
67 Profile_ptr;
69 struct Profile
71 public:
72 class Header
74 public:
75 Header (u16 id, u16 size)
76 : id_ (id), size_ (size)
80 Header (istream& is)
82 (void) (is >> id_ >> size_);
85 public:
86 u16
87 id () const
89 return id_;
92 u16
93 size () const
95 return size_;
98 protected:
99 void
100 size (u16 s)
102 size_ = s;
105 friend struct Profile;
107 private:
108 u16 id_;
109 u16 size_;
112 public:
113 virtual
114 ~Profile ()
118 Profile_ptr
119 clone ()
121 Profile_ptr p (clone_ ());
122 return p;
125 protected:
126 Profile (u16 id)
127 : header_ (id, 0)
131 Profile (Header const& h)
132 : header_ (h)
136 virtual Profile_ptr
137 clone_ () = 0;
139 private:
140 Profile&
141 operator= (Profile const&) = delete;
143 public:
145 id () const
147 return header_.id ();
151 size () const
153 return header_.size ();
156 protected:
157 void
158 size (u16 s)
160 header_.size (s);
164 calculate_size ()
166 sstream ss;
168 serialize_body (ss);
170 return static_cast<u16> (ss.total_length ());
173 public:
174 virtual void
175 serialize_body (ostream&) const = 0;
177 virtual void
178 serialize_body (sstream&) const = 0;
180 friend
181 ostream&
182 operator<< (ostream& os, Profile const& p);
184 friend
185 sstream&
186 operator<< (sstream& ss, Profile const& p);
188 private:
189 Header header_;
192 inline
193 ostream&
194 operator<< (ostream& os, Profile::Header const& hdr)
196 os << hdr.id ();
197 os << hdr.size ();
199 return os;
202 inline
203 sstream&
204 operator<< (sstream& ss, Profile::Header const& hdr)
206 ss << hdr.id ();
207 ss << hdr.size ();
209 return ss;
212 inline
213 ostream&
214 operator<< (ostream& os, Profile const& p)
216 os << p.header_;
217 p.serialize_body (os);
219 return os;
222 inline
223 sstream&
224 operator<< (sstream& ss, Profile const& p)
226 ss << p.header_;
227 p.serialize_body (ss);
229 return ss;
236 class Message;
238 typedef
239 ACE_Strong_Bound_Ptr<Message, Mutex>
240 Message_ptr;
242 class Message
244 typedef
245 ACE_Hash_Map_Manager<u16, Profile_ptr, ACE_Null_Mutex>
246 Profiles;
248 public:
249 Message ()
250 : profiles_ (4)
254 Message_ptr
255 clone ()
257 Message_ptr cloned (new Message (*this));
258 return cloned;
261 protected:
262 Message (Message const& m)
263 : profiles_ (4)
265 for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ())
267 // Shallow copy of profiles. This implies that profiles are not
268 // modified as they go up/down the stack.
270 profiles_.bind ((*i).ext_id_, (*i).int_id_);
274 private:
275 Message&
276 operator= (Message const&);
278 public:
279 bool
280 add (Profile_ptr p)
282 u16 id (p->id ());
284 if (profiles_.find (id) == 0)
286 return false;
289 profiles_.bind (id, p);
291 return true;
294 void
295 replace (Profile_ptr p)
297 profiles_.rebind (p->id (), p);
300 void
301 remove (u16 id)
303 profiles_.unbind (id);
306 Profile const*
307 find (u16 id) const
309 Profiles::ENTRY* e = 0;
311 if (profiles_.find (id, e) == -1) return 0;
313 return e->int_id_.get ();
316 typedef
317 Profiles::const_iterator
318 ProfileIterator;
320 ProfileIterator
321 begin () const
323 return ProfileIterator (profiles_);
326 public:
327 size_t
328 size () const
330 sstream ss;
332 u32 s (0);
334 ss << s;
336 for (Profiles::const_iterator i (profiles_); !i.done (); i.advance ())
338 ss << *((*i).int_id_);
341 return ss.total_length ();
344 friend
345 ostream&
346 operator<< (ostream& os, Message const& m)
348 u32 s (m.size ());
350 os << s;
352 for (Profiles::const_iterator i (m.profiles_); !i.done (); i.advance ())
354 os << *((*i).int_id_);
357 return os;
360 private:
361 Profiles profiles_;
364 typedef ACE_Vector<Message_ptr, ACE_VECTOR_DEFAULT_SIZE> Messages;
369 struct From;
371 typedef
372 ACE_Strong_Bound_Ptr<From, Mutex>
373 From_ptr;
375 struct From : Profile
377 static u16 const id;
379 public:
380 From (Header const& h, istream& is)
381 : Profile (h)
383 u32 addr;
384 u16 port;
386 is >> addr;
387 is >> port;
389 address_ = Address (port, addr);
392 From (Address const& addr)
393 : Profile (id), address_ (addr)
395 size (calculate_size ());
398 From_ptr
399 clone ()
401 return From_ptr (clone_ ());
404 protected:
405 virtual Profile_ptr
406 clone_ ()
408 Profile_ptr p (new From (*this));
409 return p;
412 From (From const& from)
413 : Profile (from),
414 address_ (from.address_)
418 public:
419 Address const&
420 address () const
422 return address_;
425 public:
426 virtual void
427 serialize_body (ostream& os) const
429 u32 addr (address_.get_ip_address ());
430 u16 port (address_.get_port_number ());
432 os << addr;
433 os << port;
436 virtual void
437 serialize_body (sstream& ss) const
439 u32 addr (0);
440 u16 port (0);
442 ss << addr;
443 ss << port;
446 private:
447 Address address_;
454 struct To;
456 typedef
457 ACE_Strong_Bound_Ptr<To, Mutex>
458 To_ptr;
460 struct To : Profile
462 static u16 const id;
464 public:
465 To (Header const& h, istream& is)
466 : Profile (h)
468 u32 addr;
469 u16 port;
471 is >> addr;
472 is >> port;
474 address_ = Address (port, addr);
477 To (Address const& addr)
478 : Profile (id), address_ (addr)
480 size (calculate_size ());
483 To_ptr
484 clone ()
486 return To_ptr (clone_ ());
489 protected:
490 virtual Profile_ptr
491 clone_ ()
493 Profile_ptr p (new To (*this));
494 return p;
497 To (To const& to)
498 : Profile (to),
499 address_ (to.address_)
503 public:
504 Address const&
505 address () const
507 return address_;
510 public:
511 virtual void
512 serialize_body (ostream& os) const
514 u32 addr (address_.get_ip_address ());
515 u16 port (address_.get_port_number ());
517 os << addr;
518 os << port;
521 virtual void
522 serialize_body (sstream& ss) const
524 u32 addr (0);
525 u16 port (0);
527 ss << addr;
528 ss << port;
531 private:
532 Address address_;
539 struct Data;
541 typedef
542 ACE_Strong_Bound_Ptr<Data, Mutex>
543 Data_ptr;
545 struct Data : Profile
547 static u16 const id;
549 public:
550 virtual
551 ~Data ()
553 if (buf_)
554 operator delete (buf_);
557 Data (Header const& h, istream& is)
558 : Profile (h),
559 buf_ (0),
560 size_ (h.size ()),
561 capacity_ (size_)
563 if (size_)
565 buf_ = reinterpret_cast<char*> (operator new (capacity_));
566 is.read_char_array (buf_, size_);
570 Data (void const* buf, size_t s, size_t capacity = 0)
571 : Profile (id),
572 buf_ (0),
573 size_ (s),
574 capacity_ (capacity < size_ ? size_ : capacity)
576 if (size_)
578 buf_ = reinterpret_cast<char*> (operator new (capacity_));
579 ACE_OS::memcpy (buf_, buf, size_);
582 Profile::size (calculate_size ());
585 Data_ptr
586 clone ()
588 return Data_ptr (clone_ ());
591 protected:
592 virtual Profile_ptr
593 clone_ ()
595 Profile_ptr p (new Data (*this));
596 return p;
599 Data (Data const& d)
600 : Profile (d),
601 buf_ (0),
602 size_ (d.size_),
603 capacity_ (d.capacity_)
605 if (size_)
607 buf_ = reinterpret_cast<char*> (operator new (capacity_));
608 ACE_OS::memcpy (buf_, d.buf_, size_);
611 Profile::size (calculate_size ());
614 public:
615 char const*
616 buf () const
618 return buf_;
621 char*
622 buf ()
624 return buf_;
627 size_t
628 size () const
630 return size_;
633 void
634 size (size_t s)
636 if (s > capacity_)
637 ACE_OS::abort ();
639 size_ = s;
641 Profile::size (calculate_size ());
644 size_t
645 capacity () const
647 return capacity_;
650 public:
651 virtual void
652 serialize_body (ostream& os) const
654 os.write_char_array (buf_, size_);
657 virtual void
658 serialize_body (sstream& ss) const
660 ss.write_char_array (buf_, size_);
663 private:
664 char* buf_;
665 size_t size_;
666 size_t capacity_;
673 struct SN;
675 typedef
676 ACE_Strong_Bound_Ptr<SN, Mutex>
677 SN_ptr;
679 struct SN : Profile
681 static u16 const id;
683 public:
684 SN (Header const& h, istream& is)
685 : Profile (h)
687 is >> n_;
690 SN (u64 n)
691 : Profile (id), n_ (n)
693 size (calculate_size ());
696 SN_ptr
697 clone ()
699 return SN_ptr (clone_ ());
702 protected:
703 virtual Profile_ptr
704 clone_ ()
706 Profile_ptr p (new SN (*this));
707 return p;
710 SN (SN const& sn)
711 : Profile (sn),
712 n_ (sn.n_)
716 public:
718 num () const
720 return n_;
723 public:
724 virtual void
725 serialize_body (ostream& os) const
727 os << n_;
730 virtual void
731 serialize_body (sstream& ss) const
733 ss << n_;
736 private:
737 u64 n_;
744 class NAK;
746 typedef
747 ACE_Strong_Bound_Ptr<NAK, Mutex>
748 NAK_ptr;
750 class NAK : public Profile
752 public:
753 static u16 const id;
755 typedef ACE_Vector<u64, ACE_VECTOR_DEFAULT_SIZE> SerialNumbers;
756 typedef SerialNumbers::Iterator iterator;
758 NAK (Header const& h, istream& is)
759 : Profile (h)
761 u64 sn (0);
762 u32 addr (0);
763 u16 port (0);
765 sstream ss;
767 ss << sn;
768 size_t sn_size (ss.total_length ());
770 ss.reset ();
772 ss << addr;
773 ss << port;
775 size_t addr_size (ss.total_length ());
778 is >> addr;
779 is >> port;
781 // num_of_sns = (size - addr_size) / sn_size
783 for (unsigned long i (0); i < ((h.size () - addr_size) / sn_size); ++i)
785 is >> sn;
786 sns_.push_back (sn);
790 address_ = Address (port, addr);
793 NAK (Address const& src)
794 : Profile (id), address_ (src)
796 size (calculate_size ());
799 NAK_ptr
800 clone ()
802 return NAK_ptr (clone_ ());
805 protected:
806 virtual Profile_ptr
807 clone_ ()
809 Profile_ptr p (new NAK (*this));
810 return p;
813 NAK (NAK const& nak)
814 : Profile (nak),
815 address_ (nak.address_),
816 sns_ (nak.sns_)
820 public:
821 void
822 add (u64 sn)
824 sns_.push_back (sn);
825 size (calculate_size ());
828 public:
829 Address const&
830 address () const
832 return address_;
836 iterator
837 begin () /* const */
839 return iterator (sns_);
843 iterator
844 end () const
846 return sns_.end ();
850 size_t
851 count () const
853 return sns_.size ();
856 public:
857 // Count max number of elements that will fit into NAK profile
858 // with size <= max_size.
860 static u32
861 max_count (u32 max_size)
863 u32 n (0);
865 sstream ss;
867 Profile::Header hdr (0, 0);
868 ss << hdr;
870 u32 addr (0);
871 u16 port (0);
872 ss << addr;
873 ss << port;
875 while (true)
877 u64 sn (0);
878 ss << sn;
880 if (ss.total_length () <= max_size)
881 ++n;
883 if (ss.total_length () >= max_size)
884 break;
887 return n;
890 public:
891 virtual void
892 serialize_body (ostream& os) const
894 NAK& this_ = const_cast<NAK&> (*this); // Don't put in ROM.
896 u32 addr (address_.get_ip_address ());
897 u16 port (address_.get_port_number ());
899 os << addr;
900 os << port;
902 // Stone age iteration.
904 for (iterator i (this_.begin ()); !i.done (); i.advance ())
906 u64* psn = 0;
907 i.next (psn);
908 os << *psn;
912 virtual void
913 serialize_body (sstream& ss) const
915 NAK& this_ = const_cast<NAK&> (*this); // Don't put in ROM.
917 u32 addr (0);
918 u16 port (0);
920 ss << addr;
921 ss << port;
923 // Stone age iteration.
925 for (iterator i (this_.begin ()); !i.done (); i.advance ())
927 u64 sn (0);
928 ss << sn;
932 private:
933 Address address_;
934 SerialNumbers sns_;
940 struct NRTM;
942 typedef
943 ACE_Strong_Bound_Ptr<NRTM, Mutex>
944 NRTM_ptr;
946 struct NRTM : Profile
948 static u16 const id;
950 public:
951 NRTM (Header const& h, istream& is)
952 : Profile (h), map_ (10)
954 u32 addr (0);
955 u16 port (0);
956 u64 sn (0);
958 sstream ss;
960 ss << sn;
961 ss << addr;
962 ss << port;
964 size_t block_size (ss.total_length ());
967 // num_of_blocks = size / block_size
969 for (size_t i (0); i < (h.size () / block_size); ++i)
971 is >> sn;
972 is >> addr;
973 is >> port;
975 map_.bind (Address (port, addr), sn);
979 NRTM ()
980 : Profile (id), map_ (10)
982 size (calculate_size ());
985 NRTM_ptr
986 clone ()
988 return NRTM_ptr (clone_ ());
991 protected:
992 virtual Profile_ptr
993 clone_ ()
995 Profile_ptr p (new NRTM (*this));
996 return p;
999 NRTM (NRTM const& nrtm)
1000 : Profile (nrtm)
1002 for (Map::const_iterator i (nrtm.map_); !i.done (); i.advance ())
1004 map_.bind ((*i).ext_id_, (*i).int_id_);
1008 public:
1009 void
1010 insert (Address const& addr, u64 sn)
1012 map_.bind (addr, sn);
1014 size (calculate_size ());
1018 find (Address const& addr) const
1020 u64 sn = 0;
1022 if (map_.find (addr, sn) == -1) return 0;
1024 return sn;
1027 bool
1028 empty () const
1030 return map_.current_size () == 0;
1033 public:
1034 // Count max number of elements that will fit into NRTM profile
1035 // with size <= max_size.
1037 static u32
1038 max_count (u32 max_size)
1040 u32 n (0);
1042 sstream ss;
1044 Profile::Header hdr (0, 0);
1045 ss << hdr;
1047 while (true)
1049 u32 addr (0);
1050 u16 port (0);
1051 u64 sn (0);
1053 ss << sn;
1054 ss << addr;
1055 ss << port;
1057 if (ss.total_length () <= max_size)
1058 ++n;
1060 if (ss.total_length () >= max_size)
1061 break;
1064 return n;
1067 public:
1068 virtual void
1069 serialize_body (ostream& os) const
1071 for (Map::const_iterator i (map_), e (map_, 1); i != e; ++i)
1073 u32 addr ((*i).ext_id_.get_ip_address ());
1074 u16 port ((*i).ext_id_.get_port_number ());
1075 u64 sn ((*i).int_id_);
1077 os << sn;
1078 os << addr;
1079 os << port;
1083 virtual void
1084 serialize_body (sstream& ss) const
1086 for (Map::const_iterator i (map_), e (map_, 1); i != e; ++i)
1088 u32 addr (0);
1089 u16 port (0);
1090 u64 sn (0);
1092 ss << sn;
1093 ss << addr;
1094 ss << port;
1098 private:
1099 typedef
1100 ACE_Hash_Map_Manager_Ex<Address,
1101 u64,
1102 AddressHasher,
1103 ACE_Equal_To<Address>,
1104 ACE_Null_Mutex>
1105 Map;
1107 Map map_;
1114 struct NoData;
1116 typedef
1117 ACE_Strong_Bound_Ptr<NoData, Mutex>
1118 NoData_ptr;
1120 struct NoData : Profile
1122 static u16 const id;
1124 public:
1125 NoData (Header const& h, istream&)
1126 : Profile (h)
1130 NoData ()
1131 : Profile (id)
1133 Profile::size (0);
1136 NoData_ptr
1137 clone ()
1139 return NoData_ptr (clone_ ());
1142 protected:
1143 virtual Profile_ptr
1144 clone_ ()
1146 Profile_ptr p (new NoData (*this));
1147 return p;
1150 NoData (NoData const& no_data)
1151 : Profile (no_data)
1155 public:
1156 virtual void
1157 serialize_body (ostream&) const
1161 virtual void
1162 serialize_body (sstream&) const
1171 struct Part;
1173 typedef
1174 ACE_Strong_Bound_Ptr<Part, Mutex>
1175 Part_ptr;
1177 struct Part : Profile
1179 static u16 const id;
1181 public:
1182 Part (Header const& h, istream& is)
1183 : Profile (h)
1185 is >> num_;
1186 is >> of_;
1187 is >> total_size_;
1190 Part (u32 num, u32 of, u64 total_size)
1191 : Profile (id),
1192 num_ (num),
1193 of_ (of),
1194 total_size_ (total_size)
1196 size (calculate_size ());
1199 Part_ptr
1200 clone ()
1202 return Part_ptr (clone_ ());
1205 protected:
1206 virtual Profile_ptr
1207 clone_ ()
1209 Profile_ptr p (new Part (*this));
1210 return p;
1213 Part (Part const& part)
1214 : Profile (part),
1215 num_ (part.num_),
1216 of_ (part.of_),
1217 total_size_ (part.total_size_)
1221 public:
1223 num () const
1225 return num_;
1229 of () const
1231 return of_;
1235 total_size () const
1237 return total_size_;
1240 public:
1241 virtual void
1242 serialize_body (ostream& os) const
1244 os << num_;
1245 os << of_;
1246 os << total_size_;
1249 virtual void
1250 serialize_body (sstream& ss) const
1252 ss << num_;
1253 ss << of_;
1254 ss << total_size_;
1258 private:
1259 u32 num_;
1260 u32 of_;
1261 u64 total_size_;
1266 inline
1267 std::ostream&
1268 operator<< (std::ostream& os, ACE_RMCast::Address const& a)
1270 char buf[64];
1271 a.addr_to_string (buf, 64, 1);
1272 return os << buf;
1276 #endif // ACE_RMCAST_PROTOCOL_H