Web Technologies to do notifications

Imagine you want to develop a mini real-time chat application using 2 web clients, connected to a web server.
There are several possibilities to receive updates:

  • Old-school: explicit Reload button...
  • Long polling with timer to auto-refresh
  • WebSocket
  • Server-Sent-Event

WebSocket

WebSocket is the most famous, but is a little overkill.
It is protocol negociation between the browser and the server to change the Http connection to a TCP connection, and re-use it as a dual-channel binary communication.
On a LAN it works great… On a WAN with HTTP Firewalls, Http Proxies and Http LoadBalancer (Stateless Non Session Sticky), it is much more difficult.

SSE : Server-Sent-Event

Server-Sent-Event are much simpler.
It is a regular Http GET request with media type “text-stream”, which will read a stream of events as single line text, each line starting as “data: …”, “id: “, “event: “, or “retry: “.
Every “data: “ is interpreted as a message event, which trigger a client-side javascript callback method.

var eventSource = new EventSource("/chat/room/Default/sse");

eventSource.onmessage = function(event) {
    var eventData = JSON.parse(event.data);
    console.log("received event data", eventData);
};

Typical streams of events for a chat:

event: chat
data: {"id":"123", "from":"bobby", "msg":"Hello John", "chatRoom":"Default", "time":"02:33:48" }

id: 124
data: {"id":"124", "from":"john", "msg":"Hi Bob", "chatRoom":"Default", "time": "02:40:10" }

id: 125
data: {"id":"125", "from": "bobby", "msg":"How are you?", "chatRoom":"Default", "time": "02:40:15" }

Looking at network level, it is plain-old Http protocol: it keep the Http socket connection opened for a while, and read results until deconnection.
The server will close its connection, typically after 30s (default on tomcat for example), and the client simply re-open a new one to possibly another server, and re-establish a query passing the “last-event-id” Http header attribute to get newly emitted events since the last one already received.

To test it, simply launch a curl command

curl -H 'Accept: text/event-stream' http://localhost:8080/chat/room/Default/sse

data: {"id": "1", ..}
data: {"id": "2", ..}
data: {"id": "3", ..}
.. 
... ommited: receive all (recent) known events from server
..
data: {"id":"123", .. }

... waiting reading more
... connection closed after 30s

So more interrestingly, add the header “last-event-id”, supposing you have already listen up to id “123”:

curl -H 'Accept: text/event-stream' -H 'last-event-id: 123' http://localhost:8080/chat/room/Default/sse

data: {"id": "124", ..}
data: {"id": "125", ..}

... waiting reading more
... connection closed after 30s

Server implementation using springboot Spring4 : SseEmitter

@RestController
@RequestMapping("/app")
public class MyRestController {
	
	private static final Logger LOG = LoggerFactory.getLogger(MyRestController.class);
	
	@Autowired
	private ChatHistoryService chatHistoryService;
	
	... 
	
	@GetMapping(path = "/chat/room/{chatRoom}/sseSpring4", produces = "text/event-stream")
	public SseEmitter subscribeChatMessages(
	 		@PathVariable("chatRoom") String chatRoom,
	   		@RequestHeader(name="last-event-id", required=false) String lastEventId
	   		) {
	    LOG.info("subscribeMessagesSpring4 lastEventId:" + lastEventId);
		ChatRoomEntry chatRoomEntry = chatHistoryService.getChatRoom(chatRoom);
	    return chatRoomEntry.subscribe(lastEventId);
	}
	
	// handle normal "Async timeout", to avoid logging warn messages every 30s per client...
	@ExceptionHandler(value = AsyncRequestTimeoutException.class)  
    public String asyncTimeout(AsyncRequestTimeoutException e){  
        return null; // "SSE timeout..OK";  
    }
	
}

... 
public class ChatRoomEntry {

	private List<SseEmitter> emitters = new ArrayList<>();

	@Override
	public void onPostMessage(ChatMessageEntry msg) {
		if (emitters.isEmpty()) {
			return;
		}
		SseEventBuilder evtBuilder = SseEmitter.event()
				.id(Integer.toString(msg.id))
				.name("chat")
				.data(msg);
		for(SseEmitter emitter : emitters) {
			try {
				emitter.send(evtBuilder);
			} catch (IOException ex) {
				LOG.error("Failed to send msg to emitter", ex);
			}
		}
	}
	
	public SseEmitter subscribe(String lastEventIdText) {
		SseEmitter emitter = new SseEmitter();
        Integer lastId = lastEventIdText != null? Integer.parseInt(lastEventIdText) : null;
		List<ChatMessageEntry> replayMsgs = (lastId != null)?  
				chatRoom.listMessagesSinceLastId(lastId) : chatRoom.listMessages();
		for(ChatMessageEntry msg : replayMsgs) {
			if (lastId != null && msg.id <= lastId) {
				continue;
			}
			try {
				emitter.send(msg);
			} catch (IOException ex) {
				LOG.error("Failed to re-send msg to emitter, ex:", ex.getMessage() + " => complete with error ... remove,disconnect");
				emitter.completeWithError(ex);
			}
		}
		
        // note: should synchronised replayMessage ... add()!
        emitters.add(emitter);
        
        emitter.onCompletion(() -> {
        	LOG.debug("onCompletion -> remove emitter"); // log as DEBUG... logged every 30s per client! 
        	emitters.remove(emitter);
        });

        emitter.onTimeout(() -> {
        	LOG.debug("onTimeout -> remove emitter");  // log as DEBUG... logged every 30s per client!
        	emitters.remove(emitter);
        });

        return emitter;
	}

Notice that you can change the configuration timeout which is 30s by default: edit file config/application.yaml

spring:
  mvc:
    async:
      # default: 30 seconds... should be higher..
      # for test only= > 15s!
      request-timeout: 15000

Server implementation using springboot Spring5 : Flux<ServerSentEvent<..>>

Spring5 is currently in SNAPSHOT as of 2017-05, you need to use maven SNAPSHOT versions (+maven SNAPSHOTS repositories). See from https://start.spring.io/, by choosing the top-level right combo-box version “with springboot version:2.0.0.BUILD-SNAPSHOT” instead of “1.5.3”

    <parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.0.0.BUILD-SNAPSHOT</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>
..
	<dependencies>
		..
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-webflux</artifactId>
		</dependency>
	

Then the implementation is much shorter

@RestController
@RequestMapping("/app")
public class MyRestController {
	
	private static final Logger LOG = LoggerFactory.getLogger(MyRestController.class);
	
	@Autowired
	private ChatHistoryService chatHistoryService;
	
	... 

	@GetMapping(path = "/chat/room/{chatRoom}/sseSpring5", produces = "text/event-stream")
    public Flux<ServerSentEvent<ChatMessageEntry>> subscribeChatMessages_spring5(
    		@PathVariable("chatRoom") String chatRoom,
    		@RequestHeader(name="last-event-id", required=false) String lastEventId
    		) {
        ChatRoomEntry chatRoomEntry = chatHistoryService.getChatRoom(chatRoom);
        if (chatRoomEntry == null) {
        	return null;
        }
        LOG.info("subscribeMessagesSpring5 lastEventId:" + lastEventId);
        return chatRoomEntry.subscribe(lastEventId);
    }


... 
public class ChatRoomEntry {

	private ReplayProcessor<ServerSentEvent<ChatMessageEntry>> replayProcessor = 
		ReplayProcessor.<ServerSentEvent<ChatMessageEntry>>create(100);
	
	@Override
	public void onPostMessage(ChatMessageEntry msg) {
		ServerSentEvent<ChatMessageEntry> event = ServerSentEvent.builder(msg)
				.event("chat")
				.id(Integer.toString(msg.id)).build();
		replayProcessor.onNext(event);
	}

	public Flux<ServerSentEvent<ChatMessageEntry>> subscribe(String lastEventId) {
		Integer lastId = (lastEventId != null)? Integer.parseInt(lastEventId) : null;
		return replayProcessor.filter(x -> lastId == null || x.data().get().id > lastId);
	}
}

Implementation on client-side using AngularJS

The code on client-side is really small.

Here is a “rich” debugging screen with options to show connection status, choose between spring4 and spring5 server implementation to call, and format messages.

'use strict';
angular.module('myapp', [ ])
.controller('MyController', function($scope, $http) {
    var self = this;

    self.useImpl = "spring5";
    self.from = "me";
    self.chatRoom = "Default";
    self.msgToSend = "";
    self.formattedMessages = '';
    
    self.eventSource = null;
    self.lastEventId = 0;
    self.connStatus = '';
    
    self.onInit = function() {
        // $scope.on("destroy", function() { self.onDispose(); });
        self.connectEventSource();
    };
    
    self.onDispose = function() {
        console.log("onDispose");
        self.eventSource.close();
    };
    
    self.connectEventSource = function() {
        console.log("subscribe chatRoom: " + self.chatRoom + " useImpl:" + self.useImpl + " lastEventId:" + self.lastEventId);
        self.eventSource = new EventSource('/app/chat/room/' + self.chatRoom + '/subscribeMessagesSpring' + (self.useImpl === 'spring4'? '4' : '5'),
                { id: self.lastEventId });
        
        if (self.lastEventId > 0) {
            self.eventSource.id = self.lastEventId;
        }
        
        self.eventSource.addEventListener('open', function(e) {
            console.log("onopen", e);
            self.connStatus = 'connected';
            $scope.$apply();
        }, false);

        self.eventSource.addEventListener('error', function(e) {
            if (e.eventPhase == EventSource.CLOSED) {
              console.log('connection closed (..reconnect)', e);
              self.connStatus = 'connection closed (..auto reconnect in 3s)';
            } else {
              console.log("onerror", e);
              self.connStatus = 'error\n';
            }
            $scope.$apply();
          }, false);
        
        self.eventSource.addEventListener('message', function(e) {
            console.log("onmessage", e);
            self.handleMessageEvent(e);
            $scope.$apply();
        }, false);
        
        self.eventSource.addEventListener("chat", function(e) {
            // never called? .. cf onmessage !
            console.log("on event 'chat'", e);
            self.handleMessageEvent(e);
            $scope.$apply();
        });
    }

    self.handleMessageEvent = function(e) {
        var msg = JSON.parse(e.data);
        self.lastEventId = msg.id;
        self.eventSource.id = self.lastEventId;
        self.addFormattedEvent(msg.id, new Date(msg.date), msg.from, msg.msg);
    };
    
    self.onClickSendMsg = function () {
        console.log("send");
        var msg = self.msgToSend;
        // self.addFormattedEvent(new Date(), self.from, msg);
        
        var req = { onBehalfOf: self.from, msg };
        self.sending = true;
        $http.post("/app/chat/room/" + self.chatRoom, req)
        .then(function() {
          self.sending = false;
        }, function(err) {
          self.addFormattedEvent(-1, new Date(), self.from, "Failed to send " + msg + ":" + err);
          self.sending = false;
        });
    };
    
    self.addFormattedEvent = function(id, date, from, msg) {
       self.formattedMessages += "[" + id + "] " + date.getHours() + ":" + date.getMinutes() + ":" + date.getSeconds() + " " +  
           ((self.from === from)? "" : "(" + from + ") ") + 
           msg + "\n\n";
    }
    
    self.onClickReconnect = function() {
        self.formattedMessages += "** Click RECONNECT **";
        self.eventSource.close();
        self.connectEventSource();
    };
    
    self.onInit();
});

Testing It...

Run it: start main class from springboot within your IDE (or mvn spring-boot:run)

Testing using web client: open 2 browsers on url http://localhost:8081/chat-angularjs/index.html


Testing using curl shell commands:

testing SSE GET request (with timeout after 30s)

Test posting message directly from curl command (with verbose mode):

Details analysis of timed-events logs

Here are commented logs from both client and server:

Logs on server:

fr.an.tests.MyRestController             : msg #1: "(BOT) server start"
  // at server startup, msg#1 is published (no client connected yet)
  // on server-side, List of recent msg: [ msg#1 ], 
  // msg sequence number incremented to 2  

fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:null  
  // <= initial connection from client "bob"
  // not "last-event-id" known from client => client receive all recent msgs from server: [ msg#0 ]
  // client1 as now "last-event-id: 1" 

  // after 30s, client1 GET request is timeouting
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:1     
  // reconnect from client1 => re-susbscrive to events, using "last-event-id: 1"
  // no new messages since #1 => no events sent, connection pending
  
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:1     
  // again .. GET Timeout + reconnect
  
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:null
  // <= initial connection with last-event-id from client2 "john"
  // not "last-event-id" known from client2 => client receive all recent msgs from server: [ msg#0 ]
  // client2 as now "last-event-id: 1" 
  
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:1
  // timeout+reconnect from client (1 or 2?)

fr.an.tests.MyRestController             : receive msg chatRoom:Default from:john msg: Hello bob
  // POST msg#2 from john => published to both connected client1 & client2 
  // client1 connected => status:  received msgs:[msg#1, msg#2]  last-event-id: 2
  // client2 connected => status:  received msgs:[msg#1, msg#2]  last-event-id: 2
  
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:2
  // client1 timeout .. reconnect after 3s using "last-event-id: 2"
  // no new messages received
  
  // client2 timeout ... will reconnect next in 3s..
  
fr.an.tests.MyRestController             : receive msg chatRoom:Default from:bob msg: Hi john
  // POST msg #3 to client1   (client2 currently disconnected)
  
  /// status:
  // client1: connected,    received msgs:[msg#1, msg#2, msg#3]  last-event-id: 3
  // client2: disconnected, received msgs:[msg#1, msg#2]  last-event-id: 2
  
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:2
  // reconnection from client2 using last-event-id: 2
  // msg #3 was NOT received during 3s re-connection delay => client2 received missing msg #3
  // status
  // client1: connected, received msgs:[msg#1, msg#2, msg#3]  last-event-id: 3
  // client2: connected, received msgs:[msg#1, msg#2, msg#3]  last-event-id: 3
   
   
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:3
  // reconnection from client2...msg #3 already received

fr.an.tests.MyRestController             : receive msg chatRoom:Default from:john msg: How are you ?
fr.an.tests.MyRestController             : receive msg chatRoom:Default from:bob msg: fine
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:4
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:5
fr.an.tests.MyRestController             : subscribeMessagesSpring5 lastEventId:5

Logs from client-side using chrome dev tools:

Conclusion

Server-Sent-Event works great, and are really simple !

SpringBoot and new Spring5 Reactor are awesome.

The full repository code is available here: https://github.com/Arnaud-Nauwynck/test-snippets/tree/master/test-spring-reactive-web