lludp replace BeginReceiveFrom/AsyncEndReceive

UbitUmarov [2024-04-27 13:30:02]
lludp replace BeginReceiveFrom/AsyncEndReceive
Filename
OpenSim/Region/ClientStack/Linden/UDP/OpenSimUDPBase.cs
diff --git a/OpenSim/Region/ClientStack/Linden/UDP/OpenSimUDPBase.cs b/OpenSim/Region/ClientStack/Linden/UDP/OpenSimUDPBase.cs
index 74593d9..ea754ef 100644
--- a/OpenSim/Region/ClientStack/Linden/UDP/OpenSimUDPBase.cs
+++ b/OpenSim/Region/ClientStack/Linden/UDP/OpenSimUDPBase.cs
@@ -32,6 +32,8 @@ using log4net;
 using OpenSim.Framework;
 using OpenMetaverse;
 using OpenMetaverse.Packets;
+using System.Threading;
+using System.Threading.Tasks;

 namespace OpenSim.Region.ClientStack.LindenUDP
 {
@@ -72,6 +74,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP

         /// <summary>Returns true if the server is currently listening for inbound packets, otherwise false</summary>
         public bool IsRunningInbound { get; private set; }
+        public CancellationTokenSource InboundCancellationSource = new();
+

         /// <summary>Returns true if the server is currently sending outbound packets, otherwise false</summary>
         /// <remarks>If IsRunningOut = false, then any request to send a packet is simply dropped.</remarks>
@@ -240,10 +244,8 @@ namespace OpenSim.Region.ClientStack.LindenUDP

                 IsRunningInbound = true;

-                // kick off an async receive.  The Start() method will return, the
-                // actual receives will occur asynchronously and will be caught in
-                // AsyncEndRecieve().
-                AsyncBeginReceive();
+                // kick start the receiver tasks dance.
+                Task.Run(AsyncBeginReceive);
             }
         }

@@ -264,6 +266,7 @@ namespace OpenSim.Region.ClientStack.LindenUDP
                 m_log.DebugFormat("[UDPBASE]: Stopping inbound UDP loop");

                 IsRunningInbound = false;
+                InboundCancellationSource.Cancel();
                 m_udpSocket.Close();
                 m_udpSocket = null;
             }
@@ -276,119 +279,63 @@ namespace OpenSim.Region.ClientStack.LindenUDP
             IsRunningOutbound = false;
         }

-        private void AsyncBeginReceive()
+        private static IPEndPoint dummyIP = new IPEndPoint(IPAddress.Any, 0);
+        private static readonly ExpiringCacheOS<SocketAddress,EndPoint> IPEndpointsCache = new(300000);
+
+        private async void AsyncBeginReceive()
         {
+            SocketAddress workSktAddress = new(m_udpSocket.AddressFamily);
             while (IsRunningInbound)
             {
-                UDPPacketBuffer buf = GetNewUDPBuffer(new IPEndPoint(IPAddress.Any, 0)); // we need a fresh one here, for now at least
+                UDPPacketBuffer buf = GetNewUDPBuffer(null); // we need a fresh one here, for now at least
                 try
                 {
-                    // kick off an async read
-                    IAsyncResult iar = m_udpSocket.BeginReceiveFrom(
-                        buf.Data,
-                        0,
-                        buf.Data.Length,
-                        SocketFlags.None,
-                        ref buf.RemoteEndPoint,
-                        AsyncEndReceive,
-                        buf);
-
-                    if (!iar.CompletedSynchronously)
+                    int nbytes = await m_udpSocket.ReceiveFromAsync(buf.Data.AsMemory(), SocketFlags.None, workSktAddress, InboundCancellationSource.Token);
+                    if (!IsRunningInbound)
+                    {
+                        FreeUDPBuffer(buf);
                         return;
-                }
-                catch (SocketException e)
-                {
-                    if (e.SocketErrorCode == SocketError.ConnectionReset)
+                    }
+
+                    if (nbytes > 0)
                     {
-                        m_log.Warn("[UDPBASE]: SIO_UDP_CONNRESET was ignored, attempting to salvage the UDP listener on port " + m_udpPort);
+                        int startTick = Util.EnvironmentTickCount();
+                        if(!IPEndpointsCache.TryGetValue(workSktAddress, 300000, out EndPoint ep))
                         {
-                            try
-                            {
-                                IAsyncResult iar = m_udpSocket.BeginReceiveFrom(
-                                    buf.Data,
-                                    0,
-                                    buf.Data.Length,
-                                    SocketFlags.None,
-                                    ref buf.RemoteEndPoint,
-                                    AsyncEndReceive,
-                                    buf);
-
-                                if (!iar.CompletedSynchronously)
-                                    return;
-                            }
-                            catch (SocketException) { }
-                            catch (ObjectDisposedException) { return; }
+                            ep = dummyIP.Create(workSktAddress);
+                            IPEndpointsCache.AddOrUpdate(workSktAddress, ep, 300000);
                         }
-                        m_log.Warn("[UDPBASE]: Salvaged the UDP listener on port " + m_udpPort);
-                    }
-                }
-                catch (Exception e)
-                {
-                    m_log.Error(
-                        string.Format("[UDPBASE]: Error processing UDP begin receive {0}.  Exception  ", UdpReceives), e);
-                }
-            }
-        }

-        private void AsyncEndReceive(IAsyncResult iar)
-        {
-            if (IsRunningInbound)
-            {
-                bool sync = iar.CompletedSynchronously;
-                try
-                {
-                    // get the buffer that was created in AsyncBeginReceive
-                    // this is the received data
-                    UDPPacketBuffer buffer = (UDPPacketBuffer)iar.AsyncState;
-
-                    int startTick = Util.EnvironmentTickCount();
+                        buf.RemoteEndPoint = ep;

-                    // get the length of data actually read from the socket, store it with the
-                    // buffer
-                    buffer.DataLength = m_udpSocket.EndReceiveFrom(iar, ref buffer.RemoteEndPoint);
+                        buf.DataLength = nbytes;
+                        UdpReceives++;

+                        PacketReceived(buf);

-                    UdpReceives++;
-
-                    // call the abstract method PacketReceived(), passing the buffer that
-                    // has just been filled from the socket read.
-                    PacketReceived(buffer);
-
-                    // If more than one thread can be calling AsyncEndReceive() at once (e.g. if m_asyncPacketHandler)
-                    // then a particular stat may be inaccurate due to a race condition.  We won't worry about this
-                    // since this should be rare and  won't cause a runtime problem.
-                    if (m_currentReceiveTimeSamples >= s_receiveTimeSamples)
-                    {
-                        AverageReceiveTicksForLastSamplePeriod
-                            = (float)m_receiveTicksInCurrentSamplePeriod / s_receiveTimeSamples;
+                        if (m_currentReceiveTimeSamples >= s_receiveTimeSamples)
+                        {
+                            AverageReceiveTicksForLastSamplePeriod
+                                = (float)m_receiveTicksInCurrentSamplePeriod / s_receiveTimeSamples;

-                        m_receiveTicksInCurrentSamplePeriod = 0;
-                        m_currentReceiveTimeSamples = 0;
+                            m_receiveTicksInCurrentSamplePeriod = 0;
+                            m_currentReceiveTimeSamples = 0;
+                        }
+                        else
+                        {
+                            m_receiveTicksInCurrentSamplePeriod += Util.EnvironmentTickCountSubtract(startTick);
+                            m_currentReceiveTimeSamples++;
+                        }
                     }
                     else
-                    {
-                        m_receiveTicksInCurrentSamplePeriod += Util.EnvironmentTickCountSubtract(startTick);
-                        m_currentReceiveTimeSamples++;
-                    }
+                        FreeUDPBuffer(buf);
                 }
-                catch (SocketException se)
+                catch (OperationCanceledException)
                 {
-                    m_log.Error(
-                        string.Format(
-                            "[UDPBASE]: Error processing UDP end receive {0}, socket error code {1}.  Exception  ",
-                            UdpReceives, se.ErrorCode),
-                        se);
                 }
-                catch(ObjectDisposedException) { }
                 catch (Exception e)
                 {
-                    m_log.Error(
-                        string.Format("[UDPBASE]: Error processing UDP end receive {0}.  Exception  ", UdpReceives), e);
-                }
-                finally
-                {
-                    if (IsRunningInbound && !sync)
-                        AsyncBeginReceive();
+                    m_log.Error($"[UDPBASE]: Error processing UDP receiveFrom. Exception ", e);
                 }
             }
         }
ViewGit