Abaco
Abaco computes formulas output from a stream of metric values as soon as all needed input values are collected.
In a real world scenario values are coming asynchronously, delayed and out of orders. Abaco may manage values referring to different times.
Getting Started
Installation:
julia> using Pkg; Pkg.add("Abaco") Usage:
using Abaco
# Initialize abaco context with a time_window of 60 seconds and handle
# input values with timestamp ts up to 4 contiguous spans.
abaco = abaco_init(interval=60, ages=4) do ts, node, formula_name, value, inputs
@info "[$ts][$node] function $formula_name=$value"
end
# Add desired outputs in terms of inputs variables x, y, z, v, w
outputs = ["xysum = x + y", "rsigma = x * exp(y-1)", "wsum = (x*w + z*v)"]
for formula_def in outputs
formula(abaco, formula_def)
end
# Start receiving some inputs values
# normally ts is the UTC timestamp from epoch in seconds.
# but for semplicity assume time start from zero.
# the node AG101 sends the x value at timestamp 0.
ts = 0
node = "AG101"
ingest(abaco, ts, node, "x", 10)
# Time flows and about 1 minute later ...
# the node CE987 sends the y value at timestamp 65.
ts = 65
node = "CE987"
ingest(abaco, ts, node, "y", 10)
# Time flows and more than 1 minute later ...
# Finally the node AG101 sends the y value calculated at timestamp 0.
# At this instant the formulas that depends on x and y are computable
# for the node AG101 at timestamp 0.
ts = 0
node = "AG101"
ingest(abaco, ts, node, "y", 20)
[ Info: [0][AG101] function xysum=30
[ Info: [0][AG101] function rsigma=1.7848230096318724e9
# Now arrives the variable x from CE987 that make some formulas computables.
# Note that x timestamp is 65, y timestamp is 101
# and the formulas timestamp is 60: the START_TIME of the span.
ts = 101
node = "CE987"
ingest(abaco, ts, node, "x", 10)
[ Info: [60][CE987] function xysum=20
[ Info: [60][CE987] function rsigma=81030.83927575384
In the formula callback the first argument may be a user defined object, like a Socket a Channel or whatsoever communication endpoint.
For example:
sock = connect(3001)
abaco = abaco_init(handle=sock, interval=900) do sock, ts, node, formula_name, value, inputs
@info "ts [$ts]: [$node] $formula_name = $value"
msg = JSON3.write(Dict(
"node" => node,
"age" => ts,
"formula" => formula_name,
"value" => value))
write(sock, msg*"\n")
endBasic concepts walkthrough
A formula is named math expression defined by a string.
A node is a domain of values. Each node has a unique name identifier and it may be tagged.
Also a formula may be tagged: the tag binds a formula with a subset on nodes identified by the same tag.
# name expr
julia> formula(abaco, "my_formula", "x + y")
# tag
julia> formula(abaco, "my_formula", "x + y", "my_tag")Ingested data has a name, a value, a timestamp and is associated with a node.
julia> ingest(abaco, # abaco instance
ts, # timestamp
"node_unique_name", # node name
"x", # metric name
100) # valueValues may also be ingested using a Dict{String,Any} object with the following rule:
ne and ts are reserved dictionary keyword for node name and timestamp whereas the others entries are metrics values.
# A record with a single metric
julia> x_value = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"x" => 1.5 # metric name => metric value
)
julia> y_value = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"y" => 8.5
)
# A record with a batch of metrics:
julia> xyz_values = Dict(
"ne" => "my_network_element",
"ts" => 1642605647,
"x" => 1.5,
"y" => 25,
"z" => 999
)
julia> ingest(abaco, x_value)
julia> ingest(abaco, xyz_values)Metrics names are the names of the variables used by Abaco formulas.
A node may be seen as a data source or a data fusion namespace: all variables that belong to a node are the inputs considered by the formulas tagged with the same value of the node tag.
As soon as all inputs variables are collected and belong to the same time window the formula result is calculated.
Using the above example as soon as both x and y are collected formula my_formula is evaluated and onresult default callback is triggered.
The default callback print the result summary to the console.
julia> ingest(abaco, x_metric)
julia> ingest(abaco, y_metric)
my_formula(ts:1642605647, ne:my_network_element) = 10.0A node may have a parent node: parent-child relationships and tags are powerful tools that enables aggregation and statistical functions:
# name tag
julia> aggregator = node(abaco, "my_region", "region")
# parent name tag
julia> node(abaco, aggregator, "city_1", "city")
julia> node(abaco, aggregator, "city_2", "city")
# tag name aggregator expr
julia> formula("region", "my_formula", "mean(city.x)")time span
A time span includes all the timestamps in a time interval:
span = { t ∈ N | START_INTERVAL <= t < END_TIME }
Timestamps t are integer values with second granularity.
For example suppose that a data collection system uses a 15 minutes span interval: in this case an hour is divided into 4 intervals and from ten to eleven of some (omitted) day you have:
- span1 = { t ∈ [10:00:00, 10:15:00) }
- span2 = { t ∈ [10:15:00, 10:30:00) }
- span3 = { t ∈ [10:30:00, 10:45:00) }
- span4 = { t ∈ [10:45:00, 11:00:00) }
By convention the span interval is identified by its START_TIME.
The formula value computed from inputs with timestamps included into the span [START_TIME, END_TIME) has timestamp equal to START_TIME.
The width of the span interval is an abaco setting, user-defined at startup.
ages
The number of ages defines how many consecutive time spans are managed.
For example, for a time span of 15 minutes, set ages to 4 if your network devices may send data with a maximum delay of an hour. A received value marked with a timestamp distant 4 or more spans from the latest span is discarded.
The number of ages is an abaco setting, user-defined at startup.
This is the minimal background theory, the below example should help to clarify the Abaco mechanics.
API
Abaco.ContextAbaco.EvalErrorAbaco.SnapAbaco.SnapsSettingAbaco.ValueNotFoundAbaco.WrongFormulaAbaco.abaco_initAbaco.add_formulasAbaco.dependentsAbaco.formulaAbaco.formulaAbaco.get_valuesAbaco.getsnapAbaco.ingestAbaco.ingestAbaco.ingest!Abaco.last_pointAbaco.last_valueAbaco.lastspanAbaco.nowtsAbaco.snap_addAbaco.span
Abaco.Context — TypeThe abaco registry.
Abaco.EvalError — TypeFormula evaluation failure.
[ingest] throws EvalError when a runtime formula evaluation fails, for example for a wrong numbers of method args:
formula(abaco, "div(x,y,z")
ingest(abaco, ts, ne, Dict("x"=>10, "y"=>1, "z"=1))Abaco.Snap — TypeMaintains the state of the abaco.
Before adding formulas and values an abaco MonoContext must be initialized by abaco_init.
Abaco.SnapsSetting — TypeThe settings of snapshots.
Before adding formulas and values the SnapsSetting must be initialized by abaco_init.
Abaco.ValueNotFound — TypeAttempt to get a value with an invalid index.
Abaco.WrongFormula — TypeWrong formula definition.
formula throws WrongFormula when a formula is malformed, for example:
formula(abaco, "myformula = x + ")
Abaco.abaco_init — Methodabaco_init(onresult; handle=nothing, interval::Int=900, ages::Int=4, emitone=true)::ContextInitialize the abaco context:
onresult: function callback that gets called each time a formula value is computed.handle: user defined object. If handle is defined it is the first argument ofonresult, default tonothing.interval: the span interval in seconds, default to 900 seconds (15 minutes). If interval is equal to -1 there is just one infinite time span.ages: the number of active rops managed by the abaco. Default to 4. Ifintervalis equal to -1agesis not applicable because it loses meaning.emitone: iftrueemits for each time span at most 1 formula value, otherwise emits a new result at every new inputs. Defaut totrue
Example 1: defining onresult callback that uses of an handle object.
# the handle object is a socket
sock = connect(3001)
function onresult(handle, ts, ne, formula_name, value, inputs)
# build a pkt message from ts, ne, ...
pkt = ...
write(sock, pkt)
endExample 2: defining onresult callback that doesn't use an handle object.
abaco = abaco_init(onresult, handle=sock)
function onresult(ts, ne, formula_name, value, inputs)
@info "[$ts][$ne] function $fname=$value"
end
abaco = abaco_init(onresult)Abaco.add_formulas — MethodAbaco.dependents — Methoddependents(abaco::Context, tag::String, var::String)Returns the list of expressions that depends on var.
Abaco.formula — Methodformula(setting::SnapsSetting, name, expression)Add the formula name defined by expression: a mathematical expression like x + y*w.
Abaco.formula — Methodformula(setting::SnapsSetting, formula_def::String)Add a formula, with formula_def formatted as "formula_name = expression", where expression is a mathematical expression, like x + y*w.
Abaco.get_values — Methodget_values(abaco, ne::String, var::String)::Dict{Int,Float64}Returns the ordered by time sequence of var values for ne node.
The returned values dictionary is ordered by descending time, most recent value first. The number of entries are at most equal to the value of ages.
Abaco.getsnap — Methodgetsnap(abaco::Context, ts, ne)Returns the ne node snapshot relative to timestamp ts.
Abaco.ingest! — Methodingest!(abaco, payload)Adds the input variables included in the payload dictionary.
The Dict msg must contains the keys ts and ne and a numbers of other keys managed as input variables.
This function modifies the payload dictionary: ne and ts keys are popped out.
payload = Dict(
"ts" => nowts(),
"ne" => "trento.castello",
"x" => 23.2,
"y" => 100
)
ingest!(abaco, ts, ne, payload)Abaco.ingest — Methodingest(abaco, ts, ne, values)Adds the input variables include in the dictionary values.
# now timestamp
ts = nowts()
# short name of network node
ne = "trento.castello"
values = Dict(
"x" => 23.2,
"y" => 100
)
ingest(abaco, ts, ne, values)Abaco.ingest — Methodingest(abaco, ts::Int, ne::String, var::String, val::Real)Adds the input variable var with value val.
ts: timestamp with seconds resolutionne: scope namevar: variable nameval: variable value
Abaco.last_point — Methodlast_point(abaco, ne::String, var::String)::Union{Nothing, Missing, Tuple{Int,Float64}}Returns the most recent in time value for the ne node metric var as a tuple (time, value).
If the node ne is unknown returns nothing and if there are not values for metric var returns missing.
Abaco.last_value — Methodlast_value(abaco, ne::String, var::String)::Union{Nothing, Missing, Float64}Returns the most recent in time value for the ne node metric var
If the node ne is unknown returns nothing and if there are not values for metric var returns missing.
Abaco.lastspan — Functionlastspan(interval::Int64=900)::Int64Returns the epoch start time of the last span. The last span is the nearest in the present time interval that satisfies the condition: span.endtime < now.
Abaco.nowts — Functionnowts()the current timestamp in seconds from epoch
Abaco.snap_add — Methodsnap_add(snap::Snap, ne, var::String, val::Real)Adds the variable value of ne node to the snap snapshot.
Abaco.span — Functionspan(ts::Int64, interval::Int64=900)Returns the start_time of the time interval that contains ts.