# -*- coding: utf-8 -*-
import logging
import mimetypes
import os
import requests
from matrix_client.client import MatrixClient
from matrix_client.client import MatrixRequestError
from requests import ConnectionError
from .helper import download_file
from .helper import pprint_json
logger = logging.getLogger('matrigram')
[docs]class MatrigramClient(object):
def __init__(self, server, tb, username):
self.client = MatrixClient(server)
self.tb = tb
self.token = None
self.server = server
self.username = username
self.focus_room_id = None
self.msg_type_router = {
'm.image': self.forward_image_to_tb,
'm.audio': self.forward_voice_to_tb,
'm.video': self.forward_video_to_tb,
'm.emote': self.forward_emote_to_tb,
}
self.room_listener_uid = None
self.ephemeral_listener_uid = None
def login(self, username, password):
try:
self.token = self.client.login_with_password(username, password)
logger.info('token = %s', self.token)
rooms = self.get_rooms_aliases()
if rooms:
# set focus room to "first" room
self.set_focus_room(rooms.keys()[0])
self.client.add_invite_listener(self.on_invite_event)
self.client.add_leave_listener(self.on_leave_event)
self.client.start_listener_thread()
return True, "OK"
except MatrixRequestError:
return False, "Failed to login"
except ConnectionError:
return False, "Server is offline"
def logout(self):
self.client.logout()
def on_event(self, _, event):
logger.debug('entered with message %s', pprint_json(event))
sender = event['sender'].split(':')[0].encode('utf-8')
# Prevent messages loopback
if sender.startswith(u'@{}'.format(self.username)):
return
if event['type'] == "m.room.message":
msgtype = event['content']['msgtype']
if msgtype != 'm.text':
callback = self.msg_type_router.get(msgtype)
if callback:
callback(event)
return
content = event['content']['body'].encode('utf-8')
self.tb.send_message(sender, content, self)
elif event['type'] == "m.room.topic":
topic = event['content']['topic'].encode('utf-8')
self.tb.send_topic(sender, topic, self)
def on_ephemeral_event(self, _, ee):
logger.debug(pprint_json(ee))
if ee['type'] == 'm.typing':
if ee['content']['user_ids']:
self.tb.start_typing_thread(self)
else:
self.tb.stop_typing_thread(self)
def on_leave_event(self, room_id, le):
logger.debug(pprint_json(le))
if le['timeline']['events'][0]['sender'] != le['timeline']['events'][0]['state_key']:
self.tb.send_kick(self._room_id_to_alias(room_id), self)
def on_invite_event(self, _, ie):
logger.debug('invite event %s', pprint_json(ie))
room_name = None
for event in ie['events']:
if event['type'] == 'm.room.name':
room_name = event['content']['name']
if room_name:
self.tb.send_invite(self, self._room_id_to_alias(room_name))
def join_room(self, room_id_or_alias):
try:
self.client.join_room(room_id_or_alias)
self.set_focus_room(room_id_or_alias)
return True
except MatrixRequestError:
return False
def leave_room(self, room_id_or_alias):
room = self.get_room_obj(room_id_or_alias)
if not room:
logger.error('cant find room')
return False
if self.focus_room_id == room.room_id:
rooms = self.get_rooms_aliases()
room_id = self._room_alias_to_id(room_id_or_alias)
del rooms[room_id]
new_focus_room = rooms.keys()[0] if rooms else None
self.set_focus_room(new_focus_room)
return room.leave()
def set_focus_room(self, room_id_or_alias):
if self._room_alias_to_id(room_id_or_alias) == self.focus_room_id:
return
# remove current focus room listeners
if self.focus_room_id is not None:
room_obj = self.get_room_obj(self.focus_room_id)
room_obj.remove_listener(self.room_listener_uid)
self.room_listener_uid = None
room_obj.remove_ephemeral_listener(self.ephemeral_listener_uid)
self.ephemeral_listener_uid = None
logger.info("remove focus room %s", self.focus_room_id)
self.focus_room_id = None
# set new room on focus
if room_id_or_alias is not None:
self.focus_room_id = self._room_alias_to_id(room_id_or_alias)
room_obj = self.get_room_obj(self.focus_room_id)
self.room_listener_uid = room_obj.add_listener(self.on_event)
self.ephemeral_listener_uid = room_obj.add_ephemeral_listener(self.on_ephemeral_event)
logger.info("set focus room to %s", self.focus_room_id)
def get_focus_room_alias(self):
return self._room_id_to_alias(self.focus_room_id)
def have_focus_room(self):
return self.focus_room_id is not None
def get_members(self):
room_obj = self.get_room_obj(self.focus_room_id)
rtn = room_obj.get_joined_members()
return [member['displayname'] for _, member in rtn.items() if member.get('displayname')]
def set_name(self, name):
user = self.client.get_user(self.client.user_id)
user.set_display_name(name)
def emote(self, body):
room_obj = self.get_room_obj(self.focus_room_id)
room_obj.send_emote(body)
def create_room(self, alias, is_public=False, invitees=()):
try:
room_obj = self.client.create_room(alias, is_public, invitees)
room_obj.update_aliases()
logger.debug('room_id %s', room_obj.room_id)
logger.debug('room_alias %s', room_obj.aliases[0])
return (room_obj.room_id, room_obj.aliases[0])
except MatrixRequestError:
logger.error('error creating room')
return None, None
def backfill_previous_messages(self, limit=10):
room_obj = self.get_room_obj(self.focus_room_id)
room_obj.backfill_previous_messages(limit=limit)
def get_rooms_aliases(self):
# returns a dict with id: room obj
rooms = self._get_rooms_updated()
if not rooms:
return rooms
logger.debug("rooms got from server are %s", rooms)
# return dict with id: list of aliases or id (if no alias exists)
return {key: val.aliases if val.aliases else [key] for (key, val) in rooms.items()}
[docs] def get_room_obj(self, room_id_or_alias):
"""Get room object of specific id or alias.
Args:
room_id_or_alias (str): Room's id or alias.
Returns (Room): Room object corresponding to room_id_or_alias.
"""
rooms = self._get_rooms_updated()
room_id = self._room_alias_to_id(room_id_or_alias)
return rooms.get(room_id)
def send_message(self, msg):
room_obj = self.get_room_obj(self.focus_room_id)
if not room_obj:
logger.error('cant find room')
else:
room_obj.send_text(msg)
def send_photo(self, path):
with open(path, 'rb') as f:
mxcurl = self.client.upload(f.read(), mimetypes.guess_type(path)[0])
room_obj = self.get_room_obj(self.focus_room_id)
if not room_obj:
logger.error('cant find room')
else:
room_obj.send_image(mxcurl, os.path.split(path)[1])
def send_voice(self, path):
with open(path, 'rb') as f:
mxcurl = self.client.upload(f.read(), mimetypes.guess_type(path)[0])
room_obj = self.get_room_obj(self.focus_room_id)
room_obj.send_audio(mxcurl, os.path.split(path)[1])
def send_video(self, path):
with open(path, 'rb') as f:
mxcurl = self.client.upload(f.read(), mimetypes.guess_type(path)[0])
room_obj = self.get_room_obj(self.focus_room_id)
room_obj.send_video(mxcurl, os.path.split(path)[1])
def discover_rooms(self):
res = requests.get('{}/_matrix/client/r0/publicRooms?limit=20'.format(self.server))
res_json = res.json()
room_objs = res_json['chunk']
return [room['aliases'][0] for room in room_objs if room.get('aliases')]
def download_from_event(self, event):
mxcurl = event['content']['url']
link = self.client.api.get_download_url(mxcurl)
media_id = mxcurl.split('/')[3]
media_type = event['content']['info']['mimetype'].split('/')[1]
path = os.path.join(self.tb.config['media_dir'], '{}.{}'.format(media_id, media_type))
download_file(link, path)
return path
def forward_image_to_tb(self, event):
sender = event['sender'].split(':')[0].encode('utf-8')
path = self.download_from_event(event)
self.tb.send_photo(sender, path, self)
def forward_voice_to_tb(self, event):
sender = event['sender'].split(':')[0].encode('utf-8')
path = self.download_from_event(event)
self.tb.send_voice(sender, path, self)
def forward_video_to_tb(self, event):
sender = event['sender'].split(':')[0].encode('utf-8')
path = self.download_from_event(event)
self.tb.send_video(sender, path, self)
def forward_emote_to_tb(self, event):
sender = event['sender'].split(':')[0].encode('utf-8')
content = event['content']['body'].encode('utf-8')
self.tb.send_emote(sender, content, self)
[docs] def _room_id_to_alias(self, id):
"""Convert room id to alias.
Args:
id (str): Room id.
Returns (str): Room alias.
"""
if id is None:
return None
if id.startswith('#'):
return id
rooms = self.get_rooms_aliases()
if id in rooms:
return rooms[id][0]
else:
return None
[docs] def _room_alias_to_id(self, alias):
"""Convert room alias to id.
Args:
alias (str): Room alias.
Returns (str): Room id.
"""
if alias is None:
return None
if not alias.startswith('#'):
return alias
return self.client.api.get_room_id(alias)
[docs] def _get_rooms_updated(self):
"""Return rooms dictionary with updated aliases.
Returns (dict): Return rooms dictionary with updated aliases.
"""
rooms = self.client.get_rooms()
for room in rooms.values():
room.update_aliases()
return rooms