Introduction to Vert.x

Toby Crawley

toby@tcrawley.org

What is Vert.x?

  • Asynchronous polyglot application platform
  • JVM based
  • Similar to Node.js, but not a clone
  • Thread safe (pretend you're the only one)

Polyglot

  • Java
  • JRuby
  • Jython
  • JavaScript/CoffeeScript (Rhino/DynJS (in-progress))
  • Groovy
  • Scala (in-progress)
  • Clojure (in-progress)
  • All with a similar API

Provides

  • Async TCP/HTTP/HTTPS/Websocket clients
  • Async TCP/HTTP/HTTPS/Websocket servers
  • Distributed event bus
  • Timers

Why Async?

  • What?
  • Back to work!
  • Thousands of concurrent connections
  • Speed

415,000 requests/sec

Java Example

HttpClient client = vertx.createHttpClient()
    .setHost("foo.com")
    .setPort(8080);

// Create a request with a response handler    
client.get("/some-path/",
           new Handler<HttpClientResponse>() {
               public void handle(HttpClientResponse resp) {
                   log.info("Got a response: " + resp.statusCode());
               }
           })
    .end();

Ruby Example

client = Vertx::HttpClient.new
client.host = 'foo.com'
client.port = 8080

# Create a request with a response handler    
client.get('/some-path/') { |resp|
  puts "got response #{resp.status_code}" 
}.end

Clojure Example

(require '[vertx.http :as http])

(-> (http/client {:host "foo.com"
                  :port 8080})
    (http/request :GET "/some-path/"
      (fn [resp]
        (println "got response" (http/status-code resp))))
    http/end)

Verticles

  • The unit of deployment
  • Many in one (any language)

Verticles

Start with a verticle:

vertx run my_app.js -conf cfg.json -instances 3

Deploy one programmatically:

vertx.deploy_verticle('my_app.clj') do |err, deployment_id|
  puts 'deploy successful!' unless err
end

Event/Reactor Loop

event-loop.png

Event/Reactor Loop(s)

  • Defaults to one/processor
  • Verticle tied to one loop (so one thread)
  • Thread safety

DON'T BLOCK THE EVENT LOOP

  • Thread.sleep()
  • Object.wait()
  • CountDownLatch.await() or any other blocking operating from java.util.concurrent
  • while(true) {puts "King of the world!"}
  • Long-lived computationally intensive operations
  • Blocking operations that might take some time to complete (e.g. DB query)

Worker Verticles

  • Can block
  • Uses thread pool
  • Verticle still called by one thread at a time

Modules

  • The unit of distribution
  • Reusable, shareable artifacts (zip files)
  • Can contain verticles
  • Can depend on other modules

Timers

  • Do something in the future
  • One-shot & periodic

Timers

(require '[vertx.core :as v])

(v/timer 5000
  (println "I'm 5 seconds in the future!"))

(v/periodic 2000
  (println "I SHOUT EVERY 2 SECONDS!!!"))

(println "I'm printed before either of those yahoos")

Eventbus

  • Message passing between components
  • Scalars & JSON
  • Inter/intra verticle/cluster nodes
  • Point-to-point
    • Request/respond
  • Broadcast

Javascript Example

var eb = vertx.eventBus;

eb.registerHandler('global.notifications', 
                   function(message) {
                       console.log('Message rcvd: ' + message);
                   });

eb.registerHandler('some.address', 
                   function(message, replyFn) {
                       console.log('Message rcvd: ' + message);
                       replyFn('Roger that');
                   });

// publish to all handlers
eb.publish("global.notifications", {some: "value"});

// send to one handler and handle the reply
eb.send("some.address", "ahoyhoy",
       function(message) {
           console.log("Reply: " + message);
       });

Clojure Example

(require '[vertx.eventbus :as eb])

(eb/register-handler "global.notifications"
  (fn [m]
    (println "Message rcvd:" m)))

;; A replying handler
(eb/register-handler "some.address"
  (fn [m]
    (println "Message rcvd:" m)
    (eb/reply "Roger that")))

;; publish to all handlers
(eb/publish "global.notifications" {:some "value"})

;; send to one handler and handle the reply
(eb/send "some.adress" "ahoyhoy"
         (partial println "Reply:"))

Clustering

  • Distributed event bus
  • Efficient distribution
vertx run my_app.py -cluster &
vertx run my_app.py -cluster

To the browser

  • Browser is a peer in the eventbus
  • SockJS
  • Live demo at the end

SharedData

  • Share maps or sets between verticles
  • Only store immutable things

SharedData

hash = Vertx::SharedData.get_hash('demo.myhash')
hash['some-key'] = 'some-value'

And then, in a different verticle:

hash = Vertx::SharedData.get_hash('demo.myhash')
puts "value of some-key is #{hash['some-key']}"

"Callback Hell"

hell.jpg http://www.flickr.com/photos/googly/8138508

"Callback Hell"

HttpClient client = vertx.createHttpClient()
    .setHost("foo.com")
    .setPort(8080);

client.get("/some-path/",
  new Handler<HttpClientResponse>() {
    public void handle(HttpClientResponse resp) {
      resp.bodyHandler(new Handler<Buffer>() {
        public void handle(Buffer buf) {
          vertx.eventBus().send("some.worker", buf,
            new Handler<Message<String>>() {
              public void handle(Message<String> m) {
                if ("ok".equals(m.body())) {
                  client.post("/a-ok/", 
                    new Handler<HttpClientResponse>() {
                      public void handle(HttpClientResponse resp) {                 
                        //do more!
                      }
                    }).end();
                }
              }
           });
        }
      });
    }
  }).end();

"Callback Hell"

  • Don't use Java :)
  • Use RxJava

"Callback Hell"

HttpClient client = vertx.createHttpClient()
    .setHost("foo.com")
    .setPort(8080);

RxHttpClient rxclient = new RxHttpClient(client);
RxEventBus rxbus = new RxEventBus(vertx.eventBus());

rxclient.get("/some-path/")
  .mapMany(downloadBody())
  .subscribe(new Action1<Buffer>() {
    public void call (Buffer buf) {
      rxbus.send("some.worker", buf)
        .mapMany(new Func1<RxMessage<String>>, Observable<RxMessage<String>>>() {
          public Observable<RxMessage<String>> call(RxMessage<String> m) {
            if ("ok".equals(m.body())) {
              rxclient.post("/a-ok/")
                .subscribe(new Action1<HttpClientResponse>() {
                   public void call (HttpClientResponse resp) {
                     //do more!
                   }
                 });
            }
          }
        });
    }
  });

Drawbacks

  • D.B.T.E.L.
  • Callback Hades
  • Standard packaging systems
  • Lack of frameworks

Live Demo

Resources