pw.io.logstash
write(table, endpoint, n_retries=0, retry_policy=<pathway.io.http._common.RetryPolicy object>, connect_timeout_ms=None, request_timeout_ms=None, *, name=None, sort_by=None)
sourceSends the stream of updates from the table to HTTP input https://www.elastic.co/guide/en/logstash/current/plugins-inputs-http.html of Logstash. The data is sent in the format of flat JSON objects, with two extra fields for time and diff.
- Parameters
- table (
Table
) – table to be tracked; - endpoint (
str
) – Logstash endpoint, accepting entries; - n_retries (
int
) – number of retries in case of failure; - retry_policy (
RetryPolicy
) – policy of delays or backoffs for the retries; - connect_timeout_ms (
int
|None
) – connection timeout, specified in milliseconds. In cas it’s None, no restrictions on connection duration will be applied; - request_timeout_ms (
int
|None
) – request timeout, specified in milliseconds. In case it’s None, no restrictions on request duration will be applied. - name (
str
|None
) – A unique name for the connector. If provided, this name will be used in logs and monitoring dashboards. - sort_by (
Optional
[Iterable
[ColumnReference
]]) – If specified, the output will be sorted in ascending order based on the values of the given columns within each minibatch. When multiple columns are provided, the corresponding value tuples will be compared lexicographically.
- table (
Example:
Suppose that we need to send the stream of updates to locally installed Logstash. For example, you can use docker-elk https://github.com/deviantony/docker-elk repository in order to get the ELK stack up and running at your local machine in a few minutes.
If Logstash stack is installed, you need to configure the input pipeline. The simplest possible way to do this, is to add the following lines in the input plugins list:
http {
port => 8012
}
The port is specified for the sake of example and can be changed. Further, we will use 8012 for clarity.
Now, with the pipeline configured, you can stream the changed into Logstash as simple as:
pw.io.logstash.write(table, "http://localhost:8012")