1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 #include "chainparams.h"
7 #include "zmqpublishnotifier.h"
8 #include "validation.h"
10 #include "rpc/server.h"
12 static std::multimap
<std::string
, CZMQAbstractPublishNotifier
*> mapPublishNotifiers
;
14 static const char *MSG_HASHBLOCK
= "hashblock";
15 static const char *MSG_HASHTX
= "hashtx";
16 static const char *MSG_RAWBLOCK
= "rawblock";
17 static const char *MSG_RAWTX
= "rawtx";
19 // Internal function to send multipart message
20 static int zmq_send_multipart(void *sock
, const void* data
, size_t size
, ...)
29 int rc
= zmq_msg_init_size(&msg
, size
);
32 zmqError("Unable to initialize ZMQ msg");
36 void *buf
= zmq_msg_data(&msg
);
37 memcpy(buf
, data
, size
);
39 data
= va_arg(args
, const void*);
41 rc
= zmq_msg_send(&msg
, sock
, data
? ZMQ_SNDMORE
: 0);
44 zmqError("Unable to send ZMQ msg");
54 size
= va_arg(args
, size_t);
59 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext
)
63 // check if address is being used by other publish notifier
64 std::multimap
<std::string
, CZMQAbstractPublishNotifier
*>::iterator i
= mapPublishNotifiers
.find(address
);
66 if (i
==mapPublishNotifiers
.end())
68 psocket
= zmq_socket(pcontext
, ZMQ_PUB
);
71 zmqError("Failed to create socket");
75 int rc
= zmq_bind(psocket
, address
.c_str());
78 zmqError("Failed to bind address");
83 // register this notifier for the address, so it can be reused for other publish notifier
84 mapPublishNotifiers
.insert(std::make_pair(address
, this));
89 LogPrint("zmq", "zmq: Reusing socket for address %s\n", address
);
91 psocket
= i
->second
->psocket
;
92 mapPublishNotifiers
.insert(std::make_pair(address
, this));
98 void CZMQAbstractPublishNotifier::Shutdown()
102 int count
= mapPublishNotifiers
.count(address
);
104 // remove this notifier from the list of publishers using this address
105 typedef std::multimap
<std::string
, CZMQAbstractPublishNotifier
*>::iterator iterator
;
106 std::pair
<iterator
, iterator
> iterpair
= mapPublishNotifiers
.equal_range(address
);
108 for (iterator it
= iterpair
.first
; it
!= iterpair
.second
; ++it
)
110 if (it
->second
==this)
112 mapPublishNotifiers
.erase(it
);
119 LogPrint("zmq", "Close socket at address %s\n", address
);
121 zmq_setsockopt(psocket
, ZMQ_LINGER
, &linger
, sizeof(linger
));
128 bool CZMQAbstractPublishNotifier::SendMessage(const char *command
, const void* data
, size_t size
)
132 /* send three parts, command & data & a LE 4byte sequence number */
133 unsigned char msgseq
[sizeof(uint32_t)];
134 WriteLE32(&msgseq
[0], nSequence
);
135 int rc
= zmq_send_multipart(psocket
, command
, strlen(command
), data
, size
, msgseq
, (size_t)sizeof(uint32_t), (void*)0);
139 /* increment memory only sequence number after sending */
145 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex
*pindex
)
147 uint256 hash
= pindex
->GetBlockHash();
148 LogPrint("zmq", "zmq: Publish hashblock %s\n", hash
.GetHex());
150 for (unsigned int i
= 0; i
< 32; i
++)
151 data
[31 - i
] = hash
.begin()[i
];
152 return SendMessage(MSG_HASHBLOCK
, data
, 32);
155 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction
&transaction
)
157 uint256 hash
= transaction
.GetHash();
158 LogPrint("zmq", "zmq: Publish hashtx %s\n", hash
.GetHex());
160 for (unsigned int i
= 0; i
< 32; i
++)
161 data
[31 - i
] = hash
.begin()[i
];
162 return SendMessage(MSG_HASHTX
, data
, 32);
165 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex
*pindex
)
167 LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex
->GetBlockHash().GetHex());
169 const Consensus::Params
& consensusParams
= Params().GetConsensus();
170 CDataStream
ss(SER_NETWORK
, PROTOCOL_VERSION
| RPCSerializationFlags());
174 if(!ReadBlockFromDisk(block
, pindex
, consensusParams
))
176 zmqError("Can't read block from disk");
183 return SendMessage(MSG_RAWBLOCK
, &(*ss
.begin()), ss
.size());
186 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction
&transaction
)
188 uint256 hash
= transaction
.GetHash();
189 LogPrint("zmq", "zmq: Publish rawtx %s\n", hash
.GetHex());
190 CDataStream
ss(SER_NETWORK
, PROTOCOL_VERSION
| RPCSerializationFlags());
192 return SendMessage(MSG_RAWTX
, &(*ss
.begin()), ss
.size());