import {defer} from 'rxjs/observable/defer';
import {Observable} from 'rxjs/Observable';

/** Example
import {from} from 'rxjs/observable/from';

from([1, 2, 3])
    .pipe(doOnSubscribe(() => console.log('subscribed to stream')))
    .subscribe(x => console.log(x), null, () => console.log('completed'));
*/

export function doOnSubscribe<T>(onSubscribe: () => void): (source: Observable<T>) =>  Observable<T> {
    return function inner(source: Observable<T>): Observable<T> {
        return defer(() => {
          onSubscribe();

          return source;
        });
    };
}