Web Technologies to do notifications
Imagine you want to develop a mini real-time chat application using 2 web clients, connected to a web server.
There are several possibilities to receive updates:
Old-school: explicit Reload button...
Long polling with timer to auto-refresh
WebSocket
Server-Sent-Event
WebSocket
WebSocket is the most famous, but is a little overkill.
It is protocol negociation between the browser and the server to change the Http connection to a TCP connection,
and re-use it as a dual-channel binary communication.
On a LAN it works great… On a WAN with HTTP Firewalls, Http Proxies and Http LoadBalancer (Stateless Non Session Sticky), it is much more difficult.
SSE : Server-Sent-Event
Server-Sent-Event are much simpler.
It is a regular Http GET request with media type “text-stream”, which will read a stream of events as single line text,
each line starting as “data: …”, “id: “, “event: “, or “retry: “.
Every “data: “ is interpreted as a message event, which trigger a client-side javascript callback method.
var eventSource = new EventSource ( "/chat/room/Default/sse" );
eventSource . onmessage = function ( event ) {
var eventData = JSON . parse ( event . data );
console . log ( "received event data" , eventData );
};
Typical streams of events for a chat:
event: chat
data: {"id":"123", "from":"bobby", "msg":"Hello John", "chatRoom":"Default", "time":"02:33:48" }
id: 124
data: {"id":"124", "from":"john", "msg":"Hi Bob", "chatRoom":"Default", "time": "02:40:10" }
id: 125
data: {"id":"125", "from": "bobby", "msg":"How are you?", "chatRoom":"Default", "time": "02:40:15" }
Looking at network level, it is plain-old Http protocol: it keep the Http socket connection opened for a while, and read results until deconnection.
The server will close its connection, typically after 30s (default on tomcat for example),
and the client simply re-open a new one to possibly another server,
and re-establish a query passing the “last-event-id” Http header attribute to get newly emitted events since the last one already received.
To test it, simply launch a curl command
curl -H 'Accept: text/event-stream' http://localhost:8080/chat/room/Default/sse
data: { "id" : "1" , ..}
data: { "id" : "2" , ..}
data: { "id" : "3" , ..}
..
... ommited: receive all ( recent) known events from server
..
data: { "id" :"123" , .. }
... waiting reading more
... connection closed after 30s
So more interrestingly, add the header “last-event-id”, supposing you have already listen up to id “123”:
curl -H 'Accept: text/event-stream' -H 'last-event-id: 123' http://localhost:8080/chat/room/Default/sse
data: { "id" : "124" , ..}
data: { "id" : "125" , ..}
... waiting reading more
... connection closed after 30s
Server implementation using springboot Spring4 : SseEmitter
@RestController
@RequestMapping ( "/app" )
public class MyRestController {
private static final Logger LOG = LoggerFactory . getLogger ( MyRestController . class );
@Autowired
private ChatHistoryService chatHistoryService ;
...
@GetMapping ( path = "/chat/room/{chatRoom}/sseSpring4" , produces = "text/event-stream" )
public SseEmitter subscribeChatMessages (
@PathVariable ( "chatRoom" ) String chatRoom ,
@RequestHeader ( name = "last-event-id" , required = false ) String lastEventId
) {
LOG . info ( "subscribeMessagesSpring4 lastEventId:" + lastEventId );
ChatRoomEntry chatRoomEntry = chatHistoryService . getChatRoom ( chatRoom );
return chatRoomEntry . subscribe ( lastEventId );
}
// handle normal "Async timeout", to avoid logging warn messages every 30s per client...
@ExceptionHandler ( value = AsyncRequestTimeoutException . class )
public String asyncTimeout ( AsyncRequestTimeoutException e ){
return null ; // "SSE timeout..OK";
}
}
...
public class ChatRoomEntry {
private List < SseEmitter > emitters = new ArrayList <>();
@Override
public void onPostMessage ( ChatMessageEntry msg ) {
if ( emitters . isEmpty ()) {
return ;
}
SseEventBuilder evtBuilder = SseEmitter . event ()
. id ( Integer . toString ( msg . id ))
. name ( "chat" )
. data ( msg );
for ( SseEmitter emitter : emitters ) {
try {
emitter . send ( evtBuilder );
} catch ( IOException ex ) {
LOG . error ( "Failed to send msg to emitter" , ex );
}
}
}
public SseEmitter subscribe ( String lastEventIdText ) {
SseEmitter emitter = new SseEmitter ();
Integer lastId = lastEventIdText != null ? Integer . parseInt ( lastEventIdText ) : null ;
List < ChatMessageEntry > replayMsgs = ( lastId != null )?
chatRoom . listMessagesSinceLastId ( lastId ) : chatRoom . listMessages ();
for ( ChatMessageEntry msg : replayMsgs ) {
if ( lastId != null && msg . id <= lastId ) {
continue ;
}
try {
emitter . send ( msg );
} catch ( IOException ex ) {
LOG . error ( "Failed to re-send msg to emitter, ex:" , ex . getMessage () + " => complete with error ... remove,disconnect" );
emitter . completeWithError ( ex );
}
}
// note: should synchronised replayMessage ... add()!
emitters . add ( emitter );
emitter . onCompletion (() -> {
LOG . debug ( "onCompletion -> remove emitter" ); // log as DEBUG... logged every 30s per client!
emitters . remove ( emitter );
});
emitter . onTimeout (() -> {
LOG . debug ( "onTimeout -> remove emitter" ); // log as DEBUG... logged every 30s per client!
emitters . remove ( emitter );
});
return emitter ;
}
Notice that you can change the configuration timeout which is 30s by default: edit file config/application.yaml
spring :
mvc :
async :
# default: 30 seconds... should be higher..
# for test only= > 15s!
request-timeout : 15000
Server implementation using springboot Spring5 : Flux<ServerSentEvent<..>>
Spring5 is currently in SNAPSHOT as of 2017-05, you need to use maven SNAPSHOT versions (+maven SNAPSHOTS repositories).
See from https://start.spring.io/ , by choosing the top-level right combo-box version “with springboot version:2.0.0.BUILD-SNAPSHOT” instead of “1.5.3”
<parent>
<groupId> org.springframework.boot</groupId>
<artifactId> spring-boot-starter-parent</artifactId>
<version> 2.0.0.BUILD-SNAPSHOT</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
..
<dependencies>
..
<dependency>
<groupId> org.springframework.boot</groupId>
<artifactId> spring-boot-starter-webflux</artifactId>
</dependency>
Then the implementation is much shorter
@RestController
@RequestMapping ( "/app" )
public class MyRestController {
private static final Logger LOG = LoggerFactory . getLogger ( MyRestController . class );
@Autowired
private ChatHistoryService chatHistoryService ;
...
@GetMapping ( path = "/chat/room/{chatRoom}/sseSpring5" , produces = "text/event-stream" )
public Flux < ServerSentEvent < ChatMessageEntry >> subscribeChatMessages_spring5 (
@PathVariable ( "chatRoom" ) String chatRoom ,
@RequestHeader ( name = "last-event-id" , required = false ) String lastEventId
) {
ChatRoomEntry chatRoomEntry = chatHistoryService . getChatRoom ( chatRoom );
if ( chatRoomEntry == null ) {
return null ;
}
LOG . info ( "subscribeMessagesSpring5 lastEventId:" + lastEventId );
return chatRoomEntry . subscribe ( lastEventId );
}
...
public class ChatRoomEntry {
private ReplayProcessor < ServerSentEvent < ChatMessageEntry >> replayProcessor =
ReplayProcessor .< ServerSentEvent < ChatMessageEntry >> create ( 100 );
@Override
public void onPostMessage ( ChatMessageEntry msg ) {
ServerSentEvent < ChatMessageEntry > event = ServerSentEvent . builder ( msg )
. event ( "chat" )
. id ( Integer . toString ( msg . id )). build ();
replayProcessor . onNext ( event );
}
public Flux < ServerSentEvent < ChatMessageEntry >> subscribe ( String lastEventId ) {
Integer lastId = ( lastEventId != null )? Integer . parseInt ( lastEventId ) : null ;
return replayProcessor . filter ( x -> lastId == null || x . data (). get (). id > lastId );
}
}
Implementation on client-side using AngularJS
The code on client-side is really small.
Here is a “rich” debugging screen with options to show connection status,
choose between spring4 and spring5 server implementation to call,
and format messages.
'use strict' ;
angular . module ( 'myapp' , [ ])
. controller ( 'MyController' , function ( $scope , $http ) {
var self = this ;
self . useImpl = "spring5" ;
self . from = "me" ;
self . chatRoom = "Default" ;
self . msgToSend = "" ;
self . formattedMessages = '' ;
self . eventSource = null ;
self . lastEventId = 0 ;
self . connStatus = '' ;
self . onInit = function () {
// $scope.on("destroy", function() { self.onDispose(); });
self . connectEventSource ();
};
self . onDispose = function () {
console . log ( "onDispose" );
self . eventSource . close ();
};
self . connectEventSource = function () {
console . log ( "subscribe chatRoom: " + self . chatRoom + " useImpl:" + self . useImpl + " lastEventId:" + self . lastEventId );
self . eventSource = new EventSource ( '/app/chat/room/' + self . chatRoom + '/subscribeMessagesSpring' + ( self . useImpl === 'spring4' ? '4' : '5' ),
{ id : self . lastEventId });
if ( self . lastEventId > 0 ) {
self . eventSource . id = self . lastEventId ;
}
self . eventSource . addEventListener ( 'open' , function ( e ) {
console . log ( "onopen" , e );
self . connStatus = 'connected' ;
$scope . $apply ();
}, false );
self . eventSource . addEventListener ( 'error' , function ( e ) {
if ( e . eventPhase == EventSource . CLOSED ) {
console . log ( 'connection closed (..reconnect)' , e );
self . connStatus = 'connection closed (..auto reconnect in 3s)' ;
} else {
console . log ( "onerror" , e );
self . connStatus = 'error \ n' ;
}
$scope . $apply ();
}, false );
self . eventSource . addEventListener ( 'message' , function ( e ) {
console . log ( "onmessage" , e );
self . handleMessageEvent ( e );
$scope . $apply ();
}, false );
self . eventSource . addEventListener ( "chat" , function ( e ) {
// never called? .. cf onmessage !
console . log ( "on event 'chat'" , e );
self . handleMessageEvent ( e );
$scope . $apply ();
});
}
self . handleMessageEvent = function ( e ) {
var msg = JSON . parse ( e . data );
self . lastEventId = msg . id ;
self . eventSource . id = self . lastEventId ;
self . addFormattedEvent ( msg . id , new Date ( msg . date ), msg . from , msg . msg );
};
self . onClickSendMsg = function () {
console . log ( "send" );
var msg = self . msgToSend ;
// self.addFormattedEvent(new Date(), self.from, msg);
var req = { onBehalfOf : self . from , msg };
self . sending = true ;
$http . post ( "/app/chat/room/" + self . chatRoom , req )
. then ( function () {
self . sending = false ;
}, function ( err ) {
self . addFormattedEvent ( - 1 , new Date (), self . from , "Failed to send " + msg + ":" + err );
self . sending = false ;
});
};
self . addFormattedEvent = function ( id , date , from , msg ) {
self . formattedMessages += "[" + id + "] " + date . getHours () + ":" + date . getMinutes () + ":" + date . getSeconds () + " " +
(( self . from === from )? "" : "(" + from + ") " ) +
msg + " \ n \ n" ;
}
self . onClickReconnect = function () {
self . formattedMessages += "** Click RECONNECT **" ;
self . eventSource . close ();
self . connectEventSource ();
};
self . onInit ();
});
Testing It...
Run it: start main class from springboot within your IDE (or mvn spring-boot:run)
Testing using web client: open 2 browsers on url
http://localhost:8081/chat-angularjs/index.html
Testing using curl shell commands:
testing SSE GET request (with timeout after 30s)
Test posting message directly from curl command (with verbose mode):
Details analysis of timed-events logs
Here are commented logs from both client and server:
Logs on server:
fr.an.tests.MyRestController : msg #1: "(BOT) server start"
// at server startup, msg#1 is published (no client connected yet)
// on server-side, List of recent msg: [ msg#1 ],
// msg sequence number incremented to 2
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:null
// <= initial connection from client "bob"
// not "last-event-id" known from client => client receive all recent msgs from server: [ msg#0 ]
// client1 as now "last-event-id: 1"
// after 30s, client1 GET request is timeouting
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:1
// reconnect from client1 => re-susbscrive to events, using "last-event-id: 1"
// no new messages since #1 => no events sent, connection pending
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:1
// again .. GET Timeout + reconnect
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:null
// <= initial connection with last-event-id from client2 "john"
// not "last-event-id" known from client2 => client receive all recent msgs from server: [ msg#0 ]
// client2 as now "last-event-id: 1"
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:1
// timeout+reconnect from client (1 or 2?)
fr.an.tests.MyRestController : receive msg chatRoom:Default from:john msg: Hello bob
// POST msg#2 from john => published to both connected client1 & client2
// client1 connected => status: received msgs:[msg#1, msg#2] last-event-id: 2
// client2 connected => status: received msgs:[msg#1, msg#2] last-event-id: 2
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:2
// client1 timeout .. reconnect after 3s using "last-event-id: 2"
// no new messages received
// client2 timeout ... will reconnect next in 3s..
fr.an.tests.MyRestController : receive msg chatRoom:Default from:bob msg: Hi john
// POST msg #3 to client1 (client2 currently disconnected)
/// status:
// client1: connected, received msgs:[msg#1, msg#2, msg#3] last-event-id: 3
// client2: disconnected, received msgs:[msg#1, msg#2] last-event-id: 2
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:2
// reconnection from client2 using last-event-id: 2
// msg #3 was NOT received during 3s re-connection delay => client2 received missing msg #3
// status
// client1: connected, received msgs:[msg#1, msg#2, msg#3] last-event-id: 3
// client2: connected, received msgs:[msg#1, msg#2, msg#3] last-event-id: 3
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:3
// reconnection from client2...msg #3 already received
fr.an.tests.MyRestController : receive msg chatRoom:Default from:john msg: How are you ?
fr.an.tests.MyRestController : receive msg chatRoom:Default from:bob msg: fine
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:4
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:5
fr.an.tests.MyRestController : subscribeMessagesSpring5 lastEventId:5
Logs from client-side using chrome dev tools:
Conclusion
Server-Sent-Event works great, and are really simple !
SpringBoot and new Spring5 Reactor are awesome.
The full repository code is available here:
https://github.com/Arnaud-Nauwynck/test-snippets/tree/master/test-spring-reactive-web