1 #ifndef MAVLINKPROTOBUFMANAGER_HPP
2 #define MAVLINKPROTOBUFMANAGER_HPP
5 #include <google/protobuf/message.h>
10 #include <common/mavlink.h>
11 #include <mavlink_types.h>
12 #include <pixhawk/pixhawk.pb.h>
15 class ProtobufManager
{
18 : mRegisteredTypeCount(0)
21 , kExtendedHeaderSize(MAVLINK_EXTENDED_HEADER_LEN
)
22 , kExtendedPayloadMaxSize(MAVLINK_MAX_EXTENDED_PAYLOAD_LEN
)
26 std::tr1::shared_ptr
<px::GLOverlay
> msg(new px::GLOverlay
);
30 // register ObstacleList
32 std::tr1::shared_ptr
<px::ObstacleList
> msg(new px::ObstacleList
);
36 // register ObstacleMap
38 std::tr1::shared_ptr
<px::ObstacleMap
> msg(new px::ObstacleMap
);
44 std::tr1::shared_ptr
<px::Path
> msg(new px::Path
);
48 // register PointCloudXYZI
50 std::tr1::shared_ptr
<px::PointCloudXYZI
> msg(new px::PointCloudXYZI
);
54 // register PointCloudXYZRGB
56 std::tr1::shared_ptr
<px::PointCloudXYZRGB
> msg(new px::PointCloudXYZRGB
);
62 std::tr1::shared_ptr
<px::RGBDImage
> msg(new px::RGBDImage
);
67 mStreamID
= rand() + 1;
70 bool fragmentMessage(uint8_t system_id
, uint8_t component_id
,
71 uint8_t target_system
, uint8_t target_component
,
72 const google::protobuf::Message
& protobuf_msg
,
73 std::vector
<mavlink_extended_message_t
> & fragments
) const
75 TypeMap::const_iterator it
= mTypeMap
.find(protobuf_msg
.GetTypeName());
77 if (it
== mTypeMap
.end()) {
78 std::cout
<< "# WARNING: Protobuf message with type "
79 << protobuf_msg
.GetTypeName() << " is not registered."
84 uint8_t typecode
= it
->second
;
86 std::string data
= protobuf_msg
.SerializeAsString();
88 int fragmentCount
= (protobuf_msg
.ByteSize() + kExtendedPayloadMaxSize
- 1) / kExtendedPayloadMaxSize
;
89 unsigned int offset
= 0;
91 for (int i
= 0; i
< fragmentCount
; ++i
) {
92 mavlink_extended_message_t fragment
;
94 // write extended header data
95 uint8_t *payload
= reinterpret_cast<uint8_t *>(fragment
.base_msg
.payload64
);
96 unsigned int length
= 0;
99 if (i
< fragmentCount
- 1) {
100 length
= kExtendedPayloadMaxSize
;
103 length
= protobuf_msg
.ByteSize() - kExtendedPayloadMaxSize
* (fragmentCount
- 1);
106 memcpy(payload
, &target_system
, 1);
107 memcpy(payload
+ 1, &target_component
, 1);
108 memcpy(payload
+ 2, &typecode
, 1);
109 memcpy(payload
+ 3, &length
, 4);
110 memcpy(payload
+ 7, &mStreamID
, 2);
111 memcpy(payload
+ 9, &offset
, 4);
112 memcpy(payload
+ 13, &flags
, 1);
114 fragment
.base_msg
.msgid
= MAVLINK_MSG_ID_EXTENDED_MESSAGE
;
115 mavlink_finalize_message(&fragment
.base_msg
, system_id
, component_id
, kExtendedHeaderSize
, 0);
117 // write extended payload data
118 fragment
.extended_payload_len
= length
;
119 memcpy(fragment
.extended_payload
, &data
[offset
], length
);
121 fragments
.push_back(fragment
);
126 std::cerr
<< "# INFO: Split extended message with size "
127 << protobuf_msg
.ByteSize() << " into "
128 << fragmentCount
<< " fragments." << std::endl
;
134 bool cacheFragment(mavlink_extended_message_t
& msg
)
136 if (!validFragment(msg
)) {
138 std::cerr
<< "# WARNING: Message is not a valid fragment. "
139 << "Dropping message..." << std::endl
;
144 // read extended header
145 uint8_t *payload
= reinterpret_cast<uint8_t *>(msg
.base_msg
.payload64
);
146 uint8_t typecode
= 0;
147 unsigned int length
= 0;
148 unsigned short streamID
= 0;
149 unsigned int offset
= 0;
152 memcpy(&typecode
, payload
+ 2, 1);
153 memcpy(&length
, payload
+ 3, 4);
154 memcpy(&streamID
, payload
+ 7, 2);
155 memcpy(&offset
, payload
+ 9, 4);
156 memcpy(&flags
, payload
+ 13, 1);
158 if (typecode
>= mTypeMap
.size()) {
159 std::cout
<< "# WARNING: Protobuf message with type code "
160 << static_cast<int>(typecode
) << " is not registered." << std::endl
;
164 bool reassemble
= false;
166 FragmentQueue::iterator it
= mFragmentQueue
.find(streamID
);
167 if (it
== mFragmentQueue
.end()) {
169 mFragmentQueue
[streamID
].push_back(msg
);
171 if ((flags
& 0x1) != 0x1) {
176 std::cerr
<< "# INFO: Added fragment to new queue."
181 std::cerr
<< "# WARNING: Message is not a valid fragment. "
182 << "Dropping message..." << std::endl
;
186 std::deque
<mavlink_extended_message_t
> & queue
= it
->second
;
190 queue
.push_back(msg
);
192 if ((flags
& 0x1) != 0x1) {
197 std::cerr
<< "# WARNING: Message is not a valid fragment. "
198 << "Dropping message..." << std::endl
;
202 if (fragmentDataSize(queue
.back()) + fragmentOffset(queue
.back()) != offset
) {
204 std::cerr
<< "# WARNING: Previous fragment(s) have been lost. "
205 << "Dropping message and clearing queue..." << std::endl
;
209 queue
.push_back(msg
);
211 if ((flags
& 0x1) != 0x1) {
219 std::deque
<mavlink_extended_message_t
> & queue
= mFragmentQueue
[streamID
];
222 for (size_t i
= 0; i
< queue
.size(); ++i
) {
223 mavlink_extended_message_t
& mavlink_msg
= queue
.at(i
);
225 data
.append(reinterpret_cast<char *>(&mavlink_msg
.extended_payload
[0]),
226 static_cast<size_t>(mavlink_msg
.extended_payload_len
));
229 mMessages
.at(typecode
)->ParseFromString(data
);
231 mMessageAvailable
.at(typecode
) = true;
236 std::cerr
<< "# INFO: Reassembled fragments for message with typename "
237 << mMessages
.at(typecode
)->GetTypeName() << " and size "
238 << mMessages
.at(typecode
)->ByteSize()
246 bool getMessage(std::tr1::shared_ptr
<google::protobuf::Message
> & msg
)
248 for (size_t i
= 0; i
< mMessageAvailable
.size(); ++i
) {
249 if (mMessageAvailable
.at(i
)) {
250 msg
= mMessages
.at(i
);
251 mMessageAvailable
.at(i
) = false;
261 void registerType(const std::tr1::shared_ptr
<google::protobuf::Message
> & msg
)
263 mTypeMap
[msg
->GetTypeName()] = mRegisteredTypeCount
;
264 ++mRegisteredTypeCount
;
265 mMessages
.push_back(msg
);
266 mMessageAvailable
.push_back(false);
269 bool validFragment(const mavlink_extended_message_t
& msg
) const
271 if (msg
.base_msg
.magic
!= MAVLINK_STX
||
272 msg
.base_msg
.len
!= kExtendedHeaderSize
||
273 msg
.base_msg
.msgid
!= MAVLINK_MSG_ID_EXTENDED_MESSAGE
) {
278 checksum
= crc_calculate(reinterpret_cast<const uint8_t *>(&msg
.base_msg
.len
), MAVLINK_CORE_HEADER_LEN
);
279 crc_accumulate_buffer(&checksum
, reinterpret_cast<const char *>(&msg
.base_msg
.payload64
), kExtendedHeaderSize
);
280 #if MAVLINK_CRC_EXTRA
281 static const uint8_t mavlink_message_crcs
[256] = MAVLINK_MESSAGE_CRCS
;
282 crc_accumulate(mavlink_message_crcs
[msg
.base_msg
.msgid
], &checksum
);
285 if (mavlink_ck_a(&(msg
.base_msg
)) != (uint8_t)(checksum
& 0xFF) &&
286 mavlink_ck_b(&(msg
.base_msg
)) != (uint8_t)(checksum
>> 8)) {
293 unsigned int fragmentDataSize(const mavlink_extended_message_t
& msg
) const
295 const uint8_t *payload
= reinterpret_cast<const uint8_t *>(msg
.base_msg
.payload64
);
297 return *(reinterpret_cast<const unsigned int *>(payload
+ 3));
300 unsigned int fragmentOffset(const mavlink_extended_message_t
& msg
) const
302 const uint8_t *payload
= reinterpret_cast<const uint8_t *>(msg
.base_msg
.payload64
);
304 return *(reinterpret_cast<const unsigned int *>(payload
+ 9));
307 int mRegisteredTypeCount
;
308 unsigned short mStreamID
;
311 typedef std::map
<std::string
, uint8_t> TypeMap
;
313 std::vector
< std::tr1::shared_ptr
<google::protobuf::Message
> > mMessages
;
314 std::vector
<bool> mMessageAvailable
;
316 typedef std::map
<unsigned short, std::deque
<mavlink_extended_message_t
> > FragmentQueue
;
317 FragmentQueue mFragmentQueue
;
319 const int kExtendedHeaderSize
;
321 * Extended header structure
322 * =========================
323 * byte 0 - target_system
324 * byte 1 - target_component
325 * byte 2 - extended message id (type code)
326 * bytes 3-6 - extended payload size in bytes
327 * byte 7-8 - stream ID
328 * byte 9-12 - fragment offset
329 * byte 13 - fragment flags (bit 0 - 1=more fragments, 0=last fragment)
332 const int kExtendedPayloadMaxSize
;
336 #endif // ifndef MAVLINKPROTOBUFMANAGER_HPP