TRY ME

Try Valo for free

We want to show you something amazing.

We'll send you a link to download a fully functional Valo copy to play with.



Great! Check your email and enjoy Valo



Apologies, we seem to be having a problem processing your input, please try again

API Quickstart

Valo is a hybrid real-time and historical analysis system for data streams, all wrapped up in a simple and easy to use API. To demonstrate this, we’ll build a small application that runs some analytics on incoming tweets talking about the internet of things.

To follow along at home, you’ll need to sign up for Twitter API keys at https://apps.twitter.com.

We’ll be talking to Valo using the REST API over HTTP, so we’ll need curl, Postman or a similar tool for making HTTP requests.

We’ll write the script in node.js although of course the Valo API is usable from any programming language. The following libraries using freely available :ref:’npm <https://docs.npmjs.com>’ will keep it nice and simple:

npm install twitter
npm install request-json

Run Valo

Simply extract the ZIP archive and run the bin\valo script. This will start up a node, automatically creating a single node cluster. You can tell when Valo is ready to go from the log:

16:13:38.268 INFO - All front ends have started

By default, the REST API is served over HTTP on port 8888 on localhost.

Create a stream

First up, we’ll need to create a stream that our tweets will flow through. Valo is a strongly-typed system so each stream must have a schema defined. More information about the form of schema files is here but for our example add the following to a file called schema.json:

{
    "schema": {
        "version": "0.0.1",
        "config": {},
        "topDef": {
            "type": "record",
            "properties": {
                "createdAt": { "type": "datetime" , "annotations": ["urn:itrs:default-timestamp"] },
                "text": { "type": "string" },
                "user": { "type": "string" }
            }
        }
    }
}

Valo has a rich type system but for this example we’re just using a timestamp and textual data. To create the stream, simple issue an HTTP PUT request to Valo:

curl -H "Content-Type: application/json" -X PUT --data @schema.json http://localhost:8888/streams/demo/twitter/tweets

We will use the URI above to refer to the stream in queries later on. You can check that the stream has been created successfully with a simple GET request:

curl http://localhost:8888/streams/demo/twitter/tweets

{"schema":{"version":"0.0.1","config":{},"topDef":{"type":"record","properties":{"createdAt":{"type":"datetime","annotations":["urn:itrs:default-timestamp"]},"text":{"type":"string"},"user":{"type":"string"},"timezone":{"type":"string"},"lang":{"type":"string"}}}}}

Streams are not persisted to storage by default. This is fine for real-time analytics but we’d like to do some historical queries too, so we configure the stream to be stored in a repository:

curl -H "Content-Type: application/json" -X PUT -d '{"name": "ssr"}' http://localhost:8888/streams/demo/twitter/tweets/repository

Publish to the stream

Now we’ll write a small script to use the Twitter Streaming API to receive tweets and send them into Valo. Publishing data to a stream is as simple as making an HTTP POST request to the stream we created earlier.

Write this script to tweeter.js.

var Twitter = require('twitter');
var request = require('request-json');

var twitterClient = new Twitter({
  consumer_key: '<replace with your keys>',
  consumer_secret: '<replace with your keys>',
  access_token_key: '<replace with your keys>',
  access_token_secret: '<replace with your keys>'
});

var valoClient = request.createClient('http://localhost:8888/');
var valoStream = "streams/demo/twitter/tweets";

var filter = {
  track: 'iot'
}

twitterClient.stream('statuses/filter', filter, function(stream) {
  stream.on('data', function(tweet) {
    var payload = {
      createdAt: new Date(tweet.created_at).toISOString(),
      text: tweet.text,
      user: tweet.user.screen_name
    }

    valoClient.post(valoStream, payload, function(err, res, body) {
      console.log("Posted to Valo: " + JSON.stringify(payload));
    });
  });
});

Run it using the command line, and you will see the data being sent to Valo:

node tweeter.js

Posted to Valo: {"createdAt":"2015-12-06T16:27:12.000Z","text":"RT @HPE_Software: New #HPE #Edgeline #IoT Systems &amp; #Aruba Sensors expand IoT capabilities @ the edge: https://t.co/SUwunOzisj [news] #HPED…","user":"Zetabytelab"}
Posted to Valo: {"createdAt":"2015-12-06T16:27:12.000Z","text":"RT @OpeniotvaultT: Factories are about to get smarter. The machines that make everything from our phones to our sa.. https://t.co/duKSEu1zu…","user":"P2Open"}
Posted to Valo: {"createdAt":"2015-12-06T16:27:19.000Z","text":"RT @dirfooddrink: RT paologlc Saltiamo qualche numero:direi Agricoltura 4,0 x le analogie con #Industry40 e #IoT,#InternetOfThings (… https…","user":"ilmionetwork"}

Query from the stream

Queries to Valo are expressed using a simple query language. We are going to submit a query to count the number of tweets per user in real-time:

from /streams/demo/twitter/tweets
where contains(text,'data')
select user, text, createdAt

Queries are submitted through session created from the execution API. First obtain a session:

curl -X POST http://localhost:8888/execution/demo/sessions

{"session":"/execution/demo/sessions/a6fe854e-9190-467b-bb40-827d842830b4"}

Next we will submit the query to the session. Write the following to query.json:

{
  "id": "test_query",
  "body": "from /streams/demo/twitter/tweets where contains(text,'data') select user, text, createdAt"
}

Now add it to the session. The query will not start immediately but returns an output channel that will return us results:

curl -H "Content-Type: application/json" -X PUT --data @query.json http://localhost:8888/execution/demo/sessions/a6fe854e-9190-467b-bb40-827d842830b4/queries

[
    {
        "id": "test_query",
        "dependencies": [],
        "query":{
            "state": "initialised",
            "outputs": [
                {
                    "type": "OUTPUT_CHANNEL",
                    "id": "af98acbb-722a-4c3d-aacb-0135c8c314eb",
                    "outputUri": "/output/demo/6daccb6739b02bacda2d4c37428263d3/5264726f-e17b-4bb4-bc44-f45be8fdb44e",
                    "outputType": "UNBOUNDED",
                    "schema": {
                        "version": "",
                        "config": {"key": []},
                        "topDef": {
                            "type":"record",
                            "properties": {
                                "text": { "type":"string" },
                                "createdAt": { "type":"datetime", "annotations": ["urn:itrs:default-timestamp"]},
                                "user": { "type": "string" }
                            }
                        }
                    }
                }
            ]
        }
    }
]

As mentioned above, Valo is strongly-typed throughout the analysis pipeline. This includes the results of queries, in this case the same schema we used to create the stream.

As this is a real-time query the results will continue infinitely, hence the UNBOUNDED output type. In another terminal window, connect to the output channel:

curl http://localhost:8888/output/valo/af98acbb-722a-4c3d-aacb-0135c8c314eb

: Letsa' Go!

This indicates that we have connect to the channel. The only step left is to start the query running:

curl -X PUT http://localhost:8888/execution/demo/sessions/a6fe854e-9190-467b-bb40-827d842830b4/queries/test_query/_start

You will then start to see results appear in the query:

data: {"type":"sos"}

data: {"type":"increment","items":[{"action":"add","data":{"user":"sreenath_s","numberOfTweets":1}}]}

data: {"type":"increment","items":[{"action":"add","data":{"user":"krishdtech","numberOfTweets":1}}]}

data: {"type":"increment","items":[{"action":"add","data":{"user":"punditas","numberOfTweets":1}}]}

data: {"type":"increment","items":[{"action":"add","data":{"user":"chimeraiot","numberOfTweets":1}}]}

Congratulations! You’ve just run your first real-time query.

Further steps

Read the API Concepts to get a feel for the various APIs available

Read the Query Language documentation to find out what operations and functions are available.

The beauty of Valo is that queries barely need to be modified when running against historical data compared to real-time. To try it, simply repeat the steps above adding the historical keyword: