Skip to content

Instantly share code, notes, and snippets.

@DevJohnC
Created December 3, 2013 04:28
Show Gist options
  • Select an option

  • Save DevJohnC/7763868 to your computer and use it in GitHub Desktop.

Select an option

Save DevJohnC/7763868 to your computer and use it in GitHub Desktop.
Prototype code for streaming video to libomxplayer
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using FragLabs.Adjutant.Network;
using FragLabs.Adjutant.Network.Interfaces;
using FragLabs.Adjutant.Network.Sockets;
namespace OmxTest
{
class NetworkOmxStream : IDisposable
{
private readonly string _filename;
private readonly OmxApi.ReadCallback _read;
private readonly OmxApi.SeekCallback _seek;
private IntPtr _omxPlayer;
private IntPtr _ffmpegBuffer;
private int _ffmpegBufferSize;
private StreamBuffer _streamBuffer;
private int _streamBufferSize;
private IClient _client;
private int _streamId;
private long _fileSize;
private long _position;
private ManualResetEventSlim _mre;
private byte[] _transferBuffer;
private CancellationTokenSource _readCancellationSource;
private CancellationToken _readCancellationToken;
private Task _readTask;
public NetworkOmxStream(IPAddress server, string filename)
{
_streamId = 1;
_filename = filename;
_ffmpegBufferSize = 1024 * 128; // 128KB of buffer for ffmpeg/omx to work with
_ffmpegBuffer = Marshal.AllocHGlobal(_ffmpegBufferSize);
_transferBuffer = new byte[_ffmpegBufferSize];
_streamBufferSize = 1024 * 2048; // 2MB of buffer for the data stream
_streamBuffer = new StreamBuffer(_streamBufferSize);
_read = new OmxApi.ReadCallback(ReadCallback);
_seek = new OmxApi.SeekCallback(SeekCallback);
_mre = new ManualResetEventSlim(false);
_client = new RconClient(new IPEndPoint(server, 21233));
_client.ConnectComplete += (sender, args) => GetFileInfo();
_client.Connect();
}
~NetworkOmxStream()
{
Dispose();
}
private void GetFileInfo()
{
_client.Send(new Message
{
IsFromServer = false,
IsResponse = false,
Words = new byte[][]
{
new[]{(byte)MessageType.FileInfo},
Encoding.UTF8.GetBytes(_filename)
}
}, (client, message) =>
{
var exists = BitConverter.ToBoolean(message.Words[0], 0);
if (exists)
{
_fileSize = BitConverter.ToInt64(message.Words[1], 0);
Console.WriteLine("Filesize: {0} bytes", _fileSize);
_omxPlayer = OmxApi.omx_create(_ffmpegBuffer, _ffmpegBufferSize, _ffmpegBuffer,
_read, _seek);
StartReading();
Task.Factory.StartNew(() =>
{
if (OmxApi.omx_probe_format(_omxPlayer, "") == IntPtr.Zero)
throw new Exception("Couldn't determine file format");
OmxApi.omx_play(_omxPlayer);
});
}
else
Console.WriteLine("FILE NOT FOUND");
});
}
public void Dispose()
{
if (_client != null)
{
_client.Send(new Message
{
IsFromServer = false,
IsResponse = false,
Words = new[]
{
new[]{(byte)MessageType.FileStream},
Encoding.UTF8.GetBytes(_filename),
new[] {(byte)Operation.Close}
}
});
_client.Disconnect();
_client.Dispose();
_client = null;
}
if (_omxPlayer != IntPtr.Zero)
{
OmxApi.omx_destroy(_omxPlayer);
_omxPlayer = IntPtr.Zero;
}
if (_ffmpegBuffer != IntPtr.Zero)
{
//Marshal.FreeHGlobal(_unmanagedBuffer);
_ffmpegBuffer = IntPtr.Zero;
}
}
private void StartReading()
{
_client.Send(new Message
{
IsFromServer = false,
IsResponse = false,
Words = new[]
{
new[] {(byte) MessageType.FileStream},
Encoding.UTF8.GetBytes(_filename),
new[] {(byte) Operation.Read},
BitConverter.GetBytes(_streamId),
BitConverter.GetBytes(_fileSize)
}
}, (client, message) =>
{
_readCancellationSource = new CancellationTokenSource();
_readCancellationToken = _readCancellationSource.Token;
_readTask = Task.Factory.StartNew(ReadThread, _readCancellationToken);
});
}
private void ReadThread()
{
var streamId = _streamId;
var socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect(new IPEndPoint(_client.RemoteIP, 32122 + streamId));
var buffer = new byte[1024 * 128];
while (!_readCancellationToken.IsCancellationRequested)
{
if (socket.Poll(10000, SelectMode.SelectError)) break;
if (socket.Poll(10000, SelectMode.SelectRead))
{
var read = socket.Receive(buffer, 0, buffer.Length, SocketFlags.None);
if (read == 0)
break;
var pos = 0;
while (read > 0)
{
if (_readCancellationToken.IsCancellationRequested)
break;
var space = _streamBuffer.Size - _streamBuffer.AvailableBytes;
if (space == 0)
{
Thread.Sleep(TimeSpan.FromMilliseconds(10));
continue;
}
if (space > read)
space = read;
lock (_streamBuffer)
{
if (_readCancellationToken.IsCancellationRequested)
break;
read -= space;
_streamBuffer.Write(buffer, pos, space);
pos += space;
}
}
}
}
socket.Close();
}
private int ReadCallback(IntPtr opaque, IntPtr buffer, int bytes)
{
lock (this)
{
// wait for buffer to have enough data
var readCount = bytes;
if (readCount > _ffmpegBufferSize)
readCount = _ffmpegBufferSize;
if (readCount > _fileSize - _position)
readCount = Convert.ToInt32(_fileSize - _position);
_streamBuffer.WaitForData(readCount);
unsafe
{
lock (_streamBuffer)
{
try
{
_streamBuffer.Read(_transferBuffer, 0, readCount);
}
catch (Exception ex)
{
Console.WriteLine("READ: Exception - {0}", ex);
throw;
}
}
var bytePtr = (byte*)buffer.ToPointer();
for (var i = 0; i < readCount; i++)
bytePtr[i] = _transferBuffer[i];
}
_position += readCount;
return readCount;
}
}
private long SeekCallback(IntPtr opaque, long offset, int whence)
{
lock (this)
{
if (whence == (int)Seek.Size)
{
return _fileSize;
}
_readCancellationSource.Cancel();
_readTask.Wait();
_readCancellationSource.Dispose();
_streamId++;
_streamBuffer.Reset();
var seekOrigin = SeekOrigin.Begin;
if (whence == (int)Seek.Cur)
seekOrigin = SeekOrigin.Current;
else if (whence == (int)Seek.End)
seekOrigin = SeekOrigin.End;
if (seekOrigin == SeekOrigin.Current)
{
Console.WriteLine("Seeking relative to current");
}
if (whence > 2 || whence < 0)
return -1;
var ret = 0L;
_mre.Reset();
_client.Send(new Message
{
IsFromServer = false,
IsResponse = false,
Words = new[]
{
new[] {(byte) MessageType.FileStream},
Encoding.UTF8.GetBytes(_filename),
new[] {(byte) Operation.Seek},
new[] {(byte) seekOrigin},
BitConverter.GetBytes(offset)
}
}, (client, message) =>
{
ret = BitConverter.ToInt64(message.Words[0], 0);
_position = ret;
StartReading();
_mre.Set();
});
_mre.Wait();
return ret;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment