[Wallet] split the keypool in an internal and external part
[bitcoinplatinum.git] / src / zmq / zmqpublishnotifier.cpp
blobcaca1248a1ad937e148fd48ded7dab79c4bbf9c1
1 // Copyright (c) 2015-2016 The Bitcoin Core developers
2 // Distributed under the MIT software license, see the accompanying
3 // file COPYING or http://www.opensource.org/licenses/mit-license.php.
5 #include "chainparams.h"
6 #include "streams.h"
7 #include "zmqpublishnotifier.h"
8 #include "validation.h"
9 #include "util.h"
10 #include "rpc/server.h"
12 static std::multimap<std::string, CZMQAbstractPublishNotifier*> mapPublishNotifiers;
14 static const char *MSG_HASHBLOCK = "hashblock";
15 static const char *MSG_HASHTX = "hashtx";
16 static const char *MSG_RAWBLOCK = "rawblock";
17 static const char *MSG_RAWTX = "rawtx";
19 // Internal function to send multipart message
20 static int zmq_send_multipart(void *sock, const void* data, size_t size, ...)
22 va_list args;
23 va_start(args, size);
25 while (1)
27 zmq_msg_t msg;
29 int rc = zmq_msg_init_size(&msg, size);
30 if (rc != 0)
32 zmqError("Unable to initialize ZMQ msg");
33 return -1;
36 void *buf = zmq_msg_data(&msg);
37 memcpy(buf, data, size);
39 data = va_arg(args, const void*);
41 rc = zmq_msg_send(&msg, sock, data ? ZMQ_SNDMORE : 0);
42 if (rc == -1)
44 zmqError("Unable to send ZMQ msg");
45 zmq_msg_close(&msg);
46 return -1;
49 zmq_msg_close(&msg);
51 if (!data)
52 break;
54 size = va_arg(args, size_t);
56 return 0;
59 bool CZMQAbstractPublishNotifier::Initialize(void *pcontext)
61 assert(!psocket);
63 // check if address is being used by other publish notifier
64 std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator i = mapPublishNotifiers.find(address);
66 if (i==mapPublishNotifiers.end())
68 psocket = zmq_socket(pcontext, ZMQ_PUB);
69 if (!psocket)
71 zmqError("Failed to create socket");
72 return false;
75 int rc = zmq_bind(psocket, address.c_str());
76 if (rc!=0)
78 zmqError("Failed to bind address");
79 zmq_close(psocket);
80 return false;
83 // register this notifier for the address, so it can be reused for other publish notifier
84 mapPublishNotifiers.insert(std::make_pair(address, this));
85 return true;
87 else
89 LogPrint("zmq", "zmq: Reusing socket for address %s\n", address);
91 psocket = i->second->psocket;
92 mapPublishNotifiers.insert(std::make_pair(address, this));
94 return true;
98 void CZMQAbstractPublishNotifier::Shutdown()
100 assert(psocket);
102 int count = mapPublishNotifiers.count(address);
104 // remove this notifier from the list of publishers using this address
105 typedef std::multimap<std::string, CZMQAbstractPublishNotifier*>::iterator iterator;
106 std::pair<iterator, iterator> iterpair = mapPublishNotifiers.equal_range(address);
108 for (iterator it = iterpair.first; it != iterpair.second; ++it)
110 if (it->second==this)
112 mapPublishNotifiers.erase(it);
113 break;
117 if (count == 1)
119 LogPrint("zmq", "Close socket at address %s\n", address);
120 int linger = 0;
121 zmq_setsockopt(psocket, ZMQ_LINGER, &linger, sizeof(linger));
122 zmq_close(psocket);
125 psocket = 0;
128 bool CZMQAbstractPublishNotifier::SendMessage(const char *command, const void* data, size_t size)
130 assert(psocket);
132 /* send three parts, command & data & a LE 4byte sequence number */
133 unsigned char msgseq[sizeof(uint32_t)];
134 WriteLE32(&msgseq[0], nSequence);
135 int rc = zmq_send_multipart(psocket, command, strlen(command), data, size, msgseq, (size_t)sizeof(uint32_t), (void*)0);
136 if (rc == -1)
137 return false;
139 /* increment memory only sequence number after sending */
140 nSequence++;
142 return true;
145 bool CZMQPublishHashBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
147 uint256 hash = pindex->GetBlockHash();
148 LogPrint("zmq", "zmq: Publish hashblock %s\n", hash.GetHex());
149 char data[32];
150 for (unsigned int i = 0; i < 32; i++)
151 data[31 - i] = hash.begin()[i];
152 return SendMessage(MSG_HASHBLOCK, data, 32);
155 bool CZMQPublishHashTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
157 uint256 hash = transaction.GetHash();
158 LogPrint("zmq", "zmq: Publish hashtx %s\n", hash.GetHex());
159 char data[32];
160 for (unsigned int i = 0; i < 32; i++)
161 data[31 - i] = hash.begin()[i];
162 return SendMessage(MSG_HASHTX, data, 32);
165 bool CZMQPublishRawBlockNotifier::NotifyBlock(const CBlockIndex *pindex)
167 LogPrint("zmq", "zmq: Publish rawblock %s\n", pindex->GetBlockHash().GetHex());
169 const Consensus::Params& consensusParams = Params().GetConsensus();
170 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
172 LOCK(cs_main);
173 CBlock block;
174 if(!ReadBlockFromDisk(block, pindex, consensusParams))
176 zmqError("Can't read block from disk");
177 return false;
180 ss << block;
183 return SendMessage(MSG_RAWBLOCK, &(*ss.begin()), ss.size());
186 bool CZMQPublishRawTransactionNotifier::NotifyTransaction(const CTransaction &transaction)
188 uint256 hash = transaction.GetHash();
189 LogPrint("zmq", "zmq: Publish rawtx %s\n", hash.GetHex());
190 CDataStream ss(SER_NETWORK, PROTOCOL_VERSION | RPCSerializationFlags());
191 ss << transaction;
192 return SendMessage(MSG_RAWTX, &(*ss.begin()), ss.size());