148 lines
4.3 KiB
Python
Executable File
148 lines
4.3 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
# XMPP bot that tests mirror bot
|
|
import argparse
|
|
import slixmpp
|
|
import configparser
|
|
import time
|
|
|
|
CONFIG_PATH = "./testconfig.ini"
|
|
|
|
VERSION='0.1.0'
|
|
|
|
# Read test messages from testmsgs.txt
|
|
ftestmsgs=open("testmsgs.txt")
|
|
testmsgs=ftestmsgs.readlines()
|
|
ftestmsgs.close()
|
|
|
|
# Generate lists with times that test messages are sent and recieved in rooms
|
|
results1=[]
|
|
for i in testmsgs:
|
|
results1.append([i,0,0])
|
|
|
|
results2=[]
|
|
for i in testmsgs:
|
|
results2.append([i,0,0])
|
|
|
|
# Check if all sent messages are recieved
|
|
def allmsgsrecv():
|
|
ret = True
|
|
for i in results1:
|
|
if i[2] == 0:
|
|
return False
|
|
for i in results2:
|
|
if i[2] == 0:
|
|
return False
|
|
return ret
|
|
|
|
# Print results and exit
|
|
def printandexit():
|
|
print("Results of test:")
|
|
print("Room1 -> Room2:")
|
|
for i in results1:
|
|
if i[2] == 0:
|
|
print(i[0] + " : Not recieved")
|
|
else:
|
|
print(i[0].rstrip() + " : " + str(round(i[2]-i[1],2)) + " secounds")
|
|
print("Room2 -> Room1:")
|
|
for i in results2:
|
|
if i[2] == 0:
|
|
print(i[0] + " : Not recieved")
|
|
else:
|
|
print(i[0].rstrip() + " : " + str(round(i[2]-i[1],2)) + " secounds")
|
|
exit()
|
|
|
|
def show_version():
|
|
print("XMPP test bot " + VERSION)
|
|
print("License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>")
|
|
print("This is free software: you are free to change and redistribute it.")
|
|
print("There is NO WARRANTY, to the extent permitted by law.")
|
|
|
|
|
|
class MUCBot(slixmpp.ClientXMPP):
|
|
|
|
def __init__(self, jid, password, nick, room1, room2):
|
|
slixmpp.ClientXMPP.__init__(self, jid, password)
|
|
|
|
self.nick = nick
|
|
self.room1 = room1
|
|
self.room2 = room2
|
|
self.add_event_handler("session_start", self.start)
|
|
self.add_event_handler("groupchat_message", self.muc_message)
|
|
|
|
async def start(self, event):
|
|
await self.get_roster()
|
|
self.send_presence()
|
|
self.plugin['xep_0045'].join_muc(self.room1,self.nick)
|
|
self.plugin['xep_0045'].join_muc(self.room2,self.nick)
|
|
# Send test messages to both rooms
|
|
for i in results1:
|
|
time.sleep(2)
|
|
self.send_message(mto=self.room1,mbody="%s" % i[0],mtype='groupchat')
|
|
i[1]=time.time()
|
|
|
|
time.sleep(5)
|
|
|
|
for i in results2:
|
|
time.sleep(2)
|
|
self.send_message(mto=self.room2,mbody="%s" % i[0],mtype='groupchat')
|
|
i[1]=time.time()
|
|
|
|
# When message is recieved find the word in results and record time
|
|
|
|
def muc_message(self, msg):
|
|
rtime=time.time()
|
|
rcpt=msg['from'].bare
|
|
body="%(body)s" % msg
|
|
if self.room1 in str(msg['from']):
|
|
for i in results2:
|
|
if i[0] == body:
|
|
i[2] = rtime
|
|
|
|
if self.room2 in str(msg['from']):
|
|
for i in results1:
|
|
if i[0] == body:
|
|
i[2] = rtime
|
|
|
|
if allmsgsrecv() is True:
|
|
printandexit()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
argparser = argparse.ArgumentParser(description="XMPP bot that mirrors chat between two MUC rooms")
|
|
argparser.add_argument("--version", dest="version", help="print version information and exit", action="store_true")
|
|
args=argparser.parse_args()
|
|
|
|
## Version argument
|
|
if args.version:
|
|
show_version()
|
|
exit()
|
|
|
|
config = configparser.ConfigParser()
|
|
config.read(CONFIG_PATH)
|
|
JID = config.get('credentials', 'JID')
|
|
PASSWORD = config.get('credentials', 'PASSWORD')
|
|
NICK = config.get('credentials', 'NICK')
|
|
ROOM1 = config.get('credentials', 'ROOM1')
|
|
ROOM2 = config.get('credentials', 'ROOM2')
|
|
HOST = None
|
|
PORT = 5222
|
|
if "host" in config.options('credentials'):
|
|
HOST = config.get('credentials', 'HOST')
|
|
if "port" in config.options('credentials'):
|
|
PORT = config.get('credentials', 'PORT')
|
|
|
|
|
|
|
|
xmpp = MUCBot(JID, PASSWORD, NICK, ROOM1, ROOM2)
|
|
xmpp.register_plugin('xep_0030') # Service Discovery
|
|
xmpp.register_plugin('xep_0045') # Multi-User Chat
|
|
xmpp.register_plugin('xep_0199') # XMPP Ping
|
|
|
|
# Connect to the XMPP server and start processing XMPP stanzas.
|
|
if HOST != None:
|
|
xmpp.connect(address=(HOST,int(PORT)))
|
|
else:
|
|
xmpp.connect()
|
|
xmpp.process(forever=False)
|
|
|