Github 바로가기


Writes

iunfluxdb go client는 writing 방법으로 non-blocking방식과 blocking 방식을 지원한다.

Non-blocking

package main

import (
    "fmt"
    "math/rand"
    "time"
    
    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    client := influxdb2.NewClientWithOptions("http://localhost:8086", "${my-token}",
        influxdb2.DefaultOptions().SetBatchSize(20))
    ctx := context.Background()

    bucketsAPI := client.BucketsAPI()
    org, err := client.OrganizationAPI().FindOrganizationByName(ctx, "mobigen")
    if err != nil {
        fmt.Printf("ERROR. Cannot find organization")
    }

    bucket, err := bucketsAPI.CreateBucketWithName(ctx, org, "rand-buck", domain.RetentionRule{EverySeconds: 3600 * 12})
    if err != nil {
        fmt.Printf("Error. Cannot create bucket")
    }

    // Get non-blocking write client
    writeAPI := client.WriteAPI(org.Name, bucket.Name)
    // Read and log errors
    errorsCh := writeAPI.Errors()
    go func(){
        for err := range errorsCh {
            fmt.Printf("write error: %s\n", err.Error())
        }
    }()

    // write some points
    for i := 0; i <100; i++ {
        // create point
        p := influxdb2.NewPoint(
            "rand-buck", 
            map[string]string{
                "contID":   fmt.Sprintf("contID_%v", i),
                "contName": fmt.Sprintf("contName_%v", i),
                "vendor":   "mobigen",
            },
            map[string]interface{}{
                "utime": rand.Float64(),
                "stime": rand.Float64(),
                "cutime": rand.Float64(),
                "cstime": rand.Float64(),
                "rxByte": rand.Float64(),
                "rxPacket": rand.Float64(),
                "txByte": rand.Float64(),
                "txPacket": rand.Float64(),
                "vmsize": rand.Float64(),
                "vmrss": rand.Float64(),
                "rssfile": rand.Float64(),
            },
            time.Now())
        // write asynchronously
        writeAPI.WritePoint(p)
    }
    // Force all unwritten data to be sent
    writeAPI.Flush()
    // Ensures background processes finishes
    client.Close()
}

주요 함수

NewPoint 함수는 형식에 맞추어 데이터를 쓰면 된다.

func NewPoint(
	measurement string,
	tags map[string]string,
	fields map[string]interface{},
	ts time.Time,
)

Errors() 메소드는 data를 write 할 때 발생한 error를 읽어서 채널에 리턴해주는데, 비동기 방식에서만 사용 가능하다. (그렇지 않으면 이 메소드가 write 프로세스를 block하기 때문)

// Read and log errors
errorsCh := writeAPI.Errors()
go func(){
    for err := range errorsCh {
        fmt.Printf("write error: %s\n", err.Error())
    }
}()


Blocking

Blocking 방식 writing은 batch size를 명시하지 않고 set of points 단위로 생성된다.

package main

import (
    "context"
    "fmt"
    "math/rand"
    "time"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient("http://localhost:8086", "my-token")
    // Get blocking write client
    writeAPI := client.WriteAPIBlocking("my-org","my-bucket")
    // write some points
    for i := 0; i <100; i++ {
        // create data point
        p := influxdb2.NewPoint(
            "system",
            map[string]string{
                "id":       fmt.Sprintf("rack_%v", i%10),
                "vendor":   "AWS",
                "hostname": fmt.Sprintf("host_%v", i%100),
            },
            map[string]interface{}{
                "temperature": rand.Float64() * 80.0,
                "disk_free":   rand.Float64() * 1000.0,
                "disk_total":  (i/10 + 1) * 1000000,
                "mem_total":   (i/100 + 1) * 10000000,
                "mem_free":    rand.Uint64(),
            },
            time.Now())
        // write synchronously
        err := writeAPI.WritePoint(context.Background(), p)
        if err != nil {
            panic(err)
        }
    }
    // Ensures background processes finishes
    client.Close()
}



Query

package main

import (
    "context"
    "fmt"

    "github.com/influxdata/influxdb-client-go/v2"
)

func main() {
    org := "mobigen"
    token := "${token}"
    url := "http://localhost:8086"

    // Create a new client using an InfluxDB server base URL and an authentication token
    client := influxdb2.NewClient(url, token)
    // Get query client
    queryAPI := client.QueryAPI(org)
    // get QueryTableResult
    result, err := queryAPI.Query(context.Background(), `from(bucket:"rand-buck")
	|> range(start:-8h)
	|> filter(fn:(r) =>
		r._measurement == "rand-buck" and
		r._field == "utime" or r._field == "stime")
    |> pivot(rowKey:["_time"], columnKey:["_field"], valueColumn: "_value")
    |> map(fn: (r) => ({ r with _value: r.utime + r.stime}))
	|> yield(name: "_results")`)

    if err == nil {
        // Iterate over query response
        for result.Next() {
            // Access data
            fmt.Printf("Time: %v\n", result.Record().Time())
            fmt.Printf("ContainerName: %v   |   ", result.Record().ValueByKey("contName"))
            fmt.Printf("utime + stime: %v\n", result.Record().Value())
        }
        // check for an error
        if result.Err() != nil {
            fmt.Printf("query parsing error: %s\n", result.Err().Error())
        }
    } else {
        fmt.Printf("ERROR. Cannot serve qeury result\n")
    }
    // Ensures background processes finishes
    client.Close()
}

출력 결과

[root@node01 hb]# ./influx-goclient
Time: 2021-03-24 01:46:13.295213661 +0000 UTC
ContainerName: name0   |   utime + stime: 1.545169376024632
Time: 2021-03-24 01:46:13.295373905 +0000 UTC
ContainerName: name1   |   utime + stime: 1.0279038335724717
Time: 2021-03-24 01:46:13.295725902 +0000 UTC
ContainerName: name10   |   utime + stime: 1.0135963801679202
Time: 2021-03-24 01:46:13.295754915 +0000 UTC
ContainerName: name11   |   utime + stime: 0.6264307459330203
Time: 2021-03-24 01:46:13.295776496 +0000 UTC
ContainerName: name12   |   utime + stime: 1.5148849208782624
Time: 2021-03-24 01:46:13.29579993 +0000 UTC
ContainerName: name13   |   utime + stime: 0.11503004905526187


Result Data 접근

result.Record().${아래 요소}


함수 Parameter Output type 설명
Field   string output table의 field 값 리턴
Measurement   string output table의 measurement 값 리턴
Start, Stop, Time   time.Time 해당 데이터의 조회 시작/조회 끝/입력 시간
Value   interface{} Field의 Value값 리턴,
_value column의 값이 존재하지 않을 시 nil 리턴
Values   map[string]interface{} 기본 output table 전체 값 리턴
ValueByKey Key값(string) interface{} Values()의 리턴값에서 indexing


Field       m func() string 
Measurement m func() string
Start       m func() time.Time
Stop        m func() time.Time
String      m func() string
Table       m func() int
Time        m func() time.Time
Value       m func() interface{}
Values      m func() map[string]interface{}
ValueByKey  m func(key string) interface{}