-
Notifications
You must be signed in to change notification settings - Fork 873
/
Copy pathArchiverContext.swift
199 lines (172 loc) · 6.59 KB
/
ArchiverContext.swift
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
//
// ArchiverContext.swift
// Telegram
//
// Created by Mikhail Filimonov on 23/10/2018.
// Copyright © 2018 Telegram. All rights reserved.
//
import Zip
import SwiftSignalKit
enum ArchiveStatus : Equatable {
case none
case waiting
case done(URL)
case fail(ZipError)
case progress(Double)
}
enum ArchiveSource : Hashable {
static func == (lhs: ArchiveSource, rhs: ArchiveSource) -> Bool {
switch lhs {
case let .resource(lhsResource):
if case let .resource(rhsResource) = rhs {
return lhsResource.isEqual(to: rhsResource)
} else {
return false
}
}
}
var contents:[URL] {
switch self {
case let .resource(resource):
if resource.path.contains("tg_temp_archive_") {
let files = try? FileManager.default.contentsOfDirectory(at: URL(fileURLWithPath: resource.path), includingPropertiesForKeys: nil, options: FileManager.DirectoryEnumerationOptions.skipsHiddenFiles)
return files ?? [URL(fileURLWithPath: resource.path)]
}
return [URL(fileURLWithPath: resource.path)]
}
}
var hashValue: Int {
switch self {
case let .resource(resource):
return resource.id.hashValue
}
}
var destinationURL: URL {
return URL(fileURLWithPath: NSTemporaryDirectory() + "tarchive-\(self.uniqueId).zip")
}
var uniqueId: Int64 {
switch self {
case .resource(let resource):
return resource.randomId
}
}
case resource(LocalFileArchiveMediaResource)
}
private final class Archiver {
private let status: ValuePromise<ArchiveStatus> = ValuePromise(.waiting, ignoreRepeated: true)
var statusSignal:Signal<ArchiveStatus, NoError> {
return status.get()
}
let destination: URL
private let source: ArchiveSource
private let queue: Queue
init(source : ArchiveSource, queue: Queue) {
self.queue = queue
self.source = source
self.destination = source.destinationURL
}
func start(cancelToken:@escaping()->Bool) {
let destination = self.destination
let source = self.source
queue.async { [weak status] in
guard let status = status else {return}
let contents = source.contents
if !contents.isEmpty {
do {
try Zip.zipFiles(paths: contents, zipFilePath: destination, password: nil, compression: ZipCompression.BestCompression, progress: { progress in
status.set(.progress(progress))
}, cancel: cancelToken)
status.set(.done(destination))
} catch {
if let error = error as? ZipError {
status.set(.fail(error))
}
}
}
}
}
}
// добавить отмену архивирования если разлонигиваемся
private final class ArchiveStatusContext {
var status: ArchiveStatus = .none
let subscribers = Bag<(ArchiveStatus) -> Void>()
}
class ArchiverContext {
var statuses:[ArchiveSource : ArchiveStatus] = [:]
private let queue = Queue(name: "ArchiverContext")
private var contexts: [ArchiveSource: Archiver] = [:]
private let archiveQueue: Queue = Queue.concurrentDefaultQueue()
private var statusContexts: [ArchiveSource: ArchiveStatusContext] = [:]
private var statusesDisposable:[ArchiveSource : Disposable] = [:]
private var cancelledTokens:[ArchiveSource : Any] = [:]
init() {
}
deinit {
self.queue.sync {
self.contexts.removeAll()
for status in statusesDisposable {
status.value.dispose()
}
}
}
func remove(_ source: ArchiveSource) {
queue.async {
self.contexts.removeValue(forKey: source)
self.statusesDisposable[source]?.dispose()
self.statuses.removeValue(forKey: source)
self.cancelledTokens[source] = true
}
}
func archive(_ source: ArchiveSource, startIfNeeded: Bool = false) -> Signal<ArchiveStatus, NoError> {
let queue = self.queue
return Signal { [weak self] subscriber in
guard let `self` = self else { return EmptyDisposable }
if self.statusContexts[source] == nil {
self.statusContexts[source] = ArchiveStatusContext()
}
let statusContext = self.statusContexts[source]!
let index = statusContext.subscribers.add({ status in
subscriber.putNext(status)
})
if let _ = self.contexts[source] {
if let statusContext = self.statusContexts[source] {
for subscriber in statusContext.subscribers.copyItems() {
subscriber(statusContext.status)
}
}
} else {
if startIfNeeded {
let archiver = Archiver(source: source, queue: self.archiveQueue)
self.contexts[source] = archiver
self.statusesDisposable[source] = (archiver.statusSignal |> deliverOn(queue)).start(next: { status in
statusContext.status = status
for subscriber in statusContext.subscribers.copyItems() {
subscriber(statusContext.status)
}
}, completed: {
subscriber.putCompletion()
})
archiver.start(cancelToken: {
var cancelled: Bool = false
queue.sync {
cancelled = self.cancelledTokens[source] != nil
self.cancelledTokens.removeValue(forKey: source)
}
return cancelled
})
} else {
for subscriber in statusContext.subscribers.copyItems() {
subscriber(statusContext.status)
}
}
}
return ActionDisposable {
self.queue.async {
if let current = self.statusContexts[source] {
current.subscribers.remove(index)
}
}
}
} |> runOn(queue)
}
}