Not a member of Pastebin yet?
Sign Up,
it unlocks many cool features!
- package main
- import (
- "bufio"
- "encoding/binary"
- "flag"
- // "fmt"
- "io"
- "log"
- "net"
- "os"
- "path"
- "sync"
- "time"
- "github.com/golang/protobuf/proto"
- "github.com/midbel/mud"
- "github.com/midbel/mud/hadock"
- "github.com/midbel/toml"
- "cmd/hdk2udp/internal/pvalue"
- "cmd/hdk2udp/internal/yamcs"
- )
- type conn struct {
- net.Conn
- addr string
- }
- func (c *conn) Write(bs []byte) (int, error) {
- n, err := c.Conn.Write(bs)
- if err == nil {
- return n, err
- }
- if err, ok := err.(net.Error); ok && !err.Temporary() {
- s, err := net.Dial("udp", c.addr)
- if err != nil {
- return len(bs), nil
- }
- c.Conn.Close()
- c.Conn = s
- }
- return len(bs), nil
- }
- type group struct {
- Addr string `toml:"addr"`
- Ifi string `toml:"interface"`
- }
- type channel struct {
- Link string `toml:"address"`
- Name string `toml:"namespace"`
- Groups []group `toml:"groups"`
- }
- func (c channel) Run() error {
- w, err := connect(c.Link)
- if err != nil {
- return err
- }
- q := make(chan *hadock.Message, 100)
- defer close(q)
- go sendTo(q, w, c.Name)
- var wg sync.WaitGroup
- for _, g := range c.Groups {
- c, err := subscribe(g.Addr, g.Ifi)
- if err != nil {
- return err
- }
- wg.Add(1)
- go func(c net.Conn) {
- defer func() {
- c.Close()
- wg.Done()
- }()
- r := bufio.NewReader(c)
- for {
- m, err := decodeMessage(r)
- if err != nil {
- log.Println(err)
- continue
- }
- q <- m
- }
- }(c)
- }
- wg.Wait()
- return nil
- }
- func main() {
- flag.Parse()
- f, err := os.Open(flag.Arg(0))
- if err != nil {
- log.Fatalln(err)
- }
- defer f.Close()
- c := struct {
- Channels []channel `toml:"channel"`
- }{}
- if err := toml.NewDecoder(f).Decode(&c); err != nil {
- log.Fatalln(err)
- }
- var wg sync.WaitGroup
- for _, c := range c.Channels {
- go func(c channel) {
- defer wg.Done()
- if err := c.Run(); err != nil {
- log.Println(err)
- }
- }(c)
- }
- wg.Wait()
- }
- // func mainBis() {
- // flag.Parse()
- // f, err := os.Open(flag.Arg(0))
- // if err != nil {
- // log.Fatalln(err)
- // }
- // defer f.Close()
- //
- // cfg := struct {
- // Link string `toml:"datalink"`
- // Name string `toml:"namespace"`
- // Groups []string `toml:"groups"`
- // }{}
- // if err := toml.NewDecoder(f).Decode(&cfg); err != nil {
- // log.Fatalln(err)
- // }
- // w, err := connect(cfg.Link)
- // if err != nil {
- // log.Fatalln(err)
- // }
- //
- // q := make(chan *hadock.Message, 100)
- // defer close(q)
- // go sendTo(q, w, cfg.Name)
- //
- // var wg sync.WaitGroup
- // for _, g := range cfg.Groups {
- // c, err := subscribe(g, "")
- // if err != nil {
- // log.Println(err)
- // continue
- // }
- // wg.Add(1)
- // go func(c net.Conn) {
- // defer func() {
- // c.Close()
- // wg.Done()
- // }()
- // r := bufio.NewReader(c)
- // for {
- // m, err := decodeMessage(r)
- // if err != nil {
- // log.Println(err)
- // continue
- // }
- // q <- m
- // }
- // }(c)
- // }
- // wg.Wait()
- // }
- func sendTo(queue <-chan *hadock.Message, w net.Conn, n string) {
- defer w.Close()
- var i uint16
- for m := range queue {
- i++
- bs, err := marshal(m, n, int32(i))
- if err != nil {
- log.Println(err)
- continue
- }
- if _, err := w.Write(bs); err != nil {
- log.Println(err)
- }
- }
- }
- func marshal(m *hadock.Message, n string, i int32) ([]byte, error) {
- w := time.Now().UTC().Unix()
- pd := &pvalue.ParameterData{
- Group: &n,
- GenerationTime: &w,
- SeqNum: &i,
- }
- t := m.Timestamp
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "origin", w, t, m.Origin))
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "sequence", w, t, m.Sequence))
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "instance", w, t, m.Instance))
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "channel", w, t, m.Channel))
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "realtime", w, t, m.Realtime))
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "count", w, t, m.Count))
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "elapsed", w, t, m.Elapsed))
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "timestamp", w, t, m.Timestamp))
- pd.Parameter = append(pd.Parameter, marshalParameter(n, "reference", w, t, m.Reference))
- return proto.Marshal(pd)
- }
- func marshalParameter(ns, n string, w, t int64, v interface{}) *pvalue.ParameterValue {
- var x yamcs.Value
- switch v := v.(type) {
- case string:
- t := yamcs.Value_STRING
- x = yamcs.Value{
- Type: &t,
- StringValue: &v,
- }
- case uint32:
- t := yamcs.Value_UINT32
- x = yamcs.Value{
- Type: &t,
- Uint32Value: &v,
- }
- case mud.Channel:
- t := yamcs.Value_SINT32
- d := int32(v)
- x = yamcs.Value{
- Type: &t,
- Sint32Value: &d,
- }
- case int32:
- t := yamcs.Value_SINT32
- x = yamcs.Value{
- Type: &t,
- Sint32Value: &v,
- }
- case time.Duration:
- t := yamcs.Value_SINT64
- d := int64(v)
- x = yamcs.Value{
- Type: &t,
- Sint64Value: &d,
- }
- case int64:
- t := yamcs.Value_SINT64
- x = yamcs.Value{
- Type: &t,
- Sint64Value: &v,
- }
- case bool:
- t := yamcs.Value_BOOLEAN
- x = yamcs.Value{
- Type: &t,
- BooleanValue: &v,
- }
- }
- //ns = fmt.Sprintf("/%s/%s", ns, n)
- ns = path.Clean(path.Join(ns, n))
- status := pvalue.AcquisitionStatus_ACQUIRED
- uid := yamcs.NamedObjectId{
- Name: &n,
- Namespace: &ns,
- }
- return &pvalue.ParameterValue{
- Id: &uid,
- EngValue: &x,
- AcquisitionStatus: &status,
- GenerationTime: &w,
- AcquisitionTime: &t,
- }
- }
- func connect(a string) (net.Conn, error) {
- c, err := net.Dial("udp", a)
- if err != nil {
- return nil, err
- }
- return &conn{c, a}, nil
- }
- func subscribe(a, i string) (net.Conn, error) {
- addr, err := net.ResolveUDPAddr("udp", a)
- if err != nil {
- return nil, err
- }
- var ifi *net.Interface
- if i, err := net.InterfaceByName(i); err == nil {
- ifi = i
- }
- return net.ListenMulticastUDP("udp", ifi, addr)
- }
- func decodeMessage(r io.Reader) (*hadock.Message, error) {
- var m hadock.Message
- m.Origin, _ = readString(r)
- binary.Read(r, binary.BigEndian, &m.Sequence)
- binary.Read(r, binary.BigEndian, &m.Instance)
- binary.Read(r, binary.BigEndian, &m.Channel)
- binary.Read(r, binary.BigEndian, &m.Realtime)
- binary.Read(r, binary.BigEndian, &m.Count)
- binary.Read(r, binary.BigEndian, &m.Elapsed)
- binary.Read(r, binary.BigEndian, &m.Timestamp)
- m.Reference, _ = readString(r)
- return &m, nil
- }
- func readString(r io.Reader) (string, error) {
- var z uint16
- if err := binary.Read(r, binary.BigEndian, &z); err != nil {
- return "", err
- }
- bs := make([]byte, int(z))
- if _, err := r.Read(bs); err != nil {
- return "", err
- }
- return string(bs), nil
- }
Add Comment
Please, Sign In to add comment