/* * Copyright (c) 2002-2007 Niels Provos <provos@citi.umich.edu> * Copyright (c) 2007-2012 Niels Provos, Nick Mathewson * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice, this list of conditions and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. The name of the author may not be used to endorse or promote products * derived from this software without specific prior written permission. * * THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR * IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES * OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. * IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY DIRECT, INDIRECT, * INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT * NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
/** * Sometimes bufferevent's implementation can overrun high watermarks * (one of examples is openssl) and in this case if the read callback * will not handle enough data do over condition above the read * callback will never be called again (due to suspend above). * * To avoid this we are scheduling read callback again here, but only * from the user callback to avoid multiple scheduling: * - when the data had been added to it * - when the data had been drained from it (user specified read callback)
*/ staticvoid bufferevent_inbuf_wm_check(struct bufferevent *bev)
{ if (!bev->wm_read.high) return; if (!(bev->enabled & EV_READ)) return; if (evbuffer_get_length(bev->input) < bev->wm_read.high) return;
/* Callback to implement watermarks on the input buffer. Only enabled
* if the watermark is set. */ staticvoid
bufferevent_inbuf_wm_cb(struct evbuffer *buf, conststruct evbuffer_cb_info *cbinfo, void *arg)
{ struct bufferevent *bufev = arg;
size_t size;
size = evbuffer_get_length(buf);
if (size >= bufev->wm_read.high)
bufferevent_wm_suspend_read(bufev); else
bufferevent_wm_unsuspend_read(bufev);
}
if ((bufev_private->eventcb_pending & BEV_EVENT_CONNECTED) &&
bufev->errorcb) { /* The "connected" happened before any reads or writes, so
send it first. */
bufferevent_event_cb errorcb = bufev->errorcb; void *cbarg = bufev->cbarg;
bufev_private->eventcb_pending &= ~BEV_EVENT_CONNECTED;
UNLOCKED(errorcb(bufev, BEV_EVENT_CONNECTED, cbarg));
} if (bufev_private->readcb_pending && bufev->readcb) {
bufferevent_data_cb readcb = bufev->readcb; void *cbarg = bufev->cbarg;
bufev_private->readcb_pending = 0;
UNLOCKED(readcb(bufev, cbarg));
bufferevent_inbuf_wm_check(bufev);
} if (bufev_private->writecb_pending && bufev->writecb) {
bufferevent_data_cb writecb = bufev->writecb; void *cbarg = bufev->cbarg;
bufev_private->writecb_pending = 0;
UNLOCKED(writecb(bufev, cbarg));
} if (bufev_private->eventcb_pending && bufev->errorcb) {
bufferevent_event_cb errorcb = bufev->errorcb; void *cbarg = bufev->cbarg; short what = bufev_private->eventcb_pending; int err = bufev_private->errno_pending;
bufev_private->eventcb_pending = 0;
bufev_private->errno_pending = 0;
EVUTIL_SET_SOCKET_ERROR(err);
UNLOCKED(errorcb(bufev,what,cbarg));
}
bufferevent_decref_and_unlock_(bufev); #undef UNLOCKED
}
#define SCHEDULE_DEFERRED(bevp) \ do { \ if (event_deferred_cb_schedule_( \
(bevp)->bev.ev_base, \
&(bevp)->deferred)) \
bufferevent_incref_(&(bevp)->bev); \
} while (0)
void
bufferevent_run_readcb_(struct bufferevent *bufev, int options)
{ /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = BEV_UPCAST(bufev); if (bufev->readcb == NULL) return; if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
p->readcb_pending = 1;
SCHEDULE_DEFERRED(p);
} else {
bufev->readcb(bufev, bufev->cbarg);
bufferevent_inbuf_wm_check(bufev);
}
}
void
bufferevent_run_writecb_(struct bufferevent *bufev, int options)
{ /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = BEV_UPCAST(bufev); if (bufev->writecb == NULL) return; if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
p->writecb_pending = 1;
SCHEDULE_DEFERRED(p);
} else {
bufev->writecb(bufev, bufev->cbarg);
}
}
void
bufferevent_trigger(struct bufferevent *bufev, short iotype, int options)
{
bufferevent_incref_and_lock_(bufev);
bufferevent_trigger_nolock_(bufev, iotype, options&BEV_TRIG_ALL_OPTS);
bufferevent_decref_and_unlock_(bufev);
}
void
bufferevent_run_eventcb_(struct bufferevent *bufev, short what, int options)
{ /* Requires that we hold the lock and a reference */ struct bufferevent_private *p = BEV_UPCAST(bufev); if (bufev->errorcb == NULL) return; if ((p->options|options) & BEV_OPT_DEFER_CALLBACKS) {
p->eventcb_pending |= what;
p->errno_pending = EVUTIL_SOCKET_ERROR();
SCHEDULE_DEFERRED(p);
} else {
bufev->errorcb(bufev, what, bufev->cbarg);
}
}
void
bufferevent_trigger_event(struct bufferevent *bufev, short what, int options)
{
bufferevent_incref_and_lock_(bufev);
bufferevent_run_eventcb_(bufev, what, options&BEV_TRIG_ALL_OPTS);
bufferevent_decref_and_unlock_(bufev);
}
if (bufferevent_ratelim_init_(bufev_private)) goto err;
/* * Set to EV_WRITE so that using bufferevent_write is going to * trigger a callback. Reading needs to be explicitly enabled * because otherwise no data will be available.
*/
bufev->enabled = EV_WRITE;
int
bufferevent_enable(struct bufferevent *bufev, short event)
{ struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); short impl_events = event; int r = 0;
bufferevent_incref_and_lock_(bufev); if (bufev_private->read_suspended)
impl_events &= ~EV_READ; if (bufev_private->write_suspended)
impl_events &= ~EV_WRITE;
bufev->enabled |= event;
if (impl_events && bufev->be_ops->enable(bufev, impl_events) < 0)
r = -1; if (r)
event_debug(("%s: cannot enable 0x%hx on %p", __func__, event, bufev));
if (highmark) { /* There is now a new high-water mark for read. enable the callback if needed, and see if we should
suspend/bufferevent_wm_unsuspend. */
if (evbuffer_get_length(bufev->input) >= highmark)
bufferevent_wm_suspend_read(bufev); elseif (evbuffer_get_length(bufev->input) < highmark)
bufferevent_wm_unsuspend_read(bufev);
} else { /* There is now no high-water mark for read. */ if (bufev_private->read_watermarks_cb)
evbuffer_cb_clear_flags(bufev->input,
bufev_private->read_watermarks_cb,
EVBUFFER_CB_ENABLED);
bufferevent_wm_unsuspend_read(bufev);
}
}
BEV_UNLOCK(bufev);
}
int
bufferevent_getwatermark(struct bufferevent *bufev, short events,
size_t *lowmark, size_t *highmark)
{ if (events == EV_WRITE) {
BEV_LOCK(bufev); if (lowmark)
*lowmark = bufev->wm_write.low; if (highmark)
*highmark = bufev->wm_write.high;
BEV_UNLOCK(bufev); return 0;
}
if (events == EV_READ) {
BEV_LOCK(bufev); if (lowmark)
*lowmark = bufev->wm_read.low; if (highmark)
*highmark = bufev->wm_read.high;
BEV_UNLOCK(bufev); return 0;
} return -1;
}
int
bufferevent_flush(struct bufferevent *bufev, short iotype, enum bufferevent_flush_mode mode)
{ int r = -1;
BEV_LOCK(bufev); if (bufev->be_ops->flush)
r = bufev->be_ops->flush(bufev, iotype, mode);
BEV_UNLOCK(bufev); return r;
}
/* Clean up the shared info */ if (bufev->be_ops->destruct)
bufev->be_ops->destruct(bufev);
/* XXX what happens if refcnt for these buffers is > 1? * The buffers can share a lock with this bufferevent object,
* but the lock might be destroyed below. */ /* evbuffer will free the callbacks */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
if (bufev_private->rate_limiting) { if (bufev_private->rate_limiting->group)
bufferevent_remove_from_rate_limit_group_internal_(bufev,0);
mm_free(bufev_private->rate_limiting);
bufev_private->rate_limiting = NULL;
}
BEV_UNLOCK(bufev);
if (bufev_private->own_lock)
EVTHREAD_FREE_LOCK(bufev_private->lock,
EVTHREAD_LOCKTYPE_RECURSIVE);
/* Free the actual allocated memory. */
mm_free(((char*)bufev) - bufev->be_ops->mem_offset);
/* Release the reference to underlying now that we no longer need the * reference to it. We wait this long mainly in case our lock is * shared with underlying. * * The 'destruct' function will also drop a reference to underlying * if BEV_OPT_CLOSE_ON_FREE is set. * * XXX Should we/can we just refcount evbuffer/bufferevent locks? * It would probably save us some headaches.
*/ if (underlying)
bufferevent_decref_(underlying);
}
int
bufferevent_decref(struct bufferevent *bufev)
{
BEV_LOCK(bufev); return bufferevent_decref_and_unlock_(bufev);
}
/* XXX: now that this function is public, we might want to * - return the count from this function * - create a new function to atomically grab the current refcount
*/
BEV_LOCK(bufev);
++bufev_private->refcnt;
BEV_UNLOCK(bufev);
}
if (underlying && !BEV_UPCAST(underlying)->lock)
bufferevent_enable_locking_(underlying, lock);
return 0; #endif
}
int
bufferevent_setfd(struct bufferevent *bev, evutil_socket_t fd)
{ union bufferevent_ctrl_data d; int res = -1;
d.fd = fd;
BEV_LOCK(bev); if (bev->be_ops->ctrl)
res = bev->be_ops->ctrl(bev, BEV_CTRL_SET_FD, &d); if (res)
event_debug(("%s: cannot set fd for %p to "EV_SOCK_FMT, __func__, bev, fd));
BEV_UNLOCK(bev); return res;
}
evutil_socket_t
bufferevent_getfd(struct bufferevent *bev)
{ union bufferevent_ctrl_data d; int res = -1;
d.fd = -1;
BEV_LOCK(bev); if (bev->be_ops->ctrl)
res = bev->be_ops->ctrl(bev, BEV_CTRL_GET_FD, &d); if (res)
event_debug(("%s: cannot get fd for %p", __func__, bev));
BEV_UNLOCK(bev); return (res<0) ? -1 : d.fd;
}
int
bufferevent_generic_adj_existing_timeouts_(struct bufferevent *bev)
{ int r = 0; if (event_pending(&bev->ev_read, EV_READ, NULL)) { if (evutil_timerisset(&bev->timeout_read)) { if (bufferevent_add_event_(&bev->ev_read, &bev->timeout_read) < 0)
r = -1;
} else {
event_remove_timer(&bev->ev_read);
}
} if (event_pending(&bev->ev_write, EV_WRITE, NULL)) { if (evutil_timerisset(&bev->timeout_write)) { if (bufferevent_add_event_(&bev->ev_write, &bev->timeout_write) < 0)
r = -1;
} else {
event_remove_timer(&bev->ev_write);
}
} return r;
}
int
bufferevent_add_event_(struct event *ev, conststruct timeval *tv)
{ if (!evutil_timerisset(tv)) return event_add(ev, NULL); else return event_add(ev, tv);
}
/* For use by user programs only; internally, we should be calling
either bufferevent_incref_and_lock_(), or BEV_LOCK. */ void
bufferevent_lock(struct bufferevent *bev)
{
bufferevent_incref_and_lock_(bev);
}
Die Informationen auf dieser Webseite wurden
nach bestem Wissen sorgfältig zusammengestellt. Es wird jedoch weder Vollständigkeit, noch Richtigkeit,
noch Qualität der bereit gestellten Informationen zugesichert.
Bemerkung:
Die farbliche Syntaxdarstellung und die Messung sind noch experimentell.