kafEEne #2: Kafka & Concurrency Utilities

Yet another blog post of the kafEEne series – this time, its Java EE Concurrency Utilities in action along with Kafka. As usual, the code is on Github and you can refer to the README on how to get this up and running using Docker

Setup

As mentioned, the application is Docker based

  • It uses Zookeeper, Kafka and TomEE (airhacks) Docker images
  • For ease of development and demo, the entire application can be run as a single unit using Docker Compose

Kafka

We have the following setup for Kafka

  • One topic with three partitions (configured to be auto-created)
  • Automatic offset commit is disabled, and
  • Links to Zookeeper (container) of course

Consumer

  • The Consumer is the meat of the application
  • It’s a Runnable whose run method polls Kafka cluster (with a configurable time out – 20 secs by default)
  • It also doubles as a ManagedTask (from the Java EE Concurrency Utilities)
  • Provides implementation for a ManagedTaskListener whose life cycle events are used to commit offsets to Kafka (automatic offset commit is disabled)
  • The Consumer is triggered using a @Singleton bean which schedules it within the ManagedExecutorService thread pool

Producer

  • It’s a Runnable
  • Triggered using a @Singleton bean within the ManagedScheduledExecutorService thread pool
  • It waits for a configurable (3 seconds by default) before producing the next record

Checking the results

kafka-javaee-conc-utils-snapshot.jpg

The results you’ll see might be similar to this. Here is the gist

The above snapshot shows data from three (consumer) poll loops

  • Producer pushed one record
  • Each loop fetched 1 record (that’s just co-incidence, not by design) and its completely independent of the producer
  • After each loop (task), the offset commit process got triggered – in this example we have 3 partitions and the offset for each partition is printed to the console
  • If you notice carefully, the committed offset (for a particular loop) will be in line with the data which was consumed (note: committed offset points to the next offset i.e. one more than the offset that offset which was last consumed)

Further reading

Cheers!

Advertisements
Posted in Java EE | Tagged , , , , , , | Leave a comment

Consume from Kafka topics using Message Driven Beans

Thanks to the recently released Payara Kafka Connector, you can now consume messages from Kafka using Message Driven Beans!

Here is a Docker based example for trying it out. Quick overview

Refer to the README for how to run this

Cheers!

Posted in Java EE | Tagged , , , , , , | 2 Comments

kafEEne #1: Websocket & Kafka

Say hello  to kafEEne ! A blog series which will show Kafka and Java EE examples

This one is about Kafka + (Java EE) Websocket API. Code is on Github and you can refer to the README on how to get this up and running using Docker. Although the focus is on Websocket, here is a list of other Java EE specs which have been used – EJB, CDI & a bit of JSON-B (part of Java EE 8)

dashboard.jpg

Websocket based dashboard

Consumer

  • Kafka Consumer is a @Stateless EJB
  • Initialized in the @PostConstruct callback
  • The consumer logic is executed within a while loop controlled by a (atomic) boolean flag which can potentially be mutated by the container. It sends the event data in form of a custom Payload object using CDI Events
  • Consumer shutdown is exposed within a @PreDestroy method where it invokes the wakeup method to interrupt the consumer while loop
  • A simple (one time) EJB timer job triggers the Consumer process – this is just for simplicity. No harm in exposing the consumer startup through, say, a REST endpoint

See Consumer for more details

Websocket endpoint

A server side Websocket endpoint which is actually does very little to be honest. Client can call it with the interested topic in the URL itself

  • The @OnOpen callback makes sure that the Websocket Session is added and tracked and the topic is added to the Session properties (Map)
  • @OnClose callback removes the Session from the memory

Broadcasting

  • It is enabled by a CDI observer (@Observe) for the Payload object
  • Simply loops over all the Sessions and broadcasts the key-value pair (received from the Kafka topic) if it registered for that topic to begin with
  • It uses the JSON-B API to convert the Payload POJO to a JSON format

See KafkaWebsocketEndpoint for more details

Connected clients

  • A @Singleton EJB maintains a list of connected clients
  • Its provides a synchronized wrapper over the collection of Websocket Session objects

See Peers for more details

Websocket Client

A simple Javascript based front end. See index.html for more details. I am not a front end guy – please pardon me!

Producer application

It’s a bare bones Java based producer application which pushes random data to couple of Kafka topics

Further reading

Cheers!

Posted in Java EE | Tagged , , , , , , , | 1 Comment

Redis CDI example….

Here is an example of a CDI producer for Redis using the Jedis (Java) client for Redis. Check out the project README to get this up and running…. Below is a quick summary

Use of CDI Qualifier

Two types of CDI producers have been demonstrated

  1. non-qualified: no additional qualifiers for simply obtaining a standalone Jedis connection
  2. qualified: uses a custom CDI Qualifier (@FromJedisPool) to get the connection from the Jedis pool

Scoping

  1. @RequestScoped for pooled producer: the Jedis connection is returned to the pool after the method invocation is complete
  2. @DependentScoped for basic producer: the lifetime of the Jedis connection object depends on which component has injected it

Common stuff

  1. In both the cases, the producer (CDI) bean itself is @ApplicationScoped and the producers have been scoped differently
  2. An equivalent @Disposes method has been provided to perform clean up (i.e. close the connection or return it to the pool)

Further reading

Cheers!

Posted in Java EE, NoSQL | Tagged , , , | Leave a comment

Handling ‘state’ in Java WebSocket applications

By and large, there are two kinds of states in a WebSocket application

  • User/client specific: related to a connected user/Session e.g. user ID, list of subscriptions, last message received etc.
  • Global: state which is relevant across your application and something which all connected users/Sessions might be able to use

User specific state

This can be handled using getUserProperties method on the Session object – this exposes a Map which you can use to store anything (Object type) using a String type key

Global state

There are multiple options here as well. Please note that these are scoped to a specific Endpoint

  • getUserProperties in EndpointConfig – it exposes the same Map interface as the one in Session. Since the WebSocket runtime creates a single instance of an EndpointConfig object per Endpoint , it can be used a global state store

  • Another option is to encapsulate some of the common/global logic in a custom Configurator implementation which can be accessed & used within the endpoint logic

Further reading

Cheers!

Posted in Java, Java EE | Tagged , , | 2 Comments

Using CDI with Java EE Concurrency Utilities

This blog post explores usage of CDI along with Java EE Concurrency Utilities – specifically using CDI beans as managed tasks. Here is the sample application on Github

Lets begin with a quick overview

Java EE Concurrency Utilities provides APIs and constructs to manage concurrency within Java EE applications. Many of the Java EE components have specific concurrency semantics e.g. EJBs, JAX-RS resources, WebSocket endpoints etc. Writing components with custom concurrency properties was traditionally difficult since starting unmanaged threads in a Java EE container was forbidden i.e. one was not able to leverage Java SE concurrency libraries. With Concurrency Utilities, Java EE applications have access to Managed versions of the Java SE counterparts, namely,

  • ManagedExecutorSevice
  • ManagedScheduledExecutorSevice
  • ManagedThreadFactory

.. and a bunch of other APIs as well, but the above ones are the Java EE equivalent of the Java SE concurrency APIs

Tasks as CDI beans

Both ManagedExecutorSevice and ManagedScheduledExecutorService can accept tasks to execute (in a container managed thread pool) in the form of Runnable and Callable instances. The good thing is that these tasks can be CDI beans as well. Points worth noting are

  • these CDI beans can be injected into other components as well as inject other beans
  • the scope of the CDI beans which can be used as tasks as restricted to @ApplicationScoped and @Dependent (for details read section 2.3.2.1 of the specification)

Here is a summary of ..

.. what’s going on in the application. For more details, refer to the the README and explore the code

  • Tasks are POSTED via a REST interface and the client gets back a HTTP 202 (Accepted) in response along with a task id
  • the BackgroundTask (CDI bean) is executed in a background thread by the ManagedExecutorService  – it is injected (@Inject) and a different instance is created on every invocation since the CDI bean is marked @Dependent and the JAX-RS resource is created on each request by the client
  • the status is store in a @Singleton EJB (TaskStore) – this is injected in the BackgroundTask CDI bean
  • status of each task (in progress, failed, completed) can be tracked via a REST interface by querying against a task ID
  • one can also get the status of all tasks

Further reading

Cheers!

Posted in Java, Java EE | Tagged , , , , , , | 5 Comments

Trending Meetup groups with Redis and Java EE

This is an application which displays the currently trending Meetup groups based on their (live) RSVPs feed. It’s built using Java EE 7 (uses WebSocket client & server APIs, Singleton EJB timers and CDI events to wire things up) and Redis

To run

  • get the code
  • start Redis
  • change Redis connection details here and here
  • mvn clean install
  • deploy the WAR file in any Java EE 7 (or above) compliant container
  • access the application in your browser e.g. http://localhost:8080/meetup-trending/

You should see something like this – Score represents the frequency of the group occurence in the RSVPs (popularity)

meetup-trending-groups

 

Cheers!

Posted in Java, Java EE | Tagged , , , , , , , | Leave a comment

Github: WebSocket applications…

I recently pushed a couple of WebSocket samples to Github. Both are based around the notion of a chat service – a canonical WebSocket example! These were originally meant to serve as additional material to the Java WebSocket API Handbook in order for readers to understand API usage in practical and also get hands-on  by tinkering with the code

Cheers!

Posted in Java EE | Tagged , , , | Leave a comment

Quick tip: managing Stateful EJBs in WebSocket endpoints

@Stateful EJBs can be injected in WebSocket endpoints (supported by the WebSocket specification). There is an one-to-one association between the WebSocket client & endpoint (which is by default) as well as the injected Stateful EJB instance, which makes it an good candidate for storing client specific state. It offers advanced semantics as compared to simple java.util.Map interface exposed by getUserProperties method in javax.websocket.Session)

But, what happens to the EJB when ….

… the WebSocket session terminates (either from the client or server side) ?? The Stateful instance can still linger in memory before its destroyed

  • Their removal can be tuned by using the @StatefulTimeout annotation (but why wait ??), or
  • you can choose to passivate them by using passivationCapable=true with  @Stateful (but why passivate the EJB when the WebSocket connection itself is closed ?)

The Tip

Implement a @Remove annotated method in the Stateful EJB and call it from the @OnClose callback method ni your WebSocket server endpoint implementation. This will ensure that the  EJB is removed from the memory immediately rather than depending upon other factors

Further reading

Posted in Java, Java EE | Tagged , , , , , , | Leave a comment

WebSocket endpoint as Singleton EJB

By default …

… a WebSocket implementation creates a new (server) endpoint instance per client. In case you need a single instance, you can implement this using a custom ServerEndpointConfig.Configurator (by overriding the getEndpointInstance method)

The catch: you might have to sacrifice some of the (Java EE) platform related services like dependency injection and interceptors

More details here

Alternate solution ?

A similar behavior can be achieved by decorating the WebSocket endpoint with @Singleton

This approach has the caveat of not being a standard feature outlined by the spec (although injection of EJBs as well as interceptors support is clearly mentioned in the Java WebSocket spec Sec 7.1)

Concurrency semantics ?

In case of a @Singleton, all the clients will interact with the one-and-only server endpoint instance. Here is a quick summary of how the EJB as well as WebSocket threading semantics are applied

  • The Singleton bean default approach WRITE lock ensures single threaded access across all connected clients
  • If thread-safety is not a concern (e.g. in case where you do not deal with client specific data/state in your logic) and in case the single-threaded access model proves to be a bottleneck, override the default behavior by switching to a READ lock which allows concurrent threads to access the methods (unless of course a WRITE lock is not already in effect)

Note: The above mentioned semantics are with respect to ALL the WebSocket clients. From the point of view of a single client, the default strategy of one thread at a time, per endpoint instance per client continues to apply (more details here)

Further reading..

Cheers!

Posted in Java, Java EE | Tagged , , , , | 3 Comments