I have an inner class that implement runnable and it makes HTTP requests and then calls the function of outer class to handle the response. The intended behavior is to append all the responses to a List of Objects.
Sample code:
public class Status {
private ExecutorService executor;
CloseableHttpClient httpClient;
List<String> results = new LinkedList<>();
public Status() {
executor = Executors.newFixedThreadPool(5);
httpClient = HttpClients.createDefault();
}
public handleInput(Entity entity) {
String result = EntityUtils.toString(httpEntity);
results.add(result);
}
private class Action implements Runnable {
@Override
public void run() {
try {
//do some http calls
// response = httpClient.execute(httpPost);
handleInput(response.getEntity())
} catch (SomeException e) {
lastResult = false;
}
}
}}
During my testing, I didn't face any issue but I would like to know if it is a thread safe operation to add results from multiple threads to the same linkedlist and if not what is the best design for such scenarios.
No, manipulating a non-thread-safe List
from across threads is not thread-safe.
Synchronized
You could make your handleInput
method synchronized
as commented by Bodewes. Of course, that method becomes a potential bottleneck, as only one thread at a time can be calling a synchronized method. Perhaps not an issue in your case, but be aware.
Another option is to replace your LinkedList
with a thread-safe collection. For example, CopyOnWriteArrayList
, CopyOnWriteArraySet
, or ConcurrentSkipListSet
.
Callable
& Future
But I suggest you not mingle your multi-threaded production of Entity
objects with collecting and processing those objects. Each task assigned to a background thread should “mind its own business” as much as possible. Having the tasks share a list is entangling them across threads needlessly; that kind of entangling (shared resources) should be avoided wherever possible.
Change your task from being a Runnable
to be a Callable
so as to return a value. That return value will be each Entity
produced.
As you submit each Callable
to the executor service, you get back a Future
object. Collect those objects. Through each of those objects you can access the result of each task’s work.
Wait for the executor service to finish all submitted tasks. Then inspect the results of each Future
.
By using Future
objects to collect the results produced by the background threads, and only processing those results after they are all done, we have eliminated the need to make your results
collection thread-safe. The original thread collects those results into a list, rather than each thread adding to the list.
Notice how in this example code below we have no resources shared across the tasks running in background threads. Each task does its own thing, reporting its own result via its particular Future
object. The tasks no longer access a shared List
.
By the way, notice that this example code does not keep the executor service in a member field as does the code in the Question. In our situation here, an executor service should be (1) instantiated, (2) utilized, and (3) shut down all in one action. You must shut down your executor service when no longer needed (or when your app exits). Otherwise its backing thread pool may run indefinitely, like a zombie 🧟♂️.
package work.basil.threading;
import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.*;
public class Status
{
record Entity( UUID uuid , Instant instant ) { }
public List < String > process ()
{
ExecutorService executorService = Executors.newFixedThreadPool( 5 );
List < WebCallTask > tasks = List.of();
try
{
tasks = List.of(
new WebCallTask( new URI( "http://www.Google.com/" ) ) ,
new WebCallTask( new URI( "http://www.DuckDuckGo.com/" ) ) ,
new WebCallTask( new URI( "http://www.Adoptium.net/" ) )
);
} catch ( URISyntaxException e )
{
e.printStackTrace();
}
List < Future < Entity > > futures = List.of();
try { futures = executorService.invokeAll( tasks ); } catch ( InterruptedException e ) { e.printStackTrace(); }
executorService.shutdown();
try { executorService.awaitTermination( 2 , TimeUnit.MINUTES ); } catch ( InterruptedException e ) { e.printStackTrace(); }
List < String > results = new ArrayList <>( tasks.size() );
for ( Future < Entity > future : futures )
{
try
{
Entity entity = future.get();
String result = this.handleInput( entity );
results.add( result );
} catch ( InterruptedException e )
{
e.printStackTrace();
} catch ( ExecutionException e )
{
e.printStackTrace();
}
}
return results;
}
public String handleInput ( Entity entity )
{
if ( Objects.isNull( entity ) ) return "Not Available.";
return entity.toString();
}
private class WebCallTask implements Callable < Entity >
{
private URI uri;
public WebCallTask ( URI uri )
{
this.uri = uri;
}
@Override
public Entity call ()
{
Entity entity = null;
try
{
// Perform some http calls.
// response = httpClient.execute(httpPost);
// Pretend to wait on network call by sleeping.
System.out.println( "Thread: " + Thread.currentThread().getId() + " is sleeping, to pretend doing network call. " + Instant.now() );
try { Thread.sleep( Duration.ofSeconds( ThreadLocalRandom.current().nextInt( 3 , 11 ) ).toMillis() ); } catch ( InterruptedException e ) { e.printStackTrace(); }
entity = new Entity( UUID.randomUUID() , Instant.now() );
System.out.println( "Thread: " + Thread.currentThread().getId() + " produced an `Entity` object. Task done. " + Instant.now() );
} catch ( Exception e ) // In your real code, you would be catching networking errors related to your networkcall.
{
e.printStackTrace();
} finally
{
return entity; // May return `null` as a legitimate value. In real work I would use `Optional< Entity >` here to signal that `null` is a possible and legitimate value. But let's not overcomplicate this example code.
}
}
}
public static void main ( String[] args )
{
System.out.println( "Thread: " + Thread.currentThread().getId() + " is starting demo. " + Instant.now() );
Status statusApp = new Status();
List < String > output = statusApp.process();
System.out.println( "output = " + output );
System.out.println( "Thread: " + Thread.currentThread().getId() + " is ending demo. " + Instant.now() );
}
}
When run.
Thread: 1 is starting demo. 2021-10-09T03:58:41.269177Z
Thread: 15 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.286424Z
Thread: 16 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.286828Z
Thread: 17 is sleeping, to pretend doing network call. 2021-10-09T03:58:41.288108Z
Thread: 16 produced an `Entity` object. Task done. 2021-10-09T03:58:44.323703Z
Thread: 15 produced an `Entity` object. Task done. 2021-10-09T03:58:46.294364Z
Thread: 17 produced an `Entity` object. Task done. 2021-10-09T03:58:46.294269Z
output = [Entity[uuid=04d73a52-79ec-4a61-becb-ce056d3aa9fa, instant=2021-10-09T03:58:46.294359Z], Entity[uuid=cc5a7266-4101-41bb-b806-8b29b77a82d0, instant=2021-10-09T03:58:44.323688Z], Entity[uuid=3cc24ad9-3ea1-4a24-98d0-c3df4bf161b6, instant=2021-10-09T03:58:46.294254Z]]
Thread: 1 is ending demo. 2021-10-09T03:58:46.321313Z