Менеджер соединений
ConnectionManager обрабатывает фактическую передачу данных между входящими и исходящими соединениями. Он устанавливает соединение с удалённым узлом, настраивает двунаправленное копирование и управляет жизненным циклом соединения.
Исходный код: route/conn.go
Структура
type ConnectionManager struct {
logger logger.ContextLogger
access sync.Mutex
connections list.List[io.Closer] // tracked active connections
}Поток TCP-соединения (NewConnection)
func (m *ConnectionManager) NewConnection(ctx, this N.Dialer, conn net.Conn, metadata, onClose) {
// 1. Dial remote
if len(metadata.DestinationAddresses) > 0 || metadata.Destination.IsIP() {
remoteConn, err = dialer.DialSerialNetwork(ctx, this, "tcp",
metadata.Destination, metadata.DestinationAddresses,
metadata.NetworkStrategy, metadata.NetworkType,
metadata.FallbackNetworkType, metadata.FallbackDelay)
} else {
remoteConn, err = this.DialContext(ctx, "tcp", metadata.Destination)
}
// 2. Report handshake success (for protocols that need it)
N.ReportConnHandshakeSuccess(conn, remoteConn)
// 3. Apply TLS fragmentation if requested
if metadata.TLSFragment || metadata.TLSRecordFragment {
remoteConn = tf.NewConn(remoteConn, ctx, ...)
}
// 4. Kick handshake (send early data)
m.kickWriteHandshake(ctx, conn, remoteConn, false, &done, onClose)
m.kickWriteHandshake(ctx, remoteConn, conn, true, &done, onClose)
// 5. Bidirectional copy
go m.connectionCopy(ctx, conn, remoteConn, false, &done, onClose)
go m.connectionCopy(ctx, remoteConn, conn, true, &done, onClose)
}Инициирование рукопожатия
Некоторые протоколы (например, прокси-протоколы с отложенным рукопожатием) требуют записи первых данных до полного установления соединения. kickWriteHandshake обрабатывает это:
func (m *ConnectionManager) kickWriteHandshake(ctx, source, destination, direction, done, onClose) bool {
if !N.NeedHandshakeForWrite(destination) {
return false // no handshake needed
}
// Try to read cached data from source
if cachedReader, ok := sourceReader.(N.CachedReader); ok {
cachedBuffer = cachedReader.ReadCached()
}
if cachedBuffer != nil {
// Write cached data to trigger handshake
_, err = destinationWriter.Write(cachedBuffer.Bytes())
} else {
// Write empty to trigger handshake
destination.SetWriteDeadline(time.Now().Add(C.ReadPayloadTimeout))
_, err = destinationWriter.Write(nil)
}
// ...
}Это позволяет отправлять ранние данные (например, TLS ClientHello) вместе с рукопожатием прокси-протокола, уменьшая количество циклов обмена.
Двунаправленное копирование
func (m *ConnectionManager) connectionCopy(ctx, source, destination, direction, done, onClose) {
_, err := bufio.CopyWithIncreateBuffer(destination, source,
bufio.DefaultIncreaseBufferAfter, bufio.DefaultBatchSize)
if err != nil {
common.Close(source, destination)
} else if duplexDst, isDuplex := destination.(N.WriteCloser); isDuplex {
duplexDst.CloseWrite() // half-close for graceful shutdown
} else {
destination.Close()
}
// done is atomic — first goroutine to finish sets it
if done.Swap(true) {
// Second goroutine: call onClose and close both
if onClose != nil { onClose(err) }
common.Close(source, destination)
}
}Ключевые особенности поведения:
- Используется
bufio.CopyWithIncreateBufferдля адаптивного размера буфера - Поддерживается полузакрытие (FIN) через
N.WriteCloser atomic.Boolгарантирует, чтоonCloseвызывается ровно один раз- Отдельное логирование направлений загрузки и выгрузки
Поток UDP-соединения (NewPacketConnection)
func (m *ConnectionManager) NewPacketConnection(ctx, this, conn, metadata, onClose) {
if metadata.UDPConnect {
// Connected UDP: dial to specific destination
remoteConn, err = this.DialContext(ctx, "udp", metadata.Destination)
remotePacketConn = bufio.NewUnbindPacketConn(remoteConn)
} else {
// Unconnected UDP: listen for packets
remotePacketConn, destinationAddress, err = this.ListenPacket(ctx, metadata.Destination)
}
// NAT handling: translate addresses if resolved IP differs from domain
if destinationAddress.IsValid() {
remotePacketConn = bufio.NewNATPacketConn(remotePacketConn, destination, originDestination)
}
// UDP timeout (protocol-aware)
if udpTimeout > 0 {
ctx, conn = canceler.NewPacketConn(ctx, conn, udpTimeout)
}
// Bidirectional packet copy
go m.packetConnectionCopy(ctx, conn, destination, false, &done, onClose)
go m.packetConnectionCopy(ctx, destination, conn, true, &done, onClose)
}Тайм-аут UDP
Тайм-аут UDP определяется в порядке приоритета:
metadata.UDPTimeout(установлен действием правила)C.ProtocolTimeouts[protocol](специфичный для протокола, например DNS = 10 сек)- Тайм-аут по умолчанию
NAT PacketConn
Когда DNS разрешает домен в IP-адрес, удалённый сокет использует IP. Но клиент ожидает ответы от исходного домена. bufio.NewNATPacketConn транслирует адреса:
Client → conn.ReadPacket() → {dest: example.com:443}
↓ NAT translate
Remote → remoteConn.WritePacket() → {dest: 1.2.3.4:443}
↓ response
Remote → remoteConn.ReadPacket() → {from: 1.2.3.4:443}
↓ NAT translate back
Client → conn.WritePacket() → {from: example.com:443}Отслеживание соединений
ConnectionManager отслеживает все активные соединения для мониторинга и очистки:
func (m *ConnectionManager) TrackConn(conn net.Conn) net.Conn {
element := m.connections.PushBack(conn)
return &trackedConn{Conn: conn, manager: m, element: element}
}
// trackedConn removes itself from the list on Close()
func (c *trackedConn) Close() error {
c.manager.connections.Remove(c.element)
return c.Conn.Close()
}CloseAll() вызывается при завершении работы для закрытия всех активных соединений.
Последовательное подключение
Когда доступно несколько адресов назначения (из DNS-разрешения), dialer.DialSerialNetwork пробует их по порядку:
// Tries each address, respecting network strategy (prefer cellular, etc.)
func DialSerialNetwork(ctx, dialer, network, destination, addresses,
strategy, networkType, fallbackType, fallbackDelay) (net.Conn, error)Это интегрируется с системой сетевых стратегий для устройств с несколькими интерфейсами (мобильные).