@ -114,18 +114,23 @@ func (sm *StreamManager) Stop() {
func ( sm * StreamManager ) connect ( ) error {
func ( sm * StreamManager ) connect ( ) error {
if sm . client != nil {
if sm . client != nil {
if c , ok := sm . client . ( * Client ) ; ok {
var scs * SyncConnState
if c . CurrentState . getState ( ) == StateDisconnected {
if client , ok := sm . client . ( * Client ) ; ok {
sm . Metrics = initMetrics ( )
scs = & client . CurrentState
err := c . Connect ( )
}
if err != nil {
if component , ok := sm . client . ( * Component ) ; ok {
return err
scs = & component . CurrentState
}
}
if sm . PostConnect != nil {
if scs != nil && scs . getState ( ) == StateDisconnected {
sm . PostConnect ( sm . client )
sm . Metrics = initMetrics ( )
}
err := sm . client . Connect ( )
return nil
if err != nil {
return err
}
if sm . PostConnect != nil {
sm . PostConnect ( sm . client )
}
}
return nil
}
}
}
}
return errors . New ( "client is not disconnected" )
return errors . New ( "client is not disconnected" )