feature: basic management service implementation (#44)

* feat: basic management service implementation [FAILING TESTS]

* test: fix healthcheck test

* test: #39 add peer registration endpoint test

* feat: #39 add setup key handling

* feat: #39 add peer management store persistence

* refactor: extract config read/write to the utility package

* refactor: move file contents copy to the utility package

* refactor: use Accounts instead of Users in the Store

* feature: add management server Docker file

* refactor: introduce datadir instead of config

* chore: use filepath.Join to concat filepaths instead of string concat

* refactor: move stop channel to the root

* refactor: move stop channel to the root

* review: fix PR review notes

Co-authored-by: braginini <hello@wiretrustee.com>
This commit is contained in:
Mikhail Bragin 2021-07-17 14:38:59 +02:00 committed by GitHub
parent dd50f495ab
commit 4587f7686e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 980 additions and 17 deletions

View File

@ -40,7 +40,7 @@ dockers:
- image_templates:
- wiretrustee/wiretrustee:signal-{{ .Version }}-amd64
goarch: amd64
use_buildx: true
use: buildx
dockerfile: Dockerfile
build_flag_templates:
- "--platform=linux/amd64"
@ -53,7 +53,7 @@ dockers:
- image_templates:
- wiretrustee/wiretrustee:signal-{{ .Version }}-arm64v8
goarch: arm64
use_buildx: true
use: buildx
dockerfile: Dockerfile
build_flag_templates:
- "--platform=linux/arm64"
@ -63,6 +63,32 @@ dockers:
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates:
- wiretrustee/wiretrustee:management-{{ .Version }}-amd64
goarch: amd64
use: buildx
dockerfile: management/Dockerfile
build_flag_templates:
- "--platform=linux/arm64"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
- image_templates:
- wiretrustee/wiretrustee:management-{{ .Version }}-arm64v8
goarch: arm64
use: buildx
dockerfile: management/Dockerfile
build_flag_templates:
- "--platform=linux/arm64"
- "--label=org.opencontainers.image.created={{.Date}}"
- "--label=org.opencontainers.image.title={{.ProjectName}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=org.opencontainers.image.revision={{.FullCommit}}"
- "--label=org.opencontainers.image.version={{.Version}}"
- "--label=maintainer=wiretrustee@wiretrustee.com"
docker_manifests:
- name_template: wiretrustee/wiretrustee:signal-{{ .Version }}
@ -73,4 +99,14 @@ docker_manifests:
- name_template: wiretrustee/wiretrustee:signal-latest
image_templates:
- wiretrustee/wiretrustee:signal-{{ .Version }}-arm64v8
- wiretrustee/wiretrustee:signal-{{ .Version }}-amd64
- wiretrustee/wiretrustee:signal-{{ .Version }}-amd64
- name_template: wiretrustee/wiretrustee:management-{{ .Version }}
image_templates:
- wiretrustee/wiretrustee:management-{{ .Version }}-arm64v8
- wiretrustee/wiretrustee:management-{{ .Version }}-amd64
- name_template: wiretrustee/wiretrustee:management-latest
image_templates:
- wiretrustee/wiretrustee:management-{{ .Version }}-arm64v8
- wiretrustee/wiretrustee:management-{{ .Version }}-amd64

61
cmd/management.go Normal file
View File

@ -0,0 +1,61 @@
package cmd
import (
"flag"
"fmt"
log "github.com/sirupsen/logrus"
"github.com/spf13/cobra"
mgmt "github.com/wiretrustee/wiretrustee/management"
mgmtProto "github.com/wiretrustee/wiretrustee/management/proto"
"google.golang.org/grpc"
"net"
)
var (
mgmtPort int
mgmtDataDir string
mgmtCmd = &cobra.Command{
Use: "management",
Short: "start Wiretrustee Management Server",
Run: func(cmd *cobra.Command, args []string) {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", mgmtPort))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
defer grpcServer.Stop()
server, err := mgmt.NewServer(mgmtDataDir)
if err != nil {
log.Fatalf("failed creating new server: %v", err)
panic(err)
}
mgmtProto.RegisterManagementServiceServer(grpcServer, server)
log.Printf("started server: localhost:%v", mgmtPort)
go func() {
if err = grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
SetupCloseHandler()
<-stopCh
log.Println("Receive signal to stop running Management server")
},
}
)
func init() {
mgmtCmd.PersistentFlags().IntVar(&mgmtPort, "port", 33073, "Server port to listen on (e.g. 33073)")
mgmtCmd.PersistentFlags().StringVar(&mgmtDataDir, "datadir", "/data", "Server data directory location (e.g. /data")
}

View File

@ -24,6 +24,9 @@ var (
Short: "",
Long: "",
}
// Execution control channel for stopCh signal
stopCh chan int
)
// Execute executes the root command.
@ -31,6 +34,9 @@ func Execute() error {
return rootCmd.Execute()
}
func init() {
stopCh = make(chan int)
defaultConfigPath = "/etc/wiretrustee/config.json"
if runtime.GOOS == "windows" {
defaultConfigPath = os.Getenv("PROGRAMDATA") + "\\Wiretrustee\\" + "config.json"
@ -41,6 +47,7 @@ func init() {
rootCmd.AddCommand(addPeerCmd)
rootCmd.AddCommand(upCmd)
rootCmd.AddCommand(signalCmd)
rootCmd.AddCommand(mgmtCmd)
rootCmd.AddCommand(serviceCmd)
serviceCmd.AddCommand(runCmd, startCmd, stopCmd, restartCmd) // service control commands are subcommands of service
serviceCmd.AddCommand(installCmd, uninstallCmd) // service installer commands are subcommands of service
@ -53,7 +60,7 @@ func SetupCloseHandler() {
go func() {
for range c {
fmt.Println("\r- Ctrl+C pressed in Terminal")
stopUP <- 0
stopCh <- 0
}
}()
}

View File

@ -13,7 +13,7 @@ func (p *program) Start(s service.Service) error {
}
func (p *program) Stop(s service.Service) error {
stopUP <- 1
stopCh <- 1
return nil
}

View File

@ -12,7 +12,7 @@ import (
)
var (
port int
signalPort int
signalCmd = &cobra.Command{
Use: "signal",
@ -20,7 +20,7 @@ var (
Run: func(cmd *cobra.Command, args []string) {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", signalPort))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
@ -31,7 +31,7 @@ var (
var opts []grpc.ServerOption
grpcServer := grpc.NewServer(opts...)
sigProto.RegisterSignalExchangeServer(grpcServer, sig.NewServer())
log.Printf("started server: localhost:%v", port)
log.Printf("started server: localhost:%v", signalPort)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
@ -43,5 +43,5 @@ var (
)
func init() {
signalCmd.PersistentFlags().IntVar(&port, "port", 10000, "Server port to listen on (e.g. 10000)")
signalCmd.PersistentFlags().IntVar(&signalPort, "port", 10000, "Server port to listen on (e.g. 10000)")
}

View File

@ -48,15 +48,8 @@ var (
//signalClient.WaitConnected()
SetupCloseHandler()
<-stopUP
<-stopCh
log.Println("Receive signal to stop running")
},
}
)
// Execution control channel for stopUP signal
var stopUP chan int
func init() {
stopUP = make(chan int)
}

4
management/Dockerfile Normal file
View File

@ -0,0 +1,4 @@
FROM gcr.io/distroless/base:debug
ENTRYPOINT [ "/go/bin/wiretrustee","management"]
CMD ["--log-level","DEBUG", "--datadir", "/data"]
COPY wiretrustee /go/bin/wiretrustee

View File

@ -0,0 +1,13 @@
package management_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"testing"
)
func TestManagement(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Management Service Suite")
}

View File

@ -0,0 +1,164 @@
package management_test
import (
"context"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
log "github.com/sirupsen/logrus"
mgmt "github.com/wiretrustee/wiretrustee/management"
mgmtProto "github.com/wiretrustee/wiretrustee/management/proto"
"github.com/wiretrustee/wiretrustee/util"
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
"io/ioutil"
"net"
"os"
"path/filepath"
"strings"
"time"
)
var _ = Describe("Client", func() {
var (
addr string
server *grpc.Server
tmpDir string
dataDir string
)
BeforeEach(func() {
var err error
dataDir, err = ioutil.TempDir("", "wiretrustee_mgmt_test_tmp_*")
Expect(err).NotTo(HaveOccurred())
err = util.CopyFileContents("testdata/store.json", filepath.Join(dataDir, "store.json"))
Expect(err).NotTo(HaveOccurred())
var listener net.Listener
server, listener = startServer(dataDir)
addr = listener.Addr().String()
})
AfterEach(func() {
server.Stop()
err := os.RemoveAll(tmpDir)
Expect(err).NotTo(HaveOccurred())
})
Describe("Service health", func() {
Context("when it has been started", func() {
It("should be ok", func() {
client := createRawClient(addr)
healthy, err := client.IsHealthy(context.TODO(), &mgmtProto.Empty{})
Expect(healthy).ToNot(BeNil())
Expect(err).To(BeNil())
})
})
})
Describe("Registration", func() {
Context("of a new peer without a valid setup key", func() {
It("should fail", func() {
key, _ := wgtypes.GenerateKey()
setupKey := "invalid_setup_key"
client := createRawClient(addr)
resp, err := client.RegisterPeer(context.TODO(), &mgmtProto.RegisterPeerRequest{
Key: key.PublicKey().String(),
SetupKey: setupKey,
})
Expect(err).To(HaveOccurred())
Expect(resp).To(BeNil())
})
})
})
Describe("Registration", func() {
Context("of a new peer with a valid setup key", func() {
It("should be successful", func() {
key, _ := wgtypes.GenerateKey()
setupKey := "A2C8E62B-38F5-4553-B31E-DD66C696CEBB" //present in the testdata/store.json file
client := createRawClient(addr)
resp, err := client.RegisterPeer(context.TODO(), &mgmtProto.RegisterPeerRequest{
Key: key.PublicKey().String(),
SetupKey: setupKey,
})
Expect(err).NotTo(HaveOccurred())
Expect(resp).ToNot(BeNil())
})
})
})
Describe("Registration", func() {
Context("of a new peer with a valid setup key", func() {
It("should be persisted to a file", func() {
key, _ := wgtypes.GenerateKey()
setupKey := "A2C8E62B-38F5-4553-B31E-DD66C696CEBB" //present in the testdata/store.json file
client := createRawClient(addr)
_, err := client.RegisterPeer(context.TODO(), &mgmtProto.RegisterPeerRequest{
Key: key.PublicKey().String(),
SetupKey: setupKey,
})
Expect(err).NotTo(HaveOccurred())
store, err := util.ReadJson(filepath.Join(dataDir, "store.json"), &mgmt.Store{})
Expect(err).NotTo(HaveOccurred())
Expect(store.(*mgmt.Store)).NotTo(BeNil())
user := store.(*mgmt.Store).Accounts["bf1c8084-ba50-4ce7-9439-34653001fc3b"]
Expect(user.Peers[key.PublicKey().String()]).NotTo(BeNil())
Expect(user.SetupKeys[strings.ToLower(setupKey)]).NotTo(BeNil())
})
})
})
})
func createRawClient(addr string) mgmtProto.ManagementServiceClient {
ctx := context.Background()
conn, err := grpc.DialContext(ctx, addr, grpc.WithInsecure(),
grpc.WithBlock(),
grpc.WithKeepaliveParams(keepalive.ClientParameters{
Time: 3 * time.Second,
Timeout: 2 * time.Second,
}))
if err != nil {
Fail("failed creating raw signal client")
}
return mgmtProto.NewManagementServiceClient(conn)
}
func startServer(dataDir string) (*grpc.Server, net.Listener) {
lis, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
s := grpc.NewServer()
server, err := mgmt.NewServer(dataDir)
if err != nil {
panic(err)
}
mgmtProto.RegisterManagementServiceServer(s, server)
go func() {
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}()
return s, lis
}

View File

@ -0,0 +1,283 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: management.proto
package proto
import (
context "context"
fmt "fmt"
proto "github.com/golang/protobuf/proto"
_ "github.com/golang/protobuf/protoc-gen-go/descriptor"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type RegisterPeerRequest struct {
// Wireguard public key
Key string `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"`
// Pre-authorized setup key
SetupKey string `protobuf:"bytes,2,opt,name=setupKey,proto3" json:"setupKey,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *RegisterPeerRequest) Reset() { *m = RegisterPeerRequest{} }
func (m *RegisterPeerRequest) String() string { return proto.CompactTextString(m) }
func (*RegisterPeerRequest) ProtoMessage() {}
func (*RegisterPeerRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_edc174f991dc0a25, []int{0}
}
func (m *RegisterPeerRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RegisterPeerRequest.Unmarshal(m, b)
}
func (m *RegisterPeerRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_RegisterPeerRequest.Marshal(b, m, deterministic)
}
func (m *RegisterPeerRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_RegisterPeerRequest.Merge(m, src)
}
func (m *RegisterPeerRequest) XXX_Size() int {
return xxx_messageInfo_RegisterPeerRequest.Size(m)
}
func (m *RegisterPeerRequest) XXX_DiscardUnknown() {
xxx_messageInfo_RegisterPeerRequest.DiscardUnknown(m)
}
var xxx_messageInfo_RegisterPeerRequest proto.InternalMessageInfo
func (m *RegisterPeerRequest) GetKey() string {
if m != nil {
return m.Key
}
return ""
}
func (m *RegisterPeerRequest) GetSetupKey() string {
if m != nil {
return m.SetupKey
}
return ""
}
type RegisterPeerResponse struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *RegisterPeerResponse) Reset() { *m = RegisterPeerResponse{} }
func (m *RegisterPeerResponse) String() string { return proto.CompactTextString(m) }
func (*RegisterPeerResponse) ProtoMessage() {}
func (*RegisterPeerResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_edc174f991dc0a25, []int{1}
}
func (m *RegisterPeerResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_RegisterPeerResponse.Unmarshal(m, b)
}
func (m *RegisterPeerResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_RegisterPeerResponse.Marshal(b, m, deterministic)
}
func (m *RegisterPeerResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_RegisterPeerResponse.Merge(m, src)
}
func (m *RegisterPeerResponse) XXX_Size() int {
return xxx_messageInfo_RegisterPeerResponse.Size(m)
}
func (m *RegisterPeerResponse) XXX_DiscardUnknown() {
xxx_messageInfo_RegisterPeerResponse.DiscardUnknown(m)
}
var xxx_messageInfo_RegisterPeerResponse proto.InternalMessageInfo
type Empty struct {
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int32 `json:"-"`
}
func (m *Empty) Reset() { *m = Empty{} }
func (m *Empty) String() string { return proto.CompactTextString(m) }
func (*Empty) ProtoMessage() {}
func (*Empty) Descriptor() ([]byte, []int) {
return fileDescriptor_edc174f991dc0a25, []int{2}
}
func (m *Empty) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_Empty.Unmarshal(m, b)
}
func (m *Empty) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_Empty.Marshal(b, m, deterministic)
}
func (m *Empty) XXX_Merge(src proto.Message) {
xxx_messageInfo_Empty.Merge(m, src)
}
func (m *Empty) XXX_Size() int {
return xxx_messageInfo_Empty.Size(m)
}
func (m *Empty) XXX_DiscardUnknown() {
xxx_messageInfo_Empty.DiscardUnknown(m)
}
var xxx_messageInfo_Empty proto.InternalMessageInfo
func init() {
proto.RegisterType((*RegisterPeerRequest)(nil), "management.RegisterPeerRequest")
proto.RegisterType((*RegisterPeerResponse)(nil), "management.RegisterPeerResponse")
proto.RegisterType((*Empty)(nil), "management.Empty")
}
func init() {
proto.RegisterFile("management.proto", fileDescriptor_edc174f991dc0a25)
}
var fileDescriptor_edc174f991dc0a25 = []byte{
// 224 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x4d, 0xcc, 0x4b,
0x4c, 0x4f, 0xcd, 0x4d, 0xcd, 0x2b, 0xd1, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x42, 0x88,
0x48, 0x29, 0xa4, 0xe7, 0xe7, 0xa7, 0xe7, 0xa4, 0xea, 0x83, 0x65, 0x92, 0x4a, 0xd3, 0xf4, 0x53,
0x52, 0x8b, 0x93, 0x8b, 0x32, 0x0b, 0x4a, 0xf2, 0x8b, 0x20, 0xaa, 0x95, 0x9c, 0xb9, 0x84, 0x83,
0x52, 0xd3, 0x33, 0x8b, 0x4b, 0x52, 0x8b, 0x02, 0x52, 0x53, 0x8b, 0x82, 0x52, 0x0b, 0x4b, 0x53,
0x8b, 0x4b, 0x84, 0x04, 0xb8, 0x98, 0xb3, 0x53, 0x2b, 0x25, 0x18, 0x15, 0x18, 0x35, 0x38, 0x83,
0x40, 0x4c, 0x21, 0x29, 0x2e, 0x8e, 0xe2, 0xd4, 0x92, 0xd2, 0x02, 0xef, 0xd4, 0x4a, 0x09, 0x26,
0xb0, 0x30, 0x9c, 0xaf, 0x24, 0xc6, 0x25, 0x82, 0x6a, 0x48, 0x71, 0x41, 0x7e, 0x5e, 0x71, 0xaa,
0x12, 0x3b, 0x17, 0xab, 0x6b, 0x6e, 0x41, 0x49, 0xa5, 0xd1, 0x5c, 0x46, 0x2e, 0x41, 0x5f, 0xb8,
0xb3, 0x82, 0x53, 0x8b, 0xca, 0x32, 0x93, 0x53, 0x85, 0x82, 0xb9, 0x78, 0x90, 0xb5, 0x09, 0xc9,
0xeb, 0x21, 0x79, 0x06, 0x8b, 0xab, 0xa4, 0x14, 0x70, 0x2b, 0x80, 0xda, 0xc8, 0x20, 0x64, 0xcc,
0xc5, 0x99, 0x59, 0xec, 0x91, 0x9a, 0x98, 0x53, 0x92, 0x51, 0x29, 0x24, 0x88, 0xac, 0x01, 0xec,
0x14, 0x29, 0x4c, 0x21, 0x25, 0x06, 0x27, 0xce, 0x28, 0x76, 0x3d, 0x6b, 0x48, 0x20, 0xb1, 0x81,
0x29, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x55, 0x7e, 0x24, 0xf8, 0x59, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConnInterface
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion6
// ManagementServiceClient is the client API for ManagementService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type ManagementServiceClient interface {
RegisterPeer(ctx context.Context, in *RegisterPeerRequest, opts ...grpc.CallOption) (*RegisterPeerResponse, error)
// health check endpoint
IsHealthy(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error)
}
type managementServiceClient struct {
cc grpc.ClientConnInterface
}
func NewManagementServiceClient(cc grpc.ClientConnInterface) ManagementServiceClient {
return &managementServiceClient{cc}
}
func (c *managementServiceClient) RegisterPeer(ctx context.Context, in *RegisterPeerRequest, opts ...grpc.CallOption) (*RegisterPeerResponse, error) {
out := new(RegisterPeerResponse)
err := c.cc.Invoke(ctx, "/management.ManagementService/RegisterPeer", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *managementServiceClient) IsHealthy(ctx context.Context, in *Empty, opts ...grpc.CallOption) (*Empty, error) {
out := new(Empty)
err := c.cc.Invoke(ctx, "/management.ManagementService/isHealthy", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// ManagementServiceServer is the server API for ManagementService service.
type ManagementServiceServer interface {
RegisterPeer(context.Context, *RegisterPeerRequest) (*RegisterPeerResponse, error)
// health check endpoint
IsHealthy(context.Context, *Empty) (*Empty, error)
}
// UnimplementedManagementServiceServer can be embedded to have forward compatible implementations.
type UnimplementedManagementServiceServer struct {
}
func (*UnimplementedManagementServiceServer) RegisterPeer(ctx context.Context, req *RegisterPeerRequest) (*RegisterPeerResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method RegisterPeer not implemented")
}
func (*UnimplementedManagementServiceServer) IsHealthy(ctx context.Context, req *Empty) (*Empty, error) {
return nil, status.Errorf(codes.Unimplemented, "method IsHealthy not implemented")
}
func RegisterManagementServiceServer(s *grpc.Server, srv ManagementServiceServer) {
s.RegisterService(&_ManagementService_serviceDesc, srv)
}
func _ManagementService_RegisterPeer_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(RegisterPeerRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagementServiceServer).RegisterPeer(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/management.ManagementService/RegisterPeer",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagementServiceServer).RegisterPeer(ctx, req.(*RegisterPeerRequest))
}
return interceptor(ctx, in, info, handler)
}
func _ManagementService_IsHealthy_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(Empty)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(ManagementServiceServer).IsHealthy(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/management.ManagementService/IsHealthy",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(ManagementServiceServer).IsHealthy(ctx, req.(*Empty))
}
return interceptor(ctx, in, info, handler)
}
var _ManagementService_serviceDesc = grpc.ServiceDesc{
ServiceName: "management.ManagementService",
HandlerType: (*ManagementServiceServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "RegisterPeer",
Handler: _ManagementService_RegisterPeer_Handler,
},
{
MethodName: "isHealthy",
Handler: _ManagementService_IsHealthy_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "management.proto",
}

View File

@ -0,0 +1,31 @@
syntax = "proto3";
import "google/protobuf/descriptor.proto";
option go_package = ".;proto";
package management;
service ManagementService {
rpc RegisterPeer(RegisterPeerRequest) returns (RegisterPeerResponse) {}
// health check endpoint
rpc isHealthy(Empty) returns (Empty) {}
}
message RegisterPeerRequest {
// Wireguard public key
string key = 1;
// Pre-authorized setup key
string setupKey = 2;
}
message RegisterPeerResponse {
}
message Empty {
}

39
management/server.go Normal file
View File

@ -0,0 +1,39 @@
package management
import (
"context"
"github.com/wiretrustee/wiretrustee/management/proto"
"google.golang.org/grpc/status"
)
// Server an instance of a Management server
type Server struct {
Store *Store
}
// NewServer creates a new Management server
func NewServer(dataDir string) (*Server, error) {
store, err := NewStore(dataDir)
if err != nil {
return nil, err
}
return &Server{
Store: store,
}, nil
}
// RegisterPeer adds a peer to the Store. Returns 404 in case the provided setup key doesn't exist
func (s *Server) RegisterPeer(ctx context.Context, req *proto.RegisterPeerRequest) (*proto.RegisterPeerResponse, error) {
err := s.Store.AddPeer(req.SetupKey, req.Key)
if err != nil {
return &proto.RegisterPeerResponse{}, status.Errorf(404, "provided setup key doesn't exists")
}
return &proto.RegisterPeerResponse{}, nil
}
// IsHealthy indicates whether the service is healthy
func (s *Server) IsHealthy(ctx context.Context, req *proto.Empty) (*proto.Empty, error) {
return &proto.Empty{}, nil
}

120
management/store.go Normal file
View File

@ -0,0 +1,120 @@
package management
import (
"fmt"
"github.com/wiretrustee/wiretrustee/util"
"os"
"path/filepath"
"strings"
"sync"
)
// storeFileName Store file name. Stored in the datadir
const storeFileName = "store.json"
// Account represents a unique account of the system
type Account struct {
Id string
SetupKeys map[string]*SetupKey
Peers map[string]*Peer
}
// SetupKey represents a pre-authorized key used to register machines (peers)
// One key might have multiple machines
type SetupKey struct {
Key string
}
// Peer represents a machine connected to the network.
// The Peer is a Wireguard peer identified by a public key
type Peer struct {
// Wireguard public key
Key string
// A setup key this peer was registered with
SetupKey *SetupKey
}
// Store represents an account storage
type Store struct {
Accounts map[string]*Account
// mutex to synchronise Store read/write operations
mux sync.Mutex `json:"-"`
storeFile string `json:"-"`
}
// NewStore restores a store from the file located in the datadir
func NewStore(dataDir string) (*Store, error) {
return restore(filepath.Join(dataDir, storeFileName))
}
// restore restores the state of the store from the file.
// Creates a new empty store file if doesn't exist
func restore(file string) (*Store, error) {
if _, err := os.Stat(file); os.IsNotExist(err) {
// create a new Store if previously didn't exist (e.g. first run)
s := &Store{
Accounts: make(map[string]*Account),
mux: sync.Mutex{},
storeFile: file,
}
err = s.persist(file)
if err != nil {
return nil, err
}
return s, nil
}
read, err := util.ReadJson(file, &Store{})
if err != nil {
return nil, err
}
read.(*Store).storeFile = file
return read.(*Store), nil
}
// persist persists account data to a file
// It is recommended to call it with locking Store,mux
func (s *Store) persist(file string) error {
return util.WriteJson(file, s)
}
// AddPeer adds peer to the store and associates it with a Account and a SetupKey. Returns related Account
// Each Account has a list of pre-authorised SetupKey and if no Account has a given key nil will be returned, meaning the key is invalid
func (s *Store) AddPeer(setupKey string, peerKey string) error {
s.mux.Lock()
defer s.mux.Unlock()
for _, u := range s.Accounts {
for _, key := range u.SetupKeys {
if key.Key == strings.ToLower(setupKey) {
u.Peers[peerKey] = &Peer{Key: peerKey, SetupKey: key}
err := s.persist(s.storeFile)
if err != nil {
return err
}
return nil
}
}
}
return fmt.Errorf("invalid setup key")
}
// AddAccount adds new account to the store.
func (s *Store) AddAccount(account *Account) error {
s.mux.Lock()
defer s.mux.Unlock()
// todo will override, handle existing keys
s.Accounts[account.Id] = account
err := s.persist(s.storeFile)
if err != nil {
return err
}
return nil
}

20
management/testdata/store.json vendored Normal file
View File

@ -0,0 +1,20 @@
{
"Accounts": {
"bf1c8084-ba50-4ce7-9439-34653001fc3b": {
"Id": "bf1c8084-ba50-4ce7-9439-34653001fc3b",
"SetupKeys": {
"a2c8e62b-38f5-4553-b31e-dd66c696cebb": {
"Key": "a2c8e62b-38f5-4553-b31e-dd66c696cebb"
}
},
"Peers": {
"/znMkP3yvi0T/ho+RSMBohXZSPtucVYnb66BcuJ5oRU=": {
"Key": "/znMkP3yvi0T/ho+RSMBohXZSPtucVYnb66BcuJ5oRU=",
"SetupKey": {
"Key": "a2c8e62b-38f5-4553-b31e-dd66c696cebb"
}
}
}
}
}
}

79
util/file.go Normal file
View File

@ -0,0 +1,79 @@
package util
import (
"encoding/json"
"io"
"io/ioutil"
"os"
"path/filepath"
)
// WriteJson writes JSON config object to a file creating parent directories if required
// The output JSON is pretty-formatted
func WriteJson(file string, obj interface{}) error {
configDir := filepath.Dir(file)
err := os.MkdirAll(configDir, 0750)
if err != nil {
return err
}
// make it pretty
bs, err := json.MarshalIndent(obj, "", " ")
if err != nil {
return err
}
err = ioutil.WriteFile(file, bs, 0600)
if err != nil {
return err
}
return nil
}
// ReadJson reads JSON config file and maps to a provided interface
func ReadJson(file string, res interface{}) (interface{}, error) {
f, err := os.Open(file)
if err != nil {
return nil, err
}
defer f.Close()
bs, err := ioutil.ReadAll(f)
if err != nil {
return nil, err
}
err = json.Unmarshal(bs, &res)
if err != nil {
return nil, err
}
return res, nil
}
// CopyFileContents copies contents of the given src file to the dst file
func CopyFileContents(src, dst string) (err error) {
in, err := os.Open(src)
if err != nil {
return
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return
}
defer func() {
cErr := out.Close()
if err == nil {
err = cErr
}
}()
if _, err = io.Copy(out, in); err != nil {
return
}
err = out.Sync()
return
}

100
util/file_test.go Normal file
View File

@ -0,0 +1,100 @@
package util_test
import (
"crypto/md5"
"encoding/hex"
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"github.com/wiretrustee/wiretrustee/util"
"io"
"io/ioutil"
"os"
)
var _ = Describe("Client", func() {
var (
tmpDir string
)
type TestConfig struct {
SomeMap map[string]string
SomeArray []string
SomeField int
}
BeforeEach(func() {
var err error
tmpDir, err = ioutil.TempDir("", "wiretrustee_util_test_tmp_*")
Expect(err).NotTo(HaveOccurred())
})
AfterEach(func() {
err := os.RemoveAll(tmpDir)
Expect(err).NotTo(HaveOccurred())
})
Describe("Config", func() {
Context("in JSON format", func() {
It("should be written and read successfully", func() {
m := make(map[string]string)
m["key1"] = "value1"
m["key2"] = "value2"
arr := []string{"value1", "value2"}
written := &TestConfig{
SomeMap: m,
SomeArray: arr,
SomeField: 99,
}
err := util.WriteJson(tmpDir+"/testconfig.json", written)
Expect(err).NotTo(HaveOccurred())
read, err := util.ReadJson(tmpDir+"/testconfig.json", &TestConfig{})
Expect(err).NotTo(HaveOccurred())
Expect(read).NotTo(BeNil())
Expect(read.(*TestConfig).SomeMap["key1"]).To(BeEquivalentTo(written.SomeMap["key1"]))
Expect(read.(*TestConfig).SomeMap["key2"]).To(BeEquivalentTo(written.SomeMap["key2"]))
Expect(read.(*TestConfig).SomeArray).To(ContainElements(arr))
Expect(read.(*TestConfig).SomeField).To(BeEquivalentTo(written.SomeField))
})
})
})
Describe("Copying file contents", func() {
Context("from one file to another", func() {
It("should be successful", func() {
src := tmpDir + "/copytest_src"
dst := tmpDir + "/copytest_dst"
err := util.WriteJson(src, []string{"1", "2", "3"})
Expect(err).NotTo(HaveOccurred())
err = util.CopyFileContents(src, dst)
Expect(err).NotTo(HaveOccurred())
hashSrc := md5.New()
hashDst := md5.New()
srcFile, err := os.Open(src)
Expect(err).NotTo(HaveOccurred())
dstFile, err := os.Open(dst)
Expect(err).NotTo(HaveOccurred())
_, err = io.Copy(hashSrc, srcFile)
Expect(err).NotTo(HaveOccurred())
_, err = io.Copy(hashDst, dstFile)
Expect(err).NotTo(HaveOccurred())
Expect(hex.EncodeToString(hashSrc.Sum(nil)[:16])).To(BeEquivalentTo(hex.EncodeToString(hashDst.Sum(nil)[:16])))
})
})
})
})

13
util/util_suite_test.go Normal file
View File

@ -0,0 +1,13 @@
package util_test
import (
. "github.com/onsi/ginkgo"
. "github.com/onsi/gomega"
"testing"
)
func TestManagement(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "Util Service Suite")
}