00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011
00012
00013
00014
00015
00016
00017
00018
00019
00020
00021 #ifndef TNT_JOB_H
00022 #define TNT_JOB_H
00023
00024 #include <deque>
00025 #include <tnt/httprequest.h>
00026 #include <tnt/httpparser.h>
00027 #include <cxxtools/thread.h>
00028 #include <cxxtools/refcounted.h>
00029 #include <cxxtools/smartptr.h>
00030
00056 namespace tnt
00057 {
00058 class Tntnet;
00059
00061 class Job : public cxxtools::RefCounted
00062 {
00063 unsigned keepAliveCounter;
00064
00065 HttpRequest request;
00066 HttpRequest::Parser parser;
00067 time_t lastAccessTime;
00068
00069 cxxtools::Mutex mutex;
00070
00071 static unsigned socket_read_timeout;
00072 static unsigned socket_write_timeout;
00073 static unsigned keepalive_max;
00074 static unsigned socket_buffer_size;
00075
00076 public:
00077 explicit Job(Tntnet& app_)
00078 : keepAliveCounter(keepalive_max),
00079 request(app_),
00080 parser(request),
00081 lastAccessTime(0)
00082 { }
00083 virtual ~Job();
00084
00085 public:
00086 virtual std::iostream& getStream() = 0;
00087 virtual int getFd() const = 0;
00088 virtual void setRead() = 0;
00089 virtual void setWrite() = 0;
00090
00091 HttpRequest& getRequest() { return request; }
00092 HttpRequest::Parser& getParser() { return parser; }
00093
00094 unsigned decrementKeepAliveCounter()
00095 { return keepAliveCounter > 0 ? --keepAliveCounter : 0; }
00096 void clear();
00097 void touch() { time(&lastAccessTime); }
00098 int msecToTimeout(time_t currentTime) const;
00099
00100 static void setSocketReadTimeout(unsigned ms) { socket_read_timeout = ms; }
00101 static void setSocketWriteTimeout(unsigned ms) { socket_write_timeout = ms; }
00102 static void setKeepAliveMax(unsigned n) { keepalive_max = n; }
00103 static void setSocketBufferSize(unsigned b) { socket_buffer_size = b; }
00104
00105 static unsigned getSocketReadTimeout() { return socket_read_timeout; }
00106 static unsigned getSocketWriteTimeout() { return socket_write_timeout; }
00107 static unsigned getKeepAliveTimeout();
00108 static unsigned getKeepAliveMax() { return keepalive_max; }
00109 static unsigned getSocketBufferSize() { return socket_buffer_size; }
00110 };
00111
00113 class Jobqueue
00114 {
00115 public:
00116 typedef cxxtools::SmartPtr<Job> JobPtr;
00117
00118 cxxtools::Condition noWaitThreads;
00119
00120 private:
00121 std::deque<JobPtr> jobs;
00122 cxxtools::Mutex mutex;
00123 cxxtools::Condition notEmpty;
00124 cxxtools::Condition notFull;
00125 unsigned waitThreads;
00126 unsigned capacity;
00127
00128 public:
00129 explicit Jobqueue(unsigned capacity_ = 1000)
00130 : waitThreads(0),
00131 capacity(capacity_)
00132 { }
00133
00134 void put(JobPtr j, bool force = false);
00135 JobPtr get();
00136
00137 void setCapacity(unsigned c)
00138 { capacity = c; }
00139 unsigned getCapacity() const
00140 { return capacity; }
00141 unsigned getWaitThreadCount() const
00142 { return waitThreads; }
00143 bool empty() const
00144 { return jobs.empty(); }
00145 };
00146 }
00147
00148 #endif // TNT_JOB_H
00149