androidflutterstream-builderflutter-streambuilder

Bad state: Stream has already been listened to compression progress listener


Hey I'm trying to upload videos from my app, when I first start the app and do the upload it is all fine but on the second upload it gives me an error

Bad state: Stream has already been listed to

Original Problem: The error occurs during video compression, where the stream controlling the compression progress is causing issues. I have a _compressVideo function that is responsible for compressing a video using the VideoCompress library. It subscribes to the compression progress updates through a stream, but when the video compression process is initiated multiple times, the error "Bad state: Stream has already been listened to" appears.

Code Overview: In my UploadVideoController, I cancel any previous compression subscriptions, create a new StreamController to handle progress updates and listen to this stream in the _compressVideo function. However, the error still occurs, indicating that the stream is being listened to more than once.

Issue: Even though I close the previous stream controller and cancel the subscription, the stream from VideoCompress.compressProgress$ might still be holding a listener, or it's not properly cleaned up between successive compression tasks. This is causing the error when trying to listen to the stream again.

How can I ensure the stream is cleaned up correctly, preventing this error on multiple compression attempts?

this is my upload video controller

import 'dart:async';
import 'dart:io';
import 'package:cloud_firestore/cloud_firestore.dart';
import 'package:firebase_storage/firebase_storage.dart';
import 'package:get/get.dart';
import 'package:video_compress/video_compress.dart';
import 'package:viwaad_1/constants.dart';
import 'package:viwaad_1/models/videos.dart';

class UploadVideoContoller extends GetxController {
  var compressionProgress = 0.0.obs; // Progress observable
  var uploadProgress = 0.0.obs;

  // StreamController and StreamSubscription to manage compression progress
  StreamController<double>? _progressController;
  StreamSubscription? _progressSubscription;

  // Method to handle video compression
  Future<File?> _compressVideo(String videoPath) async {
    try {
      // Clean up any previous compression progress
      await _cancelPreviousCompressionProgress();
  
      // Set logging level (optional)
      VideoCompress.setLogLevel(0);
      compressionProgress.value = 0.0;

      // Create a new StreamController for the new compression task
      _progressController = StreamController<double>();

      // Start video compression
      final compressTask = VideoCompress.compressVideo(
        videoPath,
        quality: VideoQuality.MediumQuality,
        deleteOrigin: false,
      );

      // Listen for progress updates
      VideoCompress.compressProgress$.subscribe((progress) {
        // Only feed progress to controller if it's valid
        if (_progressController != null && !_progressController!.isClosed) {
          _progressController!.add(progress / 100); // Normalize progress to 0-1 range
        }
      });

      // Listen to the progress updates stream
      _progressSubscription = _progressController!.stream.listen((progress) {
        compressionProgress.value = progress; // Update progress observable
      });

      // Wait for the compression task to complete
      final compressedVideo = await compressTask;

      // Close the progress controller stream
      await _progressController!.close();
        return compressedVideo?.file;
    } catch (e) {
      // Handle any errors that occur during compression
      Get.snackbar(
        'Compression Error',
        'An error occurred during video compression: $e',
        snackPosition: SnackPosition.TOP,
        duration: Duration(seconds: 3),
      );
      return null;
    }
  }

  // Method to clean up previous progress stream and subscription
  Future<void> _cancelPreviousCompressionProgress() async {
    if (_progressSubscription != null) {
      await _progressSubscription!.cancel();
      _progressSubscription = null;
    }
    if (_progressController != null) {
      await _progressController!.close();
      _progressController = null;
    }
  }

  // Main method to upload video and associated metadata
  Future<void> uploadVideo(String songName, String caption, String videoPath) async {
    try {
      // Clean up any previous progress before starting new upload
      await _cancelPreviousCompressionProgress();
      String videoUrl = await _uploadVideoToStorage("Video $len", videoPath);
      String profilePhoto = (userDoc.data() as Map<String, dynamic>)['profilePhoto'];

      // Create video metadata object
      Video video = Video(
        uid: uid,
        id: "Video $len",
        likes: [],
        commentCount: 0,
        shareCount: 0,
        songName: songName,
        caption: caption,
        videoUrl: videoUrl,
        thumbnail: thumbnail,
        profilePhoto: profilePhoto,
      );
    } catch (e) {
      //error display
    }
  }

  @override
  void onClose() {
    // Ensure that previous compression progress is cleaned up when the controller is disposed
    _cancelPreviousCompressionProgress();
    super.onClose();
  }
}

and this is the upload screen

import 'dart:io';
import 'package:flutter/material.dart';
import 'package:get/get.dart';
import 'package:video_player/video_player.dart';
import 'package:viwaad_1/controllers/upload_video_contoller.dart';
import 'package:viwaad_1/views/widgets/text_input_field.dart';

class ConfirmScreen extends StatefulWidget {
  final File videoFile;
  final String videoPath;

  const ConfirmScreen({Key? key, required this.videoFile, required this.videoPath}) : super(key: key);

  @override
  _ConfirmScreenState createState() => _ConfirmScreenState();
}

class _ConfirmScreenState extends State<ConfirmScreen> {
  late VideoPlayerController _controller;
  final TextEditingController _songController = TextEditingController();
  final TextEditingController _captionController = TextEditingController();
  final UploadVideoContoller _uploadVideoContoller = Get.put(UploadVideoContoller(), permanent: false);
  bool _isUploading = false;

  @override
  void initState() {
    super.initState();
    // Initialize VideoPlayerController
    _controller = VideoPlayerController.file(widget.videoFile)
      ..initialize().then((_) {
        setState(() {
          _controller.play();
          _controller.setVolume(1);
          _controller.setLooping(true);
        });
      });
  }

  @override
  void dispose() {
    _controller.dispose(); // Dispose the video controller
    _uploadVideoContoller.onClose(); // Ensure the controller handles cleanup
    super.dispose();
  }

  void startUpload() {
    setState(() {
      _isUploading = true;
      _uploadVideoContoller.compressionProgress.value = 0.0;
      _uploadVideoContoller.uploadProgress.value = 0.0;
    });

    _uploadVideoContoller.uploadVideo(
      _songController.text,
      _captionController.text,
      widget.videoPath,
    ).then((_) {
      setState(() {
        _isUploading = false;
      });
      // Navigate back after Snackbar duration
      Future.delayed(const Duration(seconds: 1), () {
        if (mounted) {
          Navigator.of(context).pop();
        }
      });
    }).catchError((e) {
      setState(() {
        _isUploading = false;
      });

      Get.snackbar(
        'Upload Failed',
        'Error: $e',
        snackPosition: SnackPosition.TOP,
        duration: const Duration(seconds: 3),
      );
    });
  }

this is a screen before that I don't know if there is a problem here but I'm just providing extra information

import 'dart:io';
import 'package:flutter/material.dart';
import 'package:image_picker/image_picker.dart';
import 'package:viwaad_1/views/screens/cofirm_image_screen.dart';
import 'package:viwaad_1/views/screens/confirm_screen.dart';
import 'package:viwaad_1/views/widgets/icons_add_screen.dart';

class AddScreen extends StatelessWidget {
  const AddScreen({super.key});

  pickVideo(ImageSource src, BuildContext context) async {
  final video = await ImagePicker().pickVideo(source: src);
  if (video != null) {
    // Close the dialog first before navigating to the next screen
    Navigator.of(context).pop();

    // Navigate to the confirm screen after picking the video
    Navigator.of(context).push(MaterialPageRoute(
      builder: (context) => ConfirmScreen(
        videoFile: File(video.path),
        videoPath: video.path,
      ),
    ));
  }
}

  showVideoOptionsDialog(BuildContext context) {
  return showDialog(
    context: context,
    builder: (context) => SimpleDialog(
      children: [
        SimpleDialogOption(
          onPressed: () => pickVideo(ImageSource.gallery, context),
          child: Row(
            children: const [
              Icon(Icons.image),
              Padding(
                padding: EdgeInsets.all(8.0),
                child: Text('Gallery', style: TextStyle(fontSize: 20)),
              )
            ],
          ),
        ),
        SimpleDialogOption(
          onPressed: () => pickVideo(ImageSource.camera, context),
          child: Row(
            children: const [
              Icon(Icons.camera_alt),
              Padding(
                padding: EdgeInsets.all(8.0),
                child: Text('Camera', style: TextStyle(fontSize: 20)),
              )
            ],
          ),
        ),
        SimpleDialogOption(
          onPressed: () => Navigator.of(context).pop(),
          child: Row(
            children: const [
              Icon(Icons.cancel),
              Padding(
                padding: EdgeInsets.all(8.0),
                child: Text('Cancel', style: TextStyle(fontSize: 20)),
              )
            ],
          ),
        )
      ],
    ),
  );
}

Solution

  • Since _progressSubscription is nullable, you can rather use the null-aware assignment operator.

    _progressSubscription ??= StreamController<double>();
    

    Even if you cancel _progressSubscription or close the StreamController, the original VideoCompress.compressProgress$ stream remains subscribed once you listen to it, which I believe leads to the issue.

    Subscription? videoCompressSubscription;
    
    videoCompressSubscription ??= VideoCompress.compressProgress$.subscribe((progress) {
            if (_progressController != null && !_progressController!.isClosed) {
              _progressController!.add(progress / 100);
            }
    });
    

    then later you can unsubscribe

    videoCompressSubscription?.unsubscribe();
    

    The issue you have is that for some reason your UploadVideoContoller is being triggered multiple times and Dart's Stream objects can only be listened to once unless they're explicitly marked as a broadcast stream. So your last option maybe to use a broadcast controller which allows for multiple listening.

    _progressController = StreamController<double>.broadcast();