日志滚存+日志打印控制台+选举+多任务(一个常驻任务、一个leader执行任务)

package main

import (
	"context"
	"fmt"
	"io"
	"os"
	"os/signal"
	"syscall"
	"time"

	"github.com/labstack/gommon/log"
	"gopkg.in/natefinch/lumberjack.v2"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/tools/leaderelection"
	"k8s.io/client-go/tools/leaderelection/resourcelock"
)

// 选主核心配置
const (
	leaderLockName      = "alertmanager-webhook-leader-lock"
	leaderLockNamespace = "monitoring"
	// Leader 身份的「最大有效期」 / Lease 锁的「租期时长」
	leaseDuration = 15 * time.Second
	// Leader 的「续约超时时间」 / 「身份续命窗口期」
	renewDeadline = 10 * time.Second

	// 两种场景下的「重试间隔」(最灵活的参数,控制集群抢主效率),覆盖所有非 Leader 实例 + Leader 续约失败场景:
	// 场景①:Follower 实例(非 Leader)的「抢主重试间隔」。未当选 Leader 的实例,会每隔 retryPeriod 时间,主动查询 K8s 中的 Lease 锁状态
	// 场景②:Leader 实例的「续约重试间隔」。Leader 实例会在「续约窗口期(renewDeadline)」内,每隔 retryPeriod 时间,主动发起 Lease 锁续约请求,直到续约成功
	retryPeriod  = 2 * time.Second
	reElectDelay = 1 * time.Second
	logFilePath  = "/file/go/src/test/log/leader-election.log"
)

var (
	k8sClient     kubernetes.Interface
	localIdentity string
)

func main() {
	// 1. 初始化日志
	initLogger()
	log.Info("✅ 程序启动,初始化LeaderElection组件")

	// 2. 初始化K8s客户端
	var err error
	k8sClient, err = initK8sClient("/home/lyx/.kube/config")
	if err != nil {
		log.Fatal("❌ K8s客户端初始化失败:", err.Error())
		os.Exit(1)
	}
	log.Info("✅ K8s客户端初始化成功")

	// 3. 根上下文+退出信号监听
	rootCtx, rootCancel := context.WithCancel(context.Background())
	defer rootCancel()
	registerSignalHandler(rootCancel)

	// 4. 初始化实例唯一标识
	localIdentity = fmt.Sprintf("%s-%d", getHostName(), os.Getpid())
	log.Info("✅ 实例唯一标识:", localIdentity)

	// 5. 启动全局常驻逻辑
	go defaultLogic()
	log.Info("✅ 常驻逻辑defaultLogic已启动")

	// 6. 启动选主循环
	log.Info("✅ 启动Leader选举循环,等待竞选...")
	lock := newLeaseLock()
	startLeaderElectionLoop(rootCtx, lock)
}

// ========== 核心框架函数 ==========
func initLogger() {
	logFile := &lumberjack.Logger{
		Filename:   logFilePath,
		MaxSize:    200,
		MaxBackups: 4,
		MaxAge:     7,
		Compress:   false,
		LocalTime:  true,
	}
	multiWriter := io.MultiWriter(os.Stdout, logFile)
	log.SetOutput(multiWriter)
	log.SetLevel(log.INFO)
	log.SetPrefix("[lyx-leader]")
}

func initK8sClient(kubeconfig string) (kubernetes.Interface, error) {
	var config *rest.Config
	var err error
	if kubeconfig == "" {
		kubeconfig = os.Getenv("KUBECONFIG")
	}

	if kubeconfig != "" {
		log.Infof("Loading kube client config from path %q", kubeconfig)
		config, err = clientcmd.BuildConfigFromFlags("", kubeconfig)
	} else {
		log.Infof("Using in-cluster kube client config")
		config, err = rest.InClusterConfig()
	}
	if err != nil {
		return nil, err
	}
	client, err := kubernetes.NewForConfig(rest.AddUserAgent(config, "lyx-leader"))
	if err != nil {
		return nil, fmt.Errorf("创建KubeClient失败: %w", err)
	}
	return client, nil
}

func registerSignalHandler(cancelFunc context.CancelFunc) {
	sigCh := make(chan os.Signal, 2)
	signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT)
	go func() {
		<-sigCh
		log.Warn("⚠️ 收到退出信号(Ctrl+C/Kill),开始优雅关停...")
		cancelFunc()

		time.Sleep(300 * time.Millisecond)
		log.Info("✅ 程序优雅关停完成,退出进程")
		os.Exit(0)
	}()
}

func startLeaderElectionLoop(rootCtx context.Context, lock *resourcelock.LeaseLock) {
	for {
		// 执行单次选主
		leaderelection.RunOrDie(rootCtx, leaderelection.LeaderElectionConfig{
			Lock:            lock,
			ReleaseOnCancel: true,
			LeaseDuration:   leaseDuration,
			RenewDeadline:   renewDeadline,
			RetryPeriod:     retryPeriod,
			Callbacks: leaderelection.LeaderCallbacks{
				OnStartedLeading: onStartedLeading,
				OnStoppedLeading: onStoppedLeading,
			},
		})

		select {
		case <-rootCtx.Done():
			return
		case <-time.After(reElectDelay):
			log.Info(fmt.Sprintf("⚠️ 丢失Leader身份,重新竞选(间隔%vs)", reElectDelay))
		}
	}
}

func newLeaseLock() *resourcelock.LeaseLock {
	return &resourcelock.LeaseLock{
		LeaseMeta: metav1.ObjectMeta{
			Name:      leaderLockName,
			Namespace: leaderLockNamespace,
		},
		Client: k8sClient.CoordinationV1(),
		LockConfig: resourcelock.ResourceLockConfig{
			Identity: localIdentity,
		},
	}
}

func defaultLogic() {
	log.Info("通用常驻逻辑defaultLogic启动")
	ticker := time.NewTicker(10 * time.Second)
	defer ticker.Stop()

	for {
		select {
		case <-ticker.C:
			log.Info("defaultLogic - 执行通用任务")
		}
	}
}

func onStartedLeading(ctx context.Context) {
	log.Info("成功当选Leader,启动专属任务...")
	ticker := time.NewTicker(5 * time.Second)
	defer func() {
		ticker.Stop()
		log.Warn("⚠️ Leader专属任务已停止")
	}()
	for {
		select {
		case <-ctx.Done():
			log.Info("✅ Leader任务上下文已取消,正常退出")
			return
		case <-ticker.C:
			Leaderlogic(ctx)
		}
	}
}
func Leaderlogic(ctx context.Context) {
	log.Info("🔍 Leader专属逻辑 - 执行检查任务")
}

// 丢失Leader身份后的清理逻辑
func onStoppedLeading() {
	log.Warn("❌ 已丢失Leader身份,释放Leader资源")
}

// ========== 工具函数 ==========
func getHostName() string {
	hostname, err := os.Hostname()
	if err != nil {
		log.Warn("获取主机名失败,使用默认标识", err)
		return "unknown-host"
	}
	return hostname
}