You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

186 lines
7.8 KiB

## Copyright (C) 2008-2012 Kjell Braden <>
## Copyright (C) 2019 Pavel R. <pd at narayana dot im>
# This file is part of Gajim OTR Plugin.
# Gajim OTR Plugin is free software; you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published
# by the Free Software Foundation; version 3 only.
# This software is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU General Public License for more details.
# You can always obtain full license text at <>.
name = 'OTR'
zeroconf = None
ERROR = None
OTR = {
'MMS': 1000,
'ALLOW_V1': False,
'ALLOW_V2': True,
'SEND_TAG': True,
import os
from collections import namedtuple
from nbxmpp.structs import StanzaHandler
from nbxmpp.protocol import Message, JID
from nbxmpp.const import MessageType
from gajim.common import app
from gajim.common import configpaths
from import NetworkEvent
from gajim.common.const import EncryptionData
from gajim.common.modules.base import BaseModule
from gajim.plugins.plugins_i18n import _
from otrplugin.keystore import Keystore
import potr as otr
ERROR = 'Unable to import python-otr (python3-potr)'
# otr channel prototype #
class OTRChannel(otr.context.Context):
global OTR
# init OTR channel here
def __init__(self, account, peer):
super(OTRChannel, self).__init__(account, peer)
self.peer = peer
self.resend = []
self.defaultQuery = account.getDefaultQueryMessage(self.getPolicy)
self.trustName = peer.getBare()
self.println = lambda line: account.otrmodule.get_control(peer).conv_textview.print_conversation_line(line, 'status', '', None)
# send message to jabber network
def inject(self, msg, appdata=None):
stanza = Message(to=self.peer, body=msg.decode(), typ='chat')
stanza.setThread(appdata.get('thread')) if appdata and appdata.get('thread') else None
# we can catch state change here
def setState(self, newstate):
state, self.state = self.state, newstate
if self.getCurrentTrust() is None: # new fingerprint
self.println("OTR: new fingerprint received [%s]" % self.getCurrentKey())
if newstate == otr.context.STATE_ENCRYPTED and state != newstate: # channel established
self.println("OTR: %s encrypted conversation started [%s]" % (self.getCurrentTrust() and 'trusted' or '**untrusted**', self.getCurrentKey()) )
elif newstate == otr.context.STATE_FINISHED and state != newstate: # channel closed
self.println("OTR: encrypted conversation closed")
def getPolicy(self, key):
return OTR['POLICY'][key] if key in OTR['POLICY'] else None
# otr account prototype
class OTRAccount(otr.context.Account):
global OTR
contextclass = OTRChannel
# init otrfor gajim acct
def __init__(self, otrmodule):
super(OTRAccount, self).__init__(otrmodule.jid, OTR['PROTOCOL'], OTR['MMS'])
self.jid = otrmodule.jid
self.keystore = otrmodule.keystore
self.otrmodule = otrmodule
def loadPrivkey(self):
my = self.keystore.load({'jid': str(self.jid)})
return otr.crypt.PK.parsePrivateKey(bytes.fromhex(my.privatekey))[0] if my and my.privatekey else None
def savePrivkey(self):
return{'jid': self.jid, 'privatekey': self.getPrivkey().serializePrivateKey().hex()})
def loadTrusts(self):
for c in self.keystore.load(): self.setTrust(c.jid, c.fingerprint,
def saveTrusts(self):
for jid, keys in self.trusts.items():
for fingerprint, trust in keys.items():{'jid': jid, 'fingerprint': fingerprint, 'trust': trust})
# Module name
class OTRModule(BaseModule):
def __init__(self, con):
BaseModule.__init__(self, con)
self.handlers = [
StanzaHandler(name='message', callback=self.receive_message, priority=9),
self.jid = self._con.get_own_jid() # JID = self._con # XMPP stream
self.keystore = Keystore(os.path.join(configpaths.get('MY_DATA'), 'otr_' + self.jid.getBare())) # Key storage
self.otr = OTRAccount(self) # OTR object
self.channels = {}
self.controls = {}
# get chat control for <peer>
def get_control(self, peer):
return self.controls.setdefault(peer, app.interface.msg_win_mgr.get_control(peer.getBare(), self._account))
# get otr channel for <peer>
def get_channel(self, peer):
self.channels.get(peer) and self.channels[peer].state == otr.context.STATE_FINISHED and self.channels.pop(peer).disconnect()
return self.channels.setdefault(peer, self.otr.getContext(peer))
# receive and decrypt message
def receive_message(self, con, stanza, message):
if message.type != MessageType.CHAT or not stanza.getBody() or stanza.getBody().encode().find(otr.proto.OTRTAG) == -1: return # it is not OTR message
if message.is_mam_message: return stanza.setBody('') # it is OTR message from archive, we can not decrypt it
self._log.debug('got otr message: %s' % stanza) # everything is fine, we can try to decrypt it
channel = self.get_channel(stanza.getFrom())
text, tlvs = channel.receiveMessage(stanza.getBody().encode(), {'thread': stanza.getThread()})
stanza.setBody(text and text.decode() or '')
message.encrypted = EncryptionData({'name':'OTR'})
except otr.context.UnencryptedMessage:
channel.println("OTR: received plain message [%s]" % stanza.getBody())
self._log.error('** got plain text over encrypted channel ** %s' % stanza.getBody())
except otr.context.ErrorReceived as e:
channel.println("OTR: received error [%s]" % e)
self._log.error('** otr error ** %s' % e)
except otr.crypt.InvalidParameterError:
channel.println("OTR: received unreadable message (session expired?)")
self._log.error('** unreadable message **')
except otr.context.NotEncryptedError:
channel.println("OTR: session lost")
self._log.error('** otr channel lost **')
channel.resend and channel.state == otr.context.STATE_ENCRYPTED and channel.sendMessage(otr.context.FRAGMENT_SEND_ALL, **(channel.resend.pop())) # resend any spooled messages
# encrypt and send message
def encrypt_message(self, obj, callback):
self._log.warning('sending otr message: %s ' % obj.msg_iq)
peer = obj.msg_iq.getTo()
channel = self.get_channel(peer)
session = obj.session or
message = obj.message.encode()
encrypted = channel.sendMessage(otr.context.FRAGMENT_SEND_ALL_BUT_LAST, message, appdata = {'thread': session.thread_id}) or b''
encrypted == channel.defaultQuery and channel.resend.append({'msg': message, 'appdata': {'thread': session.thread_id}})
except otr.context.NotEncryptedError:
self._log.warning('message was not sent.')
obj.encrypted = 'OTR'
obj.additional_data['encrypted'] = {'name': 'OTR'}
def get_instance(*args, **kwargs):
return OTRModule(*args, **kwargs), name