Calendar: add FT sprint
[tails/test.git] / features / scripts / otr-bot.py
blobe70fc79d980fe6196670d555c5ba02e56c3bba9c
1 #!/usr/bin/python3
2 import slixmpp
3 import potr
4 import logging
5 from argparse import ArgumentParser
8 class OtrContext(potr.context.Context):
10 def __init__(self, account, peer):
11 super(OtrContext, self).__init__(account, peer)
13 def getPolicy(self, key):
14 return True
16 def inject(self, body, appdata=None):
17 msg = appdata["base_reply"]
18 msg["body"] = str(body)
19 appdata["send_raw_message_fn"](msg)
22 class BotAccount(potr.context.Account):
24 def __init__(self, jid, keyFilePath):
25 protocol = 'xmpp'
26 max_message_size = 10*1024
27 super(BotAccount, self).__init__(jid, protocol, max_message_size)
28 self.keyFilePath = keyFilePath
30 def loadPrivkey(self):
31 with open(self.keyFilePath, 'rb') as keyFile:
32 return potr.crypt.PK.parsePrivateKey(keyFile.read())[0]
35 class OtrContextManager:
37 def __init__(self, jid, keyFilePath):
38 self.account = BotAccount(jid, keyFilePath)
39 self.contexts = {}
41 def start_context(self, other):
42 if other not in self.contexts:
43 self.contexts[other] = OtrContext(self.account, other)
44 return self.contexts[other]
46 def get_context_for_user(self, other):
47 return self.start_context(other)
50 class OtrBot(slixmpp.ClientXMPP):
52 def __init__(self, account, password, otr_key_path,
53 rooms=[], connect_server=None, log_file=None):
54 self.__connect_server = connect_server
55 self.__password = password
56 self.__log_file = log_file
57 self.__rooms = rooms
58 super().__init__(account, password)
59 self.__otr_manager = OtrContextManager(account, otr_key_path)
60 self.send_raw_message_fn = self.raw_send
61 self.__default_otr_appdata = {
62 "send_raw_message_fn": self.send_raw_message_fn
64 self.add_event_handler("session_start", self.start)
65 self.add_event_handler("message", self.handle_message)
66 self.register_plugin("xep_0045") # Multi-User Chat
67 self.register_plugin("xep_0394") # Message Markup
69 def __otr_appdata_for_msg(self, msg):
70 appdata = self.__default_otr_appdata.copy()
71 appdata["base_reply"] = msg
72 return appdata
74 def connect(self):
75 address = ()
76 if self.__connect_server:
77 address = (self.__connect_server, self.default_port)
78 super().connect(address)
80 async def start(self, event):
81 self.send_presence()
82 await self.get_roster()
83 for room in self.__rooms:
84 self.join_room(room)
86 def join_room(self, room):
87 self.plugin["xep_0045"].join_muc(room, self.boundjid.user)
89 def raw_send(self, msg):
90 msg.send()
92 def get_reply(self, command):
93 if command.strip() == "ping":
94 return "pong"
95 return None
97 def handle_message(self, msg):
98 msg = self.decrypt(msg)
99 reply = None
100 if msg["type"] == "chat":
101 if msg["html"]["body"].startswith("<p>?OTRv"):
102 return
103 reply = self.get_reply(msg["body"])
104 elif msg["type"] == "groupchat":
105 try:
106 recipient, command = msg["body"].split(":", 1)
107 except ValueError:
108 recipient, command = None, msg["body"]
109 if msg["mucnick"] == self.boundjid.user or \
110 recipient != self.boundjid.user:
111 return
112 response = self.get_reply(command)
113 if response:
114 reply = "%s: %s" % (msg["mucnick"], response)
115 else:
116 return
117 if reply:
118 self.send_message(msg.reply(reply))
120 def send_message(self, msg):
121 otrctx = self.__otr_manager.get_context_for_user(msg["to"])
122 if otrctx.state == potr.context.STATE_ENCRYPTED:
123 otrctx.sendMessage(potr.context.FRAGMENT_SEND_ALL,
124 msg["body"].encode("utf-8"),
125 appdata=self.__otr_appdata_for_msg(msg))
126 else:
127 self.raw_send(msg)
129 def decrypt(self, msg):
130 if msg["type"] == "groupchat":
131 return msg
132 otrctx = self.__otr_manager.get_context_for_user(msg["from"])
133 if msg["type"] == "chat":
134 try:
135 appdata = self.__otr_appdata_for_msg(msg.reply())
136 plaintext, tlvs = otrctx.receiveMessage(
137 msg["body"].encode("utf-8"),
138 appdata=appdata
140 if plaintext:
141 decrypted_body = plaintext.decode("utf-8")
142 else:
143 decrypted_body = ""
144 otrctx.processTLVs(tlvs)
145 except potr.context.NotEncryptedError:
146 otrctx.authStartV2(appdata=appdata)
147 return msg
148 except (potr.context.UnencryptedMessage,
149 potr.context.NotOTRMessage):
150 decrypted_body = msg["body"]
151 else:
152 decrypted_body = msg["body"]
153 msg["body"] = decrypted_body
154 return msg
157 if __name__ == '__main__':
158 parser = ArgumentParser()
159 parser.add_argument("account",
160 help="the user account, given as user@domain")
161 parser.add_argument("password",
162 help="the user account's password")
163 parser.add_argument("otr_key_path",
164 help="the path to the account's OTR key file")
165 parser.add_argument("-c", "--connect-server", metavar='ADDRESS',
166 help="use a Connect Server, given as host[:port] " +
167 "(port defaults to 5222)")
168 parser.add_argument("-j", "--auto-join", nargs='+', metavar='ROOMS',
169 help="auto-join multi-user chatrooms on start",
170 default=[])
171 parser.add_argument("-l", "--log-file", metavar='LOGFILE',
172 help="Log to file instead of stderr")
173 parser.add_argument("-d", "--debug",
174 help="enable debug logging",
175 action="store_const", dest="loglevel",
176 const=logging.DEBUG, default=logging.FATAL)
177 args = parser.parse_args()
178 logging.basicConfig(level=args.loglevel,
179 format='%(levelname)-8s %(message)s')
180 otr_bot = OtrBot(args.account,
181 args.password,
182 args.otr_key_path,
183 rooms=args.auto_join,
184 connect_server=args.connect_server,
185 log_file=args.log_file)
186 try:
187 otr_bot.connect()
188 otr_bot.process()
189 except KeyboardInterrupt:
190 otr_bot.disconnect()