REST RPC Async Netty Grpc Event-driven Reactive-Programming Futures

How to communicate between client and server

Communication Protocols

REST : Representational state transfer

Transfer of state via GET, PUT, PATCH, etc

RPC - Remote procedure call

Think in terms of actions using GET and PUT

Example

Operation RPC (operation) REST (resource)
Signup POST /signup POST /persons
Read a person GET /readPerson?personid=1234 GET /persons/1234
Read a person’s items list GET /readUsersItemsList?userid=1234 GET /persons/1234/items
Add an item to a person’s list POST /addItemToUsersItemsList POST /persons/1234/items
Update an item POST /modifyItem PUT /items/456
Delete an item POST /removeItem?itemId=456 DELETE /items/456

Communicate Synchronously or Async

Spring's RestTemplate

Spring's central class for synchronous client-side HTTP access. It simplifies communication with HTTP servers, and enforces RESTful principles.

Asynchronous

Javascript callbacks, promises, etc


// Say "Hello."
console.log("Hello.");
// Say "Goodbye" two seconds from now.
setTimeout(function() {
	console.log("Goodbye!"); 
}, 2000); 
// Say "Hello again!" 
console.log("Hello again!");
						

java.nio library, futures, threads, etc

							
CompletableFuture.supplyAsync(this::findReceiver)
.thenApply(this::sendMsg)
.thenAccept(this::notify);
							
						

How does asynchronous programming work

Observer Pattern/ Listeners/ Reactive programming

We create a task and subscribe ourselves as observers to the success/failure/error etc of the task.


When the task ends, methods(onSuccess(), onError()) of registered observers are called.

						
for( i = 0; i < observers.Count; i++) {
	observers[i].onSuccess("some data");
}
						
					
Combining RPC and async programming with gRPC

Blocking Call


// Obtains the feature at a given position.

rpc GetFeature(Point) returns (Feature) {}

Client


/**
* Blocking unary call example. 
  Calls getFeature and prints the response.
*/
public void getFeature(int lat, int lon) {
	info("*** GetFeature: lat={0} lon={1}", lat, lon);

	Point request = Point.newBuilder()
	.setLatitude(lat)
	.setLongitude(lon)
	.build();

	Feature feature = blockingStub.getFeature(request);
}

Server

receives an observer as if this was a non blocking call.


public void getFeature(Point Point, StreamObserver responseObserver) {
		responseObserver.onNext(getFeatureFor(point));
		responseObserver.onCompleted();
		}

Networking Library ensures the call is blocking.


ListenableFuture responseFuture = futureUnaryCall(call, param);

while(!responseFuture.isDone()) {
	try {
		executor.waitAndDrain();
	} catch (InterruptedException var8) {
		Thread.currentThread().interrupt();
		throw Status.CANCELLED.withDescription("Call was interrupted").withCause(var8).asRuntimeException();
	}
}

return getUnchecked(responseFuture);
	

Streaming Async call


// Accepts a stream of Points on a route being traversed, returning a
// RouteSummary when traversal is completed.
rpc RecordRoute(stream Point) returns (RouteSummary) {}

Client

Send an observer for response


StreamObserver requestObserver = asyncStub.recordRoute(responseObserver);

Send points on the received observer


for (int i = 0; i < numPoints; ++i) {
	int index = random.nextInt(features.size());
	Point point = features.get(index).getLocation();
	requestObserver.onNext(point);	
	// Sleep for a bit before sending the next one.
	Thread.sleep(random.nextInt(1000) + 500);
	}

Server


@Override
public StreamObserver recordRoute(final StreamObserver responseObserver) {
	return new StreamObserver() {
	int pointCount;
	int distance;
	Point previous;
	final long startTime = System.nanoTime();

	@Override
	public void onNext(Point point) {
		pointCount++;
		// For each point after the first, add the incremental distance from the previous point to
		// the total distance value.
		if (previous != null) {
		distance += calcDistance(previous, point);
		}
		previous = point;
	}

	@Override
	public void onCompleted() {
		long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
		responseObserver.onNext(RouteSummary.newBuilder().setPointCount(pointCount)
				.setDistance(distance)
				.setElapsedTime((int) seconds).build());
		responseObserver.onCompleted();
	}
	};
}

What did we learn ?

  • Which protocols can machines use to communicate : REST and RPC
  • Async programming/reactive programming is nothing but observer pattern underneath

We saw in previous examples that server received an observer which it calls methods on to send data/status/error

How can one machine calling methods on an object result to methods being called on another object sitting on another machine.

Or how is this observer java object shared?

Netty

gRPC uses netty under the hood.