shithub: rtmp

ref: 5745445bdf695dc69d3c15189e083432f2c1f42c
dir: /main.c/

View raw version
#include <u.h>
#include <libc.h>
#include <thread.h>
#include <bio.h>
#include "adts.h"
#include "ivf.h"
#include "rtmp.h"
#include "util.h"

typedef struct Conn Conn;

struct Conn {
	char *url;
	RTMP *r;
	ulong sid;
};

enum {
	Abufsz = 441*2*2, /* 1/100s */
};

int mainstacksize = 65536;
int debug = 0;

static Conn *cs;
static int ncs;
static uvlong ns₀, vms;
static int afd;
static Channel *ans₀;

static uvlong
ns2ms(uvlong z, uvlong ns)
{
	if(z != 0 && z != Zns₀)
		ns = z - ns₀ + ns;

	return ns / 1000000ULL;
}

static void
audioenc(void *aux)
{
	int nssent, fd, n;
	u8int *buf;
	uvlong ns;

	buf = emalloc(Abufsz);
	fd = *(int*)aux;
	nssent = 0;
	for(;;){
		if((n = readn(afd, buf, Abufsz)) < 1)
			break;
		if(nssent == 0){
			ns = nsec() - 10000000ULL;
			if(send(ans₀, &ns) != 1)
				break;
			nssent = 1;
		}
		if(write(fd, buf, n) != n)
			break;
	}
	chanclose(ans₀);
	close(afd);
	close(fd);
	
	threadexits(nil);
}

static void
audiosend(void *aux)
{
	ADTSFrame af;
	Biobuf *a;
	u64int ms;
	Conn *c;
	int i;

	if((a = Bfdopen(*(int*)aux, OREAD)) == nil)
		sysfatal("%r");
	memset(&af, 0, sizeof(af));
	if(recv(ans₀, &af.ns₀) != 1)
		sysfatal("no audio timestamp");
	for(;;){
		if(adtsread(a, &af) != 0)
			sysfatal("%r");
		if(af.sz == 0) /* eof */
			break;
		ms = ns2ms(af.ns₀, af.ns);

		for(c = cs, i = 0; i < ncs; i++, c++){
			if(rtmpdata(c->r, c->sid, ms, Taudio, af.buf, af.sz) != 0){
				fprint(2, "%s: %r\n", c->url);
				goto out;
			}
		}

		/* protect against overruns */
		if(vms+200 < ms)
			sleep(100);
	}

	/* FIXME properly close RTMP connection */
out:
	threadexitsall(nil);
}

static void
audio(void)
{
	static int p[4], pid;
	Dir d;

	nulldir(&d);
	d.length = Abufsz;
	pipe(p);
	pipe(p+2);
	dirfwstat(p[0], &d);
	if((pid = fork()) < 0)
		sysfatal("%r");
	if(pid == 0){
		close(afd);
		dup(p[0], 0); close(p[0]);
		dup(p[2], 1); close(p[2]);
		execl("/bin/audio/aacenc", "audio/aacenc", nil);
		sysfatal("aacenc: %r");
	}
	close(p[0]);
	close(p[2]);
	ans₀ = chancreate(sizeof(uvlong), 1);
	proccreate(audioenc, &p[1], mainstacksize);
	proccreate(audiosend, &p[3], mainstacksize);
}

static void
usage(void)
{
	fprint(2, "usage: %s [-a AUDIO] URL [URL...]\n", argv0);
	threadexitsall("usage");
}

void
threadmain(int argc, char **argv)
{
	IVFrame vf;
	u64int ms;
	Biobuf v;
	IVF ivf;
	Conn *c;
	int i;

	afd = open("/dev/zero", OREAD|OCEXEC);
	ARGBEGIN{
	case 'd':
		debug++;
		break;
	case 'a':
		if((afd = open(EARGF(usage()), OREAD|OCEXEC)) < 0)
			sysfatal("%r");
		break;
	default:
		usage();
	}ARGEND

	ncs = argc;
	if(ncs < 1)
		usage();
	ns₀ = nsec() - 10ULL*1000000000ULL; /* base, -10s */
	srand(time(nil));

	if(Binit(&v, 0, OREAD) != 0 || ivfopen(&v, &ivf) != 0)
		sysfatal("%r");
	if(strcmp(ivf.type, "AVC1") != 0)
		sysfatal("not H.264");

	cs = ecalloc(argc, sizeof(*cs));
	for(c = cs, i = 0; i < ncs; i++, c++){
		c->url = "rtmp://REDACTED";//FIXME the key has to be redacted argv[i];

		if((c->r = rtmpdial(argv[i])) == nil)
			sysfatal("%r");

		if(rtmpstream(c->r, &c->sid) != 0 ||
		   rtmppublish(c->r, c->sid, PubLive, nil) != 0 ||
		   rtmpmeta(c->r, c->sid, VcodecH264, ivf.w, ivf.h, afd >= 0 ? AcodecAAC : -1) != 0){
			sysfatal("%r");
		}
	}

	if(afd >= 0)
		audio();

	memset(&vf, 0, sizeof(vf));
	for(;;){
		if(ivfread(&ivf, &vf) != 0)
			sysfatal("%r");
		if(vf.sz == 0)
			break;
		ms = ns2ms(ivf.ns₀, vf.ns);
		vms = ms;
		for(c = cs, i = 0; i < ncs; i++, c++){
			if(rtmpdata(c->r, c->sid, ms, Tvideo, vf.buf, vf.sz) != 0){
				fprint(2, "%s: %r\n", c->url);
				goto out;
			}
		}
	}

	/* FIXME properly close RTMP connection */
out:
	close(afd);

	threadexitsall(nil);
}