I am trying to block the concurrent access to a method , and i didn t succeeded whatever i tried.I tried to sinchronize the method by class (by using synchronized(this) or synchronized at method level ) ,i even synchronized all methods involved , same result .
I have a connection to a websocket server, and from time to time , i reconnect . On the @ClientEndpoint class , @OnClose method , i am instantly reconnection the session.I expected that the method which is called from @OnClose event will wait until synchronized block from reconnectSession will end the execution.
When running this example ,you will probably see following logs :
init ws connections ended
RECCONECTING TO BINANCE WS STARTED
sessionWrapper instance com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa from thread Thread-21
RECCONECTING SESSION com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa started
session with id 0 closed with code NORMAL_CLOSURE with reason phrase
sessionWrapper instance com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa from thread Thread-21
RECCONECTING SESSION com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa started
RECCONECTING SESSION com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa ended
RECCONECTING SESSION com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa ended
RECCONECTING TO BINANCE WS ENDED
I expected to see
RECCONECTING SESSION com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa started
RECCONECTING SESSION com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa ended
and not the start twice.
I replicated the important part in this test example :
First, the main class :
package com.mycompany.binancereconnect;
import jakarta.websocket.ContainerProvider;
import jakarta.websocket.WebSocketContainer;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
/**
*
* @author utza
*/
public class TestSynchronization {
public List<WsSessionWrapper> wsSessions = new ArrayList<>();
public static String BASE_SPOT_SOCKET_URL = "wss://stream.binance.com:443/ws";
public void init() {
try {
for (WsSessionWrapper sessionWrapper : wsSessions) {
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
if (sessionWrapper.getType().equals("SPOT")) {
sessionWrapper.setSession(container.connectToServer(new BinanceSpotMessageReceiver(this),
new URI(
BASE_SPOT_SOCKET_URL)));
}
}
System.out.println(" init ws connections ended ");
} catch (Exception ex) {
ex.printStackTrace();
}
}
public void reconnectSession(WsSessionWrapper sessionWrapper) {
System.out.println("sessionWrapper instance " + sessionWrapper + " from thread " + Thread.currentThread().getName());
synchronized (sessionWrapper) {
if (sessionWrapper.allowedToReconnect()) {
try {
System.out.println("RECCONECTING SESSION " + sessionWrapper + " started ");
try {
if (sessionWrapper.getSession() != null) {
sessionWrapper.getSession().close();
}
} catch (Exception ex) {
ex.printStackTrace();
}
sessionWrapper.setSession(null);
WebSocketContainer container = ContainerProvider.getWebSocketContainer();
if (sessionWrapper.getType().equals("SPOT")) {
sessionWrapper.setSession(container.connectToServer(new BinanceSpotMessageReceiver(this),
new URI(
BASE_SPOT_SOCKET_URL)));
}
System.out.println("RECCONECTING SESSION " + sessionWrapper + " ended ");
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
public void fullReconnect() {
// aici;to also take into account no transactions on btc
System.out.println("RECCONECTING TO BINANCE WS STARTED ");
for (WsSessionWrapper sessionWrapper : wsSessions) {
reconnectSession(sessionWrapper);
}
System.out.println("RECCONECTING TO BINANCE WS ENDED ");
}
public void reconnectSessionBySessionId(String sessionId) {
for (WsSessionWrapper sessionWrapper : wsSessions) {
if (sessionWrapper.getSession() != null && sessionId.equals(sessionWrapper.getSession().getId())) {
reconnectSession(sessionWrapper);
}
}
}
public static void main(String[] args) throws InterruptedException {
TestSynchronization testSynchronization = new TestSynchronization();
WsSessionWrapper s1 = new WsSessionWrapper("SPOT");
s1.setId("1");
testSynchronization.wsSessions.add(s1);
testSynchronization.init();
Thread t1 = new Thread(() -> {
testSynchronization.fullReconnect();
});
t1.start();
Thread.sleep(100000);
}
}
Second , the messageReceiver class , where onclose i attempt to reconnect :
package com.mycompany.binancereconnect;
import jakarta.websocket.ClientEndpoint;
import jakarta.websocket.CloseReason;
import jakarta.websocket.OnClose;
import jakarta.websocket.OnMessage;
import jakarta.websocket.Session;
import java.io.IOException;
import java.util.Date;
/**
*
* @author utza
*/
@ClientEndpoint()
public class BinanceSpotMessageReceiver {
public BinanceSpotMessageReceiver(TestSynchronization socketService) {
this.socketService = socketService;
}
private TestSynchronization socketService;
private Date lastTimeTradeEventReceived = new Date();
@OnClose
public void onClose(Session session, CloseReason closeReason) {
if (closeReason != null) {
System.out.println(" session with id " + session.getId() + " closed with code " + closeReason.getCloseCode() + " with reason phrase " + closeReason.getReasonPhrase());
} else {
System.out.println(" session with id " + session.getId() + " closed with close reason null ");
}
socketService.reconnectSessionBySessionId(session.getId());
}
@OnMessage
public void onMessage(Session session,
String message) throws IOException {
System.out.println("received message from binance " + message);
}
}
And the last java class is the wrapper object :
package com.mycompany.binancereconnect;
import jakarta.websocket.Session;
import java.util.Calendar;
import java.util.Date;
/**
*
* @author utza
*/
public class WsSessionWrapper {
private String id;
private Session session;
private Date lastReconnectionTime;
private String type;
public WsSessionWrapper(String type) {
this.type = type;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Session getSession() {
return session;
}
public void setSession(Session session) {
this.session = session;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public Date getLastReconnectionTime() {
return lastReconnectionTime;
}
public void setLastReconnectionTime(Date lastReconnectionTime) {
this.lastReconnectionTime = lastReconnectionTime;
}
//is allowed to reconnect if 1 minute passed from last reconnection
public Boolean allowedToReconnect() {
if (lastReconnectionTime == null || addMinutes(lastReconnectionTime, 1).compareTo(new Date()) < 0) {
return true;
}
return false;
}
public static Date addMinutes(Date date, int minutes) {
Calendar cal = Calendar.getInstance();
cal.setTime(date);
cal.add(Calendar.MINUTE, minutes);
return cal.getTime();
}
}
You might also find usefull the dependencies :
<dependencies>
<dependency>
<groupId>jakarta.websocket</groupId>
<artifactId>jakarta.websocket-client-api</artifactId>
<version>2.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-websocket</artifactId>
<version>10.1.7</version>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.10.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.java-websocket/Java-WebSocket -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.1</version>
</dependency>
</dependencies>
Any help would be appreciated. Thanks in advance
I think what you're seeing is the onClose
method being executed when this line is reached in the reconnectSession
method:
sessionWrapper.getSession().close();
This creates a nested or reentrant call to reconnectSession
, not a concurrent one. The clue in the logs is that the same thread name is printed both times:
sessionWrapper instance com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa from thread Thread-21
sessionWrapper instance com.mycompany.binancereconnect.WsSessionWrapper@7c4b13aa from thread Thread-21
In Java, synchronized blocks are always reentrant — in other words, a thread that already holds a lock can reacquire the same lock. This is a good thing: if the nested call to reconnectSession
were to block instead, it would never become unblocked since the outer call would be unable to proceed.
One way to handle this might be to split reconnectSession
into two methods: one that disconnects the session, and another that connects it again. The method that reestablishes the connection would only be called from the onClose
handler, which in turn would be triggered when the method that disconnects the session calls sessionWrapper.getSession().close()
.