javaproducer-consumer

Java how to avoid using Thread.sleep() in a loop


From my main I am starting two threads called producer and consumer. Both contains while(true) loop. Producer loop is UDP Server hence it does not require sleep. My problem is in the Consumer loop. Consumer loop remove the objects from the linked queue and pass it on to a function for further processing. From what researched it is not a good practice to use thread sleep in a loop as at times O/S will not release at end of set time. If I remove thread sleep when the application is ideal it drags CPU to 20 to 30%.

class Producer implements Runnable {
    private DatagramSocket dsocket;
    FError fer = new FError();

    int port =1548;
    ConcurrentLinkedQueue<String> queue;

    Producer(ConcurrentLinkedQueue<String> queue){
        this.queue = queue; 
    }

    @Override
    public void run() {

        try {

            // Create a socket to listen on the port.
            dsocket = new DatagramSocket(port);
            // Create a buffer to read datagrams into.
            byte[] buffer = new byte[30000];
            // Create a packet to receive data into the buffer
            DatagramPacket packet = new DatagramPacket(buffer,
            buffer.length);

            while (true) {
                try {

                   // Wait to receive a datagram
                    dsocket.receive(packet);
                    //Convert the contents to a string,
                    String msg = new String(buffer, 0, packet.getLength());

                    int ltr = msg.length();
                     // System.out.println("MSG =" + msg);

                    if(ltr>4)
                    {

                        SimpleDateFormat sdfDate = new SimpleDateFormat  ("yyyy-MM-dd HH:mm:ss");//dd/MM/yyyy

                        Date now = new Date();
                        String strDate = sdfDate.format(now);

                        //System.out.println(strDate);

                        queue.add(msg + "&" + strDate);

                     // System.out.println("MSG =" + msg);
                    }

                  // Reset the length of the packet before reusing it.
                   packet.setLength(buffer.length);

                } catch (IOException e) {
                    fer.felog("svr class", "producer", "producer thread",e.getClass().getName() + ": " + e.getMessage());
                    dsocket.close();
                    break; 
                }
            }

        } catch (SocketException e) {
          fer.felog("svr class", "producer","Another App using the udp port " + port, e.getClass().getName() + ": " + e.getMessage()); 

        }

    }

}

class Consumer implements Runnable {

    String str;  
    ConcurrentLinkedQueue<String> queue;

    Consumer(ConcurrentLinkedQueue<String> queue) {
        this.queue = queue;  
    }

    @Override
    public void run() {

        while (true) {
            try {

                while ((str = queue.poll()) != null) {

                    call(str);  // do further processing

                   }
            } catch (IOException e) {
                ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
                break;
            }

            try {
                Thread.sleep(500);
            } catch (InterruptedException ex) {

                ferpt.felog("svr class", "consumer","sleep", ex.getClass().getName() + ": " + ex.getMessage());
            }

        }

    }

}

Solution

  • Instead of making Consumer extend Runnable you could change your code to incorporate a ScheduledExecutorService which runs the polling of the queue every half a second instead of making the thread sleep. An example of this would be

    public void schedule() {
        ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
        executor.scheduleAtFixedRate(() -> {
            String str;
            try {
                while ((str = queue.poll()) != null) {
                    call(str);  // do further processing
                }
            } catch (IOException e) {
                ferpt.felog("svr class", "consumer", "consumer thread", e.getClass().getName() + ": " + e.getMessage());
            }
        }, 0, 500, TimeUnit.MILLISECONDS);
    }