Guest User

Untitled

a guest
Jan 18th, 2018
113
0
Never
Not a member of Pastebin yet? Sign Up, it unlocks many cool features!
text 6.46 KB | None | 0 0
  1. package main
  2.  
  3. import (
  4. "bufio"
  5. "encoding/binary"
  6. "flag"
  7. // "fmt"
  8. "io"
  9. "log"
  10. "net"
  11. "os"
  12. "path"
  13. "sync"
  14. "time"
  15.  
  16. "github.com/golang/protobuf/proto"
  17. "github.com/midbel/mud"
  18. "github.com/midbel/mud/hadock"
  19. "github.com/midbel/toml"
  20.  
  21. "cmd/hdk2udp/internal/pvalue"
  22. "cmd/hdk2udp/internal/yamcs"
  23. )
  24.  
  25. type conn struct {
  26. net.Conn
  27. addr string
  28. }
  29.  
  30. func (c *conn) Write(bs []byte) (int, error) {
  31. n, err := c.Conn.Write(bs)
  32. if err == nil {
  33. return n, err
  34. }
  35. if err, ok := err.(net.Error); ok && !err.Temporary() {
  36. s, err := net.Dial("udp", c.addr)
  37. if err != nil {
  38. return len(bs), nil
  39. }
  40. c.Conn.Close()
  41. c.Conn = s
  42. }
  43. return len(bs), nil
  44. }
  45.  
  46. type group struct {
  47. Addr string `toml:"addr"`
  48. Ifi string `toml:"interface"`
  49. }
  50.  
  51. type channel struct {
  52. Link string `toml:"address"`
  53. Name string `toml:"namespace"`
  54. Groups []group `toml:"groups"`
  55. }
  56.  
  57. func (c channel) Run() error {
  58. w, err := connect(c.Link)
  59. if err != nil {
  60. return err
  61. }
  62. q := make(chan *hadock.Message, 100)
  63. defer close(q)
  64. go sendTo(q, w, c.Name)
  65.  
  66. var wg sync.WaitGroup
  67. for _, g := range c.Groups {
  68. c, err := subscribe(g.Addr, g.Ifi)
  69. if err != nil {
  70. return err
  71. }
  72. wg.Add(1)
  73. go func(c net.Conn) {
  74. defer func() {
  75. c.Close()
  76. wg.Done()
  77. }()
  78. r := bufio.NewReader(c)
  79. for {
  80. m, err := decodeMessage(r)
  81. if err != nil {
  82. log.Println(err)
  83. continue
  84. }
  85. q <- m
  86. }
  87. }(c)
  88. }
  89. wg.Wait()
  90. return nil
  91. }
  92.  
  93. func main() {
  94. flag.Parse()
  95. f, err := os.Open(flag.Arg(0))
  96. if err != nil {
  97. log.Fatalln(err)
  98. }
  99. defer f.Close()
  100.  
  101. c := struct {
  102. Channels []channel `toml:"channel"`
  103. }{}
  104. if err := toml.NewDecoder(f).Decode(&c); err != nil {
  105. log.Fatalln(err)
  106. }
  107. var wg sync.WaitGroup
  108. for _, c := range c.Channels {
  109. go func(c channel) {
  110. defer wg.Done()
  111. if err := c.Run(); err != nil {
  112. log.Println(err)
  113. }
  114. }(c)
  115. }
  116. wg.Wait()
  117. }
  118.  
  119. // func mainBis() {
  120. // flag.Parse()
  121. // f, err := os.Open(flag.Arg(0))
  122. // if err != nil {
  123. // log.Fatalln(err)
  124. // }
  125. // defer f.Close()
  126. //
  127. // cfg := struct {
  128. // Link string `toml:"datalink"`
  129. // Name string `toml:"namespace"`
  130. // Groups []string `toml:"groups"`
  131. // }{}
  132. // if err := toml.NewDecoder(f).Decode(&cfg); err != nil {
  133. // log.Fatalln(err)
  134. // }
  135. // w, err := connect(cfg.Link)
  136. // if err != nil {
  137. // log.Fatalln(err)
  138. // }
  139. //
  140. // q := make(chan *hadock.Message, 100)
  141. // defer close(q)
  142. // go sendTo(q, w, cfg.Name)
  143. //
  144. // var wg sync.WaitGroup
  145. // for _, g := range cfg.Groups {
  146. // c, err := subscribe(g, "")
  147. // if err != nil {
  148. // log.Println(err)
  149. // continue
  150. // }
  151. // wg.Add(1)
  152. // go func(c net.Conn) {
  153. // defer func() {
  154. // c.Close()
  155. // wg.Done()
  156. // }()
  157. // r := bufio.NewReader(c)
  158. // for {
  159. // m, err := decodeMessage(r)
  160. // if err != nil {
  161. // log.Println(err)
  162. // continue
  163. // }
  164. // q <- m
  165. // }
  166. // }(c)
  167. // }
  168. // wg.Wait()
  169. // }
  170.  
  171. func sendTo(queue <-chan *hadock.Message, w net.Conn, n string) {
  172. defer w.Close()
  173.  
  174. var i uint16
  175. for m := range queue {
  176. i++
  177. bs, err := marshal(m, n, int32(i))
  178. if err != nil {
  179. log.Println(err)
  180. continue
  181. }
  182. if _, err := w.Write(bs); err != nil {
  183. log.Println(err)
  184. }
  185. }
  186. }
  187.  
  188. func marshal(m *hadock.Message, n string, i int32) ([]byte, error) {
  189. w := time.Now().UTC().Unix()
  190. pd := &pvalue.ParameterData{
  191. Group: &n,
  192. GenerationTime: &w,
  193. SeqNum: &i,
  194. }
  195. t := m.Timestamp
  196. pd.Parameter = append(pd.Parameter, marshalParameter(n, "origin", w, t, m.Origin))
  197. pd.Parameter = append(pd.Parameter, marshalParameter(n, "sequence", w, t, m.Sequence))
  198. pd.Parameter = append(pd.Parameter, marshalParameter(n, "instance", w, t, m.Instance))
  199. pd.Parameter = append(pd.Parameter, marshalParameter(n, "channel", w, t, m.Channel))
  200. pd.Parameter = append(pd.Parameter, marshalParameter(n, "realtime", w, t, m.Realtime))
  201. pd.Parameter = append(pd.Parameter, marshalParameter(n, "count", w, t, m.Count))
  202. pd.Parameter = append(pd.Parameter, marshalParameter(n, "elapsed", w, t, m.Elapsed))
  203. pd.Parameter = append(pd.Parameter, marshalParameter(n, "timestamp", w, t, m.Timestamp))
  204. pd.Parameter = append(pd.Parameter, marshalParameter(n, "reference", w, t, m.Reference))
  205.  
  206. return proto.Marshal(pd)
  207. }
  208.  
  209. func marshalParameter(ns, n string, w, t int64, v interface{}) *pvalue.ParameterValue {
  210. var x yamcs.Value
  211.  
  212. switch v := v.(type) {
  213. case string:
  214. t := yamcs.Value_STRING
  215. x = yamcs.Value{
  216. Type: &t,
  217. StringValue: &v,
  218. }
  219. case uint32:
  220. t := yamcs.Value_UINT32
  221. x = yamcs.Value{
  222. Type: &t,
  223. Uint32Value: &v,
  224. }
  225. case mud.Channel:
  226. t := yamcs.Value_SINT32
  227. d := int32(v)
  228. x = yamcs.Value{
  229. Type: &t,
  230. Sint32Value: &d,
  231. }
  232. case int32:
  233. t := yamcs.Value_SINT32
  234. x = yamcs.Value{
  235. Type: &t,
  236. Sint32Value: &v,
  237. }
  238. case time.Duration:
  239. t := yamcs.Value_SINT64
  240. d := int64(v)
  241. x = yamcs.Value{
  242. Type: &t,
  243. Sint64Value: &d,
  244. }
  245. case int64:
  246. t := yamcs.Value_SINT64
  247. x = yamcs.Value{
  248. Type: &t,
  249. Sint64Value: &v,
  250. }
  251. case bool:
  252. t := yamcs.Value_BOOLEAN
  253. x = yamcs.Value{
  254. Type: &t,
  255. BooleanValue: &v,
  256. }
  257. }
  258. //ns = fmt.Sprintf("/%s/%s", ns, n)
  259. ns = path.Clean(path.Join(ns, n))
  260. status := pvalue.AcquisitionStatus_ACQUIRED
  261. uid := yamcs.NamedObjectId{
  262. Name: &n,
  263. Namespace: &ns,
  264. }
  265. return &pvalue.ParameterValue{
  266. Id: &uid,
  267. EngValue: &x,
  268. AcquisitionStatus: &status,
  269. GenerationTime: &w,
  270. AcquisitionTime: &t,
  271. }
  272. }
  273.  
  274. func connect(a string) (net.Conn, error) {
  275. c, err := net.Dial("udp", a)
  276. if err != nil {
  277. return nil, err
  278. }
  279. return &conn{c, a}, nil
  280. }
  281.  
  282. func subscribe(a, i string) (net.Conn, error) {
  283. addr, err := net.ResolveUDPAddr("udp", a)
  284. if err != nil {
  285. return nil, err
  286. }
  287. var ifi *net.Interface
  288. if i, err := net.InterfaceByName(i); err == nil {
  289. ifi = i
  290. }
  291. return net.ListenMulticastUDP("udp", ifi, addr)
  292. }
  293.  
  294. func decodeMessage(r io.Reader) (*hadock.Message, error) {
  295. var m hadock.Message
  296.  
  297. m.Origin, _ = readString(r)
  298. binary.Read(r, binary.BigEndian, &m.Sequence)
  299. binary.Read(r, binary.BigEndian, &m.Instance)
  300. binary.Read(r, binary.BigEndian, &m.Channel)
  301. binary.Read(r, binary.BigEndian, &m.Realtime)
  302. binary.Read(r, binary.BigEndian, &m.Count)
  303. binary.Read(r, binary.BigEndian, &m.Elapsed)
  304. binary.Read(r, binary.BigEndian, &m.Timestamp)
  305. m.Reference, _ = readString(r)
  306.  
  307. return &m, nil
  308. }
  309.  
  310. func readString(r io.Reader) (string, error) {
  311. var z uint16
  312. if err := binary.Read(r, binary.BigEndian, &z); err != nil {
  313. return "", err
  314. }
  315. bs := make([]byte, int(z))
  316. if _, err := r.Read(bs); err != nil {
  317. return "", err
  318. }
  319. return string(bs), nil
  320. }
Add Comment
Please, Sign In to add comment