169 lines
5.9 KiB
Python
169 lines
5.9 KiB
Python
import sys
|
|
import time
|
|
import socket
|
|
import unittest
|
|
|
|
import pyzor
|
|
import pyzor.client
|
|
import pyzor.account
|
|
import pyzor.message
|
|
|
|
def make_MockSocket(response, request):
|
|
"""Create a MockSocket class that will append requests to
|
|
the specified `request` list and return the specified `response`
|
|
"""
|
|
class MockSocket():
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
def settimeout(self, timeout):
|
|
pass
|
|
def recvfrom(self, packetsize):
|
|
return response, ("127.0.0.1", 24441)
|
|
def sendto(self, data, flag, address):
|
|
request.append(data)
|
|
return MockSocket
|
|
|
|
def make_MockThreadId(thread):
|
|
"""Creates a MockThreadId class that will generate
|
|
the specified thread number.
|
|
"""
|
|
class MockThreadId(int):
|
|
def __new__(cls, i):
|
|
return int.__new__(cls, i)
|
|
@classmethod
|
|
def generate(cls):
|
|
return thread
|
|
|
|
def in_ok_range(self):
|
|
return True
|
|
return MockThreadId
|
|
|
|
def mock_sign_msg(hash_key, timestamp, msg):
|
|
return "TestSig"
|
|
|
|
def mock_hash_key(user_key, user):
|
|
return None
|
|
|
|
class ClientTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
unittest.TestCase.setUp(self)
|
|
self.real_sg = pyzor.account.sign_msg
|
|
pyzor.account.sign_msg = mock_sign_msg
|
|
self.real_hk = pyzor.account.hash_key
|
|
pyzor.account.hash_key = mock_hash_key
|
|
self.thread = 33715
|
|
|
|
# the response the mock socket will send
|
|
self.response = "Code: 200\nDiag: OK\nPV: 2.1\nThread: 33715\n\n"
|
|
# the requests send by the client will be stored here
|
|
self.request = []
|
|
# the expected request that the client should send
|
|
self.expected = {"Thread": str(self.thread),
|
|
"PV": str(pyzor.proto_version),
|
|
"User": "anonymous",
|
|
"Time": str(int(time.time())),
|
|
"Sig": "TestSig"}
|
|
|
|
def tearDown(self):
|
|
unittest.TestCase.tearDown(self)
|
|
pyzor.account.sign_msg = self.real_sg
|
|
pyzor.account.hash_key = self.real_hk
|
|
|
|
def check_request(self, request):
|
|
"""Check if the request sent by the client is equal
|
|
to the expected one.
|
|
"""
|
|
req = {}
|
|
request = request.decode("utf8").replace("\n\n", "\n")
|
|
for line in request.splitlines():
|
|
key = line.split(":")[0].strip()
|
|
value = line.split(":")[1].strip()
|
|
req[key] = value
|
|
self.assertEqual(req, self.expected)
|
|
|
|
def check_client(self, accounts, method, *args, **kwargs):
|
|
"""Tests if the request and response are sent
|
|
and read correctly by the client.
|
|
"""
|
|
real_socket = socket.socket
|
|
socket.socket = make_MockSocket(self.response.encode("utf8"),
|
|
self.request)
|
|
|
|
real_ThreadId = pyzor.message.ThreadId
|
|
pyzor.message.ThreadId = make_MockThreadId(self.thread)
|
|
client = pyzor.client.Client(accounts)
|
|
try:
|
|
response = getattr(client, method)(*args, **kwargs)
|
|
self.assertEqual(str(response), self.response)
|
|
self.check_request(self.request[0])
|
|
finally:
|
|
socket.socket = real_socket
|
|
pyzor.message.ThreadId = real_ThreadId
|
|
return client
|
|
|
|
def test_ping(self):
|
|
"""Test the client ping request"""
|
|
self.expected["Op"] = "ping"
|
|
self.check_client(None, "ping")
|
|
|
|
def test_pong(self):
|
|
"""Test the client pong request"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.expected["Op"] = "pong"
|
|
self.expected["Op-Digest"] = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.check_client(None, "pong", digest)
|
|
|
|
def test_check(self):
|
|
"""Test the client check request"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.expected["Op"] = "check"
|
|
self.expected["Op-Digest"] = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.check_client(None, "check", digest)
|
|
|
|
def test_info(self):
|
|
"""Test the client info request"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.expected["Op"] = "info"
|
|
self.expected["Op-Digest"] = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.check_client(None, "info", digest)
|
|
|
|
def test_report(self):
|
|
"""Test the client report request"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.expected["Op"] = "report"
|
|
self.expected["Op-Digest"] = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.expected["Op-Spec"] = "20,3,60,3"
|
|
self.check_client(None, "report", digest)
|
|
|
|
def test_whitelist(self):
|
|
"""Test the client whitelist request"""
|
|
digest = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.expected["Op"] = "whitelist"
|
|
self.expected["Op-Digest"] = "2aedaac999d71421c9ee49b9d81f627a7bc570aa"
|
|
self.expected["Op-Spec"] = "20,3,60,3"
|
|
self.check_client(None, "whitelist", digest)
|
|
|
|
def test_handle_account(self):
|
|
"""Test client handling accounts"""
|
|
test_account = pyzor.account.Account("TestUser", "TestKey", "TestSalt")
|
|
self.expected["Op"] = "ping"
|
|
self.expected["User"] = "TestUser"
|
|
self.check_client({("public.pyzor.org", 24441): test_account}, "ping")
|
|
|
|
def test_handle_invalid_thread(self):
|
|
"""Test invalid thread id"""
|
|
self.thread += 20
|
|
self.expected["Op"] = "ping"
|
|
self.assertRaises(pyzor.ProtocolError, self.check_client, None, "ping")
|
|
|
|
def suite():
|
|
"""Gather all the tests from this module in a test suite."""
|
|
test_suite = unittest.TestSuite()
|
|
test_suite.addTest(unittest.makeSuite(ClientTest))
|
|
return test_suite
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|
|
|