ref: e4f321314eb6c3d0059cc7b88790a5e6ae2a1bb4
parent: 3774672b4c45fab2920e37e6bdf03d067b89c571
author: Timothy B. Terriberry <tterribe@xiph.org>
date: Sat Sep 22 19:12:41 EDT 2012
Estimate http connection latency and bandwidth. This gives us a better idea when to re-use a connection.
--- a/src/http.c
+++ b/src/http.c
@@ -159,10 +159,10 @@
#if defined(OP_ENABLE_HTTP)
# include <sys/types.h>
# include <sys/socket.h>
-# include <sys/time.h>
+# include <sys/timeb.h>
# include <arpa/inet.h>
+# include <netient/in.h>
# include <fcntl.h>
-# include <netinet/in.h>
# include <netdb.h>
# include <poll.h>
# include <unistd.h>
@@ -443,8 +443,17 @@
struct OpusHTTPConn{
/*The current position indicator for this connection.*/
opus_int64 pos;
+ /*The SSL connection, if this is https.*/
SSL *ssl_conn;
+ /*The next connection in either the LRU or free list.*/
OpusHTTPConn *next;
+ /*The last time we blocked for reading from this connection.*/
+ struct timeb read_time;
+ /*The number of bytes we've read since the last time we blocked.*/
+ opus_int64 read_bytes;
+ /*The estimated throughput of this connection, in bytes/s.*/
+ opus_int64 read_rate;
+ /*The socket we're reading from.*/
int fd;
};
@@ -482,6 +491,8 @@
/*The connection we're currently reading from.
This can be -1 if no connection is active.*/
int cur_conni;
+ /*The estimated time required to open a new connection, in milliseconds.*/
+ opus_int32 connect_rate;
/*Information about the address we connected to.*/
struct addrinfo addr_info;
/*The address we connected to.*/
@@ -609,7 +620,7 @@
# define OP_NPROTOS (2)
static int op_http_connect(OpusHTTPStream *_stream,OpusHTTPConn *_conn,
- struct addrinfo *_addrs){
+ struct addrinfo *_addrs,struct timeb *_start_time){
struct addrinfo *addr;
struct addrinfo *addrs[OP_NPROTOS];
struct pollfd fds[OP_NPROTOS];
@@ -640,6 +651,11 @@
nprotos++;
}
}
+ ret=ftime(_start_time);
+ OP_ASSERT(!ret);
+ *&_conn->read_time=*_start_time;
+ _conn->read_bytes=0;
+ _conn->read_rate=0;
/*Try to start a connection to each protocol.*/
for(pi=0;pi<nprotos;pi++){
ai_family=addrs[pi]->ai_family;
@@ -866,6 +882,36 @@
return 0;
}
+static opus_int32 op_time_diff_ms(const struct timeb *_end,
+ const struct timeb *_start){
+ opus_int64 dtime;
+ dtime=_end->time-_start->time;
+ OP_ASSERT(_end->millitm<1000);
+ OP_ASSERT(_start->millitm<1000);
+ if(OP_UNLIKELY(dtime>(0x7FFFFFFF-1000)/1000))return 0x7FFFFFFF;
+ if(OP_UNLIKELY(dtime<(-0x7FFFFFFF+999)/1000))return -0x7FFFFFFF-1;
+ return (opus_int32)dtime*1000+_end->millitm-_start->millitm;
+}
+
+/*Update the read rate for this connection.*/
+static void op_http_conn_read_rate_update(OpusHTTPConn *_conn){
+ struct timeb read_time;
+ opus_int32 read_delta_ms;
+ opus_int64 read_delta_bytes;
+ opus_int64 read_rate;
+ int ret;
+ ret=ftime(&read_time);
+ OP_ASSERT(!ret);
+ read_delta_ms=op_time_diff_ms(&read_time,&_conn->read_time);
+ read_delta_bytes=_conn->read_bytes;
+ read_rate=_conn->read_rate;
+ read_delta_ms=OP_MAX(read_delta_ms,1);
+ read_rate+=read_delta_bytes*1000/read_delta_ms-read_rate+4>>3;
+ *&_conn->read_time=*&read_time;
+ _conn->read_bytes=0;
+ _conn->read_rate=read_rate;
+}
+
/*Tries to read from the given connection.
[out] _buf: Returns the data read.
_size: The size of the buffer.
@@ -875,9 +921,10 @@
struct pollfd fd;
SSL *ssl_conn;
ptrdiff_t nread;
+ ptrdiff_t nread_unblocked;
fd.fd=_conn->fd;
ssl_conn=_conn->ssl_conn;
- nread=0;
+ nread=nread_unblocked=0;
do{
int err;
if(ssl_conn!=NULL){
@@ -887,6 +934,7 @@
/*Read some data.
Keep going to see if there's more.*/
nread+=ret;
+ nread_unblocked+=ret;
continue;
}
/*Connection closed.*/
@@ -907,6 +955,7 @@
/*Read some data.
Keep going to see if there's more.*/
nread+=ret;
+ nread_unblocked+=ret;
continue;
}
/*If we already read some data, return it right now.*/
@@ -915,11 +964,15 @@
if(err!=EAGAIN&&err!=EWOULDBLOCK)return 0;
fd.events=POLLIN;
}
+ _conn->read_bytes+=nread_unblocked;
+ op_http_conn_read_rate_update(_conn);
+ nread_unblocked=0;
if(!_block)break;
/*Need to wait to get any data at all.*/
if(poll(&fd,1,-1)==-1)return 0;
}
while(nread<_size);
+ _conn->read_bytes+=nread_unblocked;
return nread;
}
@@ -1169,11 +1222,13 @@
ret=op_parse_url(&_stream->url,_url);
if(OP_UNLIKELY(ret<0))return ret;
for(nredirs=0;nredirs<OP_REDIRECT_LIMIT;nredirs++){
- char response[OP_RESPONSE_SIZE_MAX];
- char *next;
- char *status_code;
- const char *host;
- unsigned port;
+ struct timeb start_time;
+ struct timeb end_time;
+ char response[OP_RESPONSE_SIZE_MAX];
+ char *next;
+ char *status_code;
+ const char *host;
+ unsigned port;
if(_proxy_host==NULL){
host=_stream->url.host;
port=_stream->url.port;
@@ -1217,7 +1272,7 @@
addrs=op_resolve(host,port);
if(OP_UNLIKELY(addrs==NULL))return OP_FALSE;
}
- ret=op_http_connect(_stream,_stream->conns+0,addrs);
+ ret=op_http_connect(_stream,_stream->conns+0,addrs,&start_time);
if(addrs!=&_stream->addr_info)freeaddrinfo(addrs);
if(OP_UNLIKELY(ret<0))return ret;
/*Build the request to send.*/
@@ -1269,6 +1324,8 @@
ret=op_http_conn_read_response(_stream->conns+0,
response,sizeof(response)/sizeof(*response));
if(OP_UNLIKELY(ret<0))return ret;
+ ret=ftime(&end_time);
+ OP_ASSERT(!ret);
next=op_http_parse_status_line(&status_code,response);
if(next==NULL)return OP_FALSE;
if(status_code[0]=='2'){
@@ -1358,6 +1415,8 @@
_stream->content_length=content_length;
_stream->conns[0].pos=0;
_stream->cur_conni=0;
+ _stream->connect_rate=op_time_diff_ms(&end_time,&start_time);
+ _stream->connect_rate=OP_MAX(_stream->connect_rate,1);
/*The URL has been successfully opened.*/
return 0;
}
@@ -1415,12 +1474,16 @@
static int op_http_conn_open_pos(OpusHTTPStream *_stream,
OpusHTTPConn *_conn,opus_int64 _pos){
- char response[OP_RESPONSE_SIZE_MAX];
- char *next;
- char *status_code;
- opus_int64 range_length;
- int ret;
- ret=op_http_connect(_stream,_conn,&_stream->addr_info);
+ struct timeb start_time;
+ struct timeb end_time;
+ char response[OP_RESPONSE_SIZE_MAX];
+ char *next;
+ char *status_code;
+ opus_int64 range_length;
+ opus_int32 connect_rate;
+ opus_int32 connect_time;
+ int ret;
+ ret=op_http_connect(_stream,_conn,&_stream->addr_info,&start_time);
if(OP_UNLIKELY(ret<0))return ret;
/*Build the request to send.*/
_stream->request.nbuf=_stream->request_tail;
@@ -1433,6 +1496,8 @@
ret=op_http_conn_read_response(_conn,
response,sizeof(response)/sizeof(*response));
if(OP_UNLIKELY(ret<0))return ret;
+ ret=ftime(&end_time);
+ OP_ASSERT(!ret);
next=op_http_parse_status_line(&status_code,response);
if(next==NULL)return OP_FALSE;
/*We _need_ a 206 Partial Content response.*/
@@ -1474,7 +1539,12 @@
_conn->pos=_pos;
_stream->cur_conni=_conn-_stream->conns;
OP_ASSERT(_stream->cur_conni>=0&&_stream->cur_conni<OP_NCONNS_MAX);
- /*The connection has been successfully opened.*/
+ /*The connection has been successfully opened.
+ Update the connection time estimate.*/
+ connect_time=op_time_diff_ms(&end_time,&start_time);
+ connect_rate=_stream->connect_rate;
+ connect_rate+=OP_MAX(connect_time,1)-connect_rate+8>>4;
+ _stream->connect_rate=connect_rate;
return 0;
}
@@ -1542,12 +1612,12 @@
return nread;
}
-/*To this will need to be larger than OP_CHUNK_SIZE to be useful.*/
-# define OP_READAHEAD_THRESH (128*1024)
+# define OP_READAHEAD_THRESH_MIN (64*1024)
/*16 kB is the largest size OpenSSL will return at once.*/
# define OP_READAHEAD_CHUNK_SIZE (16*1024)
static int op_http_stream_seek(void *_stream,opus_int64 _offset,int _whence){
+ struct timeb seek_time;
OpusHTTPStream *stream;
OpusHTTPConn *conn;
OpusHTTPConn *prev;
@@ -1582,6 +1652,15 @@
}break;
default:return -1;
}
+ /*Mark when we deactivated the active connection.*/
+ if(ci>=0){
+ op_http_conn_read_rate_update(stream->conns+ci);
+ *&seek_time=*&stream->conns[ci].read_time;
+ }
+ else{
+ ret=ftime(&seek_time);
+ OP_ASSERT(!ret);
+ }
/*If we seeked past the end of the stream, just disable the active
connection.*/
if(pos>=content_length){
@@ -1595,11 +1674,24 @@
conn=stream->lru_head;
while(conn!=NULL){
opus_int64 conn_pos;
- /*TODO: Estimate connection open time and current throughput, and compute
- the read-ahead threshold accordingly.*/
- /*TODO: Expire connections aggressively to avoid server timeouts.*/
+ opus_int64 read_ahead_thresh;
+ /*If this connection has been dormant too long, close it.
+ This is to prevent us from hitting server/firewall timeouts.*/
+ if(op_time_diff_ms(&seek_time,&conn->read_time)>5*1000){
+ *pnext=conn->next;
+ conn->next=stream->lru_head;
+ stream->lru_head=conn;
+ op_http_conn_close(stream,conn);
+ conn=*pnext;
+ continue;
+ }
+ /*Dividing by 512 instead of 1000 scales this by nearly 2, biasing towards
+ connection re-use (and roughly compensating for the ability of the TCP
+ window to open up on long reads).*/
+ read_ahead_thresh=OP_MAX(OP_READAHEAD_THRESH_MIN,
+ stream->connect_rate*conn->read_rate>>9);
conn_pos=conn->pos;
- if(pos-OP_READAHEAD_THRESH<=conn_pos&&conn_pos<=pos){
+ if(pos-read_ahead_thresh<=conn_pos&&conn_pos<=pos){
/*Found a suitable connection to re-use.*/
*pnext=conn->next;
conn->next=stream->lru_head;
--- a/src/opusfile.c
+++ b/src/opusfile.c
@@ -2008,7 +2008,7 @@
OP_ASSERT(!ret);
/*Take a (pretty decent) guess.*/
bisect=begin+op_rescale64(diff,diff2,end-begin)-OP_CHUNK_SIZE;
- if(bisect<begin+OP_CHUNK_SIZE)bisect=begin;
+ if(bisect-OP_CHUNK_SIZE<begin)bisect=begin;
}
ret=op_seek_helper(_of,bisect);
if(OP_UNLIKELY(ret<0))return ret;