Initial cache_peer standby=N implementation. No SSL peer support yet. The feature focus is to instantly provide a ready-to-use connection to a cooperating cache peer, virtually at all times. This is useful when connection establishment is "too slow" and/or when infrequent peer use prevents Squid from combating slow connection establishment with the regular idle connection pool. The feature is similar to Squid2 idle=N feature, but there are key differences: * Standby connections are available virtually at all times, while Squid2 unused "idle" connections are available only for a short time after a peer request. * All N standby connections are not opened at once, reducing the chance of the feature being mistaken for a DoS attack on a peer. * More consistent support for peers with multiple IP addresses (peer IPs are cycled through, just like during regular Squid request forwarding). Besides, "idle" is a poor choice of adjective for an unused connection pool name because the same term is used for used persistent connections, which have somewhat different properties, are stored in a different pool, may need distinct set of tuning options, etc. It is better to use a dedicated term for the new feature. The relationship between the max-conn limit and standby/idle connections is a complex one. After several rewrites and tests, Squid now obeys max-conn limit when opening new standby connections and accounts for standby connections when checking whether to allow peer use. This often works OK, but leads to standby guarantee violations when non-standby connections approach the limit. The alternative design where standby code ignores max-conn works better, but is really difficult to explain and advocate because an admin expects max-conn to cover all connections and because of the idle connections accounting and maintenance bugs. We may come back to this when the idle connections code is fixed. Fixed max-conn documentation and XXXed a peerHTTPOkay() bug (now in peerHasConnAvailable()) that results in max-conn limit preventing the use of a peer with idle persistent connections. Decided to use standby connections for non-retriable requests. Avoiding standby connections for POSTs and such would violate the main purpose of the feature: providing an instant ready-to-use connection. A user does not care whether it is waiting too long for a GET or POST request. Actually, a user may care more when their POST requests are delayed (because canceling and retrying them is often scary from the user point of view). The idea behind standby connections is that the admin is responsible for avoiding race conditions by properly configuring the peering Squids. If such proper configuration is not possible or the consequences of rare races (e.g., due to peer shutdown) are more severe than the consequences of slow requests, the admin should not use standby=N. This choice may become configurable in the future. TODO: Teach peer probing code to push successful probing connections into the standby pool (when enabled). Should be done as a followup project because of the differences in standby and probe connection opening code, especially when SSL peers are supported. Will require some discussion. A standby pool is using a full-blown PconnPool object for storage instead of the smaller IdleConnList, like the ICAP code does. The primary reasons for this design were: * A peer may have multiple addresses and those addresses may change. PconnPool has code to deal with multiple addresses while IdleConnList does not. I do not think this difference is really used in this implementation, but I did not want to face an unknown limitation. Note that ICAP does not support multiple ICAP server addresses. * PconnPool has reporting (and cache manager integration) code that we should eventually improve and report standby-specific stats. When this happens, PconnPool will probably become abstract and spawn two kids, one for pconn and one for standby pools. Seemingly unrelated changes triggered by standby=N addition: * Removed PconnPool from fde.h. We used to create immortal PconnPool objects. Now, standby pools are destroyed when their peer is destroyed. Sharing raw pointers to such pools is too dangerous. We could use smart pointers, but PconnPools do not really belong to such a low-level object like fde IMO. * Added FwdState::closeServerConnection() to encapsulate server connection closing code, including the new noteUses() maintenance. Also updated FwdState::serverClosed() to do the same maintenance. * Encapsulated commonly reused ToS and NfMark code into GetMarkingsToServer(). May need more work in FwdState where some similar-but-different code remains. * Close all connections in IdleConnList upon deletion. The old code did not care because we never deleted PconnPools (although I am not sure there were no bugs related to ICAP service pools which use IdleConnList directly and do get destroyed). * Fixed PconnPool::dumpHash(). It was listing the first entry twice because the code misused misnamed hash_next(). * Removed unnecessary hard-coded limit on the number of PconnPools. Use std::set for their storage. * Fixed very stale PconnPool::pop() documentation and polished its code. * Added RegisteredRunner::sync() method to use during Squid reconfiguration: The existing run() method and destructor are great for the initial configuration and final shutdown, but do not work well for reconfiguration when you do not want to completely destroy and then recreate the state. The sync() method (called via SyncRegistered) can be used for that. Eventually, the reconfiguration API should present the old "saved" config and the new "current" config to RegisteredRunners so that they can update their modules/features intelligently. For now, they just see the new config. === modified file 'src/CachePeer.h' --- src/CachePeer.h 2013-05-13 22:48:23 +0000 +++ src/CachePeer.h 2014-02-07 16:31:06 +0000 @@ -13,54 +13,57 @@ * incorporates software developed and/or copyrighted by other * sources; see the CREDITS file for full details. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "acl/forward.h" +#include "base/CbcPointer.h" #include "enums.h" #include "icp_opcode.h" #include "ip/Address.h" //TODO: remove, it is unconditionally defined and always used. #define PEER_MULTICAST_SIBLINGS 1 #if USE_SSL #include <openssl/ssl.h> #endif class CachePeerDomainList; class NeighborTypeDomainList; +class PconnPool; class PeerDigest; +class PeerPoolMgr; // currently a POD class CachePeer { public: u_int index; char *name; char *host; peer_t type; Ip::Address in_addr; struct { int pings_sent; int pings_acked; int fetches; int rtt; int ignored_replies; int n_keepalives_sent; int n_keepalives_recv; @@ -169,40 +172,46 @@ public: double load_multiplier; double load_factor; /* normalized weight value */ } carp; #if USE_AUTH struct { unsigned int hash; double load_multiplier; double load_factor; /* normalized weight value */ } userhash; #endif struct { unsigned int hash; double load_multiplier; double load_factor; /* normalized weight value */ } sourcehash; char *login; /* Proxy authorization */ time_t connect_timeout; int connect_fail_limit; int max_conn; + struct { + PconnPool *pool; ///< idle connection pool for this peer + CbcPointer<PeerPoolMgr> mgr; ///< pool manager + int limit; ///< the limit itself + bool waitingForClose; ///< a conn must close before we open a standby conn + } standby; ///< optional "cache_peer standby=limit" feature char *domain; /* Forced domain */ #if USE_SSL int use_ssl; char *sslcert; char *sslkey; int sslversion; char *ssloptions; char *sslcipher; char *sslcafile; char *sslcapath; char *sslcrlfile; char *sslflags; char *ssldomain; SSL_CTX *sslContext; SSL_SESSION *sslSession; #endif int front_end_https; int connection_auth; === modified file 'src/FwdState.cc' --- src/FwdState.cc 2013-08-15 22:09:07 +0000 +++ src/FwdState.cc 2014-02-06 18:06:52 +0000 @@ -48,91 +48,99 @@ #include "fd.h" #include "fde.h" #include "ftp.h" #include "FwdState.h" #include "globals.h" #include "gopher.h" #include "hier_code.h" #include "http.h" #include "HttpReply.h" #include "HttpRequest.h" #include "icmp/net_db.h" #include "internal.h" #include "ip/Intercept.h" #include "ip/QosConfig.h" #include "ip/tools.h" #include "MemObject.h" #include "mgr/Registration.h" #include "neighbors.h" #include "pconn.h" #include "PeerSelectState.h" +#include "PeerPoolMgr.h" #include "SquidConfig.h" #include "SquidTime.h" #include "Store.h" #include "StoreClient.h" #include "urn.h" #include "whois.h" #if USE_SSL #include "ssl/cert_validate_message.h" #include "ssl/Config.h" #include "ssl/helper.h" #include "ssl/support.h" #include "ssl/ErrorDetail.h" #include "ssl/ServerBump.h" #endif #if HAVE_ERRNO_H #include <errno.h> #endif static PSC fwdPeerSelectionCompleteWrapper; static CLCB fwdServerClosedWrapper; #if USE_SSL static PF fwdNegotiateSSLWrapper; #endif static CNCB fwdConnectDoneWrapper; static OBJH fwdStats; #define MAX_FWD_STATS_IDX 9 static int FwdReplyCodes[MAX_FWD_STATS_IDX + 1][Http::scInvalidHeader + 1]; -static PconnPool *fwdPconnPool = new PconnPool("server-side"); +static PconnPool *fwdPconnPool = new PconnPool("server-side", NULL); CBDATA_CLASS_INIT(FwdState); void FwdState::abort(void* d) { FwdState* fwd = (FwdState*)d; Pointer tmp = fwd; // Grab a temporary pointer to keep the object alive during our scope. if (Comm::IsConnOpen(fwd->serverConnection())) { - comm_remove_close_handler(fwd->serverConnection()->fd, fwdServerClosedWrapper, fwd); - debugs(17, 3, HERE << "store entry aborted; closing " << - fwd->serverConnection()); - fwd->serverConnection()->close(); + fwd->closeServerConnection("store entry aborted"); } else { debugs(17, 7, HERE << "store entry aborted; no connection to close"); } fwd->serverDestinations.clean(); fwd->self = NULL; } +/// stops monitoring server connection for closure and updates pconn stats +void +FwdState::closeServerConnection(const char *reason) +{ + debugs(17, 3, "because " << reason << "; " << serverConn); + comm_remove_close_handler(serverConn->fd, fwdServerClosedWrapper, this); + fwdPconnPool->noteUses(fd_table[serverConn->fd].pconn.uses); + serverConn->close(); +} + /**** PUBLIC INTERFACE ********************************************************/ FwdState::FwdState(const Comm::ConnectionPointer &client, StoreEntry * e, HttpRequest * r, const AccessLogEntryPointer &alp): al(alp) { debugs(17, 2, HERE << "Forwarding client request " << client << ", url=" << e->url() ); entry = e; clientConn = client; request = r; HTTPMSGLOCK(request); pconnRace = raceImpossible; start_t = squid_curtime; serverDestinations.reserve(Config.forward_max_tries); e->lock("FwdState"); EBIT_SET(e->flags, ENTRY_FWD_HDR_WAIT); } // Called once, right after object creation, when it is safe to set self void FwdState::start(Pointer aSelf) { @@ -254,45 +262,42 @@ FwdState::~FwdState() if (! flags.forward_completed) completed(); doneWithRetries(); HTTPMSGUNLOCK(request); delete err; entry->unregisterAbort(); entry->unlock("FwdState"); entry = NULL; if (calls.connector != NULL) { calls.connector->cancel("FwdState destructed"); calls.connector = NULL; } - if (Comm::IsConnOpen(serverConn)) { - comm_remove_close_handler(serverConnection()->fd, fwdServerClosedWrapper, this); - debugs(17, 3, HERE << "closing FD " << serverConnection()->fd); - serverConn->close(); - } + if (Comm::IsConnOpen(serverConn)) + closeServerConnection("~FwdState"); serverDestinations.clean(); debugs(17, 3, HERE << "FwdState destructor done"); } /** * This is the entry point for client-side to start forwarding * a transaction. It is a static method that may or may not * allocate a FwdState. */ void FwdState::Start(const Comm::ConnectionPointer &clientConn, StoreEntry *entry, HttpRequest *request, const AccessLogEntryPointer &al) { /** \note * client_addr == no_addr indicates this is an "internal" request * from peer_digest.c, asn.c, netdb.c, etc and should always * be allowed. yuck, I know. */ @@ -581,41 +586,44 @@ FwdState::checkRetry() * after a failure. If the request is not retriable then we don't * want to risk sending it on a persistent connection. Instead we'll * force it to go on a new HTTP connection. */ bool FwdState::checkRetriable() { // Optimize: A compliant proxy may retry PUTs, but Squid lacks the [rather // complicated] code required to protect the PUT request body from being // nibbled during the first try. Thus, Squid cannot retry some PUTs today. if (request->body_pipe != NULL) return false; // RFC2616 9.1 Safe and Idempotent Methods return (request->method.isHttpSafe() || request->method.isIdempotent()); } void FwdState::serverClosed(int fd) { - debugs(17, 2, HERE << "FD " << fd << " " << entry->url()); + debugs(17, 2, "FD " << fd << " " << entry->url() << " after " << + fd_table[fd].pconn.uses << " requests"); + if (serverConnection()->fd == fd) // should be, but not critical to assert + fwdPconnPool->noteUses(fd_table[fd].pconn.uses); retryOrBail(); } void FwdState::retryOrBail() { if (checkRetry()) { debugs(17, 3, HERE << "re-forwarding (" << n_tries << " tries, " << (squid_curtime - start_t) << " secs)"); // we should retry the same destination if it failed due to pconn race if (pconnRace == raceHappened) debugs(17, 4, HERE << "retrying the same destination"); else serverDestinations.shift(); // last one failed. try another. startConnectionOrFail(); return; } // TODO: should we call completed() here and move doneWithRetries there? doneWithRetries(); @@ -1131,114 +1139,103 @@ FwdState::connectStart() return; } // Pinned connection failure. debugs(17,2,HERE << "Pinned connection failed: " << pinned_connection); ErrorState *anErr = new ErrorState(ERR_ZERO_SIZE_OBJECT, Http::scServiceUnavailable, request); fail(anErr); self = NULL; // refcounted return; } // Use pconn to avoid opening a new connection. const char *host = NULL; if (!serverDestinations[0]->getPeer()) host = request->GetHost(); Comm::ConnectionPointer temp; // Avoid pconns after races so that the same client does not suffer twice. // This does not increase the total number of connections because we just // closed the connection that failed the race. And re-pinning assumes this. if (pconnRace != raceHappened) - temp = fwdPconnPool->pop(serverDestinations[0], host, checkRetriable()); + temp = pconnPop(serverDestinations[0], host); const bool openedPconn = Comm::IsConnOpen(temp); pconnRace = openedPconn ? racePossible : raceImpossible; // if we found an open persistent connection to use. use it. if (openedPconn) { serverConn = temp; flags.connected_okay = true; debugs(17, 3, HERE << "reusing pconn " << serverConnection()); ++n_tries; comm_add_close_handler(serverConnection()->fd, fwdServerClosedWrapper, this); /* Update server side TOS and Netfilter mark on the connection. */ if (Ip::Qos::TheConfig.isAclTosActive()) { const tos_t tos = GetTosToServer(request); Ip::Qos::setSockTos(temp, tos); } #if SO_MARK if (Ip::Qos::TheConfig.isAclNfmarkActive()) { const nfmark_t nfmark = GetNfmarkToServer(request); Ip::Qos::setSockNfmark(temp, nfmark); } #endif dispatch(); return; } // We will try to open a new connection, possibly to the same destination. // We reset serverDestinations[0] in case we are using it again because // ConnOpener modifies its destination argument. serverDestinations[0]->local.port(0); serverConn = NULL; #if URL_CHECKSUM_DEBUG entry->mem_obj->checkUrlChecksum(); #endif - /* Get the server side TOS and Netfilter mark to be set on the connection. */ - if (Ip::Qos::TheConfig.isAclTosActive()) { - serverDestinations[0]->tos = GetTosToServer(request); - } -#if SO_MARK && USE_LIBCAP - serverDestinations[0]->nfmark = GetNfmarkToServer(request); - debugs(17, 3, "fwdConnectStart: got outgoing addr " << serverDestinations[0]->local << ", tos " << int(serverDestinations[0]->tos) - << ", netfilter mark " << serverDestinations[0]->nfmark); -#else - serverDestinations[0]->nfmark = 0; - debugs(17, 3, "fwdConnectStart: got outgoing addr " << serverDestinations[0]->local << ", tos " << int(serverDestinations[0]->tos)); -#endif + GetMarkingsToServer(request, *serverDestinations[0]); calls.connector = commCbCall(17,3, "fwdConnectDoneWrapper", CommConnectCbPtrFun(fwdConnectDoneWrapper, this)); Comm::ConnOpener *cs = new Comm::ConnOpener(serverDestinations[0], calls.connector, ctimeout); if (host) cs->setHost(host); AsyncJob::Start(cs); } void FwdState::dispatch() { debugs(17, 3, HERE << clientConn << ": Fetching '" << RequestMethodStr(request->method) << " " << entry->url() << "'"); /* * Assert that server_fd is set. This is to guarantee that fwdState * is attached to something and will be deallocated when server_fd * is closed. */ assert(Comm::IsConnOpen(serverConn)); fd_note(serverConnection()->fd, entry->url()); - fd_table[serverConnection()->fd].noteUse(fwdPconnPool); + fd_table[serverConnection()->fd].noteUse(); /*assert(!EBIT_TEST(entry->flags, ENTRY_DISPATCHED)); */ assert(entry->ping_status != PING_WAITING); assert(entry->locked()); EBIT_SET(entry->flags, ENTRY_DISPATCHED); netdbPingSite(request->GetHost()); /* Retrieves remote server TOS or MARK value, and stores it as part of the * original client request FD object. It is later used to forward * remote server's TOS/MARK in the response to the client in case of a MISS. */ if (Ip::Qos::TheConfig.isHitNfmarkActive()) { if (Comm::IsConnOpen(clientConn) && Comm::IsConnOpen(serverConnection())) { fde * clientFde = &fd_table[clientConn->fd]; // XXX: move the fd_table access into Ip::Qos /* Get the netfilter mark for the connection */ Ip::Qos::getNfmarkFromServer(serverConnection(), clientFde); } @@ -1439,40 +1436,56 @@ FwdState::reforwardableStatus(const Http /* NOTREACHED */ } /** * Decide where details need to be gathered to correctly describe a persistent connection. * What is needed: * - the address/port details about this link * - domain name of server at other end of this link (either peer or requested host) */ void FwdState::pconnPush(Comm::ConnectionPointer &conn, const char *domain) { if (conn->getPeer()) { fwdPconnPool->push(conn, NULL); } else { fwdPconnPool->push(conn, domain); } } +Comm::ConnectionPointer +FwdState::pconnPop(const Comm::ConnectionPointer &dest, const char *domain) +{ + // always call shared pool first because we need to close an idle + // connection there if we have to use a standby connection. + Comm::ConnectionPointer conn = fwdPconnPool->pop(dest, domain, checkRetriable()); + if (!Comm::IsConnOpen(conn)) { + // either there was no pconn to pop or this is not a retriable xaction + if (CachePeer *peer = dest->getPeer()) { + if (peer->standby.pool) + conn = peer->standby.pool->pop(dest, domain, true); + } + } + return conn; // open, closed, or nil +} + void FwdState::initModule() { RegisterWithCacheManager(); } void FwdState::RegisterWithCacheManager(void) { Mgr::RegisterAction("forward", "Request Forwarding Statistics", fwdStats, 0, 1); } void FwdState::logReplyStatus(int tries, const Http::StatusCode status) { if (status > Http::scInvalidHeader) return; assert(tries >= 0); @@ -1564,20 +1577,37 @@ getOutgoingAddress(HttpRequest * request if (!l->aclList || ch.fastCheck(l->aclList) == ACCESS_ALLOWED) { conn->local = l->addr; return; } } } tos_t GetTosToServer(HttpRequest * request) { ACLFilledChecklist ch(NULL, request, NULL); return aclMapTOS(Ip::Qos::TheConfig.tosToServer, &ch); } nfmark_t GetNfmarkToServer(HttpRequest * request) { ACLFilledChecklist ch(NULL, request, NULL); return aclMapNfmark(Ip::Qos::TheConfig.nfmarkToServer, &ch); } + +void +GetMarkingsToServer(HttpRequest * request, Comm::Connection &conn) +{ + // Get the server side TOS and Netfilter mark to be set on the connection. + if (Ip::Qos::TheConfig.isAclTosActive()) { + conn.tos = GetTosToServer(request); + debugs(17, 3, "from " << conn.local << " tos " << int(conn.tos)); + } + +#if SO_MARK && USE_LIBCAP + conn.nfmark = GetNfmarkToServer(request); + debugs(17, 3, "from " << conn.local << " netfilter mark " << conn.nfmark); +#else + conn.nfmark = 0; +#endif +} === modified file 'src/FwdState.h' --- src/FwdState.h 2013-06-07 04:35:25 +0000 +++ src/FwdState.h 2014-02-06 05:44:58 +0000 @@ -1,129 +1,137 @@ #ifndef SQUID_FORWARD_H #define SQUID_FORWARD_H #include "base/Vector.h" #include "base/RefCount.h" #include "comm.h" #include "comm/Connection.h" #include "err_type.h" #include "fde.h" #include "http/StatusCode.h" #include "ip/Address.h" #if USE_SSL #include "ssl/support.h" #endif /* forward decls */ class AccessLogEntry; typedef RefCount<AccessLogEntry> AccessLogEntryPointer; +class PconnPool; +typedef RefCount<PconnPool> PconnPoolPointer; class ErrorState; class HttpRequest; #if USE_SSL namespace Ssl { class ErrorDetail; class CertValidationResponse; }; #endif /** * Returns the TOS value that we should be setting on the connection * to the server, based on the ACL. */ tos_t GetTosToServer(HttpRequest * request); /** * Returns the Netfilter mark value that we should be setting on the * connection to the server, based on the ACL. */ nfmark_t GetNfmarkToServer(HttpRequest * request); +/// Sets initial TOS value and Netfilter for the future outgoing connection. +void GetMarkingsToServer(HttpRequest * request, Comm::Connection &conn); + class HelperReply; class FwdState : public RefCountable { public: typedef RefCount<FwdState> Pointer; ~FwdState(); static void initModule(); /// Initiates request forwarding to a peer or origin server. static void Start(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *, const AccessLogEntryPointer &alp); /// Same as Start() but no master xaction info (AccessLogEntry) available. static void fwdStart(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *); /// This is the real beginning of server connection. Call it whenever /// the forwarding server destination has changed and a new one needs to be opened. /// Produces the cannot-forward error on fail if no better error exists. void startConnectionOrFail(); void fail(ErrorState *err); void unregister(Comm::ConnectionPointer &conn); void unregister(int fd); void complete(); void handleUnregisteredServerEnd(); int reforward(); bool reforwardableStatus(const Http::StatusCode s) const; void serverClosed(int fd); void connectStart(); void connectDone(const Comm::ConnectionPointer & conn, comm_err_t status, int xerrno); void connectTimeout(int fd); void initiateSSL(); void negotiateSSL(int fd); bool checkRetry(); bool checkRetriable(); void dispatch(); + Comm::ConnectionPointer pconnPop(const Comm::ConnectionPointer &dest, const char *domain); void pconnPush(Comm::ConnectionPointer & conn, const char *domain); bool dontRetry() { return flags.dont_retry; } void dontRetry(bool val) { flags.dont_retry = val; } /** return a ConnectionPointer to the current server connection (may or may not be open) */ Comm::ConnectionPointer const & serverConnection() const { return serverConn; }; #if USE_SSL /// Callback function called when squid receive message from cert validator helper static void sslCrtvdHandleReplyWrapper(void *data, Ssl::CertValidationResponse const &); /// Process response from cert validator helper void sslCrtvdHandleReply(Ssl::CertValidationResponse const &); /// Check SSL errors returned from cert validator against sslproxy_cert_error access list Ssl::CertErrors *sslCrtvdCheckForErrors(Ssl::CertValidationResponse const &, Ssl::ErrorDetail *&); #endif private: // hidden for safer management of self; use static fwdStart FwdState(const Comm::ConnectionPointer &client, StoreEntry *, HttpRequest *, const AccessLogEntryPointer &alp); void start(Pointer aSelf); #if STRICT_ORIGINAL_DST void selectPeerForIntercepted(); #endif static void logReplyStatus(int tries, const Http::StatusCode status); void doneWithRetries(); void completed(); void retryOrBail(); ErrorState *makeConnectingError(const err_type type) const; static void RegisterWithCacheManager(void); + void closeServerConnection(const char *reason); + public: StoreEntry *entry; HttpRequest *request; AccessLogEntryPointer al; ///< info for the future access.log entry static void abort(void*); private: Pointer self; ErrorState *err; Comm::ConnectionPointer clientConn; ///< a possibly open connection to the client. time_t start_t; int n_tries; // AsyncCalls which we set and may need cancelling. struct { AsyncCall::Pointer connector; ///< a call linking us to the ConnOpener producing serverConn. } calls; struct { === modified file 'src/Makefile.am' --- src/Makefile.am 2013-08-16 15:40:14 +0000 +++ src/Makefile.am 2014-02-06 18:13:24 +0000 @@ -443,40 +443,42 @@ squid_SOURCES = \ neighbors.h \ neighbors.cc \ Notes.cc \ Notes.h \ Packer.cc \ Packer.h \ Parsing.cc \ Parsing.h \ $(XPROF_STATS_SOURCE) \ pconn.cc \ pconn.h \ PeerDigest.h \ peer_digest.cc \ peer_proxy_negotiate_auth.h \ peer_proxy_negotiate_auth.cc \ peer_select.cc \ peer_sourcehash.h \ peer_sourcehash.cc \ peer_userhash.h \ peer_userhash.cc \ + PeerPoolMgr.h \ + PeerPoolMgr.cc \ PeerSelectState.h \ PingData.h \ protos.h \ redirect.h \ redirect.cc \ refresh.h \ refresh.cc \ RemovalPolicy.cc \ RemovalPolicy.h \ send-announce.h \ send-announce.cc \ $(SBUF_SOURCE) \ $(SNMP_SOURCE) \ SquidMath.h \ SquidMath.cc \ SquidNew.cc \ IoStats.h \ stat.h \ stat.cc \ StatCounters.h \ @@ -1506,40 +1508,42 @@ tests_testCacheManager_SOURCES = \ MemObject.cc \ mime.h \ mime.cc \ mime_header.h \ mime_header.cc \ neighbors.h \ neighbors.cc \ Notes.cc \ Notes.h \ Packer.cc \ Parsing.cc \ pconn.cc \ peer_digest.cc \ peer_proxy_negotiate_auth.h \ peer_proxy_negotiate_auth.cc \ peer_select.cc \ peer_sourcehash.h \ peer_sourcehash.cc \ peer_userhash.h \ peer_userhash.cc \ + PeerPoolMgr.h \ + PeerPoolMgr.cc \ redirect.h \ redirect.cc \ refresh.h \ refresh.cc \ RemovalPolicy.cc \ Server.cc \ $(SNMP_SOURCE) \ SquidMath.h \ SquidMath.cc \ IoStats.h \ stat.h \ stat.cc \ StatCounters.h \ StatCounters.cc \ StatHist.h \ StrList.h \ StrList.cc \ tests/stub_StatHist.cc \ stmem.cc \ repl_modules.h \ @@ -1886,40 +1890,42 @@ tests_testEvent_SOURCES = \ http.cc \ HttpBody.h \ HttpBody.cc \ HttpHeader.h \ HttpHeader.cc \ HttpHeaderFieldInfo.h \ HttpHeaderTools.h \ HttpHeaderTools.cc \ HttpHeaderFieldStat.h \ HttpHdrCc.h \ HttpHdrCc.cc \ HttpHdrCc.cci \ HttpHdrContRange.cc \ HttpHdrRange.cc \ HttpHdrSc.cc \ HttpHdrScTarget.cc \ HttpMsg.cc \ HttpParser.cc \ HttpParser.h \ HttpReply.cc \ + PeerPoolMgr.h \ + PeerPoolMgr.cc \ RequestFlags.h \ RequestFlags.cc \ HttpRequest.cc \ HttpRequestMethod.cc \ icp_v2.cc \ icp_v3.cc \ $(IPC_SOURCE) \ ipcache.cc \ int.h \ int.cc \ internal.h \ internal.cc \ SquidList.h \ SquidList.cc \ MasterXaction.cc \ MasterXaction.h \ Mem.h \ mem.cc \ mem_node.cc \ MemBlob.cc \ @@ -2137,40 +2143,42 @@ tests_testEventLoop_SOURCES = \ http.cc \ HttpBody.h \ HttpBody.cc \ HttpHeader.h \ HttpHeader.cc \ HttpHeaderFieldInfo.h \ HttpHeaderTools.h \ HttpHeaderTools.cc \ HttpHeaderFieldStat.h \ HttpHdrCc.h \ HttpHdrCc.cc \ HttpHdrCc.cci \ HttpHdrContRange.cc \ HttpHdrRange.cc \ HttpHdrSc.cc \ HttpHdrScTarget.cc \ HttpMsg.cc \ HttpParser.cc \ HttpParser.h \ HttpReply.cc \ + PeerPoolMgr.h \ + PeerPoolMgr.cc \ RequestFlags.h \ RequestFlags.cc \ HttpRequest.cc \ HttpRequestMethod.cc \ icp_v2.cc \ icp_v3.cc \ $(IPC_SOURCE) \ ipcache.cc \ int.h \ int.cc \ internal.h \ internal.cc \ SquidList.h \ SquidList.cc \ MasterXaction.cc \ MasterXaction.h \ MemBlob.cc \ MemBuf.cc \ MemObject.cc \ Mem.h \ @@ -2385,40 +2393,42 @@ tests_test_http_range_SOURCES = \ http.cc \ HttpBody.h \ HttpBody.cc \ HttpHeaderFieldStat.h \ HttpHdrCc.h \ HttpHdrCc.cc \ HttpHdrCc.cci \ HttpHdrContRange.cc \ HttpHdrRange.cc \ HttpHdrSc.cc \ HttpHdrScTarget.cc \ HttpHeader.h \ HttpHeader.cc \ HttpHeaderFieldInfo.h \ HttpHeaderTools.h \ HttpHeaderTools.cc \ HttpMsg.cc \ HttpParser.cc \ HttpParser.h \ HttpReply.cc \ + PeerPoolMgr.h \ + PeerPoolMgr.cc \ RequestFlags.h \ RequestFlags.cc \ HttpRequest.cc \ HttpRequestMethod.cc \ icp_v2.cc \ icp_v3.cc \ int.h \ int.cc \ internal.h \ internal.cc \ $(IPC_SOURCE) \ ipcache.cc \ SquidList.h \ SquidList.cc \ MasterXaction.cc \ MasterXaction.h \ MemBlob.cc \ MemBuf.cc \ MemObject.cc \ Mem.h \ @@ -2719,40 +2729,42 @@ tests_testHttpRequest_SOURCES = \ MemObject.cc \ mime.h \ mime.cc \ mime_header.h \ mime_header.cc \ neighbors.h \ neighbors.cc \ Notes.cc \ Notes.h \ Packer.cc \ Parsing.cc \ pconn.cc \ peer_digest.cc \ peer_proxy_negotiate_auth.h \ peer_proxy_negotiate_auth.cc \ peer_select.cc \ peer_sourcehash.h \ peer_sourcehash.cc \ peer_userhash.h \ peer_userhash.cc \ + PeerPoolMgr.h \ + PeerPoolMgr.cc \ redirect.h \ redirect.cc \ refresh.h \ refresh.cc \ RemovalPolicy.cc \ Server.cc \ $(SNMP_SOURCE) \ SquidMath.h \ SquidMath.cc \ IoStats.h \ stat.h \ stat.cc \ StatCounters.h \ StatCounters.cc \ StatHist.h \ StatHist.cc \ stmem.cc \ repl_modules.h \ store.cc \ store_client.cc \ @@ -3659,40 +3671,42 @@ tests_testURL_SOURCES = \ http.cc \ HttpBody.h \ HttpBody.cc \ HttpHeaderFieldStat.h \ HttpHdrCc.h \ HttpHdrCc.cc \ HttpHdrCc.cci \ HttpHdrContRange.cc \ HttpHdrRange.cc \ HttpHdrSc.cc \ HttpHdrScTarget.cc \ HttpHeader.h \ HttpHeader.cc \ HttpHeaderFieldInfo.h \ HttpHeaderTools.h \ HttpHeaderTools.cc \ HttpMsg.cc \ HttpParser.cc \ HttpParser.h \ HttpReply.cc \ + PeerPoolMgr.h \ + PeerPoolMgr.cc \ RequestFlags.h \ RequestFlags.cc \ HttpRequest.cc \ HttpRequestMethod.cc \ icp_v2.cc \ icp_v3.cc \ $(IPC_SOURCE) \ ipcache.cc \ int.h \ int.cc \ internal.h \ internal.cc \ SquidList.h \ SquidList.cc \ MasterXaction.cc \ MasterXaction.h \ multicast.h \ multicast.cc \ Mem.h \ mem.cc \ === added file 'src/PeerPoolMgr.cc' --- src/PeerPoolMgr.cc 1970-01-01 00:00:00 +0000 +++ src/PeerPoolMgr.cc 2014-02-07 16:55:24 +0000 @@ -0,0 +1,190 @@ +#include "squid.h" +#include "base/AsyncJobCalls.h" +#include "base/RunnersRegistry.h" +#include "CachePeer.h" +#include "comm/Connection.h" +#include "comm/ConnOpener.h" +#include "Debug.h" +#include "fd.h" +#include "FwdState.h" +#include "globals.h" +#include "neighbors.h" +#include "pconn.h" +#include "PeerPoolMgr.h" +#include "SquidConfig.h" + +CBDATA_CLASS_INIT(PeerPoolMgr); + +PeerPoolMgr::PeerPoolMgr(CachePeer *aPeer): AsyncJob("PeerPoolMgr"), + peer(cbdataReference(aPeer)), + opener(), + addrUsed(0) +{ +} + +PeerPoolMgr::~PeerPoolMgr() +{ + cbdataReferenceDone(peer); +} + +void +PeerPoolMgr::start() +{ + AsyncJob::start(); + checkpoint("peer initialized"); +} + +void +PeerPoolMgr::swanSong() +{ + AsyncJob::swanSong(); +} + +/// whether the peer is still out there and in a valid state we can safely use +bool +PeerPoolMgr::validPeer() const { + return peer && cbdataReferenceValid(peer) && peer->standby.pool; +} + +bool +PeerPoolMgr::doneAll() const +{ + return !(validPeer() && peer->standby.limit) && AsyncJob::doneAll(); +} + +/// Comm::ConnOpener calls this when done opening a connection for us +void +PeerPoolMgr::handleOpenedConnection(const CommConnectCbParams ¶ms) +{ + opener = NULL; + + if (!validPeer()) { + debugs(48, 3, "peer gone"); + if (params.conn != NULL) + params.conn->close(); + return; + } + + if (params.flag != COMM_OK) { + /* it might have been a timeout with a partially open link */ + if (params.conn != NULL) + params.conn->close(); + peerConnectFailed(peer); + checkpoint("conn opening failure"); // may retry + return; + } + + Must(params.conn != NULL); + + // TODO: Handle SSL peers. + + peer->standby.pool->push(params.conn, NULL /* domain */); + // push() will trigger a checkpoint() +} + +/// starts the process of opening a new standby connection (if possible) +void +PeerPoolMgr::openNewConnection() +{ + // KISS: Do nothing else when we are already doing something. + if (opener != NULL || shutting_down) { + debugs(48, 7, "busy: " << opener << '|' << shutting_down); + return; // there will be another checkpoint when we are done opening + } + + // Do not talk to a peer until it is ready. + if (!neighborUp(peer)) // provides debugging + return; // there will be another checkpoint when peer is up + + // Do not violate peer limits. + if (!peerCanOpenMore(peer)) { // provides debugging + peer->standby.waitingForClose = true; // may already be true + return; // there will be another checkpoint when a peer conn closes + } + + // Do not violate global restrictions. + if (fdUsageHigh()) { + debugs(48, 7, "overwhelmed"); + peer->standby.waitingForClose = true; // may already be true + // There will be another checkpoint when a peer conn closes OR when + // a future pop() fails due to an empty pool. See PconnPool::pop(). + return; + } + + peer->standby.waitingForClose = false; + + Comm::ConnectionPointer conn = new Comm::Connection; + Must(peer->n_addresses); // guaranteed by neighborUp() above + // cycle through all available IP addresses + conn->remote = peer->addresses[addrUsed++ % peer->n_addresses]; + conn->remote.port(peer->http_port); + conn->peerType = STANDBY_POOL; // should be reset by peerSelect() + conn->setPeer(peer); + getOutgoingAddress(NULL /* request */, conn); + GetMarkingsToServer(NULL /* request */, *conn); + + const int ctimeout = peer->connect_timeout > 0 ? + peer->connect_timeout : Config.Timeout.peer_connect; + typedef CommCbMemFunT<PeerPoolMgr, CommConnectCbParams> Dialer; + opener = JobCallback(48, 5, Dialer, this, PeerPoolMgr::handleOpenedConnection); + Comm::ConnOpener *cs = new Comm::ConnOpener(conn, opener, ctimeout); + AsyncJob::Start(cs); +} + +void +PeerPoolMgr::closeOldConnections(const int howMany) +{ + debugs(48, 8, howMany); + peer->standby.pool->closeN(howMany); +} + +void +PeerPoolMgr::checkpoint(const char *reason) +{ + if (!validPeer()) { + debugs(48, 3, reason << " and peer gone"); + return; // nothing to do after our owner dies; the job will quit + } + + const int count = peer->standby.pool->count(); + const int limit = peer->standby.limit; + debugs(48, 7, reason << " with " << count << " ? " << limit); + + if (count < limit) + openNewConnection(); + else if (count > limit) + closeOldConnections(count - limit); +} + +void +PeerPoolMgr::Checkpoint(const Pointer &mgr, const char *reason) +{ + CallJobHere1(48, 5, mgr, PeerPoolMgr, checkpoint, reason); +} + +/// launches PeerPoolMgrs for peers configured with standby.limit +class PeerPoolMgrsRr: public RegisteredRunner +{ +public: + /* RegisteredRunner API */ + virtual void run(const RunnerRegistry &r) { sync(r); } + virtual void sync(const RunnerRegistry &r); +}; + +RunnerRegistrationEntry(rrAfterConfig, PeerPoolMgrsRr); + +void +PeerPoolMgrsRr::sync(const RunnerRegistry &) +{ + for (CachePeer *p = Config.peers; p; p = p->next) { + // On reconfigure, Squid deletes the old config (and old peers in it), + // so should always be dealing with a brand new configuration. + assert(!p->standby.mgr); + assert(!p->standby.pool); + if (p->standby.limit) { + p->standby.mgr = new PeerPoolMgr(p); + p->standby.pool = new PconnPool(p->name, p->standby.mgr); + AsyncJob::Start(p->standby.mgr.get()); + } + } +} === added file 'src/PeerPoolMgr.h' --- src/PeerPoolMgr.h 1970-01-01 00:00:00 +0000 +++ src/PeerPoolMgr.h 2014-02-06 18:10:28 +0000 @@ -0,0 +1,44 @@ +#ifndef SQUID_PEERPOOLMGR_H +#define SQUID_PEERPOOLMGR_H + +#include "base/AsyncJob.h" +#include "comm/forward.h" + +class CachePeer; +class CommConnectCbParams; + +/// Maintains an fixed-size "standby" PconnPool for a single CachePeer. +class PeerPoolMgr: public AsyncJob +{ +public: + typedef CbcPointer<PeerPoolMgr> Pointer; + + // syncs mgr state whenever connection-related peer or pool state changes + static void Checkpoint(const Pointer &mgr, const char *reason); + + explicit PeerPoolMgr(CachePeer *aPeer); + virtual ~PeerPoolMgr(); + +protected: + /* AsyncJob API */ + virtual void start(); + virtual void swanSong(); + virtual bool doneAll() const; + + bool validPeer() const; + + void checkpoint(const char *reason); + void openNewConnection(); + void closeOldConnections(const int howMany); + + void handleOpenedConnection(const CommConnectCbParams ¶ms); + +private: + CachePeer *peer; ///< the owner of the pool we manage + AsyncCall::Pointer opener; ///< whether we are opening a connection + unsigned int addrUsed; ///< counter for cycling through peer addresses + + CBDATA_CLASS2(PeerPoolMgr); +}; + +#endif /* SQUID_PEERPOOLMGR_H */ === modified file 'src/adaptation/icap/ServiceRep.cc' --- src/adaptation/icap/ServiceRep.cc 2013-02-12 11:34:35 +0000 +++ src/adaptation/icap/ServiceRep.cc 2014-02-03 00:46:35 +0000 @@ -130,41 +130,41 @@ void Adaptation::Icap::ServiceRep::putCo debugs(93, 3, HERE << (sendReset ? "RST" : "FIN") << "-closing " << comment); // comm_close called from Connection::close will clear timeout // TODO: add "bool sendReset = false" to Connection::close()? if (sendReset) comm_reset_close(conn); else conn->close(); } Must(theBusyConns > 0); --theBusyConns; // a connection slot released. Check if there are waiters.... busyCheckpoint(); } // a wrapper to avoid exposing theIdleConns void Adaptation::Icap::ServiceRep::noteConnectionUse(const Comm::ConnectionPointer &conn) { Must(Comm::IsConnOpen(conn)); - fd_table[conn->fd].noteUse(NULL); // pconn re-use but not via PconnPool API + fd_table[conn->fd].noteUse(); // pconn re-use, albeit not via PconnPool API } void Adaptation::Icap::ServiceRep::noteConnectionFailed(const char *comment) { debugs(93, 3, HERE << "Connection failed: " << comment); --theBusyConns; } void Adaptation::Icap::ServiceRep::setMaxConnections() { if (cfg().maxConn >= 0) theMaxConnections = cfg().maxConn; else if (theOptions && theOptions->max_connections >= 0) theMaxConnections = theOptions->max_connections; else { theMaxConnections = -1; return; } if (::Config.workers > 1 ) === modified file 'src/base/RunnersRegistry.cc' --- src/base/RunnersRegistry.cc 2012-01-20 18:55:04 +0000 +++ src/base/RunnersRegistry.cc 2014-02-06 00:43:16 +0000 @@ -23,35 +23,45 @@ GetRunners(const RunnerRegistry ®istr } int RegisterRunner(const RunnerRegistry ®istryId, RegisteredRunner *rr) { Runners &runners = GetRunners(registryId); runners.push_back(rr); return runners.size(); } int ActivateRegistered(const RunnerRegistry ®istryId) { Runners &runners = GetRunners(registryId); typedef Runners::iterator RRI; for (RRI i = runners.begin(); i != runners.end(); ++i) (*i)->run(registryId); return runners.size(); } +int +SyncRegistered(const RunnerRegistry ®istryId) +{ + Runners &runners = GetRunners(registryId); + typedef Runners::iterator RRI; + for (RRI i = runners.begin(); i != runners.end(); ++i) + (*i)->sync(registryId); + return runners.size(); +} + void DeactivateRegistered(const RunnerRegistry ®istryId) { Runners &runners = GetRunners(registryId); while (!runners.empty()) { delete runners.back(); runners.pop_back(); } } bool UseThisStatic(const void *) { return true; } === modified file 'src/base/RunnersRegistry.h' --- src/base/RunnersRegistry.h 2012-08-28 13:00:30 +0000 +++ src/base/RunnersRegistry.h 2014-02-06 00:42:01 +0000 @@ -1,71 +1,80 @@ #ifndef SQUID_BASE_RUNNERSREGISTRY_H #define SQUID_BASE_RUNNERSREGISTRY_H /** * This API allows virtually any module to register with a well-known registry, - * be activated by some central processor at some registry-specific time, and + * be activated by some central processor at some registry-specific time, + * synced or refreshed at some registry-specific time(s), and finally * be deactiveated by some central processor at some registry-specific time. * * For example, main.cc may activate registered I/O modules after parsing - * squid.conf and deactivate them before exiting. + * squid.conf, sync them during reconfiguration events, and then finally + * deactivate them before exiting. * * A module in this context is code providing a functionality or service to the * rest of Squid, such as src/DiskIO/Blocking, src/fs/ufs, or Cache Manager. A - * module must declare a RegisteredRunner child class to implement activation and - * deactivation logic using the run() method and destructor, respectively. + * module must declare a RegisteredRunner child class to implement activation, + * refresh, and deactivation logic using the run() method, sync() method, + * and the destructor, respectively. * - * This API allows the registry to determine the right [de]activation time for - * each group of similar modules, without knowing any module specifics. + * This API allows the registry to determine the right [de]activation and + * refresh times for each group of similar modules, without knowing any + * module specifics. * */ /// well-known registries typedef enum { - /// Managed by main.cc. Activated after parsing squid.conf and + /// Managed by main.cc. Activated/synced after parsing squid.conf and /// deactivated before freeing configuration-related memory or exit()-ing. /// Meant for setting configuration options that depend on other /// configuration options and were not explicitly configured. rrFinalizeConfig, - /// Managed by main.cc. Activated after rrFinalizeConfig and + /// Managed by main.cc. Activated/synced after rrFinalizeConfig and /// deactivated before rrFinalizeConfig. Meant for announcing /// memory reservations before memory is allocated. rrClaimMemoryNeeds, - /// Managed by main.cc. Activated after rrClaimMemoryNeeds and + /// Managed by main.cc. Activated/synced after rrClaimMemoryNeeds and /// deactivated before rrClaimMemoryNeeds. Meant for activating /// modules and features based on the finalized configuration. rrAfterConfig, rrEnd ///< not a real registry, just a label to mark the end of enum } RunnerRegistry; /// a runnable registrant API class RegisteredRunner { public: - // called when this runner's registry is deactivated + // called once when this runner's registry is deactivated virtual ~RegisteredRunner() {} - // called when this runner's registry is activated + // called once when this runner's registry is activated virtual void run(const RunnerRegistry &r) = 0; + + // called whenever this runner's registry changes + virtual void sync(const RunnerRegistry &r) {} }; /// registers a given runner with the given registry and returns registry count int RegisterRunner(const RunnerRegistry ®istry, RegisteredRunner *rr); /// calls run() methods of all runners in the given registry int ActivateRegistered(const RunnerRegistry ®istry); +/// calls sync() methods of all runners in the given registry +int SyncRegistered(const RunnerRegistry ®istry); /// deletes all runners in the given registry void DeactivateRegistered(const RunnerRegistry ®istry); /// convenience function to "use" an otherwise unreferenced static variable bool UseThisStatic(const void *); /// convenience macro: register one RegisteredRunner kid as early as possible #define RunnerRegistrationEntry(Registry, Who) \ static const bool Who ## _RegisteredWith_ ## Registry = \ RegisterRunner(Registry, new Who) > 0 && \ UseThisStatic(& Who ## _RegisteredWith_ ## Registry); #endif /* SQUID_BASE_RUNNERSREGISTRY_H */ === modified file 'src/cache_cf.cc' --- src/cache_cf.cc 2013-08-16 14:48:30 +0000 +++ src/cache_cf.cc 2014-02-07 17:16:05 +0000 @@ -51,40 +51,41 @@ #include "DiskIO/DiskIOModule.h" #include "eui/Config.h" #include "ExternalACL.h" #include "format/Format.h" #include "globals.h" #include "HttpHeaderTools.h" #include "HttpRequestMethod.h" #include "ident/Config.h" #include "ip/Intercept.h" #include "ip/QosConfig.h" #include "ip/tools.h" #include "ipc/Kids.h" #include "log/Config.h" #include "log/CustomLog.h" #include "Mem.h" #include "MemBuf.h" #include "mgr/Registration.h" #include "NeighborTypeDomainList.h" #include "Parsing.h" #include "PeerDigest.h" +#include "PeerPoolMgr.h" #include "RefreshPattern.h" #include "rfc1738.h" #include "SquidConfig.h" #include "SquidString.h" #include "ssl/ProxyCerts.h" #include "Store.h" #include "StoreFileSystem.h" #include "SwapDir.h" #include "wordlist.h" #include "neighbors.h" #include "tools.h" /* wccp2 has its own conditional definitions */ #include "wccp2.h" #if USE_ADAPTATION #include "adaptation/Config.h" #endif #if ICAP_CLIENT #include "adaptation/icap/Config.h" #endif #if USE_ECAP @@ -2258,40 +2259,42 @@ parse_peer(CachePeer ** head) p->options.no_delay = true; #else debugs(0, DBG_CRITICAL, "WARNING: cache_peer option 'no-delay' requires --enable-delay-pools"); #endif } else if (!strncmp(token, "login=", 6)) { p->login = xstrdup(token + 6); rfc1738_unescape(p->login); } else if (!strncmp(token, "connect-timeout=", 16)) { p->connect_timeout = xatoi(token + 16); } else if (!strncmp(token, "connect-fail-limit=", 19)) { p->connect_fail_limit = xatoi(token + 19); #if USE_CACHE_DIGESTS } else if (!strncmp(token, "digest-url=", 11)) { p->digest_url = xstrdup(token + 11); #endif } else if (!strcmp(token, "allow-miss")) { p->options.allow_miss = true; } else if (!strncmp(token, "max-conn=", 9)) { p->max_conn = xatoi(token + 9); + } else if (!strncmp(token, "standby=", 8)) { + p->standby.limit = xatoi(token + 8); } else if (!strcmp(token, "originserver")) { p->options.originserver = true; } else if (!strncmp(token, "name=", 5)) { safe_free(p->name); if (token[5]) p->name = xstrdup(token + 5); } else if (!strncmp(token, "forceddomain=", 13)) { safe_free(p->domain); if (token[13]) p->domain = xstrdup(token + 13); #if USE_SSL } else if (strcmp(token, "ssl") == 0) { p->use_ssl = 1; } else if (strncmp(token, "sslcert=", 8) == 0) { safe_free(p->sslcert); p->sslcert = xstrdup(token + 8); @@ -2331,40 +2334,43 @@ parse_peer(CachePeer ** head) p->front_end_https = 2; } else if (strcmp(token, "connection-auth=off") == 0) { p->connection_auth = 0; } else if (strcmp(token, "connection-auth") == 0) { p->connection_auth = 1; } else if (strcmp(token, "connection-auth=on") == 0) { p->connection_auth = 1; } else if (strcmp(token, "connection-auth=auto") == 0) { p->connection_auth = 2; } else if (token[0] == '#') { // start of a text comment. stop reading this line. break; } else { debugs(3, DBG_PARSE_NOTE(DBG_IMPORTANT), "ERROR: Ignoring unknown cache_peer option '" << token << "'"); } } if (peerFindByName(p->name)) fatalf("ERROR: cache_peer %s specified twice\n", p->name); + if (p->max_conn > 0 && p->max_conn < p->standby.limit) + fatalf("parse_peer: cache_peer %s max-conn=%d is lower than its standby=%d\n", p->host, p->max_conn, p->standby.limit); + if (p->weight < 1) p->weight = 1; if (p->connect_fail_limit < 1) p->connect_fail_limit = 10; p->icp.version = ICP_VERSION_CURRENT; p->testing_now = false; #if USE_CACHE_DIGESTS if (!p->options.no_digest) { /* XXX This looks odd.. who has the original pointer * then? */ PeerDigest *pd = peerDigestCreate(p); p->digest = cbdataReference(pd); } @@ -2375,40 +2381,43 @@ parse_peer(CachePeer ** head) while (*head != NULL) head = &(*head)->next; *head = p; peerClearRRStart(); } static void free_peer(CachePeer ** P) { CachePeer *p; while ((p = *P) != NULL) { *P = p->next; #if USE_CACHE_DIGESTS cbdataReferenceDone(p->digest); #endif + // the mgr job will notice that its owner is gone and stop + PeerPoolMgr::Checkpoint(p->standby.mgr, "peer gone"); + delete p->standby.pool; cbdataFree(p); } Config.npeers = 0; } static void dump_cachemgrpasswd(StoreEntry * entry, const char *name, Mgr::ActionPasswordList * list) { wordlist *w; while (list != NULL) { if (strcmp(list->passwd, "none") && strcmp(list->passwd, "disable")) storeAppendPrintf(entry, "%s XXXXXXXXXX", name); else storeAppendPrintf(entry, "%s %s", name, list->passwd); for (w = list->actions; w != NULL; w = w->next) { storeAppendPrintf(entry, " %s", w->key); } === modified file 'src/cf.data.pre' --- src/cf.data.pre 2014-01-20 23:21:06 +0000 +++ src/cf.data.pre 2014-02-07 18:01:15 +0000 @@ -3005,52 +3005,93 @@ DOC_START Used for verifying the correctness of the received peer certificate. If not specified the peer hostname will be used. front-end-https Enable the "Front-End-Https: On" header needed when using Squid as a SSL frontend in front of Microsoft OWA. See MS KB document Q307347 for details on this header. If set to auto the header will only be added if the request is forwarded as a https:// URL. ==== GENERAL OPTIONS ==== connect-timeout=N A peer-specific connect timeout. Also see the peer_connect_timeout directive. connect-fail-limit=N How many times connecting to a peer must fail before - it is marked as down. Default is 10. + it is marked as down. Standby connection failures + count towards this limit. Default is 10. allow-miss Disable Squid's use of only-if-cached when forwarding requests to siblings. This is primarily useful when icp_hit_stale is used by the sibling. To extensive use of this option may result in forwarding loops, and you should avoid having two-way peerings with this option. For example to deny peer usage on requests from peer by denying cache_peer_access if the source is a peer. - max-conn=N Limit the amount of connections Squid may open to this - peer. see also + max-conn=N Limit the number of concurrent connections the Squid + may open to this peer, including already opened idle + and standby connections. There is no peer-specific + connection limit by default. + + A peer exceeding the limit is not used for new + requests unless a standby connection is available. + + max-conn limit works poorly when there is a relatively + large number of idle persistent connections with the + peer because the limiting code does not know that + Squid can often reuse some of those idle connections. + + standby=N Maintain a pool of N "hot standby" connections to an + UP peer, available for requests when no idle + persistent connection is available (or safe) to use. + By default and with zero N, no such pool is maintained. + N must not exceed the max-conn limit (if any). + + At start or after reconfiguration, Squid opens new TCP + standby connections until there are N connections + available and then replenishes the standby pool as + opened connections are used up for requests. A used + connection never goes back to the standby pool, but + may go to the regular idle persistent connection pool + shared by all peers and origin servers. + + Squid never opens multiple new standby connections + concurrently. This one-at-a-time approach minimizes + flooding-like effect on peers. Furthermore, just a few + standby connections should be sufficient in most cases + to supply most new requests with a ready-to-use + connection. + + Standby connections obey server_idle_pconn_timeout. + For the feature to work as intended, the peer must be + configured to accept and keep them open longer than + the idle timeout at the connecting Squid, to minimize + race conditions typical to idle used persistent + connections. Default request_timeout and + server_idle_pconn_timeout values ensure such a + configuration. name=xxx Unique name for the peer. Required if you have multiple peers on the same host but different ports. This name can be used in cache_peer_access and similar directives to dentify the peer. Can be used by outgoing access controls through the peername ACL type. no-tproxy Do not use the client-spoof TPROXY support when forwarding requests to this peer. Use normal address selection instead. This overrides the spoof_client_ip ACL. proxy-only objects fetched from the peer will not be stored locally. DOC_END NAME: cache_peer_domain cache_host_domain TYPE: hostdomain DEFAULT: none === modified file 'src/comm.cc' --- src/comm.cc 2013-06-03 14:05:16 +0000 +++ src/comm.cc 2014-02-03 00:15:17 +0000 @@ -1125,43 +1125,40 @@ _comm_close(int fd, char const *file, in Comm::SetSelect(fd, COMM_SELECT_WRITE, NULL, NULL, 0); COMMIO_FD_WRITECB(fd)->finish(COMM_ERR_CLOSING, errno); } if (COMMIO_FD_READCB(fd)->active()) { Comm::SetSelect(fd, COMM_SELECT_READ, NULL, NULL, 0); COMMIO_FD_READCB(fd)->finish(COMM_ERR_CLOSING, errno); } #if USE_DELAY_POOLS if (ClientInfo *clientInfo = F->clientInfo) { if (clientInfo->selectWaiting) { clientInfo->selectWaiting = false; // kick queue or it will get stuck as commWriteHandle is not called clientInfo->kickQuotaQueue(); } } #endif commCallCloseHandlers(fd); - if (F->pconn.uses && F->pconn.pool) - F->pconn.pool->noteUses(F->pconn.uses); - comm_empty_os_read_buffers(fd); AsyncCall::Pointer completeCall=commCbCall(5,4, "comm_close_complete", FdeCbPtrFun(comm_close_complete, NULL)); FdeCbParams &completeParams = GetCommParams<FdeCbParams>(completeCall); completeParams.fd = fd; // must use async call to wait for all callbacks // scheduled before comm_close() to finish ScheduleCallHere(completeCall); PROF_stop(comm_close); } /* Send a udp datagram to specified TO_ADDR. */ int comm_udp_sendto(int fd, const Ip::Address &to_addr, const void *buf, int len) { === modified file 'src/comm/Connection.cc' --- src/comm/Connection.cc 2013-07-15 15:47:00 +0000 +++ src/comm/Connection.cc 2014-02-07 16:57:50 +0000 @@ -1,26 +1,27 @@ #include "squid.h" #include "CachePeer.h" #include "cbdata.h" #include "comm.h" #include "comm/Connection.h" #include "fde.h" +#include "neighbors.h" #include "SquidTime.h" class CachePeer; bool Comm::IsConnOpen(const Comm::ConnectionPointer &conn) { return conn != NULL && conn->isOpen(); } Comm::Connection::Connection() : local(), remote(), peerType(HIER_NONE), fd(-1), tos(0), nfmark(0), flags(COMM_NONBLOCKING), peer_(NULL) { *rfc931 = 0; // quick init the head. the rest does not matter. @@ -49,41 +50,41 @@ Comm::Connection::copyDetails() const c->tos = tos; c->nfmark = nfmark; c->flags = flags; // ensure FD is not open in the new copy. c->fd = -1; // ensure we have a cbdata reference to peer_ not a straight ptr copy. c->peer_ = cbdataReference(getPeer()); return c; } void Comm::Connection::close() { if (isOpen()) { comm_close(fd); fd = -1; if (CachePeer *p=getPeer()) - -- p->stats.conn_open; + peerConnClosed(p); } } CachePeer * Comm::Connection::getPeer() const { if (cbdataReferenceValid(peer_)) return peer_; return NULL; } void Comm::Connection::setPeer(CachePeer *p) { /* set to self. nothing to do. */ if (getPeer() == p) return; cbdataReferenceDone(peer_); === modified file 'src/fde.cc' --- src/fde.cc 2013-06-03 14:05:16 +0000 +++ src/fde.cc 2014-02-03 00:13:26 +0000 @@ -105,25 +105,24 @@ fde::DumpStats (StoreEntry *dumpEntry) } } char const * fde::remoteAddr() const { LOCAL_ARRAY(char, buf, MAX_IPSTRLEN ); if (type != FD_SOCKET) return null_string; if ( *ipaddr ) snprintf( buf, MAX_IPSTRLEN, "%s:%d", ipaddr, (int)remote_port); else local_addr.toUrl(buf,MAX_IPSTRLEN); // toHostStr does not include port. return buf; } void -fde::noteUse(PconnPool *pool) +fde::noteUse() { ++ pconn.uses; - pconn.pool = pool; } === modified file 'src/fde.h' --- src/fde.h 2013-06-03 14:05:16 +0000 +++ src/fde.h 2014-02-06 05:39:02 +0000 @@ -25,109 +25,107 @@ * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #ifndef SQUID_FDE_H #define SQUID_FDE_H #include "comm.h" #include "defines.h" #include "ip/Address.h" #if USE_SSL #include <openssl/ssl.h> #endif #if USE_DELAY_POOLS class ClientInfo; #endif -class PconnPool; class dwrite_q; class _fde_disk { public: DWCB *wrt_handle; void *wrt_handle_data; dwrite_q *write_q; dwrite_q *write_q_tail; off_t offset; _fde_disk() { memset(this, 0, sizeof(_fde_disk)); } }; class fde { public: fde() { clear(); }; /// True if comm_close for this fd has been called bool closing() { return flags.close_request; } /* NOTE: memset is used on fdes today. 20030715 RBC */ static void DumpStats (StoreEntry *); char const *remoteAddr() const; void dumpStats (StoreEntry &, int); bool readPending(int); - void noteUse(PconnPool *); + void noteUse(); public: /// global table of FD and their state. static fde* Table; unsigned int type; unsigned short remote_port; Ip::Address local_addr; tos_t tosToServer; /**< The TOS value for packets going towards the server. See also tosFromServer. */ nfmark_t nfmarkToServer; /**< The netfilter mark for packets going towards the server. See also nfmarkFromServer. */ int sock_family; char ipaddr[MAX_IPSTRLEN]; /* dotted decimal address of peer */ char desc[FD_DESC_SZ]; struct _fde_flags { bool open; bool close_request; ///< true if file_ or comm_close has been called bool write_daemon; bool socket_eof; bool nolinger; bool nonblocking; bool ipc; bool called_connect; bool nodelay; bool close_on_exec; bool read_pending; //bool write_pending; //XXX seems not to be used bool transparent; } flags; int64_t bytes_read; int64_t bytes_written; struct { int uses; /* ie # req's over persistent conn */ - PconnPool *pool; } pconn; #if USE_DELAY_POOLS ClientInfo * clientInfo;/* pointer to client info used in client write limiter or NULL if not present */ #endif unsigned epoll_state; _fde_disk disk; PF *read_handler; void *read_data; PF *write_handler; void *write_data; AsyncCall::Pointer timeoutHandler; time_t timeout; time_t writeStart; void *lifetime_data; AsyncCall::Pointer closeHandler; AsyncCall::Pointer halfClosedReader; /// read handler for half-closed fds CommWriteStateData *wstate; /* State data for comm_write */ READ_HANDLER *read_method; @@ -150,41 +148,40 @@ public: server. See FwdState::dispatch(). Note that this differs to nfmarkToServer in that this is the value we *receive* from the, connection, whereas nfmarkToServer is the value to set on packets *leaving* Squid. */ private: /** Clear the fde class back to NULL equivalent. */ inline void clear() { type = 0; remote_port = 0; local_addr.setEmpty(); tosToServer = '\0'; nfmarkToServer = 0; sock_family = 0; memset(ipaddr, '\0', MAX_IPSTRLEN); memset(desc,'\0',FD_DESC_SZ); memset(&flags,0,sizeof(_fde_flags)); bytes_read = 0; bytes_written = 0; pconn.uses = 0; - pconn.pool = NULL; #if USE_DELAY_POOLS clientInfo = NULL; #endif epoll_state = 0; read_handler = NULL; read_data = NULL; write_handler = NULL; write_data = NULL; timeoutHandler = NULL; timeout = 0; writeStart = 0; lifetime_data = NULL; closeHandler = NULL; halfClosedReader = NULL; wstate = NULL; read_method = NULL; write_method = NULL; #if USE_SSL ssl = NULL; dynamicSslContext = NULL; === modified file 'src/hier_code.h' --- src/hier_code.h 2012-09-21 06:11:10 +0000 +++ src/hier_code.h 2014-02-02 17:35:47 +0000 @@ -9,28 +9,29 @@ typedef enum { DEFAULT_PARENT, SINGLE_PARENT, FIRSTUP_PARENT, FIRST_PARENT_MISS, CLOSEST_PARENT_MISS, CLOSEST_PARENT, CLOSEST_DIRECT, NO_DIRECT_FAIL, SOURCE_FASTEST, ROUNDROBIN_PARENT, #if USE_CACHE_DIGESTS CD_PARENT_HIT, CD_SIBLING_HIT, #endif CARP, ANY_OLD_PARENT, USERHASH_PARENT, SOURCEHASH_PARENT, PINNED, ORIGINAL_DST, + STANDBY_POOL, HIER_MAX } hier_code; extern const char *hier_code_str[]; inline hier_code operator++(hier_code &i) { return i = (hier_code)(1+(int)i); } #endif /* SQUID__HIER_CODE_H */ === modified file 'src/main.cc' --- src/main.cc 2013-08-16 14:48:30 +0000 +++ src/main.cc 2014-02-06 00:37:18 +0000 @@ -795,40 +795,44 @@ mainReconfigureFinish(void *) errorClean(); enter_suid(); /* root to read config file */ // we may have disabled the need for PURGE if (Config2.onoff.enable_purge) Config2.onoff.enable_purge = 2; // parse the config returns a count of errors encountered. const int oldWorkers = Config.workers; if ( parseConfigFile(ConfigFile) != 0) { // for now any errors are a fatal condition... self_destruct(); } if (oldWorkers != Config.workers) { debugs(1, DBG_CRITICAL, "WARNING: Changing 'workers' (from " << oldWorkers << " to " << Config.workers << ") is not supported and ignored"); Config.workers = oldWorkers; } + SyncRegistered(rrFinalizeConfig); + SyncRegistered(rrClaimMemoryNeeds); + SyncRegistered(rrAfterConfig); + if (IamPrimaryProcess()) CpuAffinityCheck(); CpuAffinityReconfigure(); setUmask(Config.umask); Mem::Report(); setEffectiveUser(); _db_init(Debug::cache_log, Debug::debugOptions); ipcache_restart(); /* clear stuck entries */ fqdncache_restart(); /* sigh, fqdncache too */ parseEtcHosts(); errorInitialize(); /* reload error pages */ accessLogInit(); #if USE_LOADABLE_MODULES LoadableModulesConfigure(Config.loadable_module_names); #endif #if USE_ADAPTATION bool enableAdaptation = false; === modified file 'src/neighbors.cc' --- src/neighbors.cc 2013-08-15 22:09:07 +0000 +++ src/neighbors.cc 2014-02-07 17:08:55 +0000 @@ -37,41 +37,43 @@ #include "CachePeer.h" #include "CachePeerDomainList.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" #include "event.h" #include "FwdState.h" #include "globals.h" #include "htcp.h" #include "HttpRequest.h" #include "icmp/net_db.h" #include "ICP.h" #include "int.h" #include "ip/Address.h" #include "ip/tools.h" #include "ipcache.h" #include "MemObject.h" #include "mgr/Registration.h" #include "multicast.h" #include "NeighborTypeDomainList.h" #include "neighbors.h" +#include "pconn.h" #include "PeerDigest.h" +#include "PeerPoolMgr.h" #include "PeerSelectState.h" #include "RequestFlags.h" #include "SquidConfig.h" #include "SquidMath.h" #include "SquidTime.h" #include "stat.h" #include "Store.h" #include "store_key_md5.h" #include "tools.h" #include "URL.h" /* count mcast group peers every 15 minutes */ #define MCAST_COUNT_RATE 900 bool peerAllowedToUse(const CachePeer *, HttpRequest *); static int peerWouldBePinged(const CachePeer *, HttpRequest *); static void neighborRemove(CachePeer *); static void neighborAlive(CachePeer *, const MemObject *, const icp_common_t *); #if USE_HTCP static void neighborAliveHtcp(CachePeer *, const MemObject *, const HtcpReplyData *); @@ -231,47 +233,80 @@ peerWouldBePinged(const CachePeer * p, H /* the case below seems strange, but can happen if the * URL host is on the other side of a firewall */ if (p->type == PEER_SIBLING) if (!request->flags.hierarchical) return 0; if (!peerAllowedToUse(p, request)) return 0; /* Ping dead peers every timeout interval */ if (squid_curtime - p->stats.last_query > Config.Timeout.deadPeer) return 1; if (!neighborUp(p)) return 0; return 1; } +bool +peerCanOpenMore(const CachePeer *p) +{ + const int effectiveLimit = p->max_conn <= 0 ? Squid_MaxFD : p->max_conn; + const int remaining = effectiveLimit - p->stats.conn_open; + debugs(15, 7, remaining << '=' << effectiveLimit << '-' << p->stats.conn_open); + return remaining > 0; +} + +bool +peerHasConnAvailable(const CachePeer *p) +{ + // Standby connections can be used without opening new connections. + const int standbys = p->standby.pool ? p->standby.pool->count() : 0; + + // XXX: Some idle pconns can be used without opening new connections. + // Complication: Idle pconns cannot be reused for some requests. + const int usableIdles = 0; + + const int available = standbys + usableIdles; + debugs(15, 7, available << '=' << standbys << '+' << usableIdles); + return available > 0; +} + +void +peerConnClosed(CachePeer *p) +{ + --p->stats.conn_open; + if (p->standby.waitingForClose && peerCanOpenMore(p)) { + p->standby.waitingForClose = false; + PeerPoolMgr::Checkpoint(p->standby.mgr, "conn closed"); + } +} + /* Return TRUE if it is okay to send an HTTP request to this CachePeer. */ int peerHTTPOkay(const CachePeer * p, HttpRequest * request) { - if (p->max_conn) - if (p->stats.conn_open >= p->max_conn) - return 0; + if (!peerCanOpenMore(p) && !peerHasConnAvailable(p)) + return 0; if (!peerAllowedToUse(p, request)) return 0; if (!neighborUp(p)) return 0; return 1; } int neighborsCount(HttpRequest * request) { CachePeer *p = NULL; int count = 0; for (p = Config.peers; p; p = p->next) if (peerWouldBePinged(p, request)) ++count; @@ -431,40 +466,42 @@ peerClearRRStart(void) */ void peerClearRR() { CachePeer *p = NULL; for (p = Config.peers; p; p = p->next) { p->rr_count = 0; } } /** * Perform all actions when a CachePeer is detected revived. */ void peerAlive(CachePeer *p) { if (p->stats.logged_state == PEER_DEAD && p->tcp_up) { debugs(15, DBG_IMPORTANT, "Detected REVIVED " << neighborTypeStr(p) << ": " << p->name); p->stats.logged_state = PEER_ALIVE; peerClearRR(); + if (p->standby.mgr.valid()) + PeerPoolMgr::Checkpoint(p->standby.mgr, "revived peer"); } p->stats.last_reply = squid_curtime; p->stats.probe_start = 0; } CachePeer * getDefaultParent(HttpRequest * request) { CachePeer *p = NULL; for (p = Config.peers; p; p = p->next) { if (neighborType(p, request) != PEER_PARENT) continue; if (!p->options.default_parent) continue; if (!peerHTTPOkay(p, request)) continue; @@ -1219,40 +1256,42 @@ peerDNSConfigure(const ipcache_addrs *ia for (j = 0; j < (int) ia->count && j < PEER_MAX_ADDRESSES; ++j) { p->addresses[j] = ia->in_addrs[j]; debugs(15, 2, "--> IP address #" << j << ": " << p->addresses[j]); ++ p->n_addresses; } p->in_addr.setEmpty(); p->in_addr = p->addresses[0]; p->in_addr.port(p->icp.port); if (p->type == PEER_MULTICAST) peerCountMcastPeersSchedule(p, 10); #if USE_ICMP if (p->type != PEER_MULTICAST) if (!p->options.no_netdb_exchange) eventAddIsh("netdbExchangeStart", netdbExchangeStart, p, 30.0, 1); #endif + if (p->standby.mgr.valid()) + PeerPoolMgr::Checkpoint(p->standby.mgr, "resolved peer"); } static void peerRefreshDNS(void *data) { CachePeer *p = NULL; if (eventFind(peerRefreshDNS, NULL)) eventDelete(peerRefreshDNS, NULL); if (!data && 0 == stat5minClientRequests()) { /* no recent client traffic, wait a bit */ eventAddIsh("peerRefreshDNS", peerRefreshDNS, NULL, 180.0, 1); return; } for (p = Config.peers; p; p = p->next) ipcache_nbgethostbyname(p->host, peerDNSConfigure, p); /* Reconfigure the peers every hour */ @@ -1549,40 +1588,42 @@ dump_peer_options(StoreEntry * sentry, C storeAppendPrintf(sentry, " connect-timeout=%d", (int) p->connect_timeout); if (p->connect_fail_limit != PEER_TCP_MAGIC_COUNT) storeAppendPrintf(sentry, " connect-fail-limit=%d", p->connect_fail_limit); #if USE_CACHE_DIGESTS if (p->digest_url) storeAppendPrintf(sentry, " digest-url=%s", p->digest_url); #endif if (p->options.allow_miss) storeAppendPrintf(sentry, " allow-miss"); if (p->options.no_tproxy) storeAppendPrintf(sentry, " no-tproxy"); if (p->max_conn > 0) storeAppendPrintf(sentry, " max-conn=%d", p->max_conn); + if (p->standby.limit > 0) + storeAppendPrintf(sentry, " standby=%d", p->standby.limit); if (p->options.originserver) storeAppendPrintf(sentry, " originserver"); if (p->domain) storeAppendPrintf(sentry, " forceddomain=%s", p->domain); if (p->connection_auth == 0) storeAppendPrintf(sentry, " connection-auth=off"); else if (p->connection_auth == 1) storeAppendPrintf(sentry, " connection-auth=on"); else if (p->connection_auth == 2) storeAppendPrintf(sentry, " connection-auth=auto"); storeAppendPrintf(sentry, "\n"); } static void dump_peers(StoreEntry * sentry, CachePeer * peers) { === modified file 'src/neighbors.h' --- src/neighbors.h 2012-10-04 00:23:44 +0000 +++ src/neighbors.h 2014-02-07 18:28:23 +0000 @@ -64,23 +64,30 @@ void neighborsHtcpClear(StoreEntry *, co CachePeer *peerFindByName(const char *); CachePeer *peerFindByNameAndPort(const char *, unsigned short); CachePeer *getDefaultParent(HttpRequest * request); CachePeer *getRoundRobinParent(HttpRequest * request); CachePeer *getWeightedRoundRobinParent(HttpRequest * request); void peerClearRRStart(void); void peerClearRR(void); lookup_t peerDigestLookup(CachePeer * p, HttpRequest * request); CachePeer *neighborsDigestSelect(HttpRequest * request); void peerNoteDigestLookup(HttpRequest * request, CachePeer * p, lookup_t lookup); void peerNoteDigestGone(CachePeer * p); int neighborUp(const CachePeer * e); CBDUNL peerDestroy; const char *neighborTypeStr(const CachePeer * e); peer_t neighborType(const CachePeer *, const HttpRequest *); void peerConnectFailed(CachePeer *); void peerConnectSucceded(CachePeer *); void dump_peer_options(StoreEntry *, CachePeer *); int peerHTTPOkay(const CachePeer *, HttpRequest *); +/// Whether we can open new connections to the peer (e.g., despite max-conn) +bool peerCanOpenMore(const CachePeer *p); +/// Whether the peer has idle or standby connections that can be used now +bool peerHasConnAvailable(const CachePeer *p); +/// Notifies peer of an associated connection closure. +void peerConnClosed(CachePeer *p); + CachePeer *whichPeer(const Ip::Address &from); #endif /* SQUID_NEIGHBORS_H_ */ === modified file 'src/pconn.cc' --- src/pconn.cc 2013-06-03 14:05:16 +0000 +++ src/pconn.cc 2014-02-07 18:02:40 +0000 @@ -14,73 +14,81 @@ * incorporates software developed and/or copyrighted by other * sources; see the CREDITS file for full details. * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" +#include "CachePeer.h" #include "comm.h" #include "comm/Connection.h" #include "fd.h" #include "fde.h" #include "globals.h" #include "mgr/Registration.h" +#include "neighbors.h" #include "pconn.h" +#include "PeerPoolMgr.h" #include "SquidConfig.h" #include "Store.h" #define PCONN_FDS_SZ 8 /* pconn set size, increase for better memcache hit rate */ //TODO: re-attach to MemPools. WAS: static MemAllocator *pconn_fds_pool = NULL; PconnModule * PconnModule::instance = NULL; CBDATA_CLASS_INIT(IdleConnList); /* ========== IdleConnList ============================================ */ IdleConnList::IdleConnList(const char *key, PconnPool *thePool) : capacity_(PCONN_FDS_SZ), size_(0), parent_(thePool) { hash.key = xstrdup(key); theList_ = new Comm::ConnectionPointer[capacity_]; // TODO: re-attach to MemPools. WAS: theList = (?? *)pconn_fds_pool->alloc(); } IdleConnList::~IdleConnList() { if (parent_) parent_->unlinkList(this); + if (size_) { + parent_ = NULL; // prevent reentrant notifications and deletions + closeN(size_); + } + delete[] theList_; xfree(hash.key); } /** Search the list. Matches by FD socket number. * Performed from the end of list where newest entries are. * * \retval <0 The connection is not listed * \retval >=0 The connection array index */ int IdleConnList::findIndexOf(const Comm::ConnectionPointer &conn) const { for (int index = size_ - 1; index >= 0; --index) { if (conn->fd == theList_[index]->fd) { debugs(48, 3, HERE << "found " << conn << " at index " << index); return index; } } @@ -275,40 +283,42 @@ IdleConnList::findUseable(const Comm::Co if (fd_table[theList_[i]->fd].timeoutHandler == NULL) continue; // finally, a match. pop and return it. Comm::ConnectionPointer result = theList_[i]; /* may delete this */ removeAt(i); clearHandlers(result); return result; } return Comm::ConnectionPointer(); } /* might delete list */ void IdleConnList::findAndClose(const Comm::ConnectionPointer &conn) { const int index = findIndexOf(conn); if (index >= 0) { + if (parent_) + parent_->notifyManager("idle conn closure"); /* might delete this */ removeAt(index); clearHandlers(conn); conn->close(); } } void IdleConnList::Read(const Comm::ConnectionPointer &conn, char *buf, size_t len, comm_err_t flag, int xerrno, void *data) { debugs(48, 3, HERE << len << " bytes from " << conn); if (flag == COMM_ERR_CLOSING) { debugs(48, 3, HERE << "COMM_ERR_CLOSING from " << conn); /* Bail out on COMM_ERR_CLOSING - may happen when shutdown aborts our idle FD */ return; } IdleConnList *list = (IdleConnList *) data; /* may delete list/data */ @@ -350,182 +360,224 @@ PconnPool::dumpHist(StoreEntry * e) cons "\treq/\n" "\tconn count\n" "\t---- ---------\n", descr); for (int i = 0; i < PCONN_HIST_SZ; ++i) { if (hist[i] == 0) continue; storeAppendPrintf(e, "\t%4d %9d\n", i, hist[i]); } } void PconnPool::dumpHash(StoreEntry *e) const { hash_table *hid = table; hash_first(hid); int i = 0; - for (hash_link *walker = hid->next; walker; walker = hash_next(hid)) { + for (hash_link *walker = hash_next(hid); walker; walker = hash_next(hid)) { storeAppendPrintf(e, "\t item %5d: %s\n", i, (char *)(walker->key)); ++i; } } /* ========== PconnPool PUBLIC FUNCTIONS ============================================ */ -PconnPool::PconnPool(const char *aDescr) : table(NULL), descr(aDescr), +PconnPool::PconnPool(const char *aDescr, const CbcPointer<PeerPoolMgr> &aMgr): + table(NULL), descr(aDescr), + mgr(aMgr), theCount(0) { int i; table = hash_create((HASHCMP *) strcmp, 229, hash_string); for (i = 0; i < PCONN_HIST_SZ; ++i) hist[i] = 0; PconnModule::GetInstance()->add(this); } +static void +DeleteIdleConnList(void *hashItem) +{ + delete reinterpret_cast<IdleConnList*>(hashItem); +} + PconnPool::~PconnPool() { - descr = NULL; + PconnModule::GetInstance()->remove(this); + hashFreeItems(table, &DeleteIdleConnList); hashFreeMemory(table); + descr = NULL; } void PconnPool::push(const Comm::ConnectionPointer &conn, const char *domain) { if (fdUsageHigh()) { debugs(48, 3, HERE << "Not many unused FDs"); conn->close(); return; } else if (shutting_down) { conn->close(); debugs(48, 3, HERE << "Squid is shutting down. Refusing to do anything"); return; } + // TODO: also close used pconns if we exceed peer max-conn limit const char *aKey = key(conn, domain); IdleConnList *list = (IdleConnList *) hash_lookup(table, aKey); if (list == NULL) { list = new IdleConnList(aKey, this); debugs(48, 3, HERE << "new IdleConnList for {" << hashKeyStr(&list->hash) << "}" ); hash_join(table, &list->hash); } else { debugs(48, 3, HERE << "found IdleConnList for {" << hashKeyStr(&list->hash) << "}" ); } list->push(conn); assert(!comm_has_incomplete_write(conn->fd)); LOCAL_ARRAY(char, desc, FD_DESC_SZ); snprintf(desc, FD_DESC_SZ, "Idle server: %s", aKey); fd_note(conn->fd, desc); debugs(48, 3, HERE << "pushed " << conn << " for " << aKey); + + // successful push notifications resume multi-connection opening sequence + notifyManager("push"); } Comm::ConnectionPointer -PconnPool::pop(const Comm::ConnectionPointer &destLink, const char *domain, bool isRetriable) +PconnPool::pop(const Comm::ConnectionPointer &dest, const char *domain, bool keepOpen) { - const char * aKey = key(destLink, domain); + + const char * aKey = key(dest, domain); IdleConnList *list = (IdleConnList *)hash_lookup(table, aKey); if (list == NULL) { debugs(48, 3, HERE << "lookup for key {" << aKey << "} failed."); + // failure notifications resume standby conn creation after fdUsageHigh + notifyManager("pop failure"); return Comm::ConnectionPointer(); } else { - debugs(48, 3, HERE << "found " << hashKeyStr(&list->hash) << (isRetriable?"(to use)":"(to kill)") ); + debugs(48, 3, HERE << "found " << hashKeyStr(&list->hash) << + (keepOpen ? " to use" : " to kill")); } /* may delete list */ - Comm::ConnectionPointer temp = list->findUseable(destLink); - if (!isRetriable && Comm::IsConnOpen(temp)) - temp->close(); + Comm::ConnectionPointer popped = list->findUseable(dest); + if (!keepOpen && Comm::IsConnOpen(popped)) + popped->close(); + + // successful pop notifications replenish standby connections pool + notifyManager("pop"); + return popped; +} - return temp; +void +PconnPool::notifyManager(const char *reason) +{ + if (mgr.valid()) + PeerPoolMgr::Checkpoint(mgr, reason); } void -PconnPool::closeN(int n, const Comm::ConnectionPointer &destLink, const char *domain) +PconnPool::closeN(int n) { - // TODO: optimize: we can probably do hash_lookup just once - for (int i = 0; i < n; ++i) - pop(destLink, domain, false); // may fail! + hash_table *hid = table; + hash_first(hid); + + // close N connections, one per list, to treat all lists "fairly" + for (int i = 0; i < n && count(); ++i) { + + hash_link *current = hash_next(hid); + if (!current) { + hash_first(hid); + current = hash_next(hid); + Must(current); // must have one because the count() was positive + } + + // may delete current + reinterpret_cast<IdleConnList*>(current)->closeN(1); + } } void PconnPool::unlinkList(IdleConnList *list) { theCount -= list->count(); assert(theCount >= 0); hash_remove_link(table, &list->hash); } void PconnPool::noteUses(int uses) { if (uses >= PCONN_HIST_SZ) uses = PCONN_HIST_SZ - 1; ++hist[uses]; } /* ========== PconnModule ============================================ */ /* * This simple class exists only for the cache manager */ -PconnModule::PconnModule() : pools(NULL), poolCount(0) +PconnModule::PconnModule(): pools() { - pools = (PconnPool **) xcalloc(MAX_NUM_PCONN_POOLS, sizeof(*pools)); -//TODO: re-link to MemPools. WAS: pconn_fds_pool = memPoolCreate("pconn_fds", PCONN_FDS_SZ * sizeof(int)); - debugs(48, DBG_CRITICAL, "persistent connection module initialized"); registerWithCacheManager(); } PconnModule * PconnModule::GetInstance() { if (instance == NULL) instance = new PconnModule; return instance; } void PconnModule::registerWithCacheManager(void) { Mgr::RegisterAction("pconn", "Persistent Connection Utilization Histograms", DumpWrapper, 0, 1); } void PconnModule::add(PconnPool *aPool) { - assert(poolCount < MAX_NUM_PCONN_POOLS); - *(pools+poolCount) = aPool; - ++poolCount; + pools.insert(aPool); } void -PconnModule::dump(StoreEntry *e) +PconnModule::remove(PconnPool *aPool) { - int i; + pools.erase(aPool); +} - for (i = 0; i < poolCount; ++i) { +void +PconnModule::dump(StoreEntry *e) +{ + typedef Pools::const_iterator PCI; + int i = 0; // TODO: Why number pools if they all have names? + for (PCI p = pools.begin(); p != pools.end(); ++p, ++i) { + // TODO: Let each pool dump itself the way it wants to. storeAppendPrintf(e, "\n Pool %d Stats\n", i); - (*(pools+i))->dumpHist(e); + (*p)->dumpHist(e); storeAppendPrintf(e, "\n Pool %d Hash Table\n",i); - (*(pools+i))->dumpHash(e); + (*p)->dumpHash(e); } } void PconnModule::DumpWrapper(StoreEntry *e) { PconnModule::GetInstance()->dump(e); } === modified file 'src/pconn.h' --- src/pconn.h 2012-10-04 11:10:17 +0000 +++ src/pconn.h 2014-02-06 05:40:00 +0000 @@ -1,43 +1,44 @@ #ifndef SQUID_PCONN_H #define SQUID_PCONN_H +#include "base/CbcPointer.h" +#include <set> + /** \defgroup PConnAPI Persistent Connection API \ingroup Component * \todo CLEANUP: Break multiple classes out of the generic pconn.h header */ class PconnPool; +class PeerPoolMgr; /* for CBDATA_CLASS2() macros */ #include "cbdata.h" /* for hash_link */ #include "hash.h" /* for IOCB */ #include "comm.h" /// \ingroup PConnAPI -#define MAX_NUM_PCONN_POOLS 10 - -/// \ingroup PConnAPI #define PCONN_HIST_SZ (1<<16) /** \ingroup PConnAPI * A list of connections currently open to a particular destination end-point. */ class IdleConnList { public: IdleConnList(const char *key, PconnPool *parent); ~IdleConnList(); /// Pass control of the connection to the idle list. void push(const Comm::ConnectionPointer &conn); /// get first conn which is not pending read fd. Comm::ConnectionPointer pop(); /** Search the list for a connection which matches the 'key' details * and pop it off the list. * The list is created based on remote IP:port hash. This further filters @@ -89,88 +90,95 @@ private: #include "ip/forward.h" class StoreEntry; class IdleConnLimit; /* for hash_table */ #include "hash.h" /** \ingroup PConnAPI * Manages idle persistent connections to a caller-defined set of * servers (e.g., all HTTP servers). Uses a collection of IdleConnLists * internally to list the individual open connections to each server. * Controls lists existence and limits the total number of * idle connections across the collection. */ class PconnPool { public: - PconnPool(const char *); + PconnPool(const char *aDescription, const CbcPointer<PeerPoolMgr> &aMgr); ~PconnPool(); void moduleInit(); void push(const Comm::ConnectionPointer &serverConn, const char *domain); /** - * Updates destLink to point at an existing open connection if available and retriable. - * Otherwise, return false. + * Returns either a pointer to a popped connection to dest or nil. + * Closes the connection before returning its pointer unless keepOpen. * - * We close available persistent connection if the caller transaction is not - * retriable to avoid having a growing number of open connections when many - * transactions create persistent connections but are not retriable. + * A caller with a non-retriable transaction should set keepOpen to false + * and call pop() anyway, even though the caller does not want a pconn. + * This forces us to close an available persistent connection, avoiding + * creating a growing number of open connections when many transactions + * create (and push) persistent connections but are not retriable and, + * hence, do not need to pop a connection. */ - Comm::ConnectionPointer pop(const Comm::ConnectionPointer &destLink, const char *domain, bool retriable); + Comm::ConnectionPointer pop(const Comm::ConnectionPointer &dest, const char *domain, bool keepOpen); void count(int uses); void dumpHist(StoreEntry *e) const; void dumpHash(StoreEntry *e) const; void unlinkList(IdleConnList *list); void noteUses(int uses); - void closeN(int n, const Comm::ConnectionPointer &destLink, const char *domain); + void closeN(int n); ///< closes any n connections int count() const { return theCount; } void noteConnectionAdded() { ++theCount; } void noteConnectionRemoved() { assert(theCount > 0); --theCount; } + // sends an async message to the pool manager, if any + void notifyManager(const char *reason); + private: static const char *key(const Comm::ConnectionPointer &destLink, const char *domain); int hist[PCONN_HIST_SZ]; hash_table *table; const char *descr; + CbcPointer<PeerPoolMgr> mgr; ///< optional pool manager (for notifications) int theCount; ///< the number of pooled connections }; class StoreEntry; class PconnPool; /** \ingroup PConnAPI * The global registry of persistent connection pools. */ class PconnModule { public: /** the module is a singleton until we have instance based cachemanager * management */ static PconnModule * GetInstance(); /** A thunk to the still C like CacheManager callback api. */ static void DumpWrapper(StoreEntry *e); PconnModule(); void registerWithCacheManager(void); void add(PconnPool *); + void remove(PconnPool *); ///< unregister and forget about this pool object OBJH dump; private: - PconnPool **pools; + typedef std::set<PconnPool*> Pools; ///< unordered PconnPool collection + Pools pools; ///< all live pools static PconnModule * instance; - - int poolCount; }; #endif /* SQUID_PCONN_H */ === modified file 'src/tests/stub_pconn.cc' --- src/tests/stub_pconn.cc 2012-01-20 18:55:04 +0000 +++ src/tests/stub_pconn.cc 2014-02-06 21:38:32 +0000 @@ -1,31 +1,31 @@ /* * STUB file for the pconn.cc API */ #include "squid.h" #include "pconn.h" #include "comm/Connection.h" #define STUB_API "pconn.cc" #include "tests/STUB.h" IdleConnList::IdleConnList(const char *key, PconnPool *parent) STUB IdleConnList::~IdleConnList() STUB void IdleConnList::push(const Comm::ConnectionPointer &conn) STUB Comm::ConnectionPointer IdleConnList::findUseable(const Comm::ConnectionPointer &key) STUB_RETVAL(Comm::ConnectionPointer()) void IdleConnList::clearHandlers(const Comm::ConnectionPointer &conn) STUB -PconnPool::PconnPool(const char *) STUB +PconnPool::PconnPool(const char *, const CbcPointer<PeerPoolMgr>&) STUB PconnPool::~PconnPool() STUB void PconnPool::moduleInit() STUB void PconnPool::push(const Comm::ConnectionPointer &serverConn, const char *domain) STUB Comm::ConnectionPointer PconnPool::pop(const Comm::ConnectionPointer &destLink, const char *domain, bool retriable) STUB_RETVAL(Comm::ConnectionPointer()) void PconnPool::count(int uses) STUB void PconnPool::noteUses(int) STUB void PconnPool::dumpHist(StoreEntry *e) const STUB void PconnPool::dumpHash(StoreEntry *e) const STUB void PconnPool::unlinkList(IdleConnList *list) STUB PconnModule * PconnModule::GetInstance() STUB_RETVAL(NULL) void PconnModule::DumpWrapper(StoreEntry *e) STUB PconnModule::PconnModule() STUB void PconnModule::registerWithCacheManager(void) STUB void PconnModule::add(PconnPool *) STUB void PconnModule::dump(StoreEntry *) STUB === modified file 'src/tunnel.cc' --- src/tunnel.cc 2013-08-16 14:48:30 +0000 +++ src/tunnel.cc 2014-02-06 21:54:26 +0000 @@ -26,40 +26,41 @@ * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111, USA. * */ #include "squid.h" #include "acl/FilledChecklist.h" #include "base/Vector.h" #include "CachePeer.h" #include "client_side_request.h" #include "client_side.h" #include "comm.h" #include "comm/Connection.h" #include "comm/ConnOpener.h" #include "comm/Write.h" #include "errorpage.h" #include "fde.h" +#include "FwdState.h" #include "http.h" #include "HttpRequest.h" #include "HttpStateFlags.h" #include "ip/QosConfig.h" #include "MemBuf.h" #include "PeerSelectState.h" #include "SquidConfig.h" #include "StatCounters.h" #include "tools.h" #if USE_DELAY_POOLS #include "DelayId.h" #endif #if USE_SSL #include "ssl/PeerConnector.h" #endif #if HAVE_LIMITS_H #include <limits.h> #endif #if HAVE_ERRNO_H @@ -776,49 +777,41 @@ tunnelErrorComplete(int fd/*const Comm:: if (Comm::IsConnOpen(tunnelState->client.conn)) tunnelState->client.conn->close(); if (Comm::IsConnOpen(tunnelState->server.conn)) tunnelState->server.conn->close(); } static void tunnelConnectDone(const Comm::ConnectionPointer &conn, comm_err_t status, int xerrno, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; if (status != COMM_OK) { debugs(26, 4, HERE << conn << ", comm failure recovery."); /* At this point only the TCP handshake has failed. no data has been passed. * we are allowed to re-try the TCP-level connection to alternate IPs for CONNECT. */ tunnelState->serverDestinations.shift(); if (status != COMM_TIMEOUT && tunnelState->serverDestinations.size() > 0) { /* Try another IP of this destination host */ - - if (Ip::Qos::TheConfig.isAclTosActive()) { - tunnelState->serverDestinations[0]->tos = GetTosToServer(tunnelState->request.getRaw()); - } - -#if SO_MARK && USE_LIBCAP - tunnelState->serverDestinations[0]->nfmark = GetNfmarkToServer(tunnelState->request.getRaw()); -#endif - + GetMarkingsToServer(tunnelState->request.getRaw(), *tunnelState->serverDestinations[0]); debugs(26, 4, HERE << "retry with : " << tunnelState->serverDestinations[0]); AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", CommConnectCbPtrFun(tunnelConnectDone, tunnelState)); Comm::ConnOpener *cs = new Comm::ConnOpener(tunnelState->serverDestinations[0], call, Config.Timeout.connect); cs->setHost(tunnelState->url); AsyncJob::Start(cs); } else { debugs(26, 4, HERE << "terminate with error."); ErrorState *err = new ErrorState(ERR_CONNECT_FAIL, Http::scServiceUnavailable, tunnelState->request.getRaw()); *tunnelState->status_ptr = Http::scServiceUnavailable; err->xerrno = xerrno; // on timeout is this still: err->xerrno = ETIMEDOUT; err->port = conn->remote.port(); err->callback = tunnelErrorComplete; err->callback_data = tunnelState; errorSend(tunnelState->client.conn, err); } return; } #if USE_DELAY_POOLS @@ -836,43 +829,40 @@ tunnelConnectDone(const Comm::Connection debugs(26, 4, HERE << "determine post-connect handling pathway."); if (conn->getPeer()) { tunnelState->request->peer_login = conn->getPeer()->login; tunnelState->request->flags.proxying = !(conn->getPeer()->options.originserver); } else { tunnelState->request->peer_login = NULL; tunnelState->request->flags.proxying = false; } if (tunnelState->request->flags.proxying) tunnelState->connectToPeer(); else { tunnelConnected(conn, tunnelState); } AsyncCall::Pointer timeoutCall = commCbCall(5, 4, "tunnelTimeout", CommTimeoutCbPtrFun(tunnelTimeout, tunnelState)); commSetConnTimeout(conn, Config.Timeout.read, timeoutCall); } -tos_t GetTosToServer(HttpRequest * request); -nfmark_t GetNfmarkToServer(HttpRequest * request); - void tunnelStart(ClientHttpRequest * http, int64_t * size_ptr, int *status_ptr) { debugs(26, 3, HERE); /* Create state structure. */ TunnelStateData *tunnelState = NULL; ErrorState *err = NULL; HttpRequest *request = http->request; char *url = http->uri; /* * client_addr.isNoAddr() indicates this is an "internal" request * from peer_digest.c, asn.c, netdb.c, etc and should always * be allowed. yuck, I know. */ if (Config.accessList.miss && !request->client_addr.isNoAddr()) { /* * Check if this host is allowed to fetch MISSES from us (miss_access) * default is to allow. @@ -1029,47 +1019,41 @@ tunnelRelayConnectRequest(const Comm::Co } static void tunnelPeerSelectComplete(Comm::ConnectionList *peer_paths, ErrorState *err, void *data) { TunnelStateData *tunnelState = (TunnelStateData *)data; if (peer_paths == NULL || peer_paths->size() < 1) { debugs(26, 3, HERE << "No paths found. Aborting CONNECT"); if (!err) { err = new ErrorState(ERR_CANNOT_FORWARD, Http::scServiceUnavailable, tunnelState->request.getRaw()); } *tunnelState->status_ptr = err->httpStatus; err->callback = tunnelErrorComplete; err->callback_data = tunnelState; errorSend(tunnelState->client.conn, err); return; } delete err; - if (Ip::Qos::TheConfig.isAclTosActive()) { - tunnelState->serverDestinations[0]->tos = GetTosToServer(tunnelState->request.getRaw()); - } - -#if SO_MARK && USE_LIBCAP - tunnelState->serverDestinations[0]->nfmark = GetNfmarkToServer(tunnelState->request.getRaw()); -#endif + GetMarkingsToServer(tunnelState->request.getRaw(), *tunnelState->serverDestinations[0]); debugs(26, 3, HERE << "paths=" << peer_paths->size() << ", p[0]={" << (*peer_paths)[0] << "}, serverDest[0]={" << tunnelState->serverDestinations[0] << "}"); AsyncCall::Pointer call = commCbCall(26,3, "tunnelConnectDone", CommConnectCbPtrFun(tunnelConnectDone, tunnelState)); Comm::ConnOpener *cs = new Comm::ConnOpener(tunnelState->serverDestinations[0], call, Config.Timeout.connect); cs->setHost(tunnelState->url); AsyncJob::Start(cs); } CBDATA_CLASS_INIT(TunnelStateData); bool TunnelStateData::noConnections() const { return !Comm::IsConnOpen(server.conn) && !Comm::IsConnOpen(client.conn); } #if USE_DELAY_POOLS void