[net][linux]: add netlink implementation for Linux connection stat

feature/add_linux_netlink
shirou 5 years ago
parent c141152a7b
commit 6dcd7d464b

@ -321,29 +321,34 @@ var TCPStatuses = map[string]string{
}
type netConnectionKindType struct {
family uint32
sockType uint32
family uint8
sockType uint8
netlinkProto uint8
filename string
}
var kindTCP4 = netConnectionKindType{
family: syscall.AF_INET,
sockType: syscall.SOCK_STREAM,
netlinkProto: syscall.IPPROTO_TCP,
filename: "tcp",
}
var kindTCP6 = netConnectionKindType{
family: syscall.AF_INET6,
sockType: syscall.SOCK_STREAM,
netlinkProto: syscall.IPPROTO_TCP,
filename: "tcp6",
}
var kindUDP4 = netConnectionKindType{
family: syscall.AF_INET,
sockType: syscall.SOCK_DGRAM,
netlinkProto: syscall.IPPROTO_UDP,
filename: "udp",
}
var kindUDP6 = netConnectionKindType{
family: syscall.AF_INET6,
sockType: syscall.SOCK_DGRAM,
netlinkProto: syscall.IPPROTO_UDP,
filename: "udp6",
}
var kindUNIX = netConnectionKindType{
@ -372,8 +377,8 @@ type inodeMap struct {
type connTmp struct {
fd uint32
family uint32
sockType uint32
family uint8
sockType uint8
laddr Addr
raddr Addr
status string
@ -450,29 +455,6 @@ func ConnectionsPidMaxWithoutUidsWithContext(ctx context.Context, kind string, p
return connectionsPidMaxWithoutUidsWithContext(ctx, kind, pid, max, true)
}
func connectionsPidMaxWithoutUidsWithContext(ctx context.Context, kind string, pid int32, max int, skipUids bool) ([]ConnectionStat, error) {
tmap, ok := netConnectionKindMap[kind]
if !ok {
return nil, fmt.Errorf("invalid kind, %s", kind)
}
root := common.HostProc()
var err error
var inodes map[string][]inodeMap
if pid == 0 {
inodes, err = getProcInodesAll(root, max)
} else {
inodes, err = getProcInodes(root, pid, max)
if len(inodes) == 0 {
// no connection for the pid
return []ConnectionStat{}, nil
}
}
if err != nil {
return nil, fmt.Errorf("cound not get pid(s), %d: %s", pid, err)
}
return statsFromInodes(root, pid, tmap, inodes, skipUids)
}
func statsFromInodes(root string, pid int32, tmap []netConnectionKindType, inodes map[string][]inodeMap, skipUids bool) ([]ConnectionStat, error) {
dupCheckMap := make(map[string]struct{})
var ret []ConnectionStat
@ -507,8 +489,8 @@ func statsFromInodes(root string, pid int32, tmap []netConnectionKindType, inode
conn := ConnectionStat{
Fd: c.fd,
Family: c.family,
Type: c.sockType,
Family: uint32(c.family),
Type: uint32(c.sockType),
Laddr: c.laddr,
Raddr: c.raddr,
Status: c.status,
@ -690,7 +672,7 @@ func getProcInodesAll(root string, max int) (map[string][]inodeMap, error) {
// ex:
// "0500000A:0016" -> "10.0.0.5", 22
// "0085002452100113070057A13F025401:0035" -> "2400:8500:1301:1052:a157:7:154:23f", 53
func decodeAddress(family uint32, src string) (Addr, error) {
func decodeAddress(family uint8, src string) (Addr, error) {
t := strings.Split(src, ":")
if len(t) != 2 {
return Addr{}, fmt.Errorf("does not contain port, %s", src)
@ -857,7 +839,7 @@ func processUnix(file string, kind netConnectionKindType, inodes map[string][]in
ret = append(ret, connTmp{
fd: pair.fd,
family: kind.family,
sockType: uint32(st),
sockType: uint8(st),
laddr: Addr{
IP: path,
},

@ -146,7 +146,7 @@ func TestDecodeAddress(t *testing.T) {
if len(src) > 13 {
family = syscall.AF_INET6
}
addr, err := decodeAddress(uint32(family), src)
addr, err := decodeAddress(uint8(family), src)
if dst.Error {
assert.NotNil(err, src)
} else {

@ -0,0 +1,81 @@
// +build linux
// +build netlink
package net
import (
"context"
"fmt"
"sync"
"syscall"
"github.com/shirou/gopsutil/internal/common"
"github.com/shirou/gopsutil/net/netlink"
)
func connectionsPidMaxWithoutUidsWithContext(ctx context.Context, kind string, pid int32, max int, skipUids bool) ([]ConnectionStat, error) {
tmap, ok := netConnectionKindMap[kind]
if !ok {
return nil, fmt.Errorf("invalid kind, %s", kind)
}
root := common.HostProc()
var err error
var inodes map[string][]inodeMap
if pid == 0 {
return connectionsNetLink(tmap)
}
inodes, err = getProcInodes(root, pid, max)
if len(inodes) == 0 {
// no connection for the pid
return []ConnectionStat{}, nil
}
if err != nil {
return nil, fmt.Errorf("cound not get pid(s), %d: %s", pid, err)
}
return statsFromInodes(root, pid, tmap, inodes, skipUids)
}
func connectionsNetLink(kinds []netConnectionKindType) ([]ConnectionStat, error) {
var wait sync.WaitGroup
reply := make([][]ConnectionStat, len(kinds))
var retErrr error
for i, kind := range kinds {
if kind.family == syscall.AF_UNIX { // TODO: Unix Domain
continue
}
wait.Add(1)
go func(i int, k netConnectionKindType) {
defer wait.Done()
msgs, err := netlink.Connections(k.family, k.netlinkProto)
if err != nil {
retErrr = err
return
}
t := make([]ConnectionStat, len(msgs))
for i, msg := range msgs {
conn := ConnectionStat{
Family: uint32(msg.Family),
Type: uint32(k.sockType),
Laddr: Addr{msg.SrcIP().String(), uint32(msg.SrcPort())},
Raddr: Addr{msg.DstIP().String(), uint32(msg.DstPort())},
Status: netlink.TCPState(msg.State).String(),
Uids: []int32{int32(msg.UID)},
// Fd: diag.Inode,
// Pid: c.pid,
}
t[i] = conn
}
reply[i] = t
}(i, kind)
}
wait.Wait()
ret := make([]ConnectionStat, 0)
for i := range kinds {
ret = append(ret, reply[i]...)
}
return ret, retErrr
}

@ -0,0 +1,34 @@
// +build linux
// +build !netlink
package net
import (
"context"
"fmt"
"github.com/shirou/gopsutil/internal/common"
)
func connectionsPidMaxWithoutUidsWithContext(ctx context.Context, kind string, pid int32, max int, skipUids bool) ([]ConnectionStat, error) {
tmap, ok := netConnectionKindMap[kind]
if !ok {
return nil, fmt.Errorf("invalid kind, %s", kind)
}
root := common.HostProc()
var err error
var inodes map[string][]inodeMap
if pid == 0 {
inodes, err = getProcInodesAll(root, max)
} else {
inodes, err = getProcInodes(root, pid, max)
if len(inodes) == 0 {
// no connection for the pid
return []ConnectionStat{}, nil
}
}
if err != nil {
return nil, fmt.Errorf("cound not get pid(s), %d: %s", pid, err)
}
return statsFromInodes(root, pid, tmap, inodes, skipUids)
}

@ -196,10 +196,12 @@ func TestNetConnections(t *testing.T) {
if len(v) == 0 {
t.Errorf("could not get NetConnections: %v", v)
}
for _, vv := range v {
if vv.Family == 0 {
t.Errorf("invalid NetConnections: %v", vv)
}
t.Log(vv)
}
}

@ -0,0 +1,21 @@
// +build linux
// This file is copied from elastic/gosigar
// https://github.com/elastic/gosigar/tree/master/sys/linux
package netlink
import (
"encoding/binary"
"unsafe"
)
func GetEndian() binary.ByteOrder {
var i int32 = 0x1
v := (*[4]byte)(unsafe.Pointer(&i))
if v[0] == 0 {
return binary.BigEndian
} else {
return binary.LittleEndian
}
}

@ -0,0 +1,321 @@
// +build linux
// This file is copied and modified from elastic/gosigar
// https://github.com/elastic/gosigar/tree/master/sys/linux
//
// modified point:
// - delete NewInetDiagReq. gopsutil only support v2.
// - change connection state strings.
package netlink
import (
"bytes"
"encoding/binary"
"hash/fnv"
"io"
"net"
"os"
"syscall"
"unsafe"
"github.com/pkg/errors"
)
// Enums / Constants
const (
// AllTCPStates is a flag to request all sockets in any TCP state.
AllTCPStates = ^uint32(0)
// TCPDIAG_GETSOCK is the netlink message type for requesting TCP diag data.
// https://github.com/torvalds/linux/blob/v4.0/include/uapi/linux/inet_diag.h#L7
TCPDIAG_GETSOCK = 18
// SOCK_DIAG_BY_FAMILY is the netlink message type for requestion socket
// diag data by family. This is newer and can be used with inet_diag_req_v2.
// https://github.com/torvalds/linux/blob/v4.0/include/uapi/linux/sock_diag.h#L6
SOCK_DIAG_BY_FAMILY = 20
)
// TCPState represents the state of a TCP connection.
type TCPState uint8
// https://github.com/torvalds/linux/blob/5924bbecd0267d87c24110cbe2041b5075173a25/include/net/tcp_states.h#L16
const (
TCP_ESTABLISHED TCPState = iota + 1
TCP_SYN_SENT
TCP_SYN_RECV
TCP_FIN_WAIT1
TCP_FIN_WAIT2
TCP_TIME_WAIT
TCP_CLOSE
TCP_CLOSE_WAIT
TCP_LAST_ACK
TCP_LISTEN
TCP_CLOSING /* Now a valid state */
)
var tcpStateNames = map[TCPState]string{
TCP_ESTABLISHED: "ESTABLISHED",
TCP_SYN_SENT: "SYN_SENT",
TCP_SYN_RECV: "SYN_RECV",
TCP_FIN_WAIT1: "FIN_WAIT1",
TCP_FIN_WAIT2: "FIN_WAIT2",
TCP_TIME_WAIT: "TIME_WAIT",
TCP_CLOSE: "CLOSE",
TCP_CLOSE_WAIT: "CLOSE_WAIT",
TCP_LAST_ACK: "LAST_ACK",
TCP_LISTEN: "LISTEN",
TCP_CLOSING: "CLOSING",
}
func (s TCPState) String() string {
if state, found := tcpStateNames[s]; found {
return state
}
return "UNKNOWN"
}
// Extensions that can be used in the InetDiagReqV2 request to ask for
// additional data.
// https://github.com/torvalds/linux/blob/v4.0/include/uapi/linux/inet_diag.h#L103
const (
INET_DIAG_NONE = 0
INET_DIAG_MEMINFO = 1 << iota
INET_DIAG_INFO
INET_DIAG_VEGASINFO
INET_DIAG_CONG
INET_DIAG_TOS
INET_DIAG_TCLASS
INET_DIAG_SKMEMINFO
INET_DIAG_SHUTDOWN
INET_DIAG_DCTCPINFO
INET_DIAG_PROTOCOL /* response attribute only */
INET_DIAG_SKV6ONLY
INET_DIAG_LOCALS
INET_DIAG_PEERS
INET_DIAG_PAD
INET_DIAG_MARK
)
var (
byteOrder = GetEndian()
)
// NetlinkInetDiag sends the given netlink request parses the responses with the
// assumption that they are inet_diag_msgs. This will allocate a temporary
// buffer for reading from the socket whose size will be the length of a page
// (usually 32k). Use NetlinkInetDiagWithBuf if you want to provide your own
// buffer.
func NetlinkInetDiag(request syscall.NetlinkMessage) ([]*InetDiagMsg, error) {
return NetlinkInetDiagWithBuf(request, nil, nil)
}
// NetlinkInetDiagWithBuf sends the given netlink request parses the responses
// with the assumption that they are inet_diag_msgs. readBuf will be used to
// hold the raw data read from the socket. If the length is not large enough to
// hold the socket contents the data will be truncated. If readBuf is nil then a
// temporary buffer will be allocated for each invocation. The resp writer, if
// non-nil, will receive a copy of all bytes read (this is useful for
// debugging).
func NetlinkInetDiagWithBuf(request syscall.NetlinkMessage, readBuf []byte, resp io.Writer) ([]*InetDiagMsg, error) {
s, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW, syscall.NETLINK_INET_DIAG)
if err != nil {
return nil, err
}
defer syscall.Close(s)
lsa := &syscall.SockaddrNetlink{Family: syscall.AF_NETLINK}
if err := syscall.Sendto(s, serialize(request), 0, lsa); err != nil {
return nil, err
}
if len(readBuf) == 0 {
// Default size used in libnl.
readBuf = make([]byte, os.Getpagesize())
}
var inetDiagMsgs []*InetDiagMsg
done:
for {
buf := readBuf
nr, _, err := syscall.Recvfrom(s, buf, 0)
if err != nil {
return nil, err
}
if nr < syscall.NLMSG_HDRLEN {
return nil, syscall.EINVAL
}
buf = buf[:nr]
// Dump raw data for inspection purposes.
if resp != nil {
if _, err := resp.Write(buf); err != nil {
return nil, err
}
}
msgs, err := syscall.ParseNetlinkMessage(buf)
if err != nil {
return nil, err
}
for _, m := range msgs {
if m.Header.Type == syscall.NLMSG_DONE {
break done
}
if m.Header.Type == syscall.NLMSG_ERROR {
return nil, ParseNetlinkError(m.Data)
}
inetDiagMsg, err := ParseInetDiagMsg(m.Data)
if err != nil {
return nil, err
}
inetDiagMsgs = append(inetDiagMsgs, inetDiagMsg)
}
}
return inetDiagMsgs, nil
}
func serialize(msg syscall.NetlinkMessage) []byte {
msg.Header.Len = uint32(syscall.SizeofNlMsghdr + len(msg.Data))
b := make([]byte, msg.Header.Len)
byteOrder.PutUint32(b[0:4], msg.Header.Len)
byteOrder.PutUint16(b[4:6], msg.Header.Type)
byteOrder.PutUint16(b[6:8], msg.Header.Flags)
byteOrder.PutUint32(b[8:12], msg.Header.Seq)
byteOrder.PutUint32(b[12:16], msg.Header.Pid)
copy(b[16:], msg.Data)
return b
}
// V2 Request
var sizeofInetDiagReqV2 = int(unsafe.Sizeof(InetDiagReqV2{}))
// InetDiagReqV2 (inet_diag_req_v2) is used to request diagnostic data.
// https://github.com/torvalds/linux/blob/v4.0/include/uapi/linux/inet_diag.h#L37
type InetDiagReqV2 struct {
Family uint8
Protocol uint8
Ext uint8
Pad uint8
States uint32
ID InetDiagSockID
}
func (r InetDiagReqV2) toWireFormat() []byte {
buf := bytes.NewBuffer(make([]byte, sizeofInetDiagReqV2))
buf.Reset()
if err := binary.Write(buf, byteOrder, r); err != nil {
// This never returns an error.
panic(err)
}
return buf.Bytes()
}
// NewInetDiagReqV2 returns a new NetlinkMessage whose payload is an
// InetDiagReqV2. Callers should set their own sequence number in the returned
// message header.
func NewInetDiagReqV2(af uint8, proto uint8) syscall.NetlinkMessage {
hdr := syscall.NlMsghdr{
Type: uint16(SOCK_DIAG_BY_FAMILY),
Flags: uint16(syscall.NLM_F_DUMP | syscall.NLM_F_REQUEST),
Pid: uint32(0),
}
req := InetDiagReqV2{
Family: af,
Protocol: proto,
States: AllTCPStates,
}
return syscall.NetlinkMessage{Header: hdr, Data: req.toWireFormat()}
}
// Response messages.
// InetDiagMsg (inet_diag_msg) is the base info structure. It contains socket
// identity (addrs/ports/cookie) and the information shown by netstat.
// https://github.com/torvalds/linux/blob/v4.0/include/uapi/linux/inet_diag.h#L86
type InetDiagMsg struct {
Family uint8 // Address family.
State uint8 // TCP State
Timer uint8
Retrans uint8
ID InetDiagSockID
Expires uint32
RQueue uint32 // Recv-Q
WQueue uint32 // Send-Q
UID uint32 // Socket owner UID
Inode uint32 // Inode of socket.
}
// ParseInetDiagMsg parse an InetDiagMsg from a byte slice. It assumes the
// InetDiagMsg starts at the beginning of b. Invoke this method to parse the
// payload of a netlink response.
func ParseInetDiagMsg(b []byte) (*InetDiagMsg, error) {
r := bytes.NewReader(b)
inetDiagMsg := &InetDiagMsg{}
err := binary.Read(r, byteOrder, inetDiagMsg)
if err != nil {
return nil, errors.Wrap(err, "failed to unmarshal inet_diag_msg")
}
return inetDiagMsg, nil
}
// SrcPort returns the source (local) port.
func (m InetDiagMsg) SrcPort() int { return int(binary.BigEndian.Uint16(m.ID.SPort[:])) }
// DstPort returns the destination (remote) port.
func (m InetDiagMsg) DstPort() int { return int(binary.BigEndian.Uint16(m.ID.DPort[:])) }
// SrcIP returns the source (local) IP.
func (m InetDiagMsg) SrcIP() net.IP { return ip(m.ID.Src, m.Family) }
// DstIP returns the destination (remote) IP.
func (m InetDiagMsg) DstIP() net.IP { return ip(m.ID.Dst, m.Family) }
func (m InetDiagMsg) srcIPBytes() []byte { return ipBytes(m.ID.Src, m.Family) }
func (m InetDiagMsg) dstIPBytes() []byte { return ipBytes(m.ID.Dst, m.Family) }
func ip(data [16]byte, af uint8) net.IP {
if af == syscall.AF_INET {
return net.IPv4(data[0], data[1], data[2], data[3])
}
return net.IP(data[:])
}
func ipBytes(data [16]byte, af uint8) []byte {
if af == syscall.AF_INET {
return data[:4]
}
return data[:]
}
// FastHash returns a hash calculated using FNV-1 of the source and destination
// addresses.
func (m *InetDiagMsg) FastHash() uint64 {
// Hash using FNV-1 algorithm.
h := fnv.New64()
h.Write(m.srcIPBytes()) // Must trim non-zero garbage from ipv4 buffers.
h.Write(m.dstIPBytes())
h.Write(m.ID.SPort[:])
h.Write(m.ID.DPort[:])
return h.Sum64()
}
// InetDiagSockID (inet_diag_sockid) contains the socket identity.
// https://github.com/torvalds/linux/blob/v4.0/include/uapi/linux/inet_diag.h#L13
type InetDiagSockID struct {
SPort [2]byte // Source port (big-endian).
DPort [2]byte // Destination port (big-endian).
Src [16]byte // Source IP
Dst [16]byte // Destination IP
If uint32
Cookie [2]uint32
}

@ -0,0 +1,120 @@
// +build linux
// This file is copied and modified from elastic/gosigar
// https://github.com/elastic/gosigar/tree/master/sys/linux
//
// add Connections function
package netlink
import (
"errors"
)
func Connections(af uint8, proto uint8) ([]*InetDiagMsg, error) {
msg := NewInetDiagReqV2(af, proto)
return NetlinkInetDiag(msg)
}
// Netlink Error Code Handling
// ParseNetlinkError parses the errno from the data section of a
// syscall.NetlinkMessage. If netlinkData is less than 4 bytes an error
// describing the problem will be returned.
func ParseNetlinkError(netlinkData []byte) error {
if len(netlinkData) >= 4 {
errno := -GetEndian().Uint32(netlinkData[:4])
return NetlinkErrno(errno)
}
return errors.New("received netlink error (data too short to read errno)")
}
// NetlinkErrno represent the error code contained in a netlink message of
// type NLMSG_ERROR.
type NetlinkErrno uint32
// Netlink error codes.
const (
NLE_SUCCESS NetlinkErrno = iota
NLE_FAILURE
NLE_INTR
NLE_BAD_SOCK
NLE_AGAIN
NLE_NOMEM
NLE_EXIST
NLE_INVAL
NLE_RANGE
NLE_MSGSIZE
NLE_OPNOTSUPP
NLE_AF_NOSUPPORT
NLE_OBJ_NOTFOUND
NLE_NOATTR
NLE_MISSING_ATTR
NLE_AF_MISMATCH
NLE_SEQ_MISMATCH
NLE_MSG_OVERFLOW
NLE_MSG_TRUNC
NLE_NOADDR
NLE_SRCRT_NOSUPPORT
NLE_MSG_TOOSHORT
NLE_MSGTYPE_NOSUPPORT
NLE_OBJ_MISMATCH
NLE_NOCACHE
NLE_BUSY
NLE_PROTO_MISMATCH
NLE_NOACCESS
NLE_PERM
NLE_PKTLOC_FILE
NLE_PARSE_ERR
NLE_NODEV
NLE_IMMUTABLE
NLE_DUMP_INTR
NLE_ATTRSIZE
)
// https://github.com/thom311/libnl/blob/libnl3_2_28/lib/error.c
var netlinkErrorMsgs = map[NetlinkErrno]string{
NLE_SUCCESS: "Success",
NLE_FAILURE: "Unspecific failure",
NLE_INTR: "Interrupted system call",
NLE_BAD_SOCK: "Bad socket",
NLE_AGAIN: "Try again",
NLE_NOMEM: "Out of memory",
NLE_EXIST: "Object exists",
NLE_INVAL: "Invalid input data or parameter",
NLE_RANGE: "Input data out of range",
NLE_MSGSIZE: "Message size not sufficient",
NLE_OPNOTSUPP: "Operation not supported",
NLE_AF_NOSUPPORT: "Address family not supported",
NLE_OBJ_NOTFOUND: "Object not found",
NLE_NOATTR: "Attribute not available",
NLE_MISSING_ATTR: "Missing attribute",
NLE_AF_MISMATCH: "Address family mismatch",
NLE_SEQ_MISMATCH: "Message sequence number mismatch",
NLE_MSG_OVERFLOW: "Kernel reported message overflow",
NLE_MSG_TRUNC: "Kernel reported truncated message",
NLE_NOADDR: "Invalid address for specified address family",
NLE_SRCRT_NOSUPPORT: "Source based routing not supported",
NLE_MSG_TOOSHORT: "Netlink message is too short",
NLE_MSGTYPE_NOSUPPORT: "Netlink message type is not supported",
NLE_OBJ_MISMATCH: "Object type does not match cache",
NLE_NOCACHE: "Unknown or invalid cache type",
NLE_BUSY: "Object busy",
NLE_PROTO_MISMATCH: "Protocol mismatch",
NLE_NOACCESS: "No Access",
NLE_PERM: "Operation not permitted",
NLE_PKTLOC_FILE: "Unable to open packet location file",
NLE_PARSE_ERR: "Unable to parse object",
NLE_NODEV: "No such device",
NLE_IMMUTABLE: "Immutable attribute",
NLE_DUMP_INTR: "Dump inconsistency detected, interrupted",
NLE_ATTRSIZE: "Attribute max length exceeded",
}
func (e NetlinkErrno) Error() string {
if msg, found := netlinkErrorMsgs[e]; found {
return msg
}
return netlinkErrorMsgs[NLE_FAILURE]
}

@ -0,0 +1,30 @@
package netlink
// This file is copied from elastic/gosigar
// https://github.com/elastic/gosigar/tree/master/sys/linux
import "net"
// SocketID identifies a single socket.
type SocketID struct {
SourcePort uint16
DestinationPort uint16
Source net.IP
Destination net.IP
Interface uint32
Cookie [2]uint32
}
// Socket represents a netlink socket.
type Socket struct {
Family uint8
State uint8
Timer uint8
Retrans uint8
ID SocketID
Expires uint32
RQueue uint32
WQueue uint32
UID uint32
INode uint32
}
Loading…
Cancel
Save