00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021
00022
00023
00024
00025
00026
00027
00028
00029
00030 #ifndef TNT_JOB_H
00031 #define TNT_JOB_H
00032
00033 #include <deque>
00034 #include <tnt/httprequest.h>
00035 #include <tnt/httpparser.h>
00036 #include <cxxtools/mutex.h>
00037 #include <cxxtools/condition.h>
00038 #include <cxxtools/refcounted.h>
00039 #include <cxxtools/smartptr.h>
00040
00066 namespace tnt
00067 {
00068 class Tntnet;
00069
00071 class Job : public cxxtools::AtomicRefCounted
00072 {
00073 unsigned keepAliveCounter;
00074
00075 HttpRequest request;
00076 HttpRequest::Parser parser;
00077 time_t lastAccessTime;
00078
00079 static unsigned socket_read_timeout;
00080 static unsigned socket_write_timeout;
00081 static unsigned keepalive_max;
00082 static unsigned socket_buffer_size;
00083
00084 public:
00085 explicit Job(Tntnet& app_, const SocketIf* socketIf_ = 0)
00086 : keepAliveCounter(keepalive_max),
00087 request(app_, socketIf_),
00088 parser(request),
00089 lastAccessTime(0)
00090 { }
00091 virtual ~Job();
00092
00093 virtual std::iostream& getStream() = 0;
00094 virtual int getFd() const = 0;
00095 virtual void setRead() = 0;
00096 virtual void setWrite() = 0;
00097
00098 HttpRequest& getRequest() { return request; }
00099 HttpRequest::Parser& getParser() { return parser; }
00100
00101 unsigned decrementKeepAliveCounter()
00102 { return keepAliveCounter > 0 ? --keepAliveCounter : 0; }
00103 void clear();
00104 void touch() { time(&lastAccessTime); }
00105 int msecToTimeout(time_t currentTime) const;
00106
00107 static void setSocketReadTimeout(unsigned ms) { socket_read_timeout = ms; }
00108 static void setSocketWriteTimeout(unsigned ms) { socket_write_timeout = ms; }
00109 static void setKeepAliveMax(unsigned n) { keepalive_max = n; }
00110 static void setSocketBufferSize(unsigned b) { socket_buffer_size = b; }
00111
00112 static unsigned getSocketReadTimeout() { return socket_read_timeout; }
00113 static unsigned getSocketWriteTimeout() { return socket_write_timeout; }
00114 static unsigned getKeepAliveTimeout();
00115 static unsigned getKeepAliveMax() { return keepalive_max; }
00116 static unsigned getSocketBufferSize() { return socket_buffer_size; }
00117 };
00118
00120 class Jobqueue
00121 {
00122 public:
00123 typedef cxxtools::SmartPtr<Job> JobPtr;
00124
00125 cxxtools::Condition noWaitThreads;
00126
00127 private:
00128 std::deque<JobPtr> jobs;
00129 cxxtools::Mutex mutex;
00130 cxxtools::Condition notEmpty;
00131 cxxtools::Condition notFull;
00132 unsigned waitThreads;
00133 unsigned capacity;
00134
00135 public:
00136 explicit Jobqueue(unsigned capacity_ = 1000)
00137 : waitThreads(0),
00138 capacity(capacity_)
00139 { }
00140
00141 void put(JobPtr j, bool force = false);
00142 JobPtr get();
00143
00144 void setCapacity(unsigned c)
00145 { capacity = c; }
00146 unsigned getCapacity() const
00147 { return capacity; }
00148 unsigned getWaitThreadCount() const
00149 { return waitThreads; }
00150 bool empty() const
00151 { return jobs.empty(); }
00152 };
00153 }
00154
00155 #endif // TNT_JOB_H
00156