TRY ME

Try Valo for free

We want to show you something amazing.

We'll send you a link to download a fully functional Valo copy to play with.



Great! Check your email and enjoy Valo



Apologies, we seem to be having a problem processing your input, please try again

Streams API

A stream URI uniquely identifies a stream of data and a schema declares the shape and format of the data. A stream can be optionally stored in a repository.

A stream URI has format /streams/<tenant>/<collection>/<name> where collection and name is defined by the user when creating the stream. If the collection does not exist it will be automatically created. The term system is a reserved word as it is used internally. Only GET operations are allowed on system streams.

Get the collections

GET /streams/:tenant/

Gets all the available stream collections.

Example request:

GET /streams/demo HTTP/1.1

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
    "instances": ["infrastructure", "applications", "tradeinfo"]
}
Status Codes:

Get the streams

GET /streams/:tenant/:collection

Gets all the registered stream definitions.

Example request:

GET /streams/demo/infrastructure HTTP/1.1

Example response:

HTTP/1.1 200 OK
Content-Type: application/json

{
    "streams": ["memory", "cpu", "network"]
}
Status Codes:

Put a stream

PUT /streams/:tenant/:collection/:streamName

Creates a new stream definition based on the specified schema.

By default the stream is created as a non-persisted stream. If the stream should be persisted a repository mapping should be created.

Note

/streams/:tenant/system is reserved for internal use.

Example request:

PUT /streams/demo/infrastructure/cpu HTTP/1.1

{
    "schema" : {
        "version": "1.0",
        "topDef" : {
            "type" : "record",
            "properties" : {
                "host"   : { "type" : "string" },
                "user"   : { "type" : "double" },
                "kernal" : { "type" : "double" },
                "timestamp" : { "type" : "datetime" },
                "os" : {
                   "type" : "record",
                   "properties" : {
                     "name"   : { "type" : "string" },
                     "version"   : { "type" : "string" }
                   }
                }
            }
        }
    }
}
Status Codes:

See the stream overview for details about the stream definition document.

Post data to a stream

POST /streams/:tenant/:collection/:streamName

Posts new stream events to an existing stream.

An empty 200 OK response indicates all payloads were written to the stream successfully. If a failure occurs, an object is returned indicated the index of the payloads that have failed in the original array and the reason for the failure.

Example request:

POST /streams/demo/infrastructure/cpu HTTP/1.1

[
    {
        "host"   : "ABC123",
        "user"   : 20,
        "kernal" : 30
    },
    {
        "host"   : "ABC156",
        "user"   : 30,
        "kernal" : 25
    }
]

Example response:

HTTP/1.1 200 OK

Example response when a failure occurs:

HTTP/1.1 500 Internal Error
Content-Type: application/json

{
    "1": {
        "msg": "Invalid stream configuration. [...]"
    },
    "56": {
        "msg": "Unable to store data in repository. [...]"
    }
}
Status Codes:

Get a stream

GET /streams/:tenant/:collection/:streamName

Gets the stream definition.

Example request:

GET /streams/demo/infrastructure/cpu HTTP/1.1

Example response:

HTTP/1.1 200 OK
Content-Type: application/json
Valo-Config-Version: Y2BgYOALzsxLz0kNS8zJ98tPSWWAAEYAvp3ZUxs=

{
    "schema" : {
        "version": "1.0",
        "topDef" : {
            "type" : "record",
            "properties" : {
                "host"   : { "type" : "string" },
                "user"   : { "type" : "double" },
                "kernel" : { "type" : "double" }
            }
        }
    }
}
Status Codes:

Put a taxonomy

PUT /streams/:tenant/:collection/:streamName/taxonomy

Sets the taxonomy definition.

Example request:

PUT /streams/demo/infrastructure/cpu/taxonomy HTTP/1.1

{
    "taxonomy" : {
      "nodes": [
        { "name" : "OS", "expression" : [ "os.name", "os.version" ] },
        { "name" : "date", "expression" : [ "year(timestamp)", "month(timestamp)", "day(timestamp)" ] }
      ]
    }
}
Status Codes:

Get a taxonomy

GET /streams/:tenant/:collection/:streamName/taxonomy

Gets the taxonomy definition.

Example request:

GET /streams/demo/infrastructure/cpu/taxonomy HTTP/1.1

Example request:

HTTP/1.1 200 OK
Content-Type: application/json
Valo-Config-Version: Y2BgYOALzsxLz0kNS8zJ98tPSWWAAEYAvp3ZUxs=

{
    "taxonomy" : {
       "nodes": [
          { "name" : "OS", "expression" : [ "os.name", "os.version" ] },
          { "name" : "date", "expression" : [ "year(timestamp)", "month(timestamp)", "day(timestamp)" ] }
       ]
    }
}
Status Codes:

Put tags

PUT /streams/:tenant/:collection/:streamName/tags

Sets the tags definition. The following restrictions apply to javascript functions:

  • The function must be called ‘tag’.
  • The return type must be an array of strings.

Example request:

PUT /streams/demo/infrastructure/cpu/tags HTTP/1.1

{
    "tags": [
      {
        "field" : "level",
        "lang" :  "javascript",
        "enabled" : true,
        "script" : "function tag(payload) {
                     if(payload.msg.contains('broken')) return ['error']
                   }"
      }
    ]
}
Status Codes:

Get tags

GET /streams/:tenant/:collection/:streamName/tags

Gets the tags definition.

Example request:

GET /streams/demo/infrastructure/cpu/tags HTTP/1.1

Example request:

HTTP/1.1 200 Ok
Content-Type: application/json

{
    "tags": [
      {
        "field" : "level",
        "lang" :  "javascript",
        "enabled" : true,
        "script" : "function tag(payload) {
                     if(payload.msg.contains('broken')) return ['error']
                   }"
      }
    ]
}
Status Codes:

Execute the tag function

POST /streams/:tenant/:collection/:streamName/tags

Applies the tagging functions to the posted payload and echo’s the modified payload. No stream data is modified.

This is useful for testing the tagging functions.

Example request:

POST /streams/demo/infrastructure/cpu/tags HTTP/1.1

{
    "msg"   : "Something is broken"
}

Example response:

HTTP/1.1 200 OK

{
    "msg"   : "Something is broken",
    "level" : ["error"]
}
Status Codes:

Put the repository mapping

PUT /streams/:tenant/:collection/:streamName/repository

Creates a repository mapping for the stream. The mapping has the following format:

{
  "name" : "<repo name>",
  "config" : "<repo specific configuration (if any)"
}

Where name refers to a registered repository and config is a valid repository config. The following repositories are supported;

name config
ssr config
tsr None as yet

Example request:

PUT /streams/demo/infrastructure/cpu/repository HTTP/1.1

{
    "name"   : "ssr",
    "config" : {
        "defaultStringAnalyzer" : "StandardAnalyzer"
    }
}
Status Codes:

Get the repository mapping

GET /streams/:tenant/:collection/:streamName/repository

Gets the repository mapping for the stream.

Example request:

GET /streams/demo/infrastructure/cpu/repository HTTP/1.1

Example response:

HTTP/1.1 200 Ok
Content-Type: application/json

{
    "name"   : "ssr",
    "config" : {
        "defaultStringAnalyzer" : "StandardAnalyzer"
    }
}
Status Codes:

Put the distribution policy

PUT /streams/:tenant/:collection/:streamName/policy

Specifies a distribution policy for the stream.

The policy has several formats, please refer to Distribution Policy for a full discussion.

Example request:

PUT /streams/demo/infrastructure/cpu/policy HTTP/1.1

{
    "replicas": 3,
    "ring_size": 128,
    "hash_payload": { "bytes": 128 }
}
Status Codes:

Get the distribution policy

GET /streams/:tenant/:collection/:streamName/policy

Gets the distribution policy for the stream.

Example request:

GET /streams/demo/infrastructure/cpu/policy HTTP/1.1

Example response:

HTTP/1.1 200 Ok
Content-Type: application/json

{
    "replicas": 3,
    "ring_size": 128,
    "hash_payload": { "bytes": 128 }
}
Status Codes: