blob: ffb6c94761716ba2ad567e57031eae9b9c3713a3 [file] [log] [blame]
Serge Bazanski6b649f82021-05-24 15:09:25 +02001package kubenat
2
3import (
4 "bufio"
5 "bytes"
6 "context"
7 "fmt"
8 "io/ioutil"
9 "net"
10 "strconv"
11 "strings"
12
13 "github.com/golang/glog"
14)
15
16// translationReq is a request passed to the translationWorker.
17type translationReq struct {
18 t *Tuple4
19 res chan *translationResp
20}
21
22// translationResp is a response from the translationWorker, sent over the res
23// channel in a translationReq.
24type translationResp struct {
25 localIP net.IP
26 localPort uint16
27}
28
29// reply sends a reply to the given translationReq based on a conntrackEntry,
30// sending nil if the entry is nil.
31func (r *translationReq) reply(e *conntrackEntry) {
32 if e == nil {
33 r.res <- nil
34 return
35 }
36 localPort, err := strconv.ParseUint(e.request["sport"], 10, 16)
37 if err != nil {
38 r.res <- nil
39 return
40 }
41 r.res <- &translationResp{
42 localIP: net.ParseIP(e.request["src"]),
43 localPort: uint16(localPort),
44 }
45}
46
47// translate performs a translationReq/translationResp exchange under a context
48// that can be used to time out the query.
49func (r *Resolver) translate(ctx context.Context, t *Tuple4) (*translationResp, error) {
50 resC := make(chan *translationResp, 1)
51 r.translationC <- &translationReq{
52 t: t,
53 res: resC,
54 }
55
56 select {
57 case <-ctx.Done():
58 return nil, ctx.Err()
59 case res := <-resC:
60 return res, nil
61 }
62}
63
64// conntrackEntry is an entry parsed from /proc/net/nf_conntrack. The format is
65// not well documented, and the best resource I could find is:
66// https://stackoverflow.com/questions/16034698/details-of-proc-net-ip-conntrack-and-proc-net-nf-conntrack
67type conntrackEntry struct {
68 // networkProtocol is currently always "ipv4".
69 networkProtocol string
70 // transmissionProtocol is currently "tcp" or "udp".
71 transmissionProtocol string
72 invalidateTimeout int64
73
74 state string
75
76 // request key-value pairs. For NAT, these are entries relating to the
77 // connection as seen as the 'inside' of the NAT, eg. the pod-originated
78 // connection.
79 request map[string]string
80 // response key-value parirs. For NAT, these are entries relating to the
81 // connection as seen by the 'outside' of the NAT, eg. the internet.
82 response map[string]string
83 tags map[string]bool
84}
85
86// conntrackParseEntry parses a line from /proc/net/nf_conntrack into a conntrackEntry.
87func conntrackParseEntry(line string) (*conntrackEntry, error) {
88 entry := conntrackEntry{
89 request: make(map[string]string),
90 response: make(map[string]string),
91 tags: make(map[string]bool),
92 }
93
94 fields := strings.Fields(line)
95 if len(fields) < 5 {
96 // This should never happen unless the file format drastically
97 // changed. Don't bother to parse the rest, error early, and let
98 // someone debug this.
99 return nil, fmt.Errorf("invalid field count: %v", fields)
100 }
101 switch fields[0] {
102 case "ipv4":
103 if fields[1] != "2" {
104 return nil, fmt.Errorf("ipv4 with proto number %q, wanted 2", fields[1])
105 }
106 // TODO(q3k): support IPv6 when we get it on prod.
107 default:
108 return nil, nil
109 }
110 entry.networkProtocol = fields[0]
111
112 rest := fields[5:]
113 switch fields[2] {
114 case "tcp":
115 if fields[3] != "6" {
116 return nil, fmt.Errorf("tcp with proto number %q, wanted 6", fields[3])
117 }
118 if len(fields) < 6 {
119 return nil, fmt.Errorf("tcp with missing state field")
120 }
121 entry.state = fields[5]
122 rest = fields[6:]
123 case "udp":
124 if fields[3] != "17" {
125 return nil, fmt.Errorf("udp with proto number %q, wanted 17", fields[3])
126 }
127 default:
128 return nil, nil
129 }
130 entry.transmissionProtocol = fields[2]
131
132 invalidateTimeout, err := strconv.ParseInt(fields[4], 10, 64)
133 if err != nil {
134 return nil, fmt.Errorf("unparseable timeout %q", fields[4])
135 }
136 entry.invalidateTimeout = invalidateTimeout
137
138 for _, el := range rest {
139 parts := strings.Split(el, "=")
140 switch len(parts) {
141 case 1:
142 // This is a tag.
143 tag := parts[0]
144 // Ensure the tag starts and ends with [] (eg. [ASSURED].
145 if !strings.HasPrefix(tag, "[") || !strings.HasSuffix(tag, "]") {
146 continue
147 }
148 // Strip [ and ].
149 tag = tag[1:]
150 tag = tag[:len(tag)-1]
151 if _, ok := entry.tags[tag]; ok {
152 return nil, fmt.Errorf("repeated tag %q", tag)
153 }
154 entry.tags[tag] = true
155 case 2:
156 // This is a k/v field.
157 k := parts[0]
158 v := parts[1]
159 if _, ok := entry.request[k]; ok {
160 if _, ok := entry.response[k]; ok {
161 return nil, fmt.Errorf("field %q encountered more than twice", k)
162 } else {
163 entry.response[k] = v
164 }
165 } else {
166 entry.request[k] = v
167 }
168 default:
169 return nil, fmt.Errorf("unparseable column %q", el)
170 }
171 }
172 return &entry, nil
173
174}
175
176// conntrackParse parses the contents of a /proc/net/nf_conntrack file into
177// multiple entries. If the majority of the entries could not be parsed, an
178// error is returned.
179func conntrackParse(data []byte) ([]conntrackEntry, error) {
180 buf := bytes.NewBuffer(data)
181 scanner := bufio.NewScanner(buf)
182 var res []conntrackEntry
183 var errors []error
184 for scanner.Scan() {
185 line := strings.TrimSpace(scanner.Text())
186 if line == "" {
187 continue
188 }
189
190 entry, err := conntrackParseEntry(line)
191 if err != nil {
192 glog.Errorf("Error while parsing %q: %v", line, err)
193 errors = append(errors, err)
194 } else if entry != nil {
195 res = append(res, *entry)
196 }
197 }
198
199 if len(errors) == 0 || len(errors) < len(res) {
200 return res, nil
201 } else {
202 return nil, fmt.Errorf("encountered too many errors during conntrack parse, check logs; first error: %w", errors[0])
203 }
204}
205
206// contrackIndex is an index into a list of conntrackEntries. It allows lookup
207// by request/response k/v pairs.
208type conntrackIndex struct {
209 entries []conntrackEntry
210 // byRequest is a map from key to value to list of indixes into entries.
211 byRequest map[string]map[string][]int
212 // byResponse is a map from key to value to list of indixes into entries.
213 byResponse map[string]map[string][]int
214}
215
216// buildIndex builds a conntrackIndex from a list of conntrackEntries.
217func buildIndex(entries []conntrackEntry) *conntrackIndex {
218 ix := conntrackIndex{
219 entries: entries,
220 byRequest: make(map[string]map[string][]int),
221 byResponse: make(map[string]map[string][]int),
222 }
223 for i, entry := range ix.entries {
224 for k, v := range entry.request {
225 if _, ok := ix.byRequest[k]; !ok {
226 ix.byRequest[k] = make(map[string][]int)
227 }
228 ix.byRequest[k][v] = append(ix.byRequest[k][v], i)
229 }
230 for k, v := range entry.response {
231 if _, ok := ix.byResponse[k]; !ok {
232 ix.byResponse[k] = make(map[string][]int)
233 }
234 ix.byResponse[k][v] = append(ix.byResponse[k][v], i)
235 }
236 }
237 return &ix
238}
239
240// getByRequest returns conntrackEntries that match a given k/v pair in their
241// request fields.
242func (c *conntrackIndex) getByRequest(k, v string) []*conntrackEntry {
243 m, ok := c.byRequest[k]
244 if !ok {
245 return nil
246 }
247 ixes, ok := m[v]
248 if !ok {
249 return nil
250 }
251 res := make([]*conntrackEntry, len(ixes))
252 for i, ix := range ixes {
253 res[i] = &c.entries[ix]
254 }
255 return res
256}
257
258// getByResponse returns conntrackEntries that match a given k/v pair in their
259// response fields.
260func (c *conntrackIndex) getByResponse(k, v string) []*conntrackEntry {
261 m, ok := c.byResponse[k]
262 if !ok {
263 return nil
264 }
265 ixes, ok := m[v]
266 if !ok {
267 return nil
268 }
269 res := make([]*conntrackEntry, len(ixes))
270 for i, ix := range ixes {
271 res[i] = &c.entries[ix]
272 }
273 return res
274}
275
276// find returns a conntrackEntry corresponding to a TCP connection defined on
277// the 'outside' of the NAT by a 4-tuple, or nil if no such connection is
278// found.
279func (c *conntrackIndex) find(t *Tuple4) *conntrackEntry {
280 // TODO(q3k): support IPv6
281 if t.RemoteIP.To4() == nil || t.LocalIP.To4() == nil {
282 return nil
283 }
284 entries := c.getByResponse("src", t.RemoteIP.String())
285 for _, entry := range entries {
286 if entry.transmissionProtocol != "tcp" {
287 continue
288 }
289 if entry.response["sport"] != fmt.Sprintf("%d", t.RemotePort) {
290 continue
291 }
292 if entry.response["dst"] != t.LocalIP.String() {
293 continue
294 }
295 if entry.response["dport"] != fmt.Sprintf("%d", t.LocalPort) {
296 continue
297 }
298 return entry
299 }
300 return nil
301}
302
303// runTranslationWorker runs the conntrack 'translation worker'. It responds to
304// requests over translationC until ctx is canceled.
305func (r *Resolver) runTranslationWorker(ctx context.Context) {
306 var ix *conntrackIndex
307 readConntrack := func() {
308 var entries []conntrackEntry
309 data, err := ioutil.ReadFile(r.conntrackPath)
310 if err != nil {
311 glog.Errorf("Failed to read conntrack file: %v", err)
312 } else {
313 entries, err = conntrackParse(data)
314 if err != nil {
315 glog.Errorf("failed to parse conntrack entries: %v", err)
316 }
317 }
318 ix = buildIndex(entries)
319 }
320 readConntrack()
321
322 for {
323 select {
324 case req := <-r.translationC:
325 entry := ix.find(req.t)
326 if entry != nil {
327 req.reply(entry)
328 } else {
329 readConntrack()
330 entry = ix.find(req.t)
331 if entry != nil {
332 req.reply(entry)
333 } else {
334 req.reply(nil)
335 }
336 }
337 case <-ctx.Done():
338 return
339 }
340 }
341}