مجموعات الصادرات: Selector وURLTest
تدير مجموعات الصادرات مجموعات من اتصالات الصادرات. يسمح Selector بالاختيار اليدوي، بينما يختار URLTest تلقائياً الصادر ذو أقل زمن استجابة بناءً على فحوصات صحية دورية.
المصدر: protocol/group/selector.go، protocol/group/urltest.go، common/interrupt/، common/urltest/
Selector
البنية
type Selector struct {
outbound.Adapter
ctx context.Context
outbound adapter.OutboundManager
connection adapter.ConnectionManager
logger logger.ContextLogger
tags []string
defaultTag string
outbounds map[string]adapter.Outbound
selected common.TypedValue[adapter.Outbound]
interruptGroup *interrupt.Group
interruptExternalConnections bool
}ينفذ Selector واجهات متعددة:
var (
_ adapter.OutboundGroup = (*Selector)(nil)
_ adapter.ConnectionHandlerEx = (*Selector)(nil)
_ adapter.PacketConnectionHandlerEx = (*Selector)(nil)
)التهيئة
func NewSelector(ctx, router, logger, tag, options) (adapter.Outbound, error) {
outbound := &Selector{
tags: options.Outbounds,
defaultTag: options.Default,
outbounds: make(map[string]adapter.Outbound),
interruptGroup: interrupt.NewGroup(),
interruptExternalConnections: options.InterruptExistConnections,
}
if len(outbound.tags) == 0 {
return nil, E.New("missing tags")
}
return outbound, nil
}البدء والاختيار
عند البدء، يتم حل الصادرات من الوسوم، ويتم تحديد الاختيار الأولي:
func (s *Selector) Start() error {
// 1. حل وسوم الصادرات إلى نسخ صادرات فعلية
for _, tag := range s.tags {
detour, _ := s.outbound.Outbound(tag)
s.outbounds[tag] = detour
}
// 2. محاولة استعادة الاختيار المخزن مؤقتاً
cacheFile := service.FromContext[adapter.CacheFile](s.ctx)
if cacheFile != nil {
selected := cacheFile.LoadSelected(s.Tag())
if detour, loaded := s.outbounds[selected]; loaded {
s.selected.Store(detour)
return nil
}
}
// 3. الرجوع إلى الوسم الافتراضي
if s.defaultTag != "" {
s.selected.Store(s.outbounds[s.defaultTag])
return nil
}
// 4. الرجوع إلى أول صادر
s.selected.Store(s.outbounds[s.tags[0]])
return nil
}الاختيار اليدوي
func (s *Selector) SelectOutbound(tag string) bool {
detour, loaded := s.outbounds[tag]
if !loaded {
return false
}
if s.selected.Swap(detour) == detour {
return true // مختار بالفعل، لا تغيير
}
// حفظ الاختيار في الذاكرة المؤقتة
cacheFile := service.FromContext[adapter.CacheFile](s.ctx)
if cacheFile != nil {
cacheFile.StoreSelected(s.Tag(), tag)
}
// مقاطعة الاتصالات الحالية
s.interruptGroup.Interrupt(s.interruptExternalConnections)
return true
}معالجة الاتصال
يفوض Selector إلى الصادر المختار:
func (s *Selector) DialContext(ctx, network, destination) (net.Conn, error) {
conn, _ := s.selected.Load().DialContext(ctx, network, destination)
return s.interruptGroup.NewConn(conn, interrupt.IsExternalConnectionFromContext(ctx)), nil
}للتوجيه القائم على المعالج (تجنب التغليف المزدوج):
func (s *Selector) NewConnectionEx(ctx, conn, metadata, onClose) {
ctx = interrupt.ContextWithIsExternalConnection(ctx)
selected := s.selected.Load()
if outboundHandler, isHandler := selected.(adapter.ConnectionHandlerEx); isHandler {
outboundHandler.NewConnectionEx(ctx, conn, metadata, onClose)
} else {
s.connection.NewConnection(ctx, selected, conn, metadata, onClose)
}
}الشبكة الديناميكية
تتغير الشبكة المعلنة لـ Selector بناءً على الصادر المختار:
func (s *Selector) Network() []string {
selected := s.selected.Load()
if selected == nil {
return []string{N.NetworkTCP, N.NetworkUDP}
}
return selected.Network()
}URLTest
البنية
type URLTest struct {
outbound.Adapter
ctx context.Context
router adapter.Router
outbound adapter.OutboundManager
connection adapter.ConnectionManager
logger log.ContextLogger
tags []string
link string
interval time.Duration
tolerance uint16
idleTimeout time.Duration
group *URLTestGroup
interruptExternalConnections bool
}URLTestGroup
يوجد المنطق الأساسي في URLTestGroup:
type URLTestGroup struct {
outbounds []adapter.Outbound
link string // الرابط للاختبار
interval time.Duration // فترة الفحص
tolerance uint16 // تحمل التأخير (مللي ثانية)
idleTimeout time.Duration // التوقف عن الفحص بعد الخمول
history adapter.URLTestHistoryStorage
checking atomic.Bool
selectedOutboundTCP adapter.Outbound
selectedOutboundUDP adapter.Outbound
interruptGroup *interrupt.Group
interruptExternalConnections bool
ticker *time.Ticker
lastActive common.TypedValue[time.Time]
}القيم الافتراضية
if interval == 0 {
interval = C.DefaultURLTestInterval
}
if tolerance == 0 {
tolerance = 50 // 50 مللي ثانية
}
if idleTimeout == 0 {
idleTimeout = C.DefaultURLTestIdleTimeout
}
if interval > idleTimeout {
return nil, E.New("interval must be less or equal than idle_timeout")
}اختيار منفصل لـ TCP/UDP
يحافظ URLTest على اختيارات منفصلة لـ TCP وUDP:
selectedOutboundTCP adapter.Outbound
selectedOutboundUDP adapter.Outboundهذا يسمح باختيار صادرات مختلفة لـ TCP وUDP بناءً على نتائج زمن الاستجابة الخاصة بكل منهما.
خوارزمية الاختيار
func (g *URLTestGroup) Select(network string) (adapter.Outbound, bool) {
var minDelay uint16
var minOutbound adapter.Outbound
// البدء بالصادر المختار حالياً
if g.selectedOutboundTCP != nil {
if history := g.history.LoadURLTestHistory(RealTag(g.selectedOutboundTCP)); history != nil {
minOutbound = g.selectedOutboundTCP
minDelay = history.Delay
}
}
// إيجاد صادر أفضل (يجب أن يتفوق على الحالي بالتحمل)
for _, detour := range g.outbounds {
if !common.Contains(detour.Network(), network) {
continue
}
history := g.history.LoadURLTestHistory(RealTag(detour))
if history == nil {
continue
}
if minDelay == 0 || minDelay > history.Delay+g.tolerance {
minDelay = history.Delay
minOutbound = detour
}
}
return minOutbound, minOutbound != nil
}يمنع التحمل التبديل المتكرر: يجب أن يكون الصادر الجديد أسرع بما لا يقل عن tolerance مللي ثانية من الحالي.
الفحص القائم على الخمول
تعمل فحوصات الصحة فقط عندما تكون المجموعة قيد الاستخدام النشط. تبدأ طريقة Touch() المؤقت عند أول استخدام:
func (g *URLTestGroup) Touch() {
g.access.Lock()
defer g.access.Unlock()
if g.ticker != nil {
g.lastActive.Store(time.Now())
return
}
g.ticker = time.NewTicker(g.interval)
go g.loopCheck()
}تتوقف حلقة الفحص عند الوصول إلى مهلة الخمول:
func (g *URLTestGroup) loopCheck() {
for {
select {
case <-g.close:
return
case <-g.ticker.C:
}
if time.Since(g.lastActive.Load()) > g.idleTimeout {
g.ticker.Stop()
g.ticker = nil
return
}
g.CheckOutbounds(false)
}
}اختبار URL
يتم تشغيل الاختبارات بشكل متزامن بحد أقصى 10 اختبارات متزامنة:
func (g *URLTestGroup) urlTest(ctx, force) (map[string]uint16, error) {
if g.checking.Swap(true) {
return result, nil // الفحص جارٍ بالفعل
}
defer g.checking.Store(false)
b, _ := batch.New(ctx, batch.WithConcurrencyNum[any](10))
for _, detour := range g.outbounds {
realTag := RealTag(detour)
if checked[realTag] { continue }
// تخطي إذا تم اختباره مؤخراً
history := g.history.LoadURLTestHistory(realTag)
if !force && history != nil && time.Since(history.Time) < g.interval {
continue
}
b.Go(realTag, func() (any, error) {
testCtx, cancel := context.WithTimeout(g.ctx, C.TCPTimeout)
defer cancel()
t, err := urltest.URLTest(testCtx, g.link, p)
if err != nil {
g.history.DeleteURLTestHistory(realTag)
} else {
g.history.StoreURLTestHistory(realTag, &adapter.URLTestHistory{
Time: time.Now(), Delay: t,
})
}
return nil, nil
})
}
b.Wait()
g.performUpdateCheck()
return result, nil
}RealTag
للمجموعات المتداخلة، يحل RealTag عبر طبقات المجموعة:
func RealTag(detour adapter.Outbound) string {
if group, isGroup := detour.(adapter.OutboundGroup); isGroup {
return group.Now()
}
return detour.Tag()
}فحص التحديث والمقاطعة
بعد الاختبار، يتم تحديث الصادر المختار ومقاطعة الاتصالات الحالية إذا تغير الاختيار:
func (g *URLTestGroup) performUpdateCheck() {
var updated bool
if outbound, exists := g.Select(N.NetworkTCP); outbound != nil && exists && outbound != g.selectedOutboundTCP {
updated = true
g.selectedOutboundTCP = outbound
}
if outbound, exists := g.Select(N.NetworkUDP); outbound != nil && exists && outbound != g.selectedOutboundUDP {
updated = true
g.selectedOutboundUDP = outbound
}
if updated {
g.interruptGroup.Interrupt(g.interruptExternalConnections)
}
}مجموعة المقاطعة
تدير interrupt.Group دورة حياة الاتصال لتغييرات المجموعة:
- عند تغيير اختيار المجموعة، يتم استدعاء
Interrupt() - يتم إغلاق جميع الاتصالات المغلفة بـ
interruptGroup.NewConn() - يتحكم
interruptExternalConnectionsفيما إذا كان يتم مقاطعة الاتصالات من مصادر خارجية (لم تبدأها هذه العملية) أيضاً
تتبع الاتصالات الخارجية:
func (s *URLTest) NewConnectionEx(ctx, conn, metadata, onClose) {
ctx = interrupt.ContextWithIsExternalConnection(ctx)
s.connection.NewConnection(ctx, s, conn, metadata, onClose)
}معالجة الأخطاء
عند فشل صادر مختار، يتم حذف سجله لفرض إعادة التقييم:
conn, err := outbound.DialContext(ctx, network, destination)
if err == nil {
return s.group.interruptGroup.NewConn(conn, ...), nil
}
s.logger.ErrorContext(ctx, err)
s.group.history.DeleteURLTestHistory(outbound.Tag())
return nil, errأمثلة على التكوين
Selector
{
"type": "selector",
"tag": "proxy",
"outbounds": ["server-a", "server-b", "server-c"],
"default": "server-a",
"interrupt_exist_connections": true
}URLTest
{
"type": "urltest",
"tag": "auto",
"outbounds": ["server-a", "server-b", "server-c"],
"url": "https://www.gstatic.com/generate_204",
"interval": "3m",
"tolerance": 50,
"idle_timeout": "30m",
"interrupt_exist_connections": true
}