gajim-otrplugin/otr.py

160 lines
7.3 KiB
Python
Raw Normal View History

## Copyright (C) 2008-2012 Kjell Braden <afflux@pentabarf.de>
## Copyright (C) 2019 Pavel R. <pd at narayana dot im>
2022-10-15 17:50:34 +00:00
## Copyright (C) 2022 Bohdan Horbeshko <bodqhrohro@gmail.com>
#
# This file is part of Gajim OTR Plugin.
#
# Gajim OTR Plugin is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
#
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You can always obtain full license text at <http://www.gnu.org/licenses/>.
import os
2022-10-06 00:57:18 +00:00
import string
import random
import itertools
import logging
from inspect import signature
from gajim.common import const, app, helpers, configpaths
2022-10-15 17:50:34 +00:00
from gajim.common.const import EncryptionData
from nbxmpp.protocol import Message, JID
2022-10-09 20:07:36 +00:00
import pathlib
import sys
sys.path.insert(0, pathlib.Path(__file__).parent.resolve())
sys.modules['gajim_otrplugin'] = sys.modules['gajim-otrplugin']
from gajim_otrplugin.potr import context, crypt, proto
from .keystore import Keystore
# Prototype of OTR Channel (secure conversations between Gajim user (Alice) and Gajim peer (Bob)
class OTRChannel(context.Context):
# this method may be called self.sendMessage() when we need to send some data to our <peer> via XMPP
def inject(self,msg,appdata=None):
stanza = Message(to=self.peer, body=msg.decode(), typ='chat')
2022-10-06 00:57:18 +00:00
stanza.setThread(appdata or self.generateThreadId())
self.user.stream.send_stanza(stanza)
# this method called on channel state change
def setState(self,state=0):
if state and state != self.state:
self.getCurrentTrust() is None and self.setCurrentTrust(0) != 0 and self.printl(OTR.TRUSTED[None].format(fprint=self.getCurrentKey())) # new fingerprint
self.printl(OTR.STATUS[state].format(peer=self.peer,trust=OTR.TRUSTED[self.getCurrentTrust()],fprint=self.getCurrentKey())) # state is changed
self.state = state
# print some text to chat window
def printl(self,line):
println = self.user.getControl(self.peer) and self.user.getControl(self.peer).conv_textview.print_conversation_line
println and println("OTR: "+line,kind='status',name='',tim='',**('jid' in signature(println).parameters and {'jid':None} or {}))
@staticmethod
def getPolicy(policy): return OTR.DEFAULT_POLICY.get(policy)
2022-10-06 00:57:18 +00:00
@staticmethod
def generateThreadId():
return ''.join(
[f(string.ascii_letters) for f in itertools.repeat(
random.choice, 32)]
)
# OTR instance for Gajim user (Alice)
class OTR(context.Account):
PROTO = ('XMPP', 1024)
ENCRYPTION_NAME = ('OTR')
DEFAULT_POLICY = {
'REQUIRE_ENCRYPTION': True,
'ALLOW_V1': False,
'ALLOW_V2': True,
'SEND_TAG': True,
'WHITESPACE_START_AKE': True,
'ERROR_START_AKE': True,
}
TRUSTED = {None:"new fingerprint received: *{fprint}*", 0:"untrusted", 1:"trusted", 2:"authenticated"}
STATUS = {
context.STATE_PLAINTEXT: "(re-)starting encrypted conversation with {peer}..",
context.STATE_ENCRYPTED: "{trust} encrypted conversation started (fingerprint: {fprint})",
context.STATE_FINISHED: "encrypted conversation with {peer} closed (fingerprint: {fprint})",
context.UnencryptedMessage: "this message is *not encrypted*: {msg}",
context.NotEncryptedError: "unable to process message (channel lost)",
context.ErrorReceived: "received error message: {err}",
crypt.InvalidParameterError: "unable to decrypt message (key/signature mismatch)",
}
2022-10-06 00:57:18 +00:00
def __init__(self,account):
super(OTR,self).__init__(account,*OTR.PROTO)
2022-10-06 00:57:18 +00:00
self.log = logging.getLogger('gajim.p.otr.otr')
self.account = account
self.stream = app.connections[account]
self.jid = self.stream.get_own_jid()
2022-10-15 17:50:34 +00:00
self.keystore = Keystore(os.path.join(configpaths.get('MY_DATA'), 'otr_' + self.jid.bare + '.db'))
self.loadTrusts()
# get chat control
def getControl(self,peer):
ctl = app.interface.msg_win_mgr.get_control(peer.getStripped(),self.account)
return ctl
# get OTR context (encrypted dialog between Alice and Bob)
def getContext(self,peer):
peer in self.ctxs and self.ctxs[peer].state == context.STATE_FINISHED and self.ctxs.pop(peer).disconnect() # close dead channels
self.ctxs[peer] = self.ctxs.get(peer) or OTRChannel(self,peer)
return self.ctxs[peer]
# load my private key
def loadPrivkey(self):
2022-10-15 17:50:34 +00:00
my = self.keystore.load(jid=str(self.jid))
return (my and my.privatekey) and crypt.PK.parsePrivateKey(bytes.fromhex(my.privatekey))[0] or None
# save my privatekey
def savePrivkey(self):
2022-10-15 17:50:34 +00:00
self.keystore.save(jid=str(self.jid),privatekey=self.getPrivkey().serializePrivateKey().hex())
# load known fingerprints
def loadTrusts(self):
for peer in self.keystore.load(): self.setTrust(peer.jid,peer.fingerprint,peer.trust)
# save known fingerprints
def saveTrusts(self):
for peer,fingerprints in self.trusts.items():
2022-10-15 17:50:34 +00:00
for fingerprint,trust in fingerprints.items(): self.keystore.save(jid=str(peer),fingerprint=fingerprint,trust=trust)
# decrypt message
2022-10-06 00:57:18 +00:00
def decrypt(self,stanza,properties):
2022-10-27 21:35:29 +00:00
peer = stanza.getFrom().new_as_bare()
2022-10-06 00:57:18 +00:00
msgtxt = stanza.getBody()
channel, ctl = self.getContext(peer), self.getControl(peer)
try:
2022-10-06 00:57:18 +00:00
text, tlvs = channel.receiveMessage(msgtxt.encode(),appdata=stanza.getThread()) or b''
except (context.UnencryptedMessage,context.NotEncryptedError,context.ErrorReceived,crypt.InvalidParameterError) as e:
self.log.error("** got exception while decrypting message: %s" % e)
2022-10-06 00:57:18 +00:00
channel.printl(OTR.STATUS[e].format(msg=msgtxt,err=e.args[0].error))
else:
2022-10-15 17:50:34 +00:00
stanza.setBody(text and text.decode() or "")
properties.encrypted = EncryptionData({'name': OTR.ENCRYPTION_NAME})
finally:
if channel.mayRetransmit and channel.state and ctl: channel.mayRetransmit = ctl.send_message(channel.lastMessage.decode())
# encrypt message
def encrypt(self,event,callback):
2022-10-27 21:35:29 +00:00
peer = event.msg_iq.getTo().new_as_bare()
channel, ctl = self.getContext(peer), event.control
if event.xhtml: return ctl.send_message(event.message) # skip xhtml messages
try:
encrypted = channel.sendMessage(context.FRAGMENT_SEND_ALL_BUT_LAST,event.message.encode(),appdata=event.msg_iq.getThread()) or b''
message = (encrypted != self.getDefaultQueryMessage(OTR.DEFAULT_POLICY.get)) and event.message or ""
except context.NotEncryptedError as e:
self.log.error("** got exception while encrypting message: %s" % e)
channel.printl(peer,OTR.STATUS[e])
else:
event.msg_iq.setBody(encrypted.decode()) # encrypted data goes here
event.message = message # message that will be displayed in our chat goes here
event.encrypted, event.additional_data["encrypted"] = OTR.ENCRYPTION_NAME, {"name":OTR.ENCRYPTION_NAME} # some mandatory encryption flags
callback(event)