gRPC in action - Example using Java microservices

gRPC in action - Example using Java microservices

microservices

java

programming

technology

In this article, we will implement a Java based microservices solution with gRPC as the integration technology. The solution is a Movie Finder application that was introduced in my earlier blog, Think gRPC, when you are architecting modern microservices. Movie Finder is an application that provides a personalized movie recommendation based on the genre input received from the user. The application, on receiving the request, fetches the list of movies categorized by the genre, then matches it against the user preferences and finally passes it over a recommendation engine to suggest a movie response back to the user. A total of four microservices will be built for this solution. All interactions between the microservices will be through gRPC. As the focus is on gRPC here, we will not focus on business logic, data persistence or the client UI.

However, we will use this use case to illustrate all the four API types supported by gRPC:

  • Unary
  • Server streaming
  • Client streaming
  • Bidirectional streaming.

Solution View

Each microservice will be running a separate gRPC server. This is a design choice. The general recommended practice is to have dedicated servers sitting behind load balancers for each microservice.

Movie controller – An external service that publishes API for client/ UI to interact with the application

Movie store – Holds a database of movie records. This can be thought of as something like omdb.org

User Preferences – Keeps track of user activities. For simplicity, we can assume that it keeps track of all the movies that the user has watched, wants to watch or does not want to watch.

Recommender – The component that holds all the logic for making a movie recommendation.

Project Setup

For simplicity, all the services are developed in a single Java project. Gradle is used as the build tool. I have used IntelliJ IDEA as the editor for running this example.

Pre requisites

  • Java 1.8 or above
  • Gradle 
  • IntelliJ Idea

Once the project is created, first step is to setup project dependencies in build.gradle. 

Grpc java documentation can be referred for these details. Build dependencies and protobuf code generation plugins should be configured as mentioned here. A snapshot of build.gradle used here is given below.

plugins {
    id 'java'
    id 'com.google.protobuf' version '0.8.14'
    id 'idea'
}
group 'com.nikhilm'
version '1.0-SNAPSHOT'
repositories {
    mavenCentral()
}
protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.12.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.34.1'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
        }
    }
}
dependencies {
    implementation 'io.grpc:grpc-netty-shaded:1.34.1'
    implementation 'io.grpc:grpc-protobuf:1.34.1'
    implementation 'io.grpc:grpc-stub:1.34.1'
    compileOnly 'org.apache.tomcat:annotations-api:6.0.53' // necessary for Java 9+
    testCompile group: 'junit', name: 'junit', version: '4.12'
}

Protocol Buffers

The interface contract for specifying the RPC definitions for each service would be defined using Protocol Buffers. Each microservice will have a .proto file defined here for this. Under src/main, create a directory named proto and then create the corresponding package structure as given below:

service construct will be used in protocol buffers to represent the rpc definition. The request and response data is represented using the message construct. The different data types and the rules and conventions to be followed for defining protocol buffers are too broad to be covered in this article. To know more about protocol buffers, please visit https://developers.google.com/protocol-buffers

common.proto

The message types, Genre and Movie that are reused in different proto definitions are organised in common.proto and imported in other definitions.

syntax = "proto3";
package common;
option java_package = "com.proto.common";
option java_multiple_files = true;

enum Genre {
  COMEDY = 0;
  ACTION = 1;
  THRILLER = 2;
  DRAMA = 3;
}
message Movie {
  string title = 1;
  float rating = 2;
  Genre genre = 3;
  string description = 4;
}

moviecontroller.proto

The unary service definition for getMovie() service is defined here.

syntax = "proto3";
package moviecontroller;
import "common/common.proto";
option java_package = "com.proto.moviecontroller";
option java_multiple_files = true;

message MovieRequest {
  string userid = 1;
  common.Genre genre = 2;
}
message MovieResponse {
  common.Movie movie = 1;
}
service MovieControllerService {
  // unary rpc call to retrieve a movie
  rpc getMovie(MovieRequest) returns (MovieResponse) {};
}

moviestore.proto

The server streaming rpc call for fetching a stream of movie as response is defined here. The first few lines for package import are excluded for brevity. Please refer them from moviecontroller.proto defined above and include them in all the .proto files accordingly.

message MovieStoreRequest {
  common.Genre genre = 1;
}
message MovieStoreResponse {
  common.Movie movie = 1;
}
service MovieStoreService {
  // server streaming rpc call to receive a stream of movies
rpc getMovies(MovieStoreRequest) returns (stream MovieStoreResponse) {};
}

recommender.proto

The client streaming rpc call for returning the recommended movie is defined here.

message RecommenderRequest {
  string userid = 1;
  common.Movie movie = 2;
}
message RecommenderResponse {
  common.Movie movie = 1;
}
service RecommenderService {
  // client streaming request that receives a stream of movies and recommends one
rpc getRecommendedMovie(stream RecommenderRequest) returns (RecommenderResponse) {};
}

userpreferences.proto

The bidirectional streaming call for receiving a stream of movies as input, matching against user preferences and responding with a stream of movies is defined here.

message UserPreferencesRequest {
  string userid = 1;
  common.Movie movie = 2;
}
message UserPreferencesResponse {
  common.Movie movie = 1;
}
service UserPreferencesService {
  // Bidirectional streaming rpc call to receive a stream of movies shortlisted based on user preferences
  rpc getShortlistedMovies(stream UserPreferencesRequest) returns (stream UserPreferencesResponse) {};
}

Code generation using Gradle

Next step is to generate the necessary gRPC code that will help us to start creating server and client code. For this, run the generateProto command in Gradle toolbar (Tasks->Other) in IntelliJ IDEA. Once this step is completed, all the gRPC generated code should be available inside the project directory.

gRPC key concepts for Java development

Before proceeding with the development of server and client side code, it will be quite helpful to review some important concepts with respect to gRPC development using Java.

Channel - A gRPC channel provides a connection to a gRPC server on a given host and port. Channels are used to create client stubs. The same channel can be used to connect to multiple services running on the same gRPC server.

Client stub - gRPC supports two types of client stubs. Blocking/ synchronous stubs and asynchronous stubs. newBlockingStub() is used to make blocking calls while newStub() is used for non blocking calls. We make references to both approaches in this example.

StreamObserver - Service implementations and clients use StreamObservers with onNext(), onError() and onCompleted() methods to receive and publish message using gRPC framework.

All service implementations extend <service name>Grpc.<service name>ImplBase classes that provide method signature to override. We will see all these details in the following section.

gRPC automatically generates the necessary builder classes thereby freeing developers from the task of writing boilerplate code.

Implement the gRPC servers

Now that we have a good understanding of the code generated by gRPC, we can proceed with service development. First step is to wire up the gRPC server and start it.

Here the MovieController gRPC server is configured to start on port 50051.

MovieControllerServer.java

public class MovieControllerServer {
    public static final int MOVIE_CONTROLLER_SERVICE_PORT = 50051;
    public static void main(String[] args) 
            throws IOException, InterruptedException {
        Server server = ServerBuilder.forPort(MOVIE_CONTROLLER_SERVICE_PORT)
                .addService(new MovieControllerServiceImpl())
                .build();
        server.start();
        Runtime.getRuntime().addShutdownHook(new Thread(() -> {
            server.shutdown();
            System.out.println("Successfully stopped the server");
        }));
        server.awaitTermination();
    }
}

MovieControllerServiceImpl class provides the service implementation for this microservice. We will reserve detailing this class for later as it is the key class that holds the integration logic for all the other services. Therefore, we will proceed to develop the other three services first.

Accordingly, use the above code and complete the server start up code for each of the other services - Movie Store, User Preferences and Recommender. Please make sure to provide a unique port number for each of the services. Next step is to implement the services and configure them in addService() of corresponding gRPC servers.

Implementing the services

MovieStoreServiceImpl.java - All gRPC service implementation classes will extend the gRPC generated base implementation class (*ServerImplBase). The method signature for the service method to be implemented is made available. As we can see, this is a server streaming implementation. The onNext() of StreamObserver<MovieStoreResponse> responseObserver is invoked to publish the stream of Movie responses. Since we are not using any backend for holding the movie details, a sample set is hand coded in the service class itself. Finally, the onCompleted() call is made to complete the stream.

public class MovieStoreServiceImpl extends
       MovieStoreServiceGrpc.MovieStoreServiceImplBase {
    @Override
    public void getMovies(MovieStoreRequest request,
                          StreamObserver<MovieStoreResponse> responseObserver) {
        List<Movie> movies = Arrays.asList(Movie.newBuilder()
                        .setTitle("No country for old men")
                        .setDescription("Western crime thriller")
                        .setRating(8.1f).setGenre(Genre.ACTION).build(),
                Movie.newBuilder().setTitle("Bourne Ultimatum")
                        .setDescription("Action thriller")
                        .setRating(8.0f).setGenre(Genre.ACTION).build(),
                Movie.newBuilder().setTitle("The taxi driver")
                        .setDescription("Pyschological thriller")
                        .setRating(8.2f).setGenre(Genre.THRILLER).build(),
                Movie.newBuilder().setTitle("The Hangover")
                        .setDescription("Hilarious ride")
                        .setRating(7.7f).setGenre(Genre.COMEDY).build(),
                Movie.newBuilder().setTitle("Raiders of the Lost Arc")
                        .setDescription("Expedition in search of the lost arc")
                        .setRating(8.4f)
                        .setGenre(Genre.ACTION).build(),
                Movie.newBuilder().setTitle("Cast Away")
                        .setDescription("survival story")
                        .setRating(7.8f).setGenre(Genre.DRAMA).build(),
                Movie.newBuilder().setTitle("Gladiator")
                        .setDescription("Period drama")
                        .setRating(8.5f).setGenre(Genre.DRAMA).build(),
                Movie.newBuilder().setTitle("Jaws")
                        .setDescription("Shark thrills")
                        .setRating(8.0f).setGenre(Genre.THRILLER).build(),
                Movie.newBuilder().setTitle("Inception")
                        .setDescription("Sci fi action")
                        .setRating(8.8f).setGenre(Genre.ACTION).build());
        movies.stream()
                .filter(movie -> movie.getGenre().equals(request.getGenre()))
                .collect(Collectors.toList())
                .forEach(movie -> {
                    responseObserver.onNext(MovieStoreResponse
                            .newBuilder().setMovie(movie).build());
                });
        responseObserver.onCompleted();
    }
}

UserPreferencesServiceImpl.java

This service implements the bidirectional streaming scenario that receives a stream of movies and responds with a shortlisted stream of movies. The shortlisting is performed by matching the incoming movies with the user preferences. The method signature has StreamObserver as argument as well as return type.

StreamObserver<UserPreferencesRequest> - used to receive stream of messages from the client using onNext(), onError() and onCompleted() calls

StreamObserver<UserPreferencesResponse> - used to respond with stream of messages back to the client using onNext(), onError() and onCompleted() calls

In the real world, the logic used for matching user preferences would be complex. It will involve tasks such as tracking user activities such as movies watched, bookmarked, rated, liked, disliked and so on. In this case, we will free ourselves of all such complexities and implement a rather trivial isEligible() method that uses a simple random calculation to mark an input movie as eligible or not.

public class UserPreferencesServiceImpl extends 
       UserPreferencesServiceGrpc.UserPreferencesServiceImplBase {
    @Override
    public StreamObserver<UserPreferencesRequest>
    getShortlistedMovies(StreamObserver<UserPreferencesResponse> responseObserver) {
        StreamObserver<UserPreferencesRequest> streamObserver =
                new StreamObserver<UserPreferencesRequest>() {
            @Override
            public void onNext(UserPreferencesRequest value) {
                if (isEligible(value.getMovie())) {
                    responseObserver.onNext(UserPreferencesResponse
                            .newBuilder().setMovie(value.getMovie()).build());
                }
            }
            @Override
            public void onError(Throwable t) {
                responseObserver.onError(Status.INTERNAL
                        .withDescription("Internal server error")
                        .asRuntimeException());
            }
            @Override
            public void onCompleted() {
                responseObserver.onCompleted();
            }
        };
        return streamObserver;
    }
    private boolean isEligible(Movie movie) {
        return (new SecureRandom().nextInt() % 4 != 0);
    }
}

RecommenderServiceImpl.java

This service implements the client streaming use case. The stream of movies that are shortlisted from User Preferences are accepted and a movie is picked as recommended and published back to the client. The method signature is similar to the above case, however, the onNext() of responseObserver is invoked only once to send a solitary message to the client. Recommender randomly picks a movie as the output. The implementation of getMovieForRecommendation() is given below.

public class RecommenderServiceImpl extends 
        RecommenderServiceGrpc.RecommenderServiceImplBase {
    @Override
    public StreamObserver<RecommenderRequest> 
    getRecommendedMovie(StreamObserver<RecommenderResponse> responseObserver) {
        StreamObserver<RecommenderRequest> streamObserver = 
                new StreamObserver<RecommenderRequest>() {
            List<Movie> movies = new ArrayList<>();
            @Override
            public void onNext(RecommenderRequest value) {
                movies.add(value.getMovie());
            }
            @Override
            public void onError(Throwable t) {
                responseObserver.onError(Status.INTERNAL
                        .withDescription("Internal server error")
                        .asRuntimeException());
            }
            @Override
            public void onCompleted() {
                if (movies.size() > 0) {
                    responseObserver.onNext(RecommenderResponse.newBuilder()
                            .setMovie(findMovieForRecommendation(movies))
                            .build());
                    responseObserver.onCompleted();
                } else {
                    responseObserver.onError(Status.NOT_FOUND
                            .withDescription("Sorry, found no movies to recommend!")
                            .asRuntimeException());
                }
            }
        };
        return streamObserver;
    }
    private Movie findMovieForRecommendation(List<Movie> movies) {
        int random = new SecureRandom().nextInt(movies.size());
        return movies.stream().skip(random).findAny().get();
    }
}

MovieControllerServiceImpl.java

Finally, we will proceed to implement the service method for Movie Controller. This is the key service which manages the gRPC integration with the other three services.

The service does the following

  • Accepts the Genre input and fetches list of movies from MovieStore service
  • Sends the stream of movie requests to UserPreferences service that sends back a shortlisted stream of movie messages as response.
  • Sends the shortlisted movie messages to Recommender service to recommend one of the movies
  • Responds to the MovieFinderClient request with the resulting movie returned by the Recommender service

Channels are established for connecting to the respective gRPC servers and then used to setup the client stubs for invoking the remote service calls. Please note that we have configured usePlainText() to deactivate TLS checking. gRPC by default, expects TLS configuration to be setup. For production environments, the TLS configuration settings should be provided in the channel configuration.

CountDownLatch is configured to ensure that the main thread waits until the asynchronous stream observers complete their executions.

public class MovieControllerServiceImpl extends
      MovieControllerServiceGrpc.MovieControllerServiceImplBase {
    public static final int MOVIES_SERVICE_PORT = 50052;
    public static final int USER_PREFERENCES_SERVICE_PORT = 50053;
    public static final int RECOMMENDER_SERVICE_PORT = 50054;
    @Override
    public void getMovie(MovieRequest request,
                         StreamObserver<MovieResponse> responseObserver) {
        String userId = request.getUserid();
        MovieStoreServiceGrpc.MovieStoreServiceBlockingStub movieStoreClient =
                MovieStoreServiceGrpc.newBlockingStub(getChannel(MOVIES_SERVICE_PORT));
        UserPreferencesServiceGrpc.UserPreferencesServiceStub userPreferencesClient =
                UserPreferencesServiceGrpc.newStub(getChannel(USER_PREFERENCES_SERVICE_PORT));
        RecommenderServiceGrpc.RecommenderServiceStub recommenderClient =
                RecommenderServiceGrpc.newStub(getChannel(RECOMMENDER_SERVICE_PORT));
        // set counter for the thread to wait until recommendation is received
        CountDownLatch latch = new CountDownLatch(1);
        StreamObserver<RecommenderRequest> recommenderRequestObserver =
                recommenderClient
                .getRecommendedMovie(new StreamObserver<RecommenderResponse>() {
                    @Override
                    public void onNext(RecommenderResponse value) {
                        responseObserver.onNext(MovieResponse
                                .newBuilder()
                                .setMovie(value.getMovie()).build());
                        System.out.println("Recommended movie " + value.getMovie());
                    }
                    @Override
                    public void onError(Throwable t) {
                        responseObserver.onError(t);
                        latch.countDown();
                    }
                    @Override
                    public void onCompleted() {
                        responseObserver.onCompleted();
                        latch.countDown();
                    }
                });
        StreamObserver<UserPreferencesRequest> streamObserver =
                userPreferencesClient.getShortlistedMovies
                        (new StreamObserver<UserPreferencesResponse>() {
                    @Override
                    public void onNext(UserPreferencesResponse value) {
                        recommenderRequestObserver
                                .onNext(RecommenderRequest.newBuilder()
                                .setUserid(userId)
                                .setMovie(value.getMovie()).build());
                    }
                    @Override
                    public void onError(Throwable t) {
                    }
                    @Override
                    public void onCompleted() {
                        recommenderRequestObserver.onCompleted();
                    }
                });
        movieStoreClient.getMovies(MovieStoreRequest.newBuilder()
                .setGenre(request.getGenre()).build())
                .forEachRemaining(response -> {
                    streamObserver.onNext(UserPreferencesRequest.newBuilder()
                            .setUserid(userId).setMovie(response.getMovie())
                            .build());
                });
        streamObserver.onCompleted();
        try {
            latch.await(3L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
    private ManagedChannel getChannel(int port) {
        return ManagedChannelBuilder.forAddress("localhost", port)
                .usePlaintext()
                .build();
    }
}

Microservices are ready - Time to review

We have completed the development of all the four microservices. Before proceeding ahead, take a pause and validate the following:

  • Service implementation classes are mapped correctly in all the corresponding gRPC servers.
  • Port mappings referred in gRPC client definitions in MovieControllerServiceImpl correspond to their respective server ports.

Implementing the Movie Finder client

Now, we will develop the client for invoking the moviecontroller service. The approach is similar to the one already followed in MovieControllerServiceImpl.

MovieFinderClient.java

public class MovieFinderClient {
    public static final int MOVIE_CONTROLLER_SERVICE_PORT = 50051;
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder
                .forAddress("localhost", MOVIE_CONTROLLER_SERVICE_PORT)
                .usePlaintext()
                .build();
         MovieControllerServiceGrpc.MovieControllerServiceBlockingStub 
                 movieFinderClient = MovieControllerServiceGrpc
                 .newBlockingStub(channel);
        try {
            MovieResponse movieResponse = movieFinderClient
                    .getMovie(MovieRequest.newBuilder()
                    .setGenre(Genre.ACTION)
                    .setUserid("abc")
                    .build());
            System.out.println("Recommended movie " + movieResponse.getMovie());
        } catch (StatusRuntimeException e) {
            System.out.println("Recommended movie not found!");
            e.printStackTrace();
        }
    }
}

Testing the application

Okay, we are done with the development of all the classes for this example. Now, it is time to test the setup.

All the gRPC servers should be started first. Easiest option would be to run the applications through the IDE. If everything work as expected, you should have four gRPC servers running and waiting for requests. Now, run the MovieFinderClient application. In this example, I have put ACTION as the Genre input for fetching the recommended movie.

Following sequence of steps will be executed.

  • MovieFinderClient invokes getMovie rpc call on MovieController with Genre input
  • MovieController accepts the Genre input and fetches list of movies from MovieStore service
  • MovieController sends the stream of movie requests to UserPreferences service that sends back a shortlisted stream of movie messages as response.
  • MovieController sends the shortlisted movie messages to Recommender service to recommend one of the movies
  • MovieController responds to the MovieFinderClient request with the resulting movie returned by the Recommender service

Snippet of output generated will be similar to the one below:


Conclusion

gRPC has emerged as a very popular alternative to RESTful API for microservices architectures. gRPC offers several benefits out of the box leveraging HTTP/2 under the hood for fast, efficient and network optimized integrations in distributed systems. It also helps increase developer productivity through contract first approach, server/ client boilerplate code generation and through the support for integrating polyglot microservices. gRPC also offers more freedom for developers to come up with flexible API definitions through client/ server and bidirectional streaming support.

This example is an attempt at showcasing the basic integration patterns that can be achieved in a typical java based microservices solution using gRPC.