128 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
			
		
		
	
	
			128 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
		
			Executable File
		
	
	
	
	
| #!/usr/bin/env python3
 | |
| # XMPP bot that tests mirror bot
 | |
| import argparse
 | |
| import slixmpp
 | |
| import configparser
 | |
| 
 | |
| CONFIG_PATH = "./testconfig.ini"
 | |
| 
 | |
| VERSION='0.1.0'
 | |
| 
 | |
| # Read test messages from testmsgs.txt
 | |
| ftestmsgs=open("testmsgs.txt")
 | |
| testmsgs=ftestmsgs.readlines()
 | |
| ftestmsgs.close()
 | |
| 
 | |
| # Generate lists with times that test messages are sent and recieved in rooms
 | |
| results1=[]
 | |
| for i in testmsgs:
 | |
|     results1.append([i,0,0])
 | |
| 
 | |
| results2=[]
 | |
| for i in testmsgs:
 | |
|     results2.append([i,0,0])
 | |
| 
 | |
| # Check if all sent messages are recieved
 | |
| def allmsgsrecv():
 | |
|     ret = True
 | |
|     for i in results1:
 | |
|         if i[2] == 0:
 | |
|             return False
 | |
|     for i in results2:
 | |
|         if i[2] == 0:
 | |
|             return False
 | |
| 
 | |
| # Print results and exit
 | |
| def printandexit():
 | |
|     print("Results of test:")
 | |
|     for i in results1:
 | |
|         print(i[0].rstrip + " : " + str(round(i[2]-i[1],2)) + " secounds")
 | |
|     for i in results2:
 | |
|         print(i[0].rstrip + " : " + str(round(i[2]-i[1],2)) + " secounds")
 | |
|     exit()
 | |
| 
 | |
| def show_version():
 | |
|         print("XMPP test bot " + VERSION)
 | |
|         print("License GPLv3+: GNU GPL version 3 or later <https://gnu.org/licenses/gpl.html>")
 | |
|         print("This is free software: you are free to change and redistribute it.")
 | |
|         print("There is NO WARRANTY, to the extent permitted by law.")
 | |
| 
 | |
| 
 | |
| class MUCBot(slixmpp.ClientXMPP):
 | |
| 
 | |
|     def __init__(self, jid, password, nick, room1, room2):
 | |
|         slixmpp.ClientXMPP.__init__(self, jid, password)
 | |
| 
 | |
|         self.nick = nick
 | |
|         self.room1 = room1
 | |
|         self.room2 = room2
 | |
|         self.add_event_handler("session_start", self.start)
 | |
|         self.add_event_handler("groupchat_message", self.muc_message)
 | |
| 
 | |
|     async def start(self, event):
 | |
|         await self.get_roster()
 | |
|         self.send_presence()
 | |
|         self.plugin['xep_0045'].join_muc(self.room1,self.nick)
 | |
|         self.plugin['xep_0045'].join_muc(self.room2,self.nick)
 | |
|         # Send test messages to both rooms
 | |
|         for i in results1:
 | |
|             time.sleep(2)
 | |
|             self.send_message(mto=self.room1,mbody="%s" % i[0],mtype='groupchat')
 | |
|             i[1]=time.time()
 | |
| 
 | |
|         time.sleep(5)
 | |
| 
 | |
|         for i in results2:
 | |
|             time.sleep(2)
 | |
|             self.send_message(mto=self.room2,mbody="%s" % i[0],mtype='groupchat')
 | |
|             i[1]=time.time()
 | |
| 
 | |
|     # When message is recieved find the word in results and record time
 | |
| 
 | |
|     def muc_message(self, msg):
 | |
|         rtime=time.time()
 | |
|         rcpt=msg['from'].bare
 | |
|         body="%(body)s" % msg
 | |
|         if self.room1 in str(msg['from']):
 | |
|             for i in results2:
 | |
|                 if i[0] == body:
 | |
|                     i[2] = rtime
 | |
| 
 | |
|         if self.room2 in str(msg['from']):
 | |
|             for i in results1:
 | |
|                 if i[0] == body:
 | |
|                     i[2] = rtime
 | |
| 
 | |
|         if allmsgsrecv:
 | |
|             printandexit()
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     argparser = argparse.ArgumentParser(description="XMPP bot that mirrors chat between two MUC rooms")
 | |
|     argparser.add_argument("--version", dest="version", help="print version information and exit", action="store_true")
 | |
|     args=argparser.parse_args()
 | |
| 
 | |
|     ## Version argument
 | |
|     if args.version:
 | |
|         show_version()
 | |
|         exit()
 | |
| 
 | |
|     config = configparser.ConfigParser()
 | |
|     config.read(CONFIG_PATH)
 | |
|     JID = config.get('credentials', 'JID')
 | |
|     PASSWORD = config.get('credentials', 'PASSWORD')
 | |
|     NICK = config.get('credentials', 'NICK')
 | |
|     ROOM1 = config.get('credentials', 'ROOM1')
 | |
|     ROOM2 = config.get('credentials', 'ROOM2')
 | |
| 
 | |
| 
 | |
|     xmpp = MUCBot(JID, PASSWORD, NICK, ROOM1, ROOM2)
 | |
|     xmpp.register_plugin('xep_0030') # Service Discovery
 | |
|     xmpp.register_plugin('xep_0045') # Multi-User Chat
 | |
|     xmpp.register_plugin('xep_0199') # XMPP Ping
 | |
| 
 | |
|     # Connect to the XMPP server and start processing XMPP stanzas.
 | |
|     xmpp.connect(address=("localhost",25222))
 | |
|     xmpp.process(forever=False)
 | |
| 
 |