//Copyright (c) 2007-2008 Henrik Schröder, Oliver Kofoed Pedersen
//Permission is hereby granted, free of charge, to any person
//obtaining a copy of this software and associated documentation
//files (the "Software"), to deal in the Software without
//restriction, including without limitation the rights to use,
//copy, modify, merge, publish, distribute, sublicense, and/or sell
//copies of the Software, and to permit persons to whom the
//Software is furnished to do so, subject to the following
//conditions:
//The above copyright notice and this permission notice shall be
//included in all copies or substantial portions of the Software.
//THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
//EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
//OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
//NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
//HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
//WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
//FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
//OTHER DEALINGS IN THE SOFTWARE.
using System.Net.Sockets;
using System.Net;
using System.IO;
using System;
using System.Threading;
using System.Collections.Generic;
using System.Text;
namespace Apollo.Common.Cache
{
///
/// The PooledSocket class encapsulates a socket connection to a specified memcached server.
/// It contains a buffered stream for communication, and methods for sending and retrieving
/// data from the memcached server, as well as general memcached error checking.
///
internal class PooledSocket : IDisposable
{
private SocketPool socketPool;
private Socket socket;
private Stream stream;
public readonly DateTime Created;
public PooledSocket(SocketPool socketPool, IPEndPoint endPoint, int sendReceiveTimeout)
{
this.socketPool = socketPool;
Created = DateTime.Now;
//Set up the socket.
socket = new Socket(endPoint.AddressFamily, SocketType.Stream, ProtocolType.Tcp);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.SendTimeout, sendReceiveTimeout);
socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReceiveTimeout, sendReceiveTimeout);
socket.ReceiveTimeout = sendReceiveTimeout;
socket.SendTimeout = sendReceiveTimeout;
//Do not use Nagle's Algorithm
socket.NoDelay = true;
//Establish connection
socket.Connect(endPoint);
//Wraps two layers of streams around the socket for communication.
stream = new BufferedStream(new NetworkStream(socket, false));
}
///
/// Disposing of a PooledSocket object in any way causes it to be returned to its SocketPool.
///
public void Dispose()
{
socketPool.Return(this);
}
///
/// This method closes the underlying stream and socket.
///
public void Close()
{
if (stream != null)
{
try { stream.Close(); }
catch (Exception e)
{
Console.WriteLine("Error closing stream: " + socketPool.Host);
}
stream = null;
}
if (socket != null)
{
try { socket.Shutdown(SocketShutdown.Both); }
catch (Exception e)
{
Console.WriteLine("Error shutting down socket: " + socketPool.Host);
}
try { socket.Close(); }
catch (Exception e)
{
Console.WriteLine("Error closing socket: " + socketPool.Host);
}
socket = null;
}
}
///
/// Checks if the underlying socket and stream is connected and available.
///
public bool IsAlive
{
get { return socket != null && socket.Connected && stream.CanRead; }
}
///
/// Writes a string to the socket encoded in UTF8 format.
///
public void Write(string str)
{
Write(Encoding.UTF8.GetBytes(str));
}
///
/// Writes an array of bytes to the socket and flushes the stream.
///
public void Write(byte[] bytes)
{
stream.Write(bytes, 0, bytes.Length);
stream.Flush();
}
///
/// Reads from the socket until the sequence '\r\n' is encountered,
/// and returns everything up to but not including that sequence as a UTF8-encoded string
///
public string ReadLine()
{
MemoryStream buffer = new MemoryStream();
int b;
bool gotReturn = false;
while ((b = stream.ReadByte()) != -1)
{
if (gotReturn)
{
if (b == 10)
{
break;
}
else
{
buffer.WriteByte(13);
gotReturn = false;
}
}
if (b == 13)
{
gotReturn = true;
}
else
{
buffer.WriteByte((byte)b);
}
}
return Encoding.UTF8.GetString(buffer.GetBuffer());
}
///
/// Reads a response line from the socket, checks for general memcached errors, and returns the line.
/// If an error is encountered, this method will throw an exception.
///
public string ReadResponse()
{
string response = ReadLine();
if (String.IsNullOrEmpty(response))
{
throw new Exception("Received empty response.");
}
if (response.StartsWith("ERROR")
|| response.StartsWith("CLIENT_ERROR")
|| response.StartsWith("SERVER_ERROR"))
{
throw new Exception("Server returned " + response);
}
return response;
}
///
/// Fills the given byte array with data from the socket.
///
public void Read(byte[] bytes)
{
if (bytes == null)
{
return;
}
int readBytes = 0;
while (readBytes < bytes.Length)
{
readBytes += stream.Read(bytes, readBytes, (bytes.Length - readBytes));
}
}
///
/// Reads from the socket until the sequence '\r\n' is encountered.
///
public void SkipUntilEndOfLine()
{
int b;
bool gotReturn = false;
while ((b = stream.ReadByte()) != -1)
{
if (gotReturn)
{
if (b == 10)
{
break;
}
else
{
gotReturn = false;
}
}
if (b == 13)
{
gotReturn = true;
}
}
}
///
/// Resets this PooledSocket by making sure the incoming buffer of the socket is empty.
/// If there was any leftover data, this method return true.
///
public bool Reset()
{
if (socket.Available > 0)
{
byte[] b = new byte[socket.Available];
Read(b);
return true;
}
return false;
}
}
internal delegate T UseSocket(PooledSocket socket);
internal delegate void UseSocket(PooledSocket socket);
///
/// The ServerPool encapsulates a collection of memcached servers and the associated SocketPool objects.
/// This class contains the server-selection logic, and contains methods for executing a block of code on
/// a socket from the server corresponding to a given key.
///
internal class ServerPool
{
//Expose the socket pools.
private SocketPool[] hostList;
internal SocketPool[] HostList { get { return hostList; } }
private Dictionary hostDictionary;
private uint[] hostKeys;
//Internal configuration properties
private int sendReceiveTimeout = 2000;
private uint maxPoolSize = 10;
private uint minPoolSize = 5;
private TimeSpan socketRecycleAge = TimeSpan.FromMinutes(30);
internal int SendReceiveTimeout { get { return sendReceiveTimeout; } set { sendReceiveTimeout = value; } }
internal uint MaxPoolSize { get { return maxPoolSize; } set { maxPoolSize = value; } }
internal uint MinPoolSize { get { return minPoolSize; } set { minPoolSize = value; } }
internal TimeSpan SocketRecycleAge { get { return socketRecycleAge; } set { socketRecycleAge = value; } }
///
/// Internal constructor. This method takes the array of hosts and sets up an internal list of socketpools.
///
internal ServerPool(string[] hosts)
{
hostDictionary = new Dictionary();
List pools = new List();
List keys = new List();
foreach (string host in hosts)
{
//Create pool
SocketPool pool = new SocketPool(this, host.Trim());
//Create 250 keys for this pool, store each key in the hostDictionary, as well as in the list of keys.
for (int i = 0; i < 250; i++)
{
uint key = (uint)i;
if (!hostDictionary.ContainsKey(key))
{
hostDictionary[key] = pool;
keys.Add(key);
}
}
pools.Add(pool);
}
//Hostlist should contain the list of all pools that has been created.
hostList = pools.ToArray();
//Hostkeys should contain the list of all key for all pools that have been created.
//This array forms the server key continuum that we use to lookup which server a
//given item key hash should be assigned to.
keys.Sort();
hostKeys = keys.ToArray();
}
///
/// Given an item key hash, this method returns the serverpool which is closest on the server key continuum.
///
internal SocketPool GetSocketPool(uint hash)
{
//Quick return if we only have one host.
if (hostList.Length == 1)
{
return hostList[0];
}
//New "ketama" host selection.
int i = Array.BinarySearch(hostKeys, hash);
//If not exact match...
if (i < 0)
{
//Get the index of the first item bigger than the one searched for.
i = ~i;
//If i is bigger than the last index, it was bigger than the last item = use the first item.
if (i >= hostKeys.Length)
{
i = 0;
}
}
return hostDictionary[hostKeys[i]];
}
internal SocketPool GetSocketPool(string host)
{
return Array.Find(HostList, delegate(SocketPool socketPool) { return socketPool.Host == host; });
}
///
/// This method executes the given delegate on a socket from the server that corresponds to the given hash.
/// If anything causes an error, the given defaultValue will be returned instead.
/// This method takes care of disposing the socket properly once the delegate has executed.
///
internal T Execute(uint hash, T defaultValue, UseSocket use)
{
return Execute(GetSocketPool(hash), defaultValue, use);
}
internal T Execute(SocketPool pool, T defaultValue, UseSocket use)
{
PooledSocket sock = null;
try
{
//Acquire a socket
sock = pool.Acquire();
//Use the socket as a parameter to the delegate and return its result.
if (sock != null)
{
return use(sock);
}
}
catch (Exception e)
{
Console.WriteLine("Error in Execute: " + pool.Host);
//Socket is probably broken
if (sock != null)
{
sock.Close();
}
}
finally
{
if (sock != null)
{
sock.Dispose();
}
}
return defaultValue;
}
internal void Execute(SocketPool pool, UseSocket use)
{
PooledSocket sock = null;
try
{
//Acquire a socket
sock = pool.Acquire();
//Use the socket as a parameter to the delegate and return its result.
if (sock != null)
{
use(sock);
}
}
catch (Exception e)
{
Console.WriteLine("Error in Execute: " + pool.Host);
//Socket is probably broken
if (sock != null)
{
sock.Close();
}
}
finally
{
if (sock != null)
{
sock.Dispose();
}
}
}
///
/// This method executes the given delegate on all servers.
///
internal void ExecuteAll(UseSocket use)
{
foreach (SocketPool socketPool in hostList)
{
Execute(socketPool, use);
}
}
}
///
/// The SocketPool encapsulates the list of PooledSockets against one specific host, and contains methods for
/// acquiring or returning PooledSockets.
///
internal class SocketPool
{
///
/// If the host stops responding, we mark it as dead for this amount of seconds,
/// and we double this for each consecutive failed retry. If the host comes alive
/// again, we reset this to 1 again.
///
private int deadEndPointSecondsUntilRetry = 1;
private const int maxDeadEndPointSecondsUntilRetry = 60 * 10; //10 minutes
private ServerPool owner;
private IPEndPoint endPoint;
private Queue queue;
//Debug variables and properties
private int newsockets = 0;
private int failednewsockets = 0;
private int reusedsockets = 0;
private int deadsocketsinpool = 0;
private int deadsocketsonreturn = 0;
private int dirtysocketsonreturn = 0;
private int acquired = 0;
public int NewSockets { get { return newsockets; } }
public int FailedNewSockets { get { return failednewsockets; } }
public int ReusedSockets { get { return reusedsockets; } }
public int DeadSocketsInPool { get { return deadsocketsinpool; } }
public int DeadSocketsOnReturn { get { return deadsocketsonreturn; } }
public int DirtySocketsOnReturn { get { return dirtysocketsonreturn; } }
public int Acquired { get { return acquired; } }
public int Poolsize { get { return queue.Count; } }
//Public variables and properties
public readonly string Host;
private bool isEndPointDead = false;
public bool IsEndPointDead { get { return isEndPointDead; } }
private DateTime deadEndPointRetryTime;
public DateTime DeadEndPointRetryTime { get { return deadEndPointRetryTime; } }
internal SocketPool(ServerPool owner, string host)
{
Host = host;
this.owner = owner;
endPoint = getEndPoint(host);
queue = new Queue();
}
///
/// This method parses the given string into an IPEndPoint.
/// If the string is malformed in some way, or if the host cannot be resolved, this method will throw an exception.
///
private static IPEndPoint getEndPoint(string host)
{
//Parse port, default to 11211.
int port = 11211;
if (host.Contains(":"))
{
string[] split = host.Split(new char[] { ':' });
if (!Int32.TryParse(split[1], out port))
{
throw new ArgumentException("Unable to parse host: " + host);
}
host = split[0];
}
//Parse host string.
IPAddress address;
if (IPAddress.TryParse(host, out address))
{
//host string successfully resolved as an IP address.
}
else
{
//See if we can resolve it as a hostname
try
{
address = Dns.GetHostEntry(host).AddressList[0];
}
catch (Exception e)
{
Console.WriteLine("Unable to resolve host: " + host);
return null;
}
}
return new IPEndPoint(address, port);
}
///
/// Gets a socket from the pool.
/// If there are no free sockets, a new one will be created. If something goes
/// wrong while creating the new socket, this pool's endpoint will be marked as dead
/// and all subsequent calls to this method will return null until the retry interval
/// has passed.
///
internal PooledSocket Acquire()
{
//Do we have free sockets in the pool?
//if so - return the first working one.
//if not - create a new one.
Interlocked.Increment(ref acquired);
lock (queue)
{
while (queue.Count > 0)
{
PooledSocket socket = queue.Dequeue();
if (socket != null && socket.IsAlive)
{
Interlocked.Increment(ref reusedsockets);
return socket;
}
Interlocked.Increment(ref deadsocketsinpool);
}
}
Interlocked.Increment(ref newsockets);
//If we know the endpoint is dead, check if it is time for a retry, otherwise return null.
if (isEndPointDead)
{
if (DateTime.Now > deadEndPointRetryTime)
{
//Retry
isEndPointDead = false;
}
else
{
//Still dead
return null;
}
}
//Try to create a new socket. On failure, mark endpoint as dead and return null.
try
{
PooledSocket socket = new PooledSocket(this, endPoint, owner.SendReceiveTimeout);
//Reset retry timer on success.
deadEndPointSecondsUntilRetry = 1;
return socket;
}
catch (Exception e)
{
Interlocked.Increment(ref failednewsockets);
Console.WriteLine("Error connecting to: " + endPoint.Address);
//Mark endpoint as dead
isEndPointDead = true;
//Retry in 2 minutes
deadEndPointRetryTime = DateTime.Now.AddSeconds(deadEndPointSecondsUntilRetry);
if (deadEndPointSecondsUntilRetry < maxDeadEndPointSecondsUntilRetry)
{
deadEndPointSecondsUntilRetry = deadEndPointSecondsUntilRetry * 2; //Double retry interval until next time
}
return null;
}
}
///
/// Returns a socket to the pool.
/// If the socket is dead, it will be destroyed.
/// If there are more than MaxPoolSize sockets in the pool, it will be destroyed.
/// If there are less than MinPoolSize sockets in the pool, it will always be put back.
/// If there are something inbetween those values, the age of the socket is checked.
/// If it is older than the SocketRrecycleAge, it is destroyed, otherwise it will be
/// put back in the pool.
///
internal void Return(PooledSocket socket)
{
//If the socket is dead, destroy it.
if (!socket.IsAlive)
{
Interlocked.Increment(ref deadsocketsonreturn);
socket.Close();
}
else
{
//Clean up socket
if (socket.Reset())
{
Interlocked.Increment(ref dirtysocketsonreturn);
}
//Check pool size.
if (queue.Count >= owner.MaxPoolSize)
{
//If the pool is full, destroy the socket.
socket.Close();
}
else if (queue.Count > owner.MinPoolSize && DateTime.Now - socket.Created > owner.SocketRecycleAge)
{
//If we have more than the minimum amount of sockets, but less than the max, and the socket is older than the recycle age, we destroy it.
socket.Close();
}
else
{
//Put the socket back in the pool.
lock (queue)
{
queue.Enqueue(socket);
}
}
}
}
}
}