pythonmultithreadingsocketsclient-serverpython-multithreading

Custom ThreadPool and Request Queue


I am currently working on implementing a toy store server using Python, employing socket connections and a custom thread pool for handling concurrent client requests. However, I'm facing some challenges with managing concurrent requests efficiently.

Server Component: The server listens for incoming client requests over sockets. It takes the name of a toy as an argument and returns the dollar price of the item if it's in stock. If the item is not found, it returns -1, and if the item is found but not in stock, it returns 0.

A custom thread pool is implemented to handle multiple client connections concurrently.

Client Component:

The client connects to the server using a socket connection and issues random toy requests.

I've implemented a custom thread pool to manage client requests, but I'm unsure if it's handling concurrent requests optimally. The server does not receive any further requests after the first client request.

Server code:

import socket
import json
from threading import Thread, Lock
from collections import deque

class Server():
    def __init__(self, host, port, num_threads):
        self.items = {
        "tux": {
            "qty": 100,
            "cost": 25.99
        },
        "whale": {
            "qty": 100,
            "cost": 19.99
        }
    }
        self.lock = Lock()
        self.request_queue = deque([])
        self.thread_pool = [Thread(target=self.serve_request) for _ in range(num_threads)]
        self.s = socket.socket()
        self.s.bind((host, port))
        print("socket binded to port", port)
        self.s.listen(5)
        print("socket is listening")

    def get_item(self, item):
        with self.lock:
            if item not in self.items:
                return -1
            if self.items[item]['qty'] == 0:
                return 0
            self.items[item]['qty'] -= 1
            return self.items[item]['cost']

    def add_request(self, req):
        data = req.recv(4096)
        data = json.loads(data.decode('utf-8'))
        cost = self.get_item(data['query'])
        self.request_queue.append([req, cost])

    def serve_request(self):
        while True:
            if self.request_queue:
                req, cost = self.request_queue.popleft()
                print("cost: ", cost)
                req.send(str(cost).encode('utf-8'))

    def run(self):
        req, addr = self.s.accept()
        for thread in self.thread_pool:
            thread.start()
        while True:
            self.add_request(req)
            print("request_queue: ", self.request_queue)

host = "127.0.0.1"
port = 12345
server = Server(host, port, 100)
server.run()

Client code:

import socket
import json
import random
def main():
    host = "127.0.0.1"
    port = 12345
    s = socket.socket()
    s.connect((host, port))
    while True:
        toys = ["tux", "whale"]
        choice = random.choice(toys)
        message = {"query": str(choice)}
        serialzed_message = json.dumps(message)
        print("requesting: ", choice)
        s.send(serialzed_message.encode('utf-8'))
        data = s.recv(4096)
        print("Server replied: {}".format(str(data.decode('utf-8'))))

if __name__ == "__main__":
    main()

Solution

  • Let's look at the client. You need to create a new socket for each request since once the server processes a request and returns a response it "forgets" the client's socket. In this case the client socket should first be closed.

    Client Code

    import socket
    import json
    import random
    
    def main():
        host = "127.0.0.1"
        port = 12345
        # Just run 5 requests and then terminate:
        for _ in range(5):
            s = socket.socket()
            s.connect((host, port))
            with s:
                toys = ["tux", "whale"]
                choice = random.choice(toys)
                message = {"query": choice}
                serialzed_message = json.dumps(message)
                print("requesting: ", choice)
                s.send(serialzed_message.encode('utf-8'))
                data = s.recv(4096)
                print("Server replied: {}".format(str(data.decode('utf-8'))))
    
    if __name__ == "__main__":
        main()
    

    As far as the server goes, it is not handling requests correctly because you are only issuing req, addr = self.s.accept() once. Function run should be:

        def run(self):
            for thread in self.thread_pool:
                thread.start()
            while True:
                req, addr = self.s.accept()
                with req:  # So that the client socket is closed automatically
                    self.add_request(req)
                    print("request_queue: ", self.request_queue)
    

    You also have all your request-handling server threads in a CPU-bound loop checking for new requests. You could have/should have used threading.Condition instances to wait for a non-empty queue. But far simpler is to just use a multithreading pool. Note that I have also allowed for termination of the server by hitting the Enter key.

    Server Code

    import socket
    import json
    from threading import Lock
    from multiprocessing.pool import ThreadPool
    
    class Server():
        def __init__(self, host, port, num_threads):
            self.items = {
            "tux": {
                "qty": 100,
                "cost": 25.99
            },
            "whale": {
                "qty": 100,
                "cost": 19.99
            }
        }
            self.lock = Lock()
            self.thread_pool = ThreadPool(num_threads)
    
        def get_item(self, item):
            with self.lock:
                if item not in self.items:
                    return -1
                if self.items[item]['qty'] == 0:
                    return 0
                self.items[item]['qty'] -= 1
                return self.items[item]['cost']
    
        def serve_request(self, client_socket):
            data = client_socket.recv(4096)
            data = json.loads(data.decode('utf-8'))
            cost = self.get_item(data['query'])
            print("cost: ", cost)
            with client_socket:
                client_socket.send(str(cost).encode('utf-8'))
    
        def server(self):
            self.s = socket.socket()
            self.s.bind((host, port))
            print("socket binded to port", port)
            self.s.listen(5)
            print("socket is listening")
    
            with self.s as server_socket:
                while True:
                    client_socket, _ = server_socket.accept()
                    self.thread_pool.apply_async(self.serve_request, args=(client_socket,))
                    print('request queued')
    
        def run(self):
            # Run actual server in the pool so that we can wait for Enter key:
            self.thread_pool.apply_async(self.server)
            input('Hit Enter to terminate the server...\n\n')
            # Kill all outstanding requests
            print('Terminating...')
            self.thread_pool.terminate()
    
    
    host = "127.0.0.1"
    port = 12345
    server = Server(host, port, 20)
    server.run()
    

    Update

    If you do not want to use a multithreading pool, then your original code should be modified to use a Condition instance so that there is no wasted CPU cycles checking an empty queue:

    Server Code

    import socket
    import json
    from threading import Thread, Lock, Condition
    from collections import deque
    
    class Server():
        def __init__(self, host, port, num_threads):
            self.items = {
            "tux": {
                "qty": 100,
                "cost": 25.99
            },
            "whale": {
                "qty": 100,
                "cost": 19.99
            }
        }
            self.lock = Lock()
            self.request_queue = deque()
            self.thread_pool = [Thread(target=self.serve_request) for _ in range(num_threads)]
            self.running = True
            self.request_queue_condition = Condition()
    
        def get_item(self, item):
            with self.lock:
                if item not in self.items:
                    return -1
                if self.items[item]['qty'] == 0:
                    return 0
                self.items[item]['qty'] -= 1
                return self.items[item]['cost']
    
        def serve_request(self):
            while True:
                with self.request_queue_condition:
                    # Wait for at least one request on queue or we are no longer running:
                    self.request_queue_condition.wait_for(
                        lambda: not self.running or self.request_queue
                    )
                    if not self.running:
                        return
                    client_socket = self.request_queue.popleft()
    
                data = client_socket.recv(4096)
                data = json.loads(data.decode('utf-8'))
                cost = self.get_item(data['query'])
                print("cost: ", cost)
                with client_socket:
                    client_socket.send(str(cost).encode('utf-8'))
    
        def server(self):
            self.s = socket.socket()
            self.s.bind((host, port))
            print("socket binded to port", port)
            self.s.listen(5)
            print("socket is listening")
    
            with self.s as server_socket:
                while True:
                    client_socket, _ = server_socket.accept()
                    if not self.running:
                        return
                    with self.request_queue_condition:
                        self.request_queue.append(client_socket)
                        self.request_queue_condition.notify(1) # Wake up one thread
                    print('request queued')
    
        def run(self):
            for thread in self.thread_pool:
                thread.start()
    
            # Run actual server in the pool so that we can wait for Enter key:
            Thread(target=self.server, daemon=True).start()
            input('Hit Enter to terminate the server...\n\n')
            # Wait for current requests to terminate
            print('Terminating...')
            with self.request_queue_condition:
                self.running = False
                self.request_queue_condition.notify_all()
    
    host = "127.0.0.1"
    port = 12345
    server = Server(host, port, 20)
    server.run()
    

    Client Code

    For a better test, we are multithreading this:

    import socket
    import json
    import random
    from multiprocessing.pool import ThreadPool
    
    def make_request():
        host = "127.0.0.1"
        port = 12345
    
        s = socket.socket()
        s.connect((host, port))
        with s:
            toys = ["tux", "whale"]
            choice = random.choice(toys)
            message = {"query": choice}
            serialzed_message = json.dumps(message)
            print("requesting: ", choice)
            s.send(serialzed_message.encode('utf-8'))
            data = s.recv(4096)
            print("Server replied: {}".format(str(data.decode('utf-8'))))
    
    def main():
        pool = ThreadPool(5)
        for _ in range(5):
            pool.apply_async(make_request)
        # Wait for all tasks to complete:
        pool.close()
        pool.join()
    
    if __name__ == "__main__":
        main()
    

    Prints:

    requesting:  tux
    requesting:  whale
    requesting:  whale
    requesting:  whale
    requesting:  tux
    Server replied: 25.99
    Server replied: 19.99
    Server replied: 25.99
    Server replied: 19.99
    Server replied: 19.99